Learn How to Process XML Data Files and Build Hudi Datalakes¶
In [45]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
In [46]:
%%bash
pip install Faker
Generate XML Data¶
In [25]:
import uuid
import random
from faker import Faker
import boto3
print("ok")
def generate_invoice_xml(replicadmstimestamp, invoiceid, itemid, category, price, quantity, orderdate, destinationstate,
shippingtype, referral):
# Create the XML content
xml_content = f"""
<response>
<row>
<replicadmstimestamp>{replicadmstimestamp}</replicadmstimestamp>
<invoiceid>{invoiceid}</invoiceid>
<itemid>{itemid}</itemid>
<category>{category}</category>
<price>{price}</price>
<quantity>{quantity}</quantity>
<orderdate>{orderdate}</orderdate>
<destinationstate>{destinationstate}</destinationstate>
<shippingtype>{shippingtype}</shippingtype>
<referral>{referral}</referral>
</row>
</response>
"""
return xml_content
def upload_to_s3(xml_content, bucket_name, file_name):
# Initialize S3 client
s3 = boto3.client('s3')
# Upload XML content to S3
s3.put_object(Body=xml_content, Bucket=bucket_name, Key=file_name)
print(f"Uploaded XML file '{file_name}' to S3 bucket '{bucket_name}'")
if __name__ == "__main__":
# Initialize Faker
fake = Faker()
print("ok....")
# S3 bucket information
S3_BUCKET_NAME = "soumilshah-dev-1995"
S3_FOLDER_PATH = "xmldata/raw/"
# Number of entries to generate
num_entries = 5
# Generate and upload XML records
for _ in range(num_entries):
replicadmstimestamp = fake.date_time_this_year()
invoiceid = fake.unique.random_number(digits=5)
itemid = fake.unique.random_number(digits=2)
category = fake.word()
price = round(random.uniform(10, 100), 2)
quantity = random.randint(1, 5)
orderdate = fake.date_this_decade()
destinationstate = fake.state_abbr()
shippingtype = random.choice(['2-Day', '3-Day', 'Standard'])
referral = fake.word()
# Generate XML content
xml_content = generate_invoice_xml(replicadmstimestamp, invoiceid, itemid, category, price, quantity, orderdate,
destinationstate, shippingtype, referral)
# Define file name
file_name = f"{S3_FOLDER_PATH}invoice_{uuid.uuid4()}.xml"
# Upload XML content to S3
upload_to_s3(xml_content, S3_BUCKET_NAME, file_name)
# Print XML content
print(f"XML content for entry {_ + 1}:")
print(xml_content)
Hudi Datalakes Code¶
In [47]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Read from S3 with Glue") \
.getOrCreate()
# Initialize Glue context
glueContext = GlueContext(SparkContext.getOrCreate())
# Path to the XML files folder in S3
S3_SOURCE_XML_FOLDER = "s3://soumilshah-dev-1995/xmldata/raw/"
# Read XML files as a Glue DynamicFrame with specified rowTag
glue_df = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [S3_SOURCE_XML_FOLDER]},
format="xml",
format_options={"rowTag": "row"} # Specify the rowTag as "row"
)
# Convert DynamicFrame to Spark DataFrame for further processing
df = glue_df.toDF()
# Show the DataFrame schema and some sample data
df.printSchema()
df.show()
df.count()
Define Helper Function¶
In [49]:
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
enable_meta_data_indexing,
use_sql_transformer, sql_transformer_query,
target_path, index_type, method='upsert', clustering_column='default'):
"""
Upserts a dataframe into a Hudi table.
Args:
glue_database (str): The name of the glue database.
table_name (str): The name of the Hudi table.
record_id (str): The name of the field in the dataframe that will be used as the record key.
precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
partition_fields this is used to parrtition data
enable_partition (bool): Whether or not to enable partitioning.
enable_cleaner (bool): Whether or not to enable data cleaning.
enable_hive_sync (bool): Whether or not to enable syncing with Hive.
use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
sql_transformer_query (str): The SQL query to use for data transformation.
target_path (str): The path to the target Hudi table.
method (str): The Hudi write method to use (default is 'upsert').
index_type : BLOOM or GLOBAL_BLOOM
Returns:
None
"""
# These are the basic settings for the Hoodie table
hudi_final_settings = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.table.type": table_type,
"hoodie.datasource.write.operation": method,
"hoodie.datasource.write.recordkey.field": record_id,
"hoodie.datasource.write.precombine.field": precomb_key,
}
# These settings enable syncing with Hive
hudi_hive_sync_settings = {
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": glue_database,
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
}
# These settings enable automatic cleaning of old data
hudi_cleaner_options = {
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "true",
"hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
"hoodie.cleaner.fileversions.retained": "3",
"hoodie-conf hoodie.cleaner.parallelism": '200',
'hoodie.cleaner.commits.retained': 5
}
# These settings enable partitioning of the data
partition_settings = {
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.write.hive_style_partitioning": "true",
}
hudi_clustering = {
"hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
"hoodie.clustering.inline": "true",
"hoodie.clustering.plan.strategy.sort.columns": clustering_column,
"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
"hoodie.clustering.plan.strategy.small.file.limit": "629145600"
}
# Define a dictionary with the index settings for Hudi
hudi_index_settings = {
"hoodie.index.type": index_type, # Specify the index type for Hudi
}
# Define a dictionary with the Fiel Size
hudi_file_size = {
"hoodie.parquet.max.file.size": 512 * 1024 * 1024, # 512MB
"hoodie.parquet.small.file.limit": 104857600, # 100MB
}
hudi_meta_data_indexing = {
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.async": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.check.timeout.seconds": "60",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
}
if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
for key, value in hudi_meta_data_indexing.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
for key, value in hudi_clustering.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# Add the Hudi index settings to the final settings dictionary
for key, value in hudi_index_settings.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
for key, value in hudi_file_size.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# If partitioning is enabled, add the partition settings to the final settings
if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
for key, value in partition_settings.items(): hudi_final_settings[key] = value
# If data cleaning is enabled, add the cleaner options to the final settings
if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value
# If Hive syncing is enabled, add the Hive sync settings to the final settings
if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value
# If there is data to write, apply any SQL transformations and write to the target path
if spark_df.count() > 0:
if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
spark_df.createOrReplaceTempView("temp")
spark_df = spark.sql(sql_transformer_query)
spark_df.write.format("hudi"). \
options(**hudi_final_settings). \
mode("append"). \
save(target_path)
In [50]:
upsert_hudi_table(
glue_database="default",
table_name="invoices",
record_id="invoiceid,itemid",
precomb_key="replicadmstimestamp",
table_type='COPY_ON_WRITE',
partition_fields="destinationstate",
method='upsert',
index_type='BLOOM',
enable_partition=True,
enable_cleaner=True,
enable_hive_sync=True,
enable_clustering='False',
clustering_column='default',
enable_meta_data_indexing='false',
use_sql_transformer=False,
sql_transformer_query="",
target_path="s3://soumilshah-dev-1995/xmldata/silver/table_name=bronze_invoices",
spark_df=df,
)
In [52]:
print("done")
Read From Hudi Tables¶
In [53]:
path = "s3://soumilshah-dev-1995/xmldata/silver/table_name=bronze_invoices/"
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"SELECT invoiceid FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Executes Stored Procs¶
In [54]:
query = "call show_commits(table => 'default.invoices', limit => 10);"
spark.sql(query).show()
In [ ]:
No comments:
Post a Comment