Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with Unity Catalog¶
In [1]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
In [13]:
print("done:")
Creating Sample Table¶
In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Initialize Spark session
spark = SparkSession.builder \
.appName("HudiExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.getOrCreate()
# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"
# Define the records
records = [
(1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
(2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
(3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
(4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
(5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
(6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]
# Define the schema
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True),
StructField("create_ts", StringType(), True)
])
# Create a DataFrame
df = spark.createDataFrame(records, schema)
# Define Hudi options
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'create_ts',
'hoodie.datasource.write.partitionpath.field': 'city',
'hoodie.datasource.write.hive_style_partitioning': 'true'
}
# Write the DataFrame to Hudi
(
df.write
.format("hudi")
.options(**hudi_options)
.mode("overwrite") # Use overwrite mode if you want to replace the existing table
.save(f"{base_path}/{table_name}")
)
In [17]:
print("done")
Download these Jar files in Directory¶
Jar Files
- utilities-0.1.0-beta1-bundled.jar https://github.com/apache/incubator-xtable/packages/1986830
Define my_config.yaml
sourceFormat: HUDI
targetFormats:
- ICEBERG
- DELTA
datasets:
-
tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
tableName: people
partitionSpec: city:VALUE
In [18]:
%%bash
java -jar ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
Read as Delta Iceberg and Hudi¶
In [19]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)
spark = SparkSession.builder \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
In [20]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM delta_snapshot"
spark.sql(query).show(truncate=False)
spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
Unity Catalog¶
bin/uc catalog create --name unity --comment "Test catalog"
bin/uc schema create --catalog unity --name default --comment "Default schema"
bin/uc table create \
--full_name unity.default.hudi_table \
--columns "id INT, name STRING, age INT, city STRING, create_ts STRING, _hoodie_commit_time STRING, _hoodie_commit_seqno STRING, _hoodie_record_key STRING, _hoodie_partition_path STRING, _hoodie_file_name STRING" \
--storage_location s3://XXX/hudi-dataset/people \
--format DELTA
bin/uc table read -full_name unity.default.hudi_table
bin/uc table delete --full_name unity.default.hudi_table
No comments:
Post a Comment