Record Level Indexing in Apache Hudi Delivers 70% Faster Point Lookups¶
Define Tables¶
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
Define Spark Session¶
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
Define data generator Class¶
global faker
faker = Faker()
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.now().isoformat().__str__(),
"adqdress": faker.address(),
"salary": faker.random_int(min=30000, max=100000)
}
customers_array.append(customer_data)
return customers_array
global total_customers, order_data_sample_size
total_customers = 500000
customer_data = get_customer_data(total_customers=total_customers)
Create Spark DF¶
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
spark_df_customers.show(1, truncate=False)
spark_df_customers.printSchema()
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='BLOOM'
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.index.type": index_type,
}
if index_type == 'RECORD_INDEX':
hudi_options.update({
"hoodie.enable.data.skipping": "true",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
"hoodie.metadata.record.index.enable": "true"
})
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Create Two Hudi tables¶
Table A¶
write_to_hudi(
spark_df=spark_df_customers,
db_name="default",
table_name="customers_withouth_rli",
recordkey="customer_id",
precombine="created_at",
partition_fields="state",
index_type="BLOOM"
)
Table B¶
write_to_hudi(
spark_df=spark_df_customers,
db_name="default",
table_name="customers_with_rli",
recordkey="customer_id",
precombine="created_at",
partition_fields="state",
index_type="RECORD_INDEX"
)
Point Lookup¶
look_up = "4229f5df-0b5e-47d9-9271-cbd7083c1a5e"
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namecustomers_withouth_rli"
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot")
query = f"SELECT * FROM hudi_snapshot where customer_id='{look_up}'"
print(query)
spark.sql(query).show(truncate=False)
WITH RLI as Point lookup¶
look_up = "4229f5df-0b5e-47d9-9271-cbd7083c1a5e"
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namecustomers_with_rli"
spark.read.format("hudi") \
.option("hoodie.enable.data.skipping", "true") \
.option("hoodie.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.option("hoodie.metadata.record.index.enable", "true") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"SELECT * FROM hudi_snapshot1 where customer_id='{look_up}'"
print(query)
spark.sql(query).show(truncate=False)
#
Result¶
Conclusion:¶
The introduction of Record Level Indexing (RLI) brought about notable enhancements in data lookup performance. Despite the minor increase in metadata processing time, the overall impact on lookup speed was remarkable. With RLI, the duration for point lookup improved significantly by 0.5 seconds, representing a 71.4% reduction compared to the lookup without RLI. This substantial improvement underscores the effectiveness of RLI in accelerating data retrieval processes.
Furthermore, the utilization of RLI resulted in a noteworthy reduction in the number of files read, decreasing from 50 to just 1. This reduction indicates a significant enhancement in efficiency, as fewer file scans are required during lookup operations.
While the implementation of RLI introduced a slight overhead in metadata processing, its overall benefits far outweigh this drawback. The profound improvement in lookup speed and the substantial reduction in file scans demonstrate the efficiency and effectiveness of RLI in enhancing data management processes
No comments:
Post a Comment