Building a Scalable and Resilient Streaming ETL Pipeline with Hudi's Incremental Processing
#
Building a Scalable and Resilient Streaming ETL Pipeline with Hudi's Incremental Processing¶
The Streaming incremental ETL pattern with Hudi is a powerful technique for performing ETL on streaming data sources with static raw data sources. This approach involves processing only the new data that arrives in the streaming source and combining it with the existing data in the data lakehouse.
Advantages of Using the Streaming Incremental ETL Pattern¶
The main advantage of using the Streaming incremental ETL pattern is that it allows you to efficiently process only the new data, reducing processing time and improving overall performance. This approach is especially useful when dealing with large amounts of data that are constantly being updated in real-time.
Spark for Big Data Processing¶
Spark is an essential tool for processing big data, thanks to its ability to handle large datasets and perform distributed computing. Spark allows you to process data in parallel across multiple nodes, which helps to improve the speed and efficiency of your ETL pipeline.
Example Use Case¶
Suppose you want to process streaming orders data with static raw customer data and join it with a date dimension before inserting it into an order Fact table. In that case, you can use the Streaming incremental ETL pattern with Hudi to perform this task efficiently.
You can ingest the streaming orders data using Spark Streaming, perform the necessary transformations, and join it with the static raw customer data and date dimension data. Hudi can then be used to store the data efficiently in a distributed manner in a data lakehouse architecture.
As new data arrives in the streaming source, you can use Hudi to perform incremental processing on the data, only processing the new data and updating the existing data in the data lakehouse. This approach helps to reduce processing time and improve overall performance, making it ideal for handling large and constantly updating datasets.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc, to_timestamp, monotonically_increasing_id, to_date, when, udf
from pyspark.sql.types import *
from functools import reduce
from faker import Faker
import pandas as pd
import boto3
import json
from datetime import datetime, date, timedelta
from pyspark.sql.functions import year, quarter, month, dayofmonth, weekofyear
from pyspark.sql import functions as F
except Exception as e:
print(e)
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Data Generator Class¶
global faker
faker = Faker()
import uuid
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.now().isoformat().__str__(),
"address":faker.address(),
}
customers_array.append(customer_data)
return customers_array
def get_orders_data(customer_ids, order_data_sample_size=3):
orders_array = []
for i in range(0, order_data_sample_size):
try:
order_id = uuid.uuid4().__str__()
customer_id= random.choice(customer_ids)
order_data = {
"order_id": order_id,
"name": faker.text(max_nb_chars=20),
"order_value": random.randint(10, 1000).__str__(),
"priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
"order_date": faker.date_between(start_date='-30d', end_date='today').strftime('%Y-%m-%d'),
"customer_id": customer_id,
}
orders_array.append(order_data)
except Exception as e:
print(e)
return orders_array
Step 4: Method to Upsert into Hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Sample Preview of data¶
min_date = '2020-01-01'
max_date = '2025-01-01'
global total_customers, order_data_sample_size
total_customers = 50
order_data_sample_size = 100
date_range = pd.date_range(start=min_date, end=max_date)
date_data = [(int(day.strftime('%Y%m%d')), day.year, day.month, day.day, str((day.month-1)//3+1),
day.strftime('%A'), day.weekday()) for day in date_range]
date_schema = ['date_key', 'year', 'month', 'day', 'quarter', 'weekday', 'weekday_number']
date_dim_df = spark.createDataFrame(date_data, schema=date_schema)
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
print("""
----------------
date Dimension
----------------
""")
date_dim_df.show(3)
print("""
----------------
RAW customers Data
----------------
""")
spark_df_customers.select(['customer_id','name','state','city','email','created_at']).show(3)
---------------- date Dimension ---------------- +--------+----+-----+---+-------+---------+--------------+ |date_key|year|month|day|quarter| weekday|weekday_number| +--------+----+-----+---+-------+---------+--------------+ |20200101|2020| 1| 1| 1|Wednesday| 2| |20200102|2020| 1| 2| 1| Thursday| 3| |20200103|2020| 1| 3| 1| Friday| 4| +--------+----+-----+---+-------+---------+--------------+ only showing top 3 rows ---------------- RAW customers Data ---------------- +--------------------+----------------+--------------+--------------+--------------------+--------------------+ | customer_id| name| state| city| email| created_at| +--------------------+----------------+--------------+--------------+--------------------+--------------------+ |5ddb43fe-3adb-409...| Shane Barnes| North Dakota| North Jeffrey|arnoldryan@exampl...|2023-04-30T17:45:...| |9544780d-2e2d-4dc...| Sonya Harper| Mississippi|West Elizabeth|guzmancheryl@exam...|2023-04-30T17:45:...| |adc07f8f-b073-4c5...|Ruben George Jr.|South Carolina| Floresport| cmiller@example.org|2023-04-30T17:45:...| +--------------------+----------------+--------------+--------------+--------------------+--------------------+ only showing top 3 rows
Uperting Date Dimension Hudi tables and customer hudi tables¶
upsert_hudi_table(
db_name='hudidb',
table_name='customers',
record_id='customer_id',
precomb_key='created_at',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='dim_date',
record_id='date_key',
precomb_key='date_key',
spark_df=date_dim_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/customers path file:///C:/tmp/hudidb/dim_date
Streaming Orders into Order Hudi Tables Table¶
for i in range(1,10):
order_data = get_orders_data(order_data_sample_size=order_data_sample_size, customer_ids=[i.get("customer_id")for i in customer_data])
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data],schema=list(order_data[0].keys()))
spark_df_orders.show()
upsert_hudi_table(
db_name='hudidb',
table_name='orders',
record_id='order_id',
precomb_key='order_date',
spark_df=spark_df_orders,
table_type='COPY_ON_WRITE',
method='upsert',
)
break
+--------------------+--------------------+-----------+--------+----------+--------------------+ | order_id| name|order_value|priority|order_date| customer_id| +--------------------+--------------------+-----------+--------+----------+--------------------+ |a4e734d2-7483-421...| Something may hard.| 635| LOW|2023-04-07|e236b2d4-36a8-465...| |677c9951-ba48-420...|Bag church increase.| 114| HIGH|2023-04-02|558e5573-3160-4b6...| |bec2aa1d-365b-4cb...| Bed camera image.| 63| LOW|2023-04-24|0a555f67-9c75-499...| |03749872-fef1-42f...| Effect ball dark.| 680| HIGH|2023-04-26|b883d163-afdb-4f7...| |9074c6ba-dd37-4e4...| Throw artist.| 412| MEDIUM|2023-04-10|987ff89d-56f8-44c...| |75fcb5c5-c0da-4a9...|Partner home as eat.| 718| HIGH|2023-04-30|44056894-7328-47d...| |0e2b39dc-0861-402...| Anything piece.| 471| MEDIUM|2023-04-04|211e7b60-a347-4d1...| |2b44e7d1-c2fd-440...| Thank state music.| 622| MEDIUM|2023-04-16|89b50e2d-724b-4c1...| |896f3b20-d53c-4de...| Great particularly.| 209| LOW|2023-04-25|a1735af7-946e-48e...| |13a412cc-4211-4db...| Reduce door why.| 929| LOW|2023-04-09|4b8d2b02-3c4b-49d...| |a75f735b-30ca-4b1...| Almost off.| 284| LOW|2023-04-18|09ee50e2-0631-481...| |14e14772-3d27-4f3...| Fact career may.| 85| HIGH|2023-04-05|686c2e79-1650-4a6...| |80188ef1-19f0-403...| Western south.| 917| LOW|2023-04-16|7eb58f14-b569-40d...| |9577dae9-5656-411...| Agreement decision.| 871| LOW|2023-04-06|d4d02403-22ca-442...| |80a122ee-beb1-460...| Travel boy size.| 994| MEDIUM|2023-04-28|75add80a-3450-436...| |54fd25f4-ad9a-469...| Police record free.| 905| HIGH|2023-04-26|7f980d34-ebea-4b0...| |45d6c0a9-3d7e-46c...| Agreement election.| 285| MEDIUM|2023-04-30|211e7b60-a347-4d1...| |f3b2f84f-cf33-47e...|You its nearly also.| 135| LOW|2023-04-02|8fa1741d-984c-4a5...| |4786c8b8-fb3a-4bd...| Simple interesting.| 523| LOW|2023-04-21|09ee50e2-0631-481...| |4e82adb0-e255-49d...| American strategy.| 177| MEDIUM|2023-04-26|9c8b1e05-7776-46f...| +--------------------+--------------------+-----------+--------+----------+--------------------+ only showing top 20 rows path file:///C:/tmp/hudidb/orders
Streaming ETL job to UPSERT into Fact table¶
try:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col, to_date
from pyspark.sql.functions import date_format
except Exception as e:
print(e)
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# =====================================================
customers = "file:///C:/tmp/hudidb/customers"
order_hudi = "file:///C:/tmp/hudidb/orders"
date_dim = 'file:///C:/tmp/hudidb/dim_date'
order_fact = "file:///C:/tmp/hudidb/orders_fact"
check_point_path = "file:///C:/tmp/hudidb/checkpoint"
# create temp views for tables
date_dim = spark.read.format("hudi").load(date_dim)
customers_df = spark.read.format("hudi").load(customers)
date_dim = date_dim.withColumn('date_key', to_date(col('date_key').cast('string'), 'yyyyMMdd'))
# =====================================================
# read stream to streaming df
order_stream_df = spark.readStream \
.format("hudi") \
.load(order_hudi)
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
def process_batch(order_df, batch_num):
try:
order_fact_df = order_df.join(customers_df, "customer_id") \
.join(date_dim.select("date_key", to_date("date_key", "yyyyMMdd").alias("date")),
to_date("order_date", "yyyy-MM-dd") == date_dim.date_key) \
.withColumn("date", to_date("order_date", "yyyy-MM-dd")) \
.select("order_id", "customer_id", "order_value", "priority",
date_format("date_key", "yyyyMMdd").alias("date_key"))
order_fact_df.show()
if order_fact_df.count() > 0:
upsert_hudi_table(
db_name='hudidb',
table_name='orders_fact',
record_id='order_id,customer_id',
precomb_key='order_id',
spark_df=order_fact_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
except Exception as e:
print("Error in processing batch:", batch_num)
print(e)
query = order_stream_df.writeStream \
.format("console") \
.option("checkpointLocation", check_point_path) \
.foreachBatch(process_batch) \
.start()
# wait for the stream to finish
query.awaitTermination()
Result¶
Conclusion¶
In conclusion, the Streaming incremental ETL pattern with Hudi is a powerful technique that enables you to efficiently process real-time data and combine it with existing data in a data lakehouse architecture. By using Spark and Hudi, you can build scalable and efficient ETL pipelines for handling big data, providing valuable insights into your business operations.
No comments:
Post a Comment