At the core of Apache Hudi's Cleaning Process are three key configurations:
hoodie.keep.min.commits: This configuration specifies the minimum number of commits to retain. With a setting of 21, it ensures that at least 21 commits are retained in the dataset.¶
hoodie.cleaner.commits.retained: Here, we define the number of commits the cleaner should retain. By setting it to 20, the cleaner ensures that the 20 most recent commits are retained, but because hoodie.keep.min.commits is 21, effectively 21 commits are kept.¶
hoodie.keep.max.commits: This configuration sets the upper limit for the number of commits to retain. With a maximum of 28 commits allowed, it provides flexibility while managing dataset size.¶
Define Imports¶
In [1]:
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
Create Spark Session¶
In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
Helper Methods¶
In [3]:
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='BLOOM',
curr_region='us-east-1'
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.keep.min.commits": "21",
"hoodie.cleaner.commits.retained": "20",
"hoodie.keep.max.commits": "28",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
In [28]:
import shutil
try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages")
except Exception as e:pass
Write data¶
In [29]:
from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
StructField("id", StringType(), True),
StructField("message", StringType(), True)
])
# Loop to generate data and write to Hudi
for i in range(1, 31): # Number of iterations
print("Epoch ", str(i))
# Generate epoch timestamp
epoch_time = int(datetime.now().timestamp())
# Create the data
updated_data = [(str(i), "Batch : {} ".format(i))]
# Create the DataFrame with the new data
df = spark.createDataFrame(updated_data, schema)
# Show the DataFrame with the updated "message" column
# Write to Hudi
write_to_hudi(
spark_df=df,
method="upsert",
db_name="default",
table_name="messages",
recordkey="id",
precombine="message"
)
Read From Hudi¶
In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages"
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"SELECT _hoodie_commit_time, id, message FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Incremental Query¶
In [6]:
beginTime = "20240612165445452"
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': beginTime,
"hoodie.datasource.read.incr.fallback.fulltablescan.enable":"true"
}
IncrementalDF = spark.read.format("hudi"). \
options(**incremental_read_options). \
load(path)
IncrementalDF.createOrReplaceTempView("hudi_incremental")
query = f"SELECT _hoodie_commit_time, id, message FROM hudi_incremental "
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
No comments:
Post a Comment