Tuesday, March 12, 2024

How to Properly Handle Updates and Deletes in Your Glue Hudi Jobs When Running in UPSERT Mode with _hoodie_is_deleted Flag

final

How to Properly Handle Updates and Deletes in Your Glue Hudi Jobs When Running in UPSERT Mode with _hoodie_is_deleted Flag

Step 1: Define Imports

In [15]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing
    print("Imports loaded ")
    from pyspark.sql.types import StructType, StructField, StringType
    from pyspark.sql.functions import lit
except Exception as e:
    print("error", e)
Imports loaded 
In [16]:
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'

Step 2: Define Spark Sessionq 

In [17]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
In [18]:
spark
Out[18]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
pyspark-shell

Define Sample Dataset

In [19]:
# Define customer data
customers_data = [
    ("1", "John Doe", "New York", "john@example.com", "2022-01-01", "123 Main St", "NY", "I"),
    ("2", "Jane Smith", "Los Angeles", "jane@example.com", "2022-01-02", "456 Elm St", "CA", "I"),
    ("3", "Alice Johnson", "Chicago", "alice@example.com", "2022-01-03", "789 Oak St", "IL", "I")
]

customers_schema = StructType([
    StructField("customer_id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("city", StringType(), nullable=False),
    StructField("email", StringType(), nullable=False),
    StructField("created_at", StringType(), nullable=False),
    StructField("address", StringType(), nullable=False),
    StructField("state", StringType(), nullable=False),
    StructField("OP", StringType(), nullable=False)
])

# Create DataFrame
customers_df = spark.createDataFrame(data=customers_data, schema=customers_schema)
# customers_df = customers_df.withColumn("_hoodie_is_deleted", lit(False))

# Show DataFrame
customers_df.show()
+-----------+-------------+-----------+-----------------+----------+-----------+-----+---+
|customer_id|         name|       city|            email|created_at|    address|state| OP|
+-----------+-------------+-----------+-----------------+----------+-----------+-----+---+
|          1|     John Doe|   New York| john@example.com|2022-01-01|123 Main St|   NY|  I|
|          2|   Jane Smith|Los Angeles| jane@example.com|2022-01-02| 456 Elm St|   CA|  I|
|          3|Alice Johnson|    Chicago|alice@example.com|2022-01-03| 789 Oak St|   IL|  I|
+-----------+-------------+-----------+-----------------+----------+-----------+-----+---+

Function to write data into Hudi tables

In [20]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  sql_transformer_query="",
                  use_sql_transformer=False
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/{db_name}/{table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
  
    }
    

    print("\n")
    print(path)
    print("\n")
    
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)
    
        
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [21]:
write_to_hudi(
    spark_df=customers_df,
    db_name="hudidb",
    table_name="customers",
    recordkey="customer_id",
    precombine="created_at",
    partition_fields="state",
    use_sql_transformer=True,
    sql_transformer_query="SELECT *, CASE WHEN OP = 'D' THEN true ELSE false END AS _hoodie_is_deleted FROM temp"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers


Read From Hudi tables

In [22]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select customer_id from hudi_snapshot").show()
print("\n")
+-----------+
|customer_id|
+-----------+
|          3|
|          2|
|          1|
+-----------+



Deletes Arriving

In [23]:
# Define customer data
delete_customers_data = [
    ("1", "John Doe", "New York", "john@example.com", "2022-01-01", "123 Main St", "NY", "D"),
]
delete_customers_df = spark.createDataFrame(data=delete_customers_data, schema=customers_schema)
delete_customers_df.show()
+-----------+--------+--------+----------------+----------+-----------+-----+---+
|customer_id|    name|    city|           email|created_at|    address|state| OP|
+-----------+--------+--------+----------------+----------+-----------+-----+---+
|          1|John Doe|New York|john@example.com|2022-01-01|123 Main St|   NY|  D|
+-----------+--------+--------+----------------+----------+-----------+-----+---+

In [24]:
write_to_hudi(
    spark_df=delete_customers_df,
    db_name="hudidb",
    table_name="customers",
    recordkey="customer_id",
    precombine="created_at",
    partition_fields="state",
    use_sql_transformer=True,
    sql_transformer_query="SELECT *, CASE WHEN OP = 'D' THEN true ELSE false END AS _hoodie_is_deleted FROM temp"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers


Read

In [25]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select customer_id from hudi_snapshot").show()
print("\n")
+-----------+
|customer_id|
+-----------+
|          3|
|          2|
+-----------+



In [25]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select customer_id from hudi_snapshot").show()
print("\n")
+-----------+
|customer_id|
+-----------+
|          3|
|          2|
+-----------+



In [ ]:

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...