Friday, July 5, 2024

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with Unity Catalog

unity

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with Unity Catalog

In [1]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
No active sessions.
In [13]:
print("done:")
done:

Creating Sample Table

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"

# Define the records
records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

# Define the schema
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

# Define Hudi options
hudi_options = {
   'hoodie.table.name': table_name,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.precombine.field': 'create_ts',
   'hoodie.datasource.write.partitionpath.field': 'city',
   'hoodie.datasource.write.hive_style_partitioning': 'true'
}

# Write the DataFrame to Hudi
(
   df.write
   .format("hudi")
   .options(**hudi_options)
   .mode("overwrite")  # Use overwrite mode if you want to replace the existing table
   .save(f"{base_path}/{table_name}")
)
In [17]:
print("done")
done

Download these Jar files in Directory

Jar Files

Define my_config.yaml

sourceFormat: HUDI

targetFormats:
  - ICEBERG
  - DELTA
datasets:
  -
    tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
    tableName: people
    partitionSpec: city:VALUE
In [18]:
%%bash 
java -jar  ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-07-05 20:42:13 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-07-05 20:42:17 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
2024-07-05 20:42:17 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-07-05 20:42:32 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Read as Delta Iceberg and Hudi

In [19]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)



spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()
s3://soumilshah-dev-1995/hudi-dataset/people/
In [20]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id   FROM delta_snapshot"
spark.sql(query).show(truncate=False)

spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240705203957517  |1  |
|20240705203957517  |4  |
|20240705203957517  |6  |
|20240705203957517  |3  |
|20240705203957517  |2  |
|20240705203957517  |5  |
+-------------------+---+

+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240705203957517  |1  |
|20240705203957517  |4  |
|20240705203957517  |6  |
|20240705203957517  |3  |
|20240705203957517  |2  |
|20240705203957517  |5  |
+-------------------+---+

Unity Catalog


bin/uc catalog create --name unity --comment "Test catalog"

bin/uc schema create --catalog unity --name default --comment "Default schema"



bin/uc table create \
    --full_name unity.default.hudi_table \
    --columns "id INT, name STRING, age INT, city STRING, create_ts STRING, _hoodie_commit_time STRING, _hoodie_commit_seqno STRING, _hoodie_record_key STRING, _hoodie_partition_path STRING, _hoodie_file_name STRING" \
    --storage_location s3://XXX/hudi-dataset/people \
    --format DELTA


bin/uc table read -full_name unity.default.hudi_table


bin/uc table delete --full_name unity.default.hudi_table

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...