Mastering File Sizing in Hudi: Boosting Performance and Efficiency¶
Hudi Configuration Parameters¶
The following are important configuration parameters used in the Hudi data management system:
hoodie.parquet.small.file.limit
: This parameter sets the threshold for identifying small files. The default value is 100MB. Smaller file sizes can improve write performance as they can be processed faster during ingestion.hoodie.parquet.max.file.size
: This parameter determines the desired size for Hudi-managed files. The default value is 120MB. Larger file sizes can be beneficial for query performance, as reading from larger files is generally faster. However, larger files may increase writer latency during ingestion.
It is generally recommended to have a target file size between 100MB and 500MB for faster Parquet reading. However, the optimal size also depends on the cluster configurations. If the cluster has sufficient resources, larger file sizes can be used, but it is advisable to stick to around 100MB to 200MB for most scenarios.
Clustering can also be employed to improve the efficiency of larger files in async mode. By clustering related data together, the query performance can be further optimized, as it reduces the amount of data that needs to be read for a specific query.
To ensure that new inserts are always routed to newer file groups and only updates go to existing ones, set the hoodie.parquet.small.file.limit
to 0 in your configuration.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
except Exception as e:
print("error",e)
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Class to Generate Fake Data¶
import faker
import uuid
import random
global faker
import datetime
import time
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
random.choice(["2021","2022","2023"]),
faker.month()
) for x in range(samples)
]
Step 4: Define Hudi Settings¶
Test 1¶
db_name = "hudidb"
table_name = "hudi_large_files_tst_1"
recordkey = 'emp_id'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "state"
method = 'upsert'
table_type = "COPY_ON_WRITE"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.clean.automatic": "true"
, "hoodie.clean.async": "true"
,"hoodie.parquet.max.file.size": 512 * 1024 * 1024 # 512MB
,"hoodie.parquet.small.file.limit": 104857600 # 100MB
}
for i in range(0, 5):
data = DataGenerator.get_data(samples=10000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
print("i",i)
i 0 i 1 i 2 i 3 i 4
Test 2¶
db_name = "hudidb"
table_name = "hudi_large_files_tst_2"
recordkey = 'emp_id'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "state"
method = 'upsert'
table_type = "COPY_ON_WRITE"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.clean.automatic": "true"
, "hoodie.clean.async": "true"
,"hoodie.parquet.max.file.size": 512 * 1024 * 1024 # 512MB
,"hoodie.parquet.small.file.limit": 0 # 100MB
}
for i in range(0, 5):
data = DataGenerator.get_data(samples=10000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
print("i",i)
i 0 i 1 i 2 i 3 i 4