Tuesday, June 18, 2024

Learn How to Process XML Data Files and Build Hudi Datalakes¶

xmlhudi (1)

Learn How to Process XML Data Files and Build Hudi Datalakes

In [45]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
In [46]:
%%bash
pip install Faker
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: Faker in /home/glue_user/.local/lib/python3.10/site-packages (25.8.0)
Requirement already satisfied: python-dateutil>=2.4 in /home/glue_user/.local/lib/python3.10/site-packages (from Faker) (2.8.2)
Requirement already satisfied: six>=1.5 in /home/glue_user/.local/lib/python3.10/site-packages (from python-dateutil>=2.4->Faker) (1.16.0)
WARNING: There was an error checking the latest version of pip.

Generate XML Data

In [25]:
import uuid
import random
from faker import Faker
import boto3
print("ok")

def generate_invoice_xml(replicadmstimestamp, invoiceid, itemid, category, price, quantity, orderdate, destinationstate,
                         shippingtype, referral):
    # Create the XML content
    xml_content = f"""
    <response>
        <row>
            <replicadmstimestamp>{replicadmstimestamp}</replicadmstimestamp>
            <invoiceid>{invoiceid}</invoiceid>
            <itemid>{itemid}</itemid>
            <category>{category}</category>
            <price>{price}</price>
            <quantity>{quantity}</quantity>
            <orderdate>{orderdate}</orderdate>
            <destinationstate>{destinationstate}</destinationstate>
            <shippingtype>{shippingtype}</shippingtype>
            <referral>{referral}</referral>
        </row>
    </response>
    """
    return xml_content


def upload_to_s3(xml_content, bucket_name, file_name):
    # Initialize S3 client
    s3 = boto3.client('s3')

    # Upload XML content to S3
    s3.put_object(Body=xml_content, Bucket=bucket_name, Key=file_name)

    print(f"Uploaded XML file '{file_name}' to S3 bucket '{bucket_name}'")


if __name__ == "__main__":
    # Initialize Faker
    fake = Faker()
    print("ok....")

    # S3 bucket information
    S3_BUCKET_NAME = "soumilshah-dev-1995"
    S3_FOLDER_PATH = "xmldata/raw/"

    # Number of entries to generate
    num_entries = 5

    # Generate and upload XML records
    for _ in range(num_entries):
        replicadmstimestamp = fake.date_time_this_year()
        invoiceid = fake.unique.random_number(digits=5)
        itemid = fake.unique.random_number(digits=2)
        category = fake.word()
        price = round(random.uniform(10, 100), 2)
        quantity = random.randint(1, 5)
        orderdate = fake.date_this_decade()
        destinationstate = fake.state_abbr()
        shippingtype = random.choice(['2-Day', '3-Day', 'Standard'])
        referral = fake.word()

        # Generate XML content
        xml_content = generate_invoice_xml(replicadmstimestamp, invoiceid, itemid, category, price, quantity, orderdate,
                                           destinationstate, shippingtype, referral)

        # Define file name
        file_name = f"{S3_FOLDER_PATH}invoice_{uuid.uuid4()}.xml"

        # Upload XML content to S3
        upload_to_s3(xml_content, S3_BUCKET_NAME, file_name)

        # Print XML content
        print(f"XML content for entry {_ + 1}:")
        print(xml_content)
ok

Hudi Datalakes Code

In [47]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read from S3 with Glue") \
    .getOrCreate()

# Initialize Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

# Path to the XML files folder in S3
S3_SOURCE_XML_FOLDER = "s3://soumilshah-dev-1995/xmldata/raw/"

# Read XML files as a Glue DynamicFrame with specified rowTag
glue_df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [S3_SOURCE_XML_FOLDER]},
    format="xml",
    format_options={"rowTag": "row"}  # Specify the rowTag as "row"
)

# Convert DynamicFrame to Spark DataFrame for further processing
df = glue_df.toDF()

# Show the DataFrame schema and some sample data
df.printSchema()
df.show()
df.count()
root
 |-- shippingtype: string (nullable = true)
 |-- price: double (nullable = true)
 |-- replicadmstimestamp: timestamp (nullable = true)
 |-- referral: string (nullable = true)
 |-- category: string (nullable = true)
 |-- invoiceid: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- destinationstate: string (nullable = true)
 |-- orderdate: string (nullable = true)

+------------+-----+--------------------+----------+--------+---------+--------+------+----------------+----------+
|shippingtype|price| replicadmstimestamp|  referral|category|invoiceid|quantity|itemid|destinationstate| orderdate|
+------------+-----+--------------------+----------+--------+---------+--------+------+----------------+----------+
|       2-Day|45.98|2024-05-24 11:32:...|    others|    rest|     4570|       1|     4|              AK|2020-02-12|
|       2-Day|40.56|2024-01-17 11:51:...|    worker|   store|    81228|       1|    26|              KY|2023-10-28|
|       3-Day|20.79|2024-01-29 02:49:...|     heart| natural|     1045|       4|    66|              OH|2020-08-05|
|       3-Day|81.98|2024-01-30 09:11:...|especially|  become|    24295|       4|    31|              HI|2022-01-18|
|    Standard| 89.4|2024-05-04 16:56:...|     treat|    stop|    67924|       4|    11|              ME|2020-10-26|
+------------+-----+--------------------+----------+--------+---------+--------+------+----------------+----------+

5
/home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
  warnings.warn(
/home/glue_user/spark/python/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it.
  warnings.warn("DataFrame constructor is internal. Do not directly use it.")

Define Helper Function

In [49]:
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)
In [50]:
upsert_hudi_table(
    glue_database="default",
    table_name="invoices",
    record_id="invoiceid,itemid",
    precomb_key="replicadmstimestamp",
    table_type='COPY_ON_WRITE',
    partition_fields="destinationstate",
    method='upsert',
    index_type='BLOOM',
    enable_partition=True,
    enable_cleaner=True,
    enable_hive_sync=True,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='false',
    use_sql_transformer=False,
    sql_transformer_query="",
    target_path="s3://soumilshah-dev-1995/xmldata/silver/table_name=bronze_invoices",
    spark_df=df,
)
In [52]:
print("done")
done

Read From Hudi Tables

In [53]:
path = "s3://soumilshah-dev-1995/xmldata/silver/table_name=bronze_invoices/"


spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"SELECT invoiceid FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)


result.show(n=result.count(), truncate=False)
SELECT invoiceid FROM hudi_snapshot1 
+---------+
|invoiceid|
+---------+
|24295    |
|67924    |
|81228    |
|1045     |
|4570     |
+---------+

Executes Stored Procs

In [54]:
query = "call show_commits(table => 'default.invoices', limit => 10);"

spark.sql(query).show()
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|      commit_time|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20240618211626533|            2184809|                5|                  0|                       5|                    5|                           0|           0|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
In [ ]:

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...