Saturday, June 22, 2024

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks

Untitled1

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks

image.png

Creating Sample Hudi Tables

In [8]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
5NonepysparkidleNone
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
5NonepysparkidleNone

Creating Sample Table

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"

# Define the records
records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

# Define the schema
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

# Define Hudi options
hudi_options = {
   'hoodie.table.name': table_name,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.precombine.field': 'create_ts',
   'hoodie.datasource.write.partitionpath.field': 'city',
   'hoodie.datasource.write.hive_style_partitioning': 'true'
}

# Write the DataFrame to Hudi
(
   df.write
   .format("hudi")
   .options(**hudi_options)
   .mode("overwrite")  # Use overwrite mode if you want to replace the existing table
   .save(f"{base_path}/{table_name}")
)

Bulding Universal Datalakes


Download these Jar files in Directory


Jar Files

* iceberg-aws-1.3.1.jar  https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws/1.3.1/iceberg-aws-1.3.1.jar


* bundle-2.23.9.jar https://mvnrepository.com/artifact/software.amazon.awssdk/bundle/2.23.9


* utilities-0.1.0-beta1-bundled.jar  https://github.com/apache/incubator-xtable/packages/1986830

Creating a Glue Database called icebergdb


aws glue create-database --database-input "{\"Name\":\"icebergdb\"}"

Define catlog.yaml


catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: onetable
catalogOptions:
    io-impl: org.apache.iceberg.aws.s3.S3FileIO
    warehouse: s3://soumilshah-dev-1995/warehouse

Define my_config.yaml


sourceFormat: HUDI

targetFormats:
  - ICEBERG
  - DELTA
datasets:
  -
    tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
    tableName: people
    partitionSpec: city:VALUE
    namespace: icebergdb

Running OneTable Sync Command

Just Write metadata

In [16]:
%%bash 
java -jar  ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-06-22 14:54:16 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-06-22 14:54:25 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Write metadata with Sync Glue Catlog

In [17]:
%%bash 
java -cp "utilities-0.1.0-beta1-bundled.jar:iceberg-aws-1.3.1.jar:bundle-2.23.9.jar" io.onetable.utilities.RunSync --datasetConfig my_config.yaml --icebergCatalogConfig catalog.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-06-22 14:54:25 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-06-22 14:54:33 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-06-22 14:54:39 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Create Spark Session

In [25]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)



spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()
s3://soumilshah-dev-1995/hudi-dataset/people/

Read Universal Tables as iceberg | Delta | Hudi

Read as Iceberg

In [26]:
spark.table(f"{catalog_name}.{database_name}.{table_name}").createOrReplaceTempView("iceberg_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM iceberg_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |2  |
|20240622145110517  |3  |
|20240622145110517  |6  |
|20240622145110517  |5  |
|20240622145110517  |1  |
|20240622145110517  |4  |
+-------------------+---+

Read as Hudi Tables

In [27]:
spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |1  |
|20240622145110517  |4  |
|20240622145110517  |6  |
|20240622145110517  |3  |
|20240622145110517  |2  |
|20240622145110517  |5  |
+-------------------+---+

Read as Delta Tables

In [28]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM delta_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |1  |
|20240622145110517  |4  |
|20240622145110517  |6  |
|20240622145110517  |3  |
|20240622145110517  |2  |
|20240622145110517  |5  |
+-------------------+---+

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...