Saturday, June 22, 2024

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks

Untitled1

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks

image.png

Creating Sample Hudi Tables

In [8]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
5NonepysparkidleNone
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
5NonepysparkidleNone

Creating Sample Table

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"

# Define the records
records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

# Define the schema
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

# Define Hudi options
hudi_options = {
   'hoodie.table.name': table_name,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.precombine.field': 'create_ts',
   'hoodie.datasource.write.partitionpath.field': 'city',
   'hoodie.datasource.write.hive_style_partitioning': 'true'
}

# Write the DataFrame to Hudi
(
   df.write
   .format("hudi")
   .options(**hudi_options)
   .mode("overwrite")  # Use overwrite mode if you want to replace the existing table
   .save(f"{base_path}/{table_name}")
)

Bulding Universal Datalakes


Download these Jar files in Directory


Jar Files

* iceberg-aws-1.3.1.jar  https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws/1.3.1/iceberg-aws-1.3.1.jar


* bundle-2.23.9.jar https://mvnrepository.com/artifact/software.amazon.awssdk/bundle/2.23.9


* utilities-0.1.0-beta1-bundled.jar  https://github.com/apache/incubator-xtable/packages/1986830

Creating a Glue Database called icebergdb


aws glue create-database --database-input "{\"Name\":\"icebergdb\"}"

Define catlog.yaml


catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: onetable
catalogOptions:
    io-impl: org.apache.iceberg.aws.s3.S3FileIO
    warehouse: s3://soumilshah-dev-1995/warehouse

Define my_config.yaml


sourceFormat: HUDI

targetFormats:
  - ICEBERG
  - DELTA
datasets:
  -
    tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
    tableName: people
    partitionSpec: city:VALUE
    namespace: icebergdb

Running OneTable Sync Command

Just Write metadata

In [16]:
%%bash 
java -jar  ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-06-22 14:54:16 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-06-22 14:54:25 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Write metadata with Sync Glue Catlog

In [17]:
%%bash 
java -cp "utilities-0.1.0-beta1-bundled.jar:iceberg-aws-1.3.1.jar:bundle-2.23.9.jar" io.onetable.utilities.RunSync --datasetConfig my_config.yaml --icebergCatalogConfig catalog.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-06-22 14:54:25 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-06-22 14:54:33 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-06-22 14:54:39 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Create Spark Session

In [25]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)



spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()
s3://soumilshah-dev-1995/hudi-dataset/people/

Read Universal Tables as iceberg | Delta | Hudi

Read as Iceberg

In [26]:
spark.table(f"{catalog_name}.{database_name}.{table_name}").createOrReplaceTempView("iceberg_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM iceberg_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |2  |
|20240622145110517  |3  |
|20240622145110517  |6  |
|20240622145110517  |5  |
|20240622145110517  |1  |
|20240622145110517  |4  |
+-------------------+---+

Read as Hudi Tables

In [27]:
spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |1  |
|20240622145110517  |4  |
|20240622145110517  |6  |
|20240622145110517  |3  |
|20240622145110517  |2  |
|20240622145110517  |5  |
+-------------------+---+

Read as Delta Tables

In [28]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM delta_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |1  |
|20240622145110517  |4  |
|20240622145110517  |6  |
|20240622145110517  |3  |
|20240622145110517  |2  |
|20240622145110517  |5  |
+-------------------+---+

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...