Removing Duplicates in Hudi Partitions with Insert_Overwrite API and Spark SQL¶
Step1 : Define Imports¶
In [1]:
try:
    import os
    import sys
    import uuid
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.functions import col, asc, desc
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from datetime import datetime
    from functools import reduce
    from faker import Faker
    
except Exception as e:
    print("error",e)
Step2 : Define Spark Session¶
In [2]:
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
spark
Out[2]:
SparkSession - in-memory
Step 3: Define function to write into Hudi tables¶
In [20]:
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.
    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }
    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }
    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }
    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }
    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }
    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }
    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }
    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "false",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }
    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary
    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary
    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary
    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary
    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value
    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value
    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value
    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)
        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)
Create Sample Dummy Data¶
In [22]:
spark_df = spark.createDataFrame(
    data=[
    (1, "insert 1", "2020-01-06 12:12:12", "IN"),
    (2, "insert 2", "2020-01-06 12:12:13", "US"),
    (3, "insert 3", "2020-01-06 12:12:15", "IN"),
    (4, "insert 4", "2020-01-06 12:13:15", "US"),
], 
    schema=["uuid", "message",  "date", "country"])
spark_df.show()
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 1|insert 1|2020-01-06 12:12:12| IN| | 2|insert 2|2020-01-06 12:12:13| US| | 3|insert 3|2020-01-06 12:12:15| IN| | 4|insert 4|2020-01-06 12:13:15| US| +----+--------+-------------------+-------+
In [24]:
upsert_hudi_table(
    glue_database="hudidb",
    table_name="demo",
    record_id="uuid",
    precomb_key="uuid",
    table_type='COPY_ON_WRITE',
    partition_fields="country",
    method='bulk_insert',
    index_type='BLOOM',
    enable_partition=True,
    enable_cleaner=True,
    enable_hive_sync=False,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='true',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path="file:///C:/tmp/hudidb/demo/",
    spark_df=spark_df,
)
inserting Duplicates in paticular partition for teaching purposes¶
In [25]:
spark_df = spark.createDataFrame(
    data=[
    (1, "insert 1**", "2020-01-06 12:13:12", "IN"),
], 
    schema=["uuid", "message",  "date", "country"])
spark_df.show()
+----+----------+-------------------+-------+ |uuid| message| date|country| +----+----------+-------------------+-------+ | 1|insert 1**|2020-01-06 12:13:12| IN| +----+----------+-------------------+-------+
In [26]:
upsert_hudi_table(
    glue_database="hudidb",
    table_name="demo",
    record_id="uuid",
    precomb_key="uuid",
    table_type='COPY_ON_WRITE',
    partition_fields="country",
    method='bulk_insert',
    index_type='BLOOM',
    enable_partition=True,
    enable_cleaner=True,
    enable_hive_sync=False,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='true',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path="file:///C:/tmp/hudidb/demo/",
    spark_df=spark_df,
)
Reading data from hudi¶
In [37]:
path = "file:///C:/tmp/hudidb/demo/"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select * from hudi_snapshot").select(
['_hoodie_commit_time',
 '_hoodie_partition_path',
 'uuid',
 'message',
 'date',
 'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728081025459| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728080901506| country=US| 4| insert 4|2020-01-06 12:13:15| US| | 20230728080901506| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| | 20230728080901506| country=US| 2| insert 2|2020-01-06 12:12:13| US| | 20230728080901506| country=IN| 1| insert 1|2020-01-06 12:12:12| IN| +-------------------+----------------------+----+----------+-------------------+-------+
Creating Template¶
Template will read data from partition and dedup and perfrom insert_overwrite¶
In [46]:
path = "file:///C:/tmp/hudidb/demo/"
partition_name = "country=IN"
hudi_complete_path = f"{path}/{partition_name}/"
spark.read.format("hudi").load(hudi_complete_path).createOrReplaceTempView("hudi_snapshot_dedup")
spark.sql("select * from hudi_snapshot_dedup").select(
['_hoodie_commit_time',
 '_hoodie_partition_path',
 'uuid',
 'message',
 'date',
 'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728081025459| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728080901506| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| | 20230728080901506| country=IN| 1| insert 1|2020-01-06 12:12:12| IN| +-------------------+----------------------+----+----------+-------------------+-------+
In [52]:
hudi_record_key = "uuid"
query = f"""
SELECT 
    t1.*
FROM 
    hudi_snapshot_dedup t1
LEFT JOIN 
    hudi_snapshot_dedup t2
ON 
    t1.{hudi_record_key} = t2.uuid AND t1._hoodie_commit_time < t2._hoodie_commit_time
WHERE 
    t2.{hudi_record_key} IS NULL
"""
dedup_df = spark.sql(query)
dedup_df.select(
['_hoodie_commit_time',
 '_hoodie_partition_path',
 'uuid',
 'message',
 'date',
 'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728081025459| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728080901506| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| +-------------------+----------------------+----+----------+-------------------+-------+
Performing Insert_overwrite API¶
In [53]:
upsert_hudi_table(
    glue_database="hudidb",
    table_name="demo",
    record_id="uuid",
    precomb_key="uuid",
    table_type='COPY_ON_WRITE',
    partition_fields="country",
    method='insert_overwrite',
    index_type='BLOOM',
    enable_partition=True,
    enable_cleaner=True,
    enable_hive_sync=False,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='true',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path="file:///C:/tmp/hudidb/demo/",
    spark_df=dedup_df,
)
Lets Read from Hudi tables¶
In [54]:
path = "file:///C:/tmp/hudidb/demo/"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select * from hudi_snapshot").select(
['_hoodie_commit_time',
 '_hoodie_partition_path',
 'uuid',
 'message',
 'date',
 'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728082352211| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728082352211| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| | 20230728080901506| country=US| 4| insert 4|2020-01-06 12:13:15| US| | 20230728080901506| country=US| 2| insert 2|2020-01-06 12:12:13| US| +-------------------+----------------------+----+----------+-------------------+-------+
 
No comments:
Post a Comment