Universal Datalakes: Interoperability with Hudi, Iceberg, and Delta Tables with AWS Glue Notebooks¶
Creating Sample Hudi Tables¶
In [8]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Creating Sample Table¶
In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Initialize Spark session
spark = SparkSession.builder \
.appName("HudiExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.getOrCreate()
# Initialize the bucket
table_name = "people"
base_path = "s3://soumilshah-dev-1995/hudi-dataset"
# Define the records
records = [
(1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
(2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
(3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
(4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
(5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
(6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]
# Define the schema
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True),
StructField("create_ts", StringType(), True)
])
# Create a DataFrame
df = spark.createDataFrame(records, schema)
# Define Hudi options
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'create_ts',
'hoodie.datasource.write.partitionpath.field': 'city',
'hoodie.datasource.write.hive_style_partitioning': 'true'
}
# Write the DataFrame to Hudi
(
df.write
.format("hudi")
.options(**hudi_options)
.mode("overwrite") # Use overwrite mode if you want to replace the existing table
.save(f"{base_path}/{table_name}")
)
Bulding Universal Datalakes¶
Download these Jar files in Directory¶
Jar Files
* iceberg-aws-1.3.1.jar https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws/1.3.1/iceberg-aws-1.3.1.jar
* bundle-2.23.9.jar https://mvnrepository.com/artifact/software.amazon.awssdk/bundle/2.23.9
* utilities-0.1.0-beta1-bundled.jar https://github.com/apache/incubator-xtable/packages/1986830
Creating a Glue Database called icebergdb¶
aws glue create-database --database-input "{\"Name\":\"icebergdb\"}"
Define catlog.yaml¶
catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: onetable
catalogOptions:
io-impl: org.apache.iceberg.aws.s3.S3FileIO
warehouse: s3://soumilshah-dev-1995/warehouse
Define my_config.yaml¶
sourceFormat: HUDI
targetFormats:
- ICEBERG
- DELTA
datasets:
-
tableBasePath: s3://soumilshah-dev-1995/hudi-dataset/people/
tableName: people
partitionSpec: city:VALUE
namespace: icebergdb
Running OneTable Sync Command¶
Just Write metadata¶
In [16]:
%%bash
java -jar ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
Write metadata with Sync Glue Catlog¶
In [17]:
%%bash
java -cp "utilities-0.1.0-beta1-bundled.jar:iceberg-aws-1.3.1.jar:bundle-2.23.9.jar" io.onetable.utilities.RunSync --datasetConfig my_config.yaml --icebergCatalogConfig catalog.yaml
Create Spark Session¶
In [25]:
catalog_name = "glue_catalog"
bucket_name = "soumilshah-dev-1995"
bucket_prefix = "hudi-dataset/people/"
database_name = "icebergdb"
table_name = "people"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
print(warehouse_path)
spark = SparkSession.builder \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
Read Universal Tables as iceberg | Delta | Hudi¶
Read as Iceberg¶
In [26]:
spark.table(f"{catalog_name}.{database_name}.{table_name}").createOrReplaceTempView("iceberg_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM iceberg_snapshot "
spark.sql(query).show(truncate=False)
Read as Hudi Tables¶
In [27]:
spark.read.format("hudi").load(warehouse_path).createOrReplaceTempView("hudi_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM hudi_snapshot "
spark.sql(query).show(truncate=False)
Read as Delta Tables¶
In [28]:
spark.read.format("delta").load(warehouse_path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT _hoodie_commit_time,id FROM delta_snapshot "
spark.sql(query).show(truncate=False)
No comments:
Post a Comment