At the core of Apache Hudi's Cleaning Process are three key configurations:
hoodie.keep.min.commits: This configuration specifies the minimum number of commits to retain. With a setting of 21, it ensures that at least 21 commits are retained in the dataset.¶
hoodie.cleaner.commits.retained: Here, we define the number of commits the cleaner should retain. By setting it to 20, the cleaner ensures that the 20 most recent commits are retained, but because hoodie.keep.min.commits is 21, effectively 21 commits are kept.¶
hoodie.keep.max.commits: This configuration sets the upper limit for the number of commits to retain. With a maximum of 28 commits allowed, it provides flexibility while managing dataset size.¶
Define Imports¶
In [1]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing
    print("Imports loaded ")
except Exception as e:
    print("error", e)
Create Spark Session¶
In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Helper Methods¶
In [3]:
def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM',
                  curr_region='us-east-1'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        
        "hoodie.keep.min.commits": "21",
        "hoodie.cleaner.commits.retained": "20",
        "hoodie.keep.max.commits": "28",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.cleaner.policy.failed.writes": "LAZY",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
    }
 
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [28]:
import shutil
try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages")
except Exception as e:pass
Write data¶
In [29]:
from pyspark.sql.types import StructType, StructField, StringType, LongType
    
schema = StructType([
    StructField("id", StringType(), True),
    StructField("message", StringType(), True)
])
# Loop to generate data and write to Hudi
for i in range(1, 31):  # Number of iterations
    print("Epoch ", str(i))
    # Generate epoch timestamp
    epoch_time = int(datetime.now().timestamp())
    # Create the data
    updated_data = [(str(i), "Batch : {} ".format(i))]
    # Create the DataFrame with the new data
    df = spark.createDataFrame(updated_data, schema)
    # Show the DataFrame with the updated "message" column
    # Write to Hudi
    write_to_hudi(
        spark_df=df,
        method="upsert",
        db_name="default",
        table_name="messages",
        recordkey="id",
        precombine="message"
    )
Read From Hudi¶
In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages"
spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")
query = f"SELECT _hoodie_commit_time, id, message FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Incremental Query¶
In [6]:
beginTime = "20240612165445452"
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
  "hoodie.datasource.read.incr.fallback.fulltablescan.enable":"true"
}
IncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(path)
IncrementalDF.createOrReplaceTempView("hudi_incremental")
query = f"SELECT _hoodie_commit_time, id, message FROM hudi_incremental "
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
 
No comments:
Post a Comment