Tuesday, June 18, 2024

Learn How to Process XML Data Files and Build Hudi Datalakes¶

xmlhudi (1)

Learn How to Process XML Data Files and Build Hudi Datalakes

In [45]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
In [46]:
%%bash
pip install Faker
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: Faker in /home/glue_user/.local/lib/python3.10/site-packages (25.8.0)
Requirement already satisfied: python-dateutil>=2.4 in /home/glue_user/.local/lib/python3.10/site-packages (from Faker) (2.8.2)
Requirement already satisfied: six>=1.5 in /home/glue_user/.local/lib/python3.10/site-packages (from python-dateutil>=2.4->Faker) (1.16.0)
WARNING: There was an error checking the latest version of pip.

Generate XML Data

In [25]:
import uuid
import random
from faker import Faker
import boto3
print("ok")

def generate_invoice_xml(replicadmstimestamp, invoiceid, itemid, category, price, quantity, orderdate, destinationstate,
                         shippingtype, referral):
    # Create the XML content
    xml_content = f"""
    <response>
        <row>
            <replicadmstimestamp>{replicadmstimestamp}</replicadmstimestamp>
            <invoiceid>{invoiceid}</invoiceid>
            <itemid>{itemid}</itemid>
            <category>{category}</category>
            <price>{price}</price>
            <quantity>{quantity}</quantity>
            <orderdate>{orderdate}</orderdate>
            <destinationstate>{destinationstate}</destinationstate>
            <shippingtype>{shippingtype}</shippingtype>
            <referral>{referral}</referral>
        </row>
    </response>
    """
    return xml_content


def upload_to_s3(xml_content, bucket_name, file_name):
    # Initialize S3 client
    s3 = boto3.client('s3')

    # Upload XML content to S3
    s3.put_object(Body=xml_content, Bucket=bucket_name, Key=file_name)

    print(f"Uploaded XML file '{file_name}' to S3 bucket '{bucket_name}'")


if __name__ == "__main__":
    # Initialize Faker
    fake = Faker()
    print("ok....")

    # S3 bucket information
    S3_BUCKET_NAME = "soumilshah-dev-1995"
    S3_FOLDER_PATH = "xmldata/raw/"

    # Number of entries to generate
    num_entries = 5

    # Generate and upload XML records
    for _ in range(num_entries):
        replicadmstimestamp = fake.date_time_this_year()
        invoiceid = fake.unique.random_number(digits=5)
        itemid = fake.unique.random_number(digits=2)
        category = fake.word()
        price = round(random.uniform(10, 100), 2)
        quantity = random.randint(1, 5)
        orderdate = fake.date_this_decade()
        destinationstate = fake.state_abbr()
        shippingtype = random.choice(['2-Day', '3-Day', 'Standard'])
        referral = fake.word()

        # Generate XML content
        xml_content = generate_invoice_xml(replicadmstimestamp, invoiceid, itemid, category, price, quantity, orderdate,
                                           destinationstate, shippingtype, referral)

        # Define file name
        file_name = f"{S3_FOLDER_PATH}invoice_{uuid.uuid4()}.xml"

        # Upload XML content to S3
        upload_to_s3(xml_content, S3_BUCKET_NAME, file_name)

        # Print XML content
        print(f"XML content for entry {_ + 1}:")
        print(xml_content)
ok

Hudi Datalakes Code

In [47]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read from S3 with Glue") \
    .getOrCreate()

# Initialize Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

# Path to the XML files folder in S3
S3_SOURCE_XML_FOLDER = "s3://soumilshah-dev-1995/xmldata/raw/"

# Read XML files as a Glue DynamicFrame with specified rowTag
glue_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [S3_SOURCE_XML_FOLDER]},
    format="xml",
    format_options={"rowTag": "row"}  # Specify the rowTag as "row"
)

# Convert DynamicFrame to Spark DataFrame for further processing
df = glue_df.toDF()

# Show the DataFrame schema and some sample data
df.printSchema()
df.show()
df.count()
root
 |-- shippingtype: string (nullable = true)
 |-- price: double (nullable = true)
 |-- replicadmstimestamp: timestamp (nullable = true)
 |-- referral: string (nullable = true)
 |-- category: string (nullable = true)
 |-- invoiceid: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- destinationstate: string (nullable = true)
 |-- orderdate: string (nullable = true)

+------------+-----+--------------------+----------+--------+---------+--------+------+----------------+----------+
|shippingtype|price| replicadmstimestamp|  referral|category|invoiceid|quantity|itemid|destinationstate| orderdate|
+------------+-----+--------------------+----------+--------+---------+--------+------+----------------+----------+
|       2-Day|45.98|2024-05-24 11:32:...|    others|    rest|     4570|       1|     4|              AK|2020-02-12|
|       2-Day|40.56|2024-01-17 11:51:...|    worker|   store|    81228|       1|    26|              KY|2023-10-28|
|       3-Day|20.79|2024-01-29 02:49:...|     heart| natural|     1045|       4|    66|              OH|2020-08-05|
|       3-Day|81.98|2024-01-30 09:11:...|especially|  become|    24295|       4|    31|              HI|2022-01-18|
|    Standard| 89.4|2024-05-04 16:56:...|     treat|    stop|    67924|       4|    11|              ME|2020-10-26|
+------------+-----+--------------------+----------+--------+---------+--------+------+----------------+----------+

5
/home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
  warnings.warn(
/home/glue_user/spark/python/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it.
  warnings.warn("DataFrame constructor is internal. Do not directly use it.")

Define Helper Function

In [49]:
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)
In [50]:
upsert_hudi_table(
    glue_database="default",
    table_name="invoices",
    record_id="invoiceid,itemid",
    precomb_key="replicadmstimestamp",
    table_type='COPY_ON_WRITE',
    partition_fields="destinationstate",
    method='upsert',
    index_type='BLOOM',
    enable_partition=True,
    enable_cleaner=True,
    enable_hive_sync=True,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='false',
    use_sql_transformer=False,
    sql_transformer_query="",
    target_path="s3://soumilshah-dev-1995/xmldata/silver/table_name=bronze_invoices",
    spark_df=df,
)
In [52]:
print("done")
done

Read From Hudi Tables

In [53]:
path = "s3://soumilshah-dev-1995/xmldata/silver/table_name=bronze_invoices/"


spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"SELECT invoiceid FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)


result.show(n=result.count(), truncate=False)
SELECT invoiceid FROM hudi_snapshot1 
+---------+
|invoiceid|
+---------+
|24295    |
|67924    |
|81228    |
|1045     |
|4570     |
+---------+

Executes Stored Procs

In [54]:
query = "call show_commits(table => 'default.invoices', limit => 10);"

spark.sql(query).show()
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|      commit_time|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20240618211626533|            2184809|                5|                  0|                       5|                    5|                           0|           0|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
In [ ]:

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...