Saturday, June 22, 2024

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks

Untitled1

Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks¶

image.png

Creating Sample Hudi Tables¶

In [8]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
5NonepysparkidleNone✔
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
5NonepysparkidleNone✔

Creating Sample Table¶

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"

# Define the records
records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

# Define the schema
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

# Define Hudi options
hudi_options = {
   'hoodie.table.name': table_name,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.precombine.field': 'create_ts',
   'hoodie.datasource.write.partitionpath.field': 'city',
   'hoodie.datasource.write.hive_style_partitioning': 'true'
}

# Write the DataFrame to Hudi
(
   df.write
   .format("hudi")
   .options(**hudi_options)
   .mode("overwrite")  # Use overwrite mode if you want to replace the existing table
   .save(f"{base_path}/{table_name}")
)

Bulding Universal Datalakes¶


Download these Jar files in Directory¶


Jar Files

* iceberg-aws-1.3.1.jar  https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws/1.3.1/iceberg-aws-1.3.1.jar


* bundle-2.23.9.jar https://mvnrepository.com/artifact/software.amazon.awssdk/bundle/2.23.9


* utilities-0.1.0-beta1-bundled.jar  https://github.com/apache/incubator-xtable/packages/1986830

Creating a Glue Database called icebergdb¶


aws glue create-database --database-input "{\"Name\":\"icebergdb\"}"

Define catlog.yaml¶


catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: onetable
catalogOptions:
    io-impl: org.apache.iceberg.aws.s3.S3FileIO
    warehouse: s3://soumilshah-dev-1995/warehouse

Define my_config.yaml¶


sourceFormat: HUDI

targetFormats:
  - ICEBERG
  - DELTA
datasets:
  -
    tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
    tableName: people
    partitionSpec: city:VALUE
    namespace: icebergdb

Running OneTable Sync Command¶

Just Write metadata¶

In [16]:
%%bash 
java -jar  ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-06-22 14:54:16 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-06-22 14:54:25 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Write metadata with Sync Glue Catlog¶

In [17]:
%%bash 
java -cp "utilities-0.1.0-beta1-bundled.jar:iceberg-aws-1.3.1.jar:bundle-2.23.9.jar" io.onetable.utilities.RunSync --datasetConfig my_config.yaml --icebergCatalogConfig catalog.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-06-22 14:54:25 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumilshah-dev-1995/hudi-dataset/people/ for following table formats [ICEBERG, DELTA]
2024-06-22 14:54:33 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-06-22 14:54:39 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG, DELTA]

Create Spark Session¶

In [25]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)



spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()
s3://soumilshah-dev-1995/hudi-dataset/people/

Read Universal Tables as iceberg | Delta | Hudi¶

Read as Iceberg¶

In [26]:
spark.table(f"{catalog_name}.{database_name}.{table_name}").createOrReplaceTempView("iceberg_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM iceberg_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |2  |
|20240622145110517  |3  |
|20240622145110517  |6  |
|20240622145110517  |5  |
|20240622145110517  |1  |
|20240622145110517  |4  |
+-------------------+---+

Read as Hudi Tables¶

In [27]:
spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |1  |
|20240622145110517  |4  |
|20240622145110517  |6  |
|20240622145110517  |3  |
|20240622145110517  |2  |
|20240622145110517  |5  |
+-------------------+---+

Read as Delta Tables¶

In [28]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id  FROM delta_snapshot "
spark.sql(query).show(truncate=False)
+-------------------+---+
|_hoodie_commit_time|id |
+-------------------+---+
|20240622145110517  |1  |
|20240622145110517  |4  |
|20240622145110517  |6  |
|20240622145110517  |3  |
|20240622145110517  |2  |
|20240622145110517  |5  |
+-------------------+---+

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...