Wednesday, June 12, 2024

Hudi Cleaning Process

hudi

Untitled Diagram drawio-2

At the core of Apache Hudi's Cleaning Process are three key configurations:

hoodie.keep.min.commits: This configuration specifies the minimum number of commits to retain. With a setting of 21, it ensures that at least 21 commits are retained in the dataset.

hoodie.cleaner.commits.retained: Here, we define the number of commits the cleaner should retain. By setting it to 20, the cleaner ensures that the 20 most recent commits are retained, but because hoodie.keep.min.commits is 21, effectively 21 commits are kept.

hoodie.keep.max.commits: This configuration sets the upper limit for the number of commits to retain. With a maximum of 28 commits allowed, it provides flexibility while managing dataset size.

Define Imports

In [1]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)
Imports loaded 

Create Spark Session

In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-088a7307-f9d7-40bd-a9b4-ded2ba9c139c;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in spark-list
:: resolution report :: resolve 59ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from spark-list in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-088a7307-f9d7-40bd-a9b4-ded2ba9c139c
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/06/12 19:31:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Helper Methods

In [3]:
def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM',
                  curr_region='us-east-1'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        
        "hoodie.keep.min.commits": "21",
        "hoodie.cleaner.commits.retained": "20",
        "hoodie.keep.max.commits": "28",


        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.cleaner.policy.failed.writes": "LAZY",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
    }
 

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [28]:
import shutil

try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages")
except Exception as e:pass

Write data

In [29]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

    
schema = StructType([
    StructField("id", StringType(), True),
    StructField("message", StringType(), True)
])


# Loop to generate data and write to Hudi
for i in range(1, 31):  # Number of iterations
    print("Epoch ", str(i))
    # Generate epoch timestamp
    epoch_time = int(datetime.now().timestamp())

    # Create the data
    updated_data = [(str(i), "Batch : {} ".format(i))]

    # Create the DataFrame with the new data
    df = spark.createDataFrame(updated_data, schema)

    # Show the DataFrame with the updated "message" column

    # Write to Hudi
    write_to_hudi(
        spark_df=df,
        method="upsert",
        db_name="default",
        table_name="messages",
        recordkey="id",
        precombine="message"
    )
Epoch  1
Epoch  2
Epoch  3
Epoch  4
Epoch  5
Epoch  6
Epoch  7
Epoch  8
Epoch  9
Epoch  10
Epoch  11
Epoch  12
Epoch  13
Epoch  14
Epoch  15
Epoch  16
Epoch  17
Epoch  18
Epoch  19
Epoch  20
Epoch  21
Epoch  22
Epoch  23
Epoch  24
Epoch  25
Epoch  26
Epoch  27
Epoch  28
Epoch  29
Epoch  30

Read From Hudi

In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages"


spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"SELECT _hoodie_commit_time, id, message FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
24/06/12 19:32:09 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/06/12 19:32:09 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
SELECT _hoodie_commit_time, id, message FROM hudi_snapshot1 
+-------------------+---+-----------+
|_hoodie_commit_time|id |message    |
+-------------------+---+-----------+
|20240612171241235  |1  |Batch : 1  |
|20240612171243771  |2  |Batch : 2  |
|20240612171245381  |3  |Batch : 3  |
|20240612171246787  |4  |Batch : 4  |
|20240612171248983  |5  |Batch : 5  |
|20240612171250800  |6  |Batch : 6  |
|20240612171252204  |7  |Batch : 7  |
|20240612171254399  |8  |Batch : 8  |
|20240612171256422  |9  |Batch : 9  |
|20240612171258074  |10 |Batch : 10 |
|20240612171300264  |11 |Batch : 11 |
|20240612171301990  |12 |Batch : 12 |
|20240612171303569  |13 |Batch : 13 |
|20240612171305018  |14 |Batch : 14 |
|20240612171307552  |15 |Batch : 15 |
|20240612171309817  |16 |Batch : 16 |
|20240612171311562  |17 |Batch : 17 |
|20240612171313725  |18 |Batch : 18 |
|20240612171316436  |19 |Batch : 19 |
|20240612171318368  |20 |Batch : 20 |
|20240612171320352  |21 |Batch : 21 |
|20240612171323553  |22 |Batch : 22 |
|20240612171326576  |23 |Batch : 23 |
|20240612171328966  |24 |Batch : 24 |
|20240612171332836  |25 |Batch : 25 |
|20240612171335824  |26 |Batch : 26 |
|20240612171340885  |27 |Batch : 27 |
|20240612171343489  |28 |Batch : 28 |
|20240612171346737  |29 |Batch : 29 |
|20240612171349787  |30 |Batch : 30 |
+-------------------+---+-----------+

Incremental Query

In [6]:
beginTime = "20240612165445452"

incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
  "hoodie.datasource.read.incr.fallback.fulltablescan.enable":"true"
}

IncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(path)

IncrementalDF.createOrReplaceTempView("hudi_incremental")
query = f"SELECT _hoodie_commit_time, id, message FROM hudi_incremental "

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+-------------------+---+-----------+
|_hoodie_commit_time|id |message    |
+-------------------+---+-----------+
|20240612171241235  |1  |Batch : 1  |
|20240612171243771  |2  |Batch : 2  |
|20240612171245381  |3  |Batch : 3  |
|20240612171246787  |4  |Batch : 4  |
|20240612171248983  |5  |Batch : 5  |
|20240612171250800  |6  |Batch : 6  |
|20240612171252204  |7  |Batch : 7  |
|20240612171254399  |8  |Batch : 8  |
|20240612171256422  |9  |Batch : 9  |
|20240612171258074  |10 |Batch : 10 |
|20240612171300264  |11 |Batch : 11 |
|20240612171301990  |12 |Batch : 12 |
|20240612171303569  |13 |Batch : 13 |
|20240612171305018  |14 |Batch : 14 |
|20240612171307552  |15 |Batch : 15 |
|20240612171309817  |16 |Batch : 16 |
|20240612171311562  |17 |Batch : 17 |
|20240612171313725  |18 |Batch : 18 |
|20240612171316436  |19 |Batch : 19 |
|20240612171318368  |20 |Batch : 20 |
|20240612171320352  |21 |Batch : 21 |
|20240612171323553  |22 |Batch : 22 |
|20240612171326576  |23 |Batch : 23 |
|20240612171328966  |24 |Batch : 24 |
|20240612171332836  |25 |Batch : 25 |
|20240612171335824  |26 |Batch : 26 |
|20240612171340885  |27 |Batch : 27 |
|20240612171343489  |28 |Batch : 28 |
|20240612171346737  |29 |Batch : 29 |
|20240612171349787  |30 |Batch : 30 |
+-------------------+---+-----------+

image.png

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...