Performance Evaluation of Hudi 0.14 and Spark 3.4.1: Record Level Index vs. Global Bloom & Global Simple Indexes¶
Setup Elastic Stack Locally¶
https://github.com/deviantony/docker-elk
git clone https://github.com/deviantony/docker-elk.git
docker-compose up setup
docker-compose up
http://localhost:5601/app/home#/
username elastic
password changeme
Backend¶
import json
import requests
from flask import Flask, request
from datetime import datetime
app = Flask(__name__)
# Elasticsearch server configuration
elasticsearch_url = "http://localhost:9200"
index_name = "hudi" # Use your desired index name
username = "elastic"
password = "changeme"
@app.route('/', methods=["POST"])
def index():
json_data = request.json
print(json_data)
print(type(json_data))
# Add an "ingestionTime" field with the current datetime
json_data["ingestionTime"] = datetime.now().isoformat()
# Ingest the JSON data into Elasticsearch using a PUT request
put_url = f"{elasticsearch_url}/{index_name}/_doc/{json_data['commitTime']}"
headers = {'Content-Type': 'application/json'}
# Provide Elasticsearch credentials
auth = (username, password)
# Send the PUT request to insert the JSON data
response = requests.put(put_url, data=json.dumps(json_data), headers=headers, auth=auth)
if response.status_code == 201:
return 'Data Ingested'
else:
return f'Failed to insert data. Status code: {response.status_code}'
if __name__ == "__main__":
app.run(debug=True)
Step 1: Define imports¶
In [1]:
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
print("Imports loaded ")
except Exception as e:
print("error",e)
Imports loaded
Step 2: Create Spark Session¶
In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
Warning: Ignoring non-Spark config property: className
:: loading settings :: url = jar:file:/Users/soumilnitinshah/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/soumilnitinshah/.ivy2/cache The jars for the packages stored in: /Users/soumilnitinshah/.ivy2/jars org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-38b74d3e-4071-4678-908e-f0f675975cd0;1.0 confs: [default] found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in central :: resolution report :: resolve 216ms :: artifacts dl 6ms :: modules in use: org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 1 | 0 | 0 | 0 || 1 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-38b74d3e-4071-4678-908e-f0f675975cd0 confs: [default] 0 artifacts copied, 1 already retrieved (0kB/10ms) 23/10/29 14:00:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Data Generator Class¶
In [3]:
import faker
import uuid
import random
global faker
import datetime
import time
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.state(),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
random.choice(["2021","2022","2023"]),
faker.month()
) for x in range(samples)
]
Creating Spark DF for Testing with 0.1M samples¶
In [4]:
data = DataGenerator.get_data(samples=100000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.count()
23/10/29 14:01:07 WARN TaskSetManager: Stage 0 contains a task of very large size (1197 KiB). The maximum recommended task size is 1000 KiB.
Out[4]:
100000
Helper Methods¶
In [5]:
def write_to_hudi(spark_df, table_name, method, index_type='BLOOM'):
path = f"file:///Users/soumilnitinshah/Downloads/hudidb/{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': 'emp_id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.write.partitionpath.field': 'state',
'hoodie.write.commit.callback.http.url': 'http://127.0.0.1:5000',
'hoodie.write.commit.callback.on': 'true',
'hoodie.write.commit.callback.http.timeout.seconds': '300'
}
if index_type == 'RECORD_INDEX':
hudi_options['hoodie.index.type'] = index_type
hudi_options['hoodie.metadata.record.index.enable'] = 'true'
else:
hudi_options['hoodie.index.type'] = index_type
spark_df.write.format("hudi").options(**hudi_options).mode("append").save(path)
def process_and_write_to_hudi(spark_df, table_name, index_type='BLOOM'):
write_to_hudi(spark_df, table_name, method="upsert", index_type=index_type)
def process_and_delete_from_hudi(table_name, condition, index_type='BLOOM'):
path = f"file:///Users/soumilnitinshah/Downloads/hudidb/{table_name}"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
delete_df = spark.sql(f"SELECT * FROM hudi_snapshot WHERE {condition}")
write_to_hudi(delete_df, table_name, method="delete", index_type=index_type)
def process_and_update_hudi(table_name, condition, updates, index_type='BLOOM'):
path = f"file:///Users/soumilnitinshah/Downloads/hudidb/{table_name}"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
update_df = spark.sql(f"SELECT * FROM hudi_snapshot WHERE {condition}")
# Apply updates to the DataFrame update_df as needed
write_to_hudi(update_df, table_name, method="upsert", index_type=index_type)
Perform Test¶
In [6]:
# Define a list of table configurations
table_configs = [
{"table_name": "global_bloom_index", "index_type": "GLOBAL_BLOOM"},
{"table_name": "global_simple_index", "index_type": "GLOBAL_SIMPLE"},
{"table_name": "record_level_index", "index_type": "RECORD_INDEX"},
]
for config in table_configs:
process_and_write_to_hudi(spark_df, config["table_name"], config["index_type"])
delete_configs = [
{
"table_name": "global_bloom_index",
"condition": "salary >= 50000",
},
{
"table_name": "global_simple_index",
"condition": "salary >= 50000",
},
{
"table_name": "record_level_index",
"condition": "salary >= 50000",
}
]
update_configs = [
{
"table_name": "global_bloom_index",
"condition": "salary >= 5000",
"updates": {"salary": "salary + 100"},
},
{
"table_name": "global_simple_index",
"condition": "salary >= 5000",
"updates": {"salary": "salary + 100"},
},
{
"table_name": "record_level_index",
"condition": "salary >= 5000",
"updates": {"salary": "salary + 100"},
}
]
# Loop through deletion configurations and process/delete from Hudi
for config in delete_configs:
process_and_delete_from_hudi(config["table_name"], config["condition"])
# Loop through update configurations and process/update Hudi
for config in update_configs:
process_and_update_hudi(config["table_name"], config["condition"], config["updates"])
23/10/29 14:01:13 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 23/10/29 14:01:13 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 23/10/29 14:01:16 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties 23/10/29 14:01:17 WARN TaskSetManager: Stage 12 contains a task of very large size (1197 KiB). The maximum recommended task size is 1000 KiB.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/10/29 14:01:56 WARN TaskSetManager: Stage 69 contains a task of very large size (1197 KiB). The maximum recommended task size is 1000 KiB. 23/10/29 14:02:19 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records 23/10/29 14:02:19 WARN TaskSetManager: Stage 121 contains a task of very large size (1197 KiB). The maximum recommended task size is 1000 KiB. 23/10/29 14:02:39 WARN DAGScheduler: Broadcasting large task binary with size 1123.3 KiB 23/10/29 14:02:41 WARN DAGScheduler: Broadcasting large task binary with size 1223.6 KiB 23/10/29 14:03:51 WARN DAGScheduler: Broadcasting large task binary with size 1134.9 KiB 23/10/29 14:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1235.1 KiB 23/10/29 14:04:52 WARN DAGScheduler: Broadcasting large task binary with size 1137.3 KiB 23/10/29 14:04:54 WARN DAGScheduler: Broadcasting large task binary with size 1237.5 KiB
Comparative Performance¶
Total Upsert Time:¶
- Global Bloom Index: 550.18
- Global Simple Index: 403.153
- Record Level Index: 301.28
Now, let's calculate the percentage by which the Record Level Index (RLI) is faster than the other indexing methods for upsert operations:
- RLI is approximately 45.23% faster than the Global Bloom Index.
- RLI is approximately 25.19% faster than the Global Simple Index.