Thursday, January 25, 2024

Learn How to use Apache Hudi Data Skipping Feature | Hands on Lab

Untitled1

Define Imports

In [1]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)
Imports loaded 

Create Spark Session

In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c15d1315-ed47-4c52-a40b-159d4f16824e;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in spark-list
:: resolution report :: resolve 58ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from spark-list in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-c15d1315-ed47-4c52-a40b-159d4f16824e
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/01/25 17:32:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
In [3]:
spark
Out[3]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
pyspark-shell

Data generator class

In [4]:
global faker
faker = Faker()


def get_customer_data(total_customers=2):
    customers_array = []
    for i in range(0, total_customers):
        customer_data = {
            "customer_id": str(uuid.uuid4()),
            "name": faker.name(),
            "state": faker.state(),
            "city": faker.city(),
            "email": faker.email(),
            "created_at": datetime.now().isoformat().__str__(),
            "address": faker.address(),
           "salary": faker.random_int(min=30000, max=100000) 
        }
        customers_array.append(customer_data)
    return customers_array

Generate Fake Data

In [5]:
global total_customers, order_data_sample_size
total_customers = 500000
customer_data = get_customer_data(total_customers=total_customers)

Create Spark DF

In [6]:
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
                                           schema=list(customer_data[0].keys()))
spark_df_customers.show(3)
24/01/25 17:24:20 WARN TaskSetManager: Stage 0 contains a task of very large size (7315 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 1) / 1]
+--------------------+----------------+--------+-----------------+--------------------+--------------------+--------------------+------+
|         customer_id|            name|   state|             city|               email|          created_at|             address|salary|
+--------------------+----------------+--------+-----------------+--------------------+--------------------+--------------------+------+
|39c68a46-048f-41e...|Danielle Jackson|Virginia|South Josephmouth|amydillon@example...|2024-01-25T17:22:...|07310 Gomez Pine ...| 41297|
|0f4114d0-4452-401...| Michelle Harris| Georgia|   West Marthaton|rodgersandrea@exa...|2024-01-25T17:22:...|700 William Park\...| 44465|
|d71f9a48-df45-4fb...|    Rita Esparza|Oklahoma| West Roberthaven|lisahall@example.net|2024-01-25T17:22:...|61135 Morgan Summ...| 73886|
+--------------------+----------------+--------+-----------------+--------------------+--------------------+--------------------+------+
only showing top 3 rows

24/01/25 17:24:25 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

Define Hudi Settings

In [7]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='', 
                 metadata_column_stats=""
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/{db_name}/{table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        
        "hoodie.enable.data.skipping": "true",
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
        "hoodie.metadata.index.column.stats.column.list":metadata_column_stats
        
    }

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

Create Hudi tables

In [8]:
write_to_hudi(
    spark_df=spark_df_customers,
    db_name="hudidb",
    table_name="customers",
    recordkey="customer_id",
    precombine="created_at",
    partition_fields="state",
    metadata_column_stats="salary"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers


24/01/25 17:27:29 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/01/25 17:27:29 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/01/25 17:27:29 WARN TaskSetManager: Stage 1 contains a task of very large size (7315 KiB). The maximum recommended task size is 1000 KiB.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_5 in memory! (computed 1549.0 KiB so far)
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_10 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_4 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_0 in memory! (computed 3.4 MiB so far)
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_2 in memory! (computed 1548.5 KiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_2 to disk instead.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_9 in memory! (computed 1547.4 KiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_9 to disk instead.
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_5 to disk instead.
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_0 to disk instead.
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_4 to disk instead.
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_10 to disk instead.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_6 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_6 to disk instead.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_1 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_1 to disk instead.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_8 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_11 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_11 to disk instead.
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_8 to disk instead.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_3 in memory! (computed 2.3 MiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_3 to disk instead.
24/01/25 17:27:33 WARN MemoryStore: Not enough space to cache rdd_14_7 in memory! (computed 3.4 MiB so far)
24/01/25 17:27:33 WARN BlockManager: Persisting block rdd_14_7 to disk instead.
                                                                                
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
[Stage 4:>                                                        (0 + 12) / 50]
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
24/01/25 17:27:35 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
24/01/25 17:27:37 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_31_2 in memory.
24/01/25 17:27:37 WARN MemoryStore: Not enough space to cache rdd_31_2 in memory! (computed 0.0 B so far)
24/01/25 17:27:37 WARN BlockManager: Persisting block rdd_31_2 to disk instead.
24/01/25 17:27:37 WARN MemoryStore: Not enough space to cache rdd_31_6 in memory! (computed 1028.6 KiB so far)
24/01/25 17:27:37 WARN BlockManager: Persisting block rdd_31_6 to disk instead.
                                                                                

Read Data from Hudi

Case A

Without Data Skipping

In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot2")
spark.sql("select * from hudi_snapshot2  where salary >= 50000000  ").show()
print("\n")
24/01/25 17:29:10 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/01/25 17:29:10 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
+-------------------+--------------------+------------------+----------------------+-----------------+-----------+----+----+-----+----------+-------+------+-----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|customer_id|name|city|email|created_at|address|salary|state|
+-------------------+--------------------+------------------+----------------------+-----------------+-----------+----+----+-----+----------+-------+------+-----+
+-------------------+--------------------+------------------+----------------------+-----------------+-----------+----+----+-----+----------+-------+------+-----+



image.png

Case B

With data Skipping

In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"

spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

spark.sql("SELECT * FROM hudi_snapshot1 WHERE salary >= 50000000 ").show()
24/01/25 17:32:40 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/01/25 17:32:40 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
24/01/25 17:32:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
+-------------------+--------------------+------------------+----------------------+-----------------+-----------+----+----+-----+----------+-------+------+-----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|customer_id|name|city|email|created_at|address|salary|state|
+-------------------+--------------------+------------------+----------------------+-----------------+-----------+----+----+-----+----------+-------+------+-----+
+-------------------+--------------------+------------------+----------------------+-----------------+-----------+----+----+-----+----------+-------+------+-----+

image.png

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins Learn How to configure your Spark Session to Join Managed (S...