Friday, July 5, 2024

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with Unity Catalog

unity

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with Unity Catalog

In [1]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
No active sessions.
In [13]:
print("done:")
done:

Creating Sample Table

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"

# Define the records
records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

# Define the schema
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

# Define Hudi options
hudi_options = {
   'hoodie.table.name': table_name,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.precombine.field': 'create_ts',
   'hoodie.datasource.write.partitionpath.field': 'city',
   'hoodie.datasource.write.hive_style_partitioning': 'true'
}

# Write the DataFrame to Hudi
(
   df.write
   .format("hudi")
   .options(**hudi_options)
   .mode("overwrite")  # Use overwrite mode if you want to replace the existing table
   .save(f"{base_path}/{table_name}")
)
In [17]:
print("done")
done

Download these Jar files in Directory

Jar Files

Define my_config.yaml

sourceFormat: HUDI

targetFormats:
  - ICEBERG
  - DELTA
datasets:
  -
    tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
    tableName: people
    partitionSpec: city:VALUE
In [18]:
%%bash 
java -jar  ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-07-05 20:42:13 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-07-05 20:42:17 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
2024-07-05 20:42:17 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-07-05 20:42:32 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Read as Delta Iceberg and Hudi

In [19]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)



spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()
s3://soumilshah-dev-1995/hudi-dataset/people/
In [20]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id   FROM delta_snapshot"
spark.sql(query).show(truncate=False)

spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240705203957517  |1  |
|20240705203957517  |4  |
|20240705203957517  |6  |
|20240705203957517  |3  |
|20240705203957517  |2  |
|20240705203957517  |5  |
+-------------------+---+

+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240705203957517  |1  |
|20240705203957517  |4  |
|20240705203957517  |6  |
|20240705203957517  |3  |
|20240705203957517  |2  |
|20240705203957517  |5  |
+-------------------+---+

Unity Catalog


bin/uc catalog create --name unity --comment "Test catalog"

bin/uc schema create --catalog unity --name default --comment "Default schema"



bin/uc table create \
    --full_name unity.default.hudi_table \
    --columns "id INT, name STRING, age INT, city STRING, create_ts STRING, _hoodie_commit_time STRING, _hoodie_commit_seqno STRING, _hoodie_record_key STRING, _hoodie_partition_path STRING, _hoodie_file_name STRING" \
    --storage_location s3://XXX/hudi-dataset/people \
    --format DELTA


bin/uc table read -full_name unity.default.hudi_table


bin/uc table delete --full_name unity.default.hudi_table

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...