Saturday, March 30, 2024

Record Level Indexing in Apache Hudi Delivers 70% Faster Point Lookups

Untitled2

Record Level Indexing in Apache Hudi Delivers 70% Faster Point Lookups

Define Tables

In [1]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)
Imports loaded 

Define Spark Session

In [2]:
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9ab02712-3901-40ef-8adb-b8dc3c981acf;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 in central
:: resolution report :: resolve 57ms :: artifacts dl 1ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-9ab02712-3901-40ef-8adb-b8dc3c981acf
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/03/29 18:33:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
In [3]:
spark
Out[3]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.4.0
Master
local[*]
AppName
pyspark-shell

Define data generator Class

In [6]:
global faker
faker = Faker()


def get_customer_data(total_customers=2):
    customers_array = []
    for i in range(0, total_customers):
        customer_data = {
            "customer_id": str(uuid.uuid4()),
            "name": faker.name(),
            "state": faker.state(),
            "city": faker.city(),
            "email": faker.email(),
            "created_at": datetime.now().isoformat().__str__(),
            "adqdress": faker.address(),
           "salary": faker.random_int(min=30000, max=100000) 
        }
        customers_array.append(customer_data)
    return customers_array
In [7]:
global total_customers, order_data_sample_size
total_customers = 500000
customer_data = get_customer_data(total_customers=total_customers)

Create Spark DF

In [14]:
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
                                           schema=list(customer_data[0].keys()))
spark_df_customers.show(1, truncate=False)
spark_df_customers.printSchema()
24/03/29 18:29:06 WARN TaskSetManager: Stage 111 contains a task of very large size (7313 KiB). The maximum recommended task size is 1000 KiB.
[Stage 111:>                                                        (0 + 1) / 1]
+------------------------------------+------------------+-------+--------------+---------------------+--------------------------+------------------------------------------------+------+
|customer_id                         |name              |state  |city          |email                |created_at                |address                                         |salary|
+------------------------------------+------------------+-------+--------------+---------------------+--------------------------+------------------------------------------------+------+
|4229f5df-0b5e-47d9-9271-cbd7083c1a5e|Kimberly Rodriguez|Georgia|North Jennifer|bbautista@example.org|2024-03-29T18:17:50.473035|84391 Sarah Roads\nNew Kathleenchester, AR 19215|78136 |
+------------------------------------+------------------+-------+--------------+---------------------+--------------------------+------------------------------------------------+------+
only showing top 1 row

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- email: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- address: string (nullable = true)
 |-- salary: long (nullable = true)

24/03/29 18:29:10 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 111 (TID 1217): Attempting to kill Python Worker
                                                                                
In [11]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM'
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
         "hoodie.index.type": index_type,
    }

    if index_type == 'RECORD_INDEX':
        hudi_options.update({
            "hoodie.enable.data.skipping": "true",
            "hoodie.metadata.enable": "true",
            "hoodie.metadata.index.column.stats.enable": "true",
            "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
            "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
            "hoodie.metadata.record.index.enable": "true"
        })
        

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

Create Two Hudi tables

Table A

In [12]:
write_to_hudi(
    spark_df=spark_df_customers,
    db_name="default",
    table_name="customers_withouth_rli",
    recordkey="customer_id",
    precombine="created_at",
    partition_fields="state",
    index_type="BLOOM"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namecustomers_withouth_rli


24/03/29 18:27:41 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/03/29 18:27:41 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/03/29 18:27:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
24/03/29 18:27:43 WARN TaskSetManager: Stage 11 contains a task of very large size (7313 KiB). The maximum recommended task size is 1000 KiB.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_6 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_3 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_6 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_9 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_9 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_8 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_8 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_2 in memory! (computed 1028.1 KiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_2 to disk instead.
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_3 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_5 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_5 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_4 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_10 in memory! (computed 1548.1 KiB so far)
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_7 in memory! (computed 1548.1 KiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_7 to disk instead.
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_10 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_0 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_4 to disk instead.
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_0 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_11 in memory! (computed 1548.5 KiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_11 to disk instead.
24/03/29 18:27:46 WARN MemoryStore: Not enough space to cache rdd_39_1 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:46 WARN BlockManager: Persisting block rdd_39_1 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_4 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_4 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_2 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_2 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_2 to disk instead.
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_4 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_7 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_7 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_7 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_10 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_10 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_10 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_6 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_6 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_6 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_0 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_0 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_0 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_56_8 in memory.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_8 in memory! (computed 0.0 B so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_8 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_1 in memory! (computed 1027.4 KiB so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_1 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_5 in memory! (computed 1027.6 KiB so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_5 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_9 in memory! (computed 1549.1 KiB so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_9 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_11 in memory! (computed 2.3 MiB so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_11 to disk instead.
24/03/29 18:27:48 WARN MemoryStore: Not enough space to cache rdd_56_3 in memory! (computed 3.4 MiB so far)
24/03/29 18:27:48 WARN BlockManager: Persisting block rdd_56_3 to disk instead.
                                                                                
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
                                                                                

Table B

In [13]:
write_to_hudi(
    spark_df=spark_df_customers,
    db_name="default",
    table_name="customers_with_rli",
    recordkey="customer_id",
    precombine="created_at",
    partition_fields="state",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namecustomers_with_rli


24/03/29 18:27:58 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/03/29 18:27:58 WARN TaskSetManager: Stage 60 contains a task of very large size (7313 KiB). The maximum recommended task size is 1000 KiB.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_67 in memory! (computed 2.3 MiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_67 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_60 in memory! (computed 3.4 MiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_60 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_65 in memory! (computed 2.3 MiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_65 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_180_61 in memory.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_61 in memory! (computed 0.0 B so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_61 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_70 in memory! (computed 3.4 MiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_70 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_69 in memory! (computed 1027.9 KiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_69 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_66 in memory! (computed 3.4 MiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_66 to disk instead.
24/03/29 18:28:01 WARN MemoryStore: Not enough space to cache rdd_180_68 in memory! (computed 1547.6 KiB so far)
24/03/29 18:28:01 WARN BlockManager: Persisting block rdd_180_68 to disk instead.
24/03/29 18:28:11 WARN DAGScheduler: Broadcasting large task binary with size 1141.4 KiB
24/03/29 18:28:14 WARN DAGScheduler: Broadcasting large task binary with size 1242.5 KiB
                                                                                

Point Lookup

In [4]:
look_up = "4229f5df-0b5e-47d9-9271-cbd7083c1a5e"
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namecustomers_withouth_rli"


spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot")

query = f"SELECT * FROM hudi_snapshot where customer_id='{look_up}'"
print(query)
spark.sql(query).show(truncate=False)
24/03/29 18:33:37 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/03/29 18:33:37 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
SELECT * FROM hudi_snapshot where customer_id='4229f5df-0b5e-47d9-9271-cbd7083c1a5e'
+-------------------+------------------------+------------------------------------+----------------------+--------------------------------------------------------------------------+------------------------------------+------------------+--------------+---------------------+--------------------------+------------------------------------------------+------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno    |_hoodie_record_key                  |_hoodie_partition_path|_hoodie_file_name                                                         |customer_id                         |name              |city          |email                |created_at                |address                                         |salary|state  |
+-------------------+------------------------+------------------------------------+----------------------+--------------------------------------------------------------------------+------------------------------------+------------------+--------------+---------------------+--------------------------+------------------------------------------------+------+-------+
|20240329182741753  |20240329182741753_17_723|4229f5df-0b5e-47d9-9271-cbd7083c1a5e|Georgia               |84476f51-4133-45d8-a8be-428829ec1c16-0_17-25-212_20240329182741753.parquet|4229f5df-0b5e-47d9-9271-cbd7083c1a5e|Kimberly Rodriguez|North Jennifer|bbautista@example.org|2024-03-29T18:17:50.473035|84391 Sarah Roads\nNew Kathleenchester, AR 19215|78136 |Georgia|
+-------------------+------------------------+------------------------------------+----------------------+--------------------------------------------------------------------------+------------------------------------+------------------+--------------+---------------------+--------------------------+------------------------------------------------+------+-------+

Screenshot 2024-03-29 at 6.35.11 PM.png

WITH RLI as Point lookup

In [5]:
look_up = "4229f5df-0b5e-47d9-9271-cbd7083c1a5e"
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namecustomers_with_rli"


spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .option("hoodie.metadata.record.index.enable", "true")  \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"SELECT * FROM hudi_snapshot1 where customer_id='{look_up}'"
print(query)
spark.sql(query).show(truncate=False)
SELECT * FROM hudi_snapshot1 where customer_id='4229f5df-0b5e-47d9-9271-cbd7083c1a5e'
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
24/03/29 18:36:43 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
+-------------------+-------------------------+------------------------------------+----------------------+--------------------------------------------------------------------------+------------------------------------+------------------+--------------+---------------------+--------------------------+------------------------------------------------+------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno     |_hoodie_record_key                  |_hoodie_partition_path|_hoodie_file_name                                                         |customer_id                         |name              |city          |email                |created_at                |address                                         |salary|state  |
+-------------------+-------------------------+------------------------------------+----------------------+--------------------------------------------------------------------------+------------------------------------+------------------+--------------+---------------------+--------------------------+------------------------------------------------+------+-------+
|20240329182757101  |20240329182757101_17_9377|4229f5df-0b5e-47d9-9271-cbd7083c1a5e|Georgia               |74952959-fb84-4fc7-96d8-eeabb185a9d4-0_17-77-807_20240329182757101.parquet|4229f5df-0b5e-47d9-9271-cbd7083c1a5e|Kimberly Rodriguez|North Jennifer|bbautista@example.org|2024-03-29T18:17:50.473035|84391 Sarah Roads\nNew Kathleenchester, AR 19215|78136 |Georgia|
+-------------------+-------------------------+------------------------------------+----------------------+--------------------------------------------------------------------------+------------------------------------+------------------+--------------+---------------------+--------------------------+------------------------------------------------+------+-------+

image.png#

In [ ]:

Result

1711802716408.png

Conclusion:

The introduction of Record Level Indexing (RLI) brought about notable enhancements in data lookup performance. Despite the minor increase in metadata processing time, the overall impact on lookup speed was remarkable. With RLI, the duration for point lookup improved significantly by 0.5 seconds, representing a 71.4% reduction compared to the lookup without RLI. This substantial improvement underscores the effectiveness of RLI in accelerating data retrieval processes.

Furthermore, the utilization of RLI resulted in a noteworthy reduction in the number of files read, decreasing from 50 to just 1. This reduction indicates a significant enhancement in efficiency, as fewer file scans are required during lookup operations.

While the implementation of RLI introduced a slight overhead in metadata processing, its overall benefits far outweigh this drawback. The profound improvement in lookup speed and the substantial reduction in file scans demonstrate the efficiency and effectiveness of RLI in enhancing data management processes

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins Learn How to configure your Spark Session to Join Managed (S...