Learn How to use Apache Hudi Data Skipping Feature | Hands on Labs¶
Resources
https://hudi.apache.org/docs/next/performance/
https://hudi.apache.org/docs/metadata/
Video Resources
Define Imports¶
In [1]:
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
Create Spark Session¶
In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
In [3]:
spark
Out[3]:
Data generator class¶
In [4]:
global faker
faker = Faker()
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.now().isoformat().__str__(),
"address": faker.address(),
"salary": faker.random_int(min=30000, max=100000)
}
customers_array.append(customer_data)
return customers_array
Generate Fake Data¶
In [5]:
global total_customers, order_data_sample_size
total_customers = 500000
customer_data = get_customer_data(total_customers=total_customers)
Create Spark DF¶
In [6]:
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
spark_df_customers.show(3)
Define Hudi Settings¶
In [7]:
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
metadata_column_stats=""
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/{db_name}/{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.enable.data.skipping": "true",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
"hoodie.metadata.index.column.stats.column.list":metadata_column_stats
}
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Create Hudi tables¶
In [8]:
write_to_hudi(
spark_df=spark_df_customers,
db_name="hudidb",
table_name="customers",
recordkey="customer_id",
precombine="created_at",
partition_fields="state",
metadata_column_stats="salary"
)
Read Data from Hudi¶
In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot2")
spark.sql("select * from hudi_snapshot2 where salary >= 50000000 ").show()
print("\n")
In [4]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
spark.read.format("hudi") \
.option("hoodie.enable.data.skipping", "true") \
.option("hoodie.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
spark.sql("SELECT * FROM hudi_snapshot1 WHERE salary >= 50000000 ").show()