Removing Duplicates in Hudi Partitions with Insert_Overwrite API and Spark SQL¶
Step1 : Define Imports¶
In [1]:
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
except Exception as e:
print("error",e)
Step2 : Define Spark Session¶
In [2]:
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
Out[2]:
SparkSession - in-memory
Step 3: Define function to write into Hudi tables¶
In [20]:
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
enable_meta_data_indexing,
use_sql_transformer, sql_transformer_query,
target_path, index_type, method='upsert', clustering_column='default'):
"""
Upserts a dataframe into a Hudi table.
Args:
glue_database (str): The name of the glue database.
table_name (str): The name of the Hudi table.
record_id (str): The name of the field in the dataframe that will be used as the record key.
precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
partition_fields this is used to parrtition data
enable_partition (bool): Whether or not to enable partitioning.
enable_cleaner (bool): Whether or not to enable data cleaning.
enable_hive_sync (bool): Whether or not to enable syncing with Hive.
use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
sql_transformer_query (str): The SQL query to use for data transformation.
target_path (str): The path to the target Hudi table.
method (str): The Hudi write method to use (default is 'upsert').
index_type : BLOOM or GLOBAL_BLOOM
Returns:
None
"""
# These are the basic settings for the Hoodie table
hudi_final_settings = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.table.type": table_type,
"hoodie.datasource.write.operation": method,
"hoodie.datasource.write.recordkey.field": record_id,
"hoodie.datasource.write.precombine.field": precomb_key,
}
# These settings enable syncing with Hive
hudi_hive_sync_settings = {
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": glue_database,
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
}
# These settings enable automatic cleaning of old data
hudi_cleaner_options = {
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "true",
"hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
"hoodie.cleaner.fileversions.retained": "3",
"hoodie-conf hoodie.cleaner.parallelism": '200',
'hoodie.cleaner.commits.retained': 5
}
# These settings enable partitioning of the data
partition_settings = {
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.write.hive_style_partitioning": "true",
}
hudi_clustering = {
"hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
"hoodie.clustering.inline": "true",
"hoodie.clustering.plan.strategy.sort.columns": clustering_column,
"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
"hoodie.clustering.plan.strategy.small.file.limit": "629145600"
}
# Define a dictionary with the index settings for Hudi
hudi_index_settings = {
"hoodie.index.type": index_type, # Specify the index type for Hudi
}
# Define a dictionary with the Fiel Size
hudi_file_size = {
"hoodie.parquet.max.file.size": 512 * 1024 * 1024, # 512MB
"hoodie.parquet.small.file.limit": 104857600, # 100MB
}
hudi_meta_data_indexing = {
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.async": "false",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.check.timeout.seconds": "60",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
}
if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
for key, value in hudi_meta_data_indexing.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
for key, value in hudi_clustering.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# Add the Hudi index settings to the final settings dictionary
for key, value in hudi_index_settings.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
for key, value in hudi_file_size.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# If partitioning is enabled, add the partition settings to the final settings
if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
for key, value in partition_settings.items(): hudi_final_settings[key] = value
# If data cleaning is enabled, add the cleaner options to the final settings
if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value
# If Hive syncing is enabled, add the Hive sync settings to the final settings
if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value
# If there is data to write, apply any SQL transformations and write to the target path
if spark_df.count() > 0:
if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
spark_df.createOrReplaceTempView("temp")
spark_df = spark.sql(sql_transformer_query)
spark_df.write.format("hudi"). \
options(**hudi_final_settings). \
mode("append"). \
save(target_path)
Create Sample Dummy Data¶
In [22]:
spark_df = spark.createDataFrame(
data=[
(1, "insert 1", "2020-01-06 12:12:12", "IN"),
(2, "insert 2", "2020-01-06 12:12:13", "US"),
(3, "insert 3", "2020-01-06 12:12:15", "IN"),
(4, "insert 4", "2020-01-06 12:13:15", "US"),
],
schema=["uuid", "message", "date", "country"])
spark_df.show()
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 1|insert 1|2020-01-06 12:12:12| IN| | 2|insert 2|2020-01-06 12:12:13| US| | 3|insert 3|2020-01-06 12:12:15| IN| | 4|insert 4|2020-01-06 12:13:15| US| +----+--------+-------------------+-------+
In [24]:
upsert_hudi_table(
glue_database="hudidb",
table_name="demo",
record_id="uuid",
precomb_key="uuid",
table_type='COPY_ON_WRITE',
partition_fields="country",
method='bulk_insert',
index_type='BLOOM',
enable_partition=True,
enable_cleaner=True,
enable_hive_sync=False,
enable_clustering='False',
clustering_column='default',
enable_meta_data_indexing='true',
use_sql_transformer=False,
sql_transformer_query='default',
target_path="file:///C:/tmp/hudidb/demo/",
spark_df=spark_df,
)
inserting Duplicates in paticular partition for teaching purposes¶
In [25]:
spark_df = spark.createDataFrame(
data=[
(1, "insert 1**", "2020-01-06 12:13:12", "IN"),
],
schema=["uuid", "message", "date", "country"])
spark_df.show()
+----+----------+-------------------+-------+ |uuid| message| date|country| +----+----------+-------------------+-------+ | 1|insert 1**|2020-01-06 12:13:12| IN| +----+----------+-------------------+-------+
In [26]:
upsert_hudi_table(
glue_database="hudidb",
table_name="demo",
record_id="uuid",
precomb_key="uuid",
table_type='COPY_ON_WRITE',
partition_fields="country",
method='bulk_insert',
index_type='BLOOM',
enable_partition=True,
enable_cleaner=True,
enable_hive_sync=False,
enable_clustering='False',
clustering_column='default',
enable_meta_data_indexing='true',
use_sql_transformer=False,
sql_transformer_query='default',
target_path="file:///C:/tmp/hudidb/demo/",
spark_df=spark_df,
)
Reading data from hudi¶
In [37]:
path = "file:///C:/tmp/hudidb/demo/"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select * from hudi_snapshot").select(
['_hoodie_commit_time',
'_hoodie_partition_path',
'uuid',
'message',
'date',
'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728081025459| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728080901506| country=US| 4| insert 4|2020-01-06 12:13:15| US| | 20230728080901506| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| | 20230728080901506| country=US| 2| insert 2|2020-01-06 12:12:13| US| | 20230728080901506| country=IN| 1| insert 1|2020-01-06 12:12:12| IN| +-------------------+----------------------+----+----------+-------------------+-------+
Creating Template¶
Template will read data from partition and dedup and perfrom insert_overwrite¶
In [46]:
path = "file:///C:/tmp/hudidb/demo/"
partition_name = "country=IN"
hudi_complete_path = f"{path}/{partition_name}/"
spark.read.format("hudi").load(hudi_complete_path).createOrReplaceTempView("hudi_snapshot_dedup")
spark.sql("select * from hudi_snapshot_dedup").select(
['_hoodie_commit_time',
'_hoodie_partition_path',
'uuid',
'message',
'date',
'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728081025459| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728080901506| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| | 20230728080901506| country=IN| 1| insert 1|2020-01-06 12:12:12| IN| +-------------------+----------------------+----+----------+-------------------+-------+
In [52]:
hudi_record_key = "uuid"
query = f"""
SELECT
t1.*
FROM
hudi_snapshot_dedup t1
LEFT JOIN
hudi_snapshot_dedup t2
ON
t1.{hudi_record_key} = t2.uuid AND t1._hoodie_commit_time < t2._hoodie_commit_time
WHERE
t2.{hudi_record_key} IS NULL
"""
dedup_df = spark.sql(query)
dedup_df.select(
['_hoodie_commit_time',
'_hoodie_partition_path',
'uuid',
'message',
'date',
'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728081025459| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728080901506| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| +-------------------+----------------------+----+----------+-------------------+-------+
Performing Insert_overwrite API¶
In [53]:
upsert_hudi_table(
glue_database="hudidb",
table_name="demo",
record_id="uuid",
precomb_key="uuid",
table_type='COPY_ON_WRITE',
partition_fields="country",
method='insert_overwrite',
index_type='BLOOM',
enable_partition=True,
enable_cleaner=True,
enable_hive_sync=False,
enable_clustering='False',
clustering_column='default',
enable_meta_data_indexing='true',
use_sql_transformer=False,
sql_transformer_query='default',
target_path="file:///C:/tmp/hudidb/demo/",
spark_df=dedup_df,
)
Lets Read from Hudi tables¶
In [54]:
path = "file:///C:/tmp/hudidb/demo/"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select * from hudi_snapshot").select(
['_hoodie_commit_time',
'_hoodie_partition_path',
'uuid',
'message',
'date',
'country']
).show()
+-------------------+----------------------+----+----------+-------------------+-------+ |_hoodie_commit_time|_hoodie_partition_path|uuid| message| date|country| +-------------------+----------------------+----+----------+-------------------+-------+ | 20230728082352211| country=IN| 1|insert 1**|2020-01-06 12:13:12| IN| | 20230728082352211| country=IN| 3| insert 3|2020-01-06 12:12:15| IN| | 20230728080901506| country=US| 4| insert 4|2020-01-06 12:13:15| US| | 20230728080901506| country=US| 2| insert 2|2020-01-06 12:12:13| US| +-------------------+----------------------+----+----------+-------------------+-------+