Efficiently Building Read-Optimized Views with Apache Hudi's Incremental ETL and Querying Pattern"¶
Processing large, raw datasets can be a challenge, especially when the data is constantly being updated. Apache Hudi provides a pattern that can help you efficiently process this data and build a read-optimized view of it. In this post, we'll explore how to implement the pattern of incremental data processing and querying using Apache Hudi for customer and order data.
The Apache Hudi Customers and Orders tables are both raw zones where data is ingested regularly. We want to build a read-optimized view of this data by performing JOIN operations between the two tables. However, scanning the entire table every time a new order arrives is not feasible. This is where the pattern of incremental data processing and querying comes in.
To implement this pattern, we start by creating a Hudi table for the Orders data with incremental processing enabled. This means that we can ingest new Orders data daily while keeping the existing data unchanged. Similarly, we can create a Hudi table for the Customers data as a read-optimized view over the Customers raw zone data.
Next, we define a checkpointing mechanism to keep track of the last successful pipeline execution. With this mechanism in place, we can set up a pipeline that incrementally queries the Orders table since the last checkpoint and performs a JOIN operation with the Customers table. This way, we only process the new or changed data since the last successful pipeline execution, instead of processing the entire dataset from scratch every time.
The output of this pipeline can be stored as a Hudi table with incremental processing enabled, which serves as the read-optimized view over the joined Customers and Orders data. By following this pattern, we can efficiently process new Orders data daily and perform JOIN operations with the Customers data without scanning the entire table. This can significantly improve the performance and scalability of our data pipeline.
In conclusion, Apache Hudi provides a powerful pattern for incrementally querying datasets and performing JOIN operations. By implementing this pattern for customer and order data, we can efficiently process the data and build a read-optimized view of it. This can help us make better data-driven decisions and improve the overall performance of our data pipeline.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import pandas as pd
import datetime
import boto3
import json
from datetime import datetime
from dataclasses import *
except Exception as e:
print(e)
Step 2: Create Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
bucket = 'delta-streamer-demo-hudi'
os.environ['AWS_ACCESS_KEY'] = 'XXX'
os.environ['AWS_SECRET_KEY'] = 'vXXXXXXXQ'
os.environ['AWS_REGION'] = 'us-east-1'
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Define Data Generator¶
global faker
faker = Faker()
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": i,
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"ts": datetime.now().isoformat().__str__()
}
customers_array.append(customer_data)
return customers_array
def get_orders_data(order_data_sample_size=3):
orders_array = []
for i in range(0, order_data_sample_size):
try:
order_id = uuid.uuid4().__str__()
customer_id = random.randint(0, total_customers) - 1
order_data = {
"orderid": order_id,
"customer_id": customer_id,
"ts": datetime.now().isoformat().__str__(),
"order_value": random.randint(10, 1000).__str__(),
"priority": random.choice(["LOW", "MEDIUM", "URGENT"])
}
orders_array.append(order_data)
except Exception as e:
pass
return orders_array
Step 4: Define Function to perform Upsert into hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Step 5: Sample Preview of DF¶
global total_customers, order_data_sample_size
total_customers = 2
order_data_sample_size = 3
customer_data = get_customer_data(total_customers=total_customers)
order_data = get_orders_data(order_data_sample_size=order_data_sample_size)
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data],
schema=list(order_data[0].keys()))
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
print(spark_df_orders.show(truncate=False))
print(spark_df_customers.show(truncate=False))
+------------------------------------+-----------+--------------------------+-----------+--------+ |orderid |customer_id|ts |order_value|priority| +------------------------------------+-----------+--------------------------+-----------+--------+ |723e86ea-ce56-4d2f-b446-c6db49107801|0 |2023-04-25T19:10:13.625090|626 |MEDIUM | |f46c9fb1-d144-4eb2-8a73-cedf44345d3a|1 |2023-04-25T19:10:13.625090|363 |MEDIUM | |92e05b41-647b-49e5-864c-82bbec7d0394|0 |2023-04-25T19:10:13.625090|245 |LOW | +------------------------------------+-----------+--------------------------+-----------+--------+ None +-----------+----------------+------------+--------------+---------------------+--------------------------+ |customer_id|name |state |city |email |ts | +-----------+----------------+------------+--------------+---------------------+--------------------------+ |0 |Brian Holt |California |Pattersonshire|pjones@example.net |2023-04-25T19:10:13.624079| |1 |Alexander Meyers|Rhode Island|New Ronald |sandypaul@example.com|2023-04-25T19:10:13.625090| +-----------+----------------+------------+--------------+---------------------+--------------------------+ None
Insert into Hudi tables¶
A) Upsert into Customer tables Customer Data¶
upsert_hudi_table(
db_name='hudidb',
table_name='customers',
record_id='customer_id',
precomb_key='ts',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/customers
B) Upsert into Order tables¶
upsert_hudi_table(
db_name='hudidb',
table_name='orders',
record_id='orderid',
precomb_key='ts',
spark_df=spark_df_orders,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/orders
Taking a look at Customer and order tables¶
customers = "file:///C:/tmp/hudidb/customers"
order_hudi = "file:///C:/tmp/hudidb/orders"
customer_df = spark. \
read. \
format("hudi"). \
load(customers)
order_df=spark. \
read. \
format("hudi"). \
load(order_hudi)
print(customer_df.select(['_hoodie_commit_time','customer_id', 'name','state', 'city','email','ts']).show())
print(order_df.select(['_hoodie_commit_time','orderid','customer_id','ts', 'order_value','priority']).show())
+-------------------+-----------+----------------+------------+--------------+--------------------+--------------------+ |_hoodie_commit_time|customer_id| name| state| city| email| ts| +-------------------+-----------+----------------+------------+--------------+--------------------+--------------------+ | 20230425191036087| 1|Alexander Meyers|Rhode Island| New Ronald|sandypaul@example...|2023-04-25T19:10:...| | 20230425191036087| 0| Brian Holt| California|Pattersonshire| pjones@example.net|2023-04-25T19:10:...| +-------------------+-----------+----------------+------------+--------------+--------------------+--------------------+ None +-------------------+--------------------+-----------+--------------------+-----------+--------+ |_hoodie_commit_time| orderid|customer_id| ts|order_value|priority| +-------------------+--------------------+-----------+--------------------+-----------+--------+ | 20230425191054168|92e05b41-647b-49e...| 0|2023-04-25T19:10:...| 245| LOW| | 20230425191054168|723e86ea-ce56-4d2...| 0|2023-04-25T19:10:...| 626| MEDIUM| | 20230425191054168|f46c9fb1-d144-4eb...| 1|2023-04-25T19:10:...| 363| MEDIUM| +-------------------+--------------------+-----------+--------------------+-----------+--------+ None
Incrementally Quering Order Tables and Performing Join on RAW Hudi Customer Tables¶
Incremental Pull Python utity Class¶
Helper Class¶
import ast
try:
import sys
import os
import datetime
from ast import literal_eval
import re
import boto3
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import json
from dataclasses import dataclass
from datetime import datetime
except Exception as e:
pass
class AWSS3(object):
"""Helper class to which add functionality on top of boto3 """
def __init__(self, bucket):
self.BucketName = bucket
self.client = boto3.client("s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
region_name=os.getenv("AWS_REGION")
)
def put_files(self, Response=None, Key=None):
"""
Put the File on S3
:return: Bool
"""
try:
response = self.client.put_object(
Body=Response, Bucket=self.BucketName, Key=Key
)
return "ok"
except Exception as e:
raise Exception("Error : {} ".format(e))
def item_exists(self, Key):
"""Given key check if the items exists on AWS S3 """
try:
response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
return True
except Exception as e:
return False
def get_item(self, Key):
"""Gets the Bytes Data from AWS S3 """
try:
response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
return response_new["Body"].read()
except Exception as e:
print("Error :{}".format(e))
return False
def find_one_update(self, data=None, key=None):
"""
This checks if Key is on S3 if it is return the data from s3
else store on s3 and return it
"""
flag = self.item_exists(Key=key)
if flag:
data = self.get_item(Key=key)
return data
else:
self.put_files(Key=key, Response=data)
return data
def delete_object(self, Key):
response = self.client.delete_object(Bucket=self.BucketName, Key=Key, )
return response
def get_all_keys(self, Prefix=""):
"""
:param Prefix: Prefix string
:return: Keys List
"""
try:
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.BucketName, Prefix=Prefix)
tmp = []
for page in pages:
for obj in page["Contents"]:
tmp.append(obj["Key"])
return tmp
except Exception as e:
return []
def print_tree(self):
keys = self.get_all_keys()
for key in keys:
print(key)
return None
def find_one_similar_key(self, searchTerm=""):
keys = self.get_all_keys()
return [key for key in keys if re.search(searchTerm, key)]
def __repr__(self):
return "AWS S3 Helper class "
@dataclass
class HUDISettings:
"""Class for keeping track of an item in inventory."""
table_name: str
path: str
class HUDIIncrementalReader(AWSS3):
def __init__(self, bucket, hudi_settings, spark_session):
AWSS3.__init__(self, bucket=bucket)
if type(hudi_settings).__name__ != "HUDISettings": raise Exception("please pass correct settings ")
self.hudi_settings = hudi_settings
self.spark = spark_session
def __check_meta_data_file(self):
"""
check if metadata for table exists
:return: Bool
"""
file_name = f"metadata/{self.hudi_settings.table_name}.json"
return self.item_exists(Key=file_name)
def __read_meta_data(self):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
return ast.literal_eval(self.get_item(Key=file_name).decode("utf-8"))
def __push_meta_data(self, json_data):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
self.put_files(
Key=file_name, Response=json.dumps(json_data)
)
def clean_check_point(self):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
self.delete_object(Key=file_name)
def __get_begin_commit(self):
self.spark.read.format("hudi").load(self.hudi_settings.path).createOrReplaceTempView("hudi_snapshot")
commits = list(map(lambda row: row[0], self.spark.sql(
"select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime asc").limit(
50).collect()))
"""begin from start """
begin_time = int(commits[0]) - 1
return begin_time
def __read_inc_data(self, commit_time):
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': commit_time,
}
incremental_df = self.spark.read.format("hudi").options(**incremental_read_options).load(
self.hudi_settings.path).createOrReplaceTempView("hudi_incremental")
df = self.spark.sql("select * from hudi_incremental")
return df
def __get_last_commit(self):
commits = list(map(lambda row: row[0], self.spark.sql(
"select distinct(_hoodie_commit_time) as commitTime from hudi_incremental order by commitTime asc").limit(
50).collect()))
last_commit = commits[len(commits) - 1]
return last_commit
def __run(self):
"""Check the metadata file"""
flag = self.__check_meta_data_file()
"""if metadata files exists load the last commit and start inc loading from that commit """
if flag:
meta_data = json.loads(self.__read_meta_data())
print(f"""
******************LOGS******************
meta_data {meta_data}
last_processed_commit : {meta_data.get("last_processed_commit")}
***************************************
""")
read_commit = str(meta_data.get("last_processed_commit"))
df = self.__read_inc_data(commit_time=read_commit)
"""if there is no INC data then it return Empty DF """
if not df.rdd.isEmpty():
last_commit = self.__get_last_commit()
self.__push_meta_data(json_data=json.dumps({
"last_processed_commit": last_commit,
"table_name": self.hudi_settings.table_name,
"path": self.hudi_settings.path,
"inserted_time": datetime.now().__str__(),
}))
return df
else:
return df
else:
"""Metadata files does not exists meaning we need to create metadata file on S3 and start reading from begining commit"""
read_commit = self.__get_begin_commit()
df = self.__read_inc_data(commit_time=read_commit)
last_commit = self.__get_last_commit()
self.__push_meta_data(json_data=json.dumps({
"last_processed_commit": last_commit,
"table_name": self.hudi_settings.table_name,
"path": self.hudi_settings.path,
"inserted_time": datetime.now().__str__(),
}))
return df
def read(self):
"""
reads INC data and return Spark Df
:return:
"""
return self.__run()
Define parameters to perfrom incremnetal Pull¶
order_hudi_path = "file:///C:/tmp/hudidb/orders"
customers_hudi_path = "file:///C:/tmp/hudidb/customers"
Fetching Incrementally Data from orders and joining with Customers¶
helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='orders',
path=order_hudi_path),
spark_session=spark
)
view_name_orders = "inc_order"
df = helper.read()
df.createOrReplaceTempView(view_name_orders)
spark. \
read. \
format("hudi"). \
load(customers). \
createOrReplaceTempView("hudi_snapshot_customers")
query = """
SELECT
tbl_orders.orderid,
tbl_orders.order_value,
tbl_orders.priority,
tbl_customers.state,
tbl_customers.name,
tbl_customers.city,
tbl_customers.customer_id,
tbl_customers.email
FROM inc_order as tbl_orders
LEFT JOIN hudi_snapshot_customers as tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
"""
df = spark.sql(query)
df.show()
+--------------------+-----------+--------+------------+----------------+--------------+-----------+--------------------+ | orderid|order_value|priority| state| name| city|customer_id| email| +--------------------+-----------+--------+------------+----------------+--------------+-----------+--------------------+ |92e05b41-647b-49e...| 245| LOW| California| Brian Holt|Pattersonshire| 0| pjones@example.net| |723e86ea-ce56-4d2...| 626| MEDIUM| California| Brian Holt|Pattersonshire| 0| pjones@example.net| |f46c9fb1-d144-4eb...| 363| MEDIUM|Rhode Island|Alexander Meyers| New Ronald| 1|sandypaul@example...| +--------------------+-----------+--------+------------+----------------+--------------+-----------+--------------------+
#helper.clean_check_point()
We have already processed the data if i execute the above code again i should see Empty DF¶
df = helper.read()
df.createOrReplaceTempView(view_name_orders)
spark. \
read. \
format("hudi"). \
load(customers). \
createOrReplaceTempView("hudi_snapshot_customers")
query = """
SELECT
tbl_orders.orderid,
tbl_orders.order_value,
tbl_orders.priority,
tbl_customers.state,
tbl_customers.name,
tbl_customers.city,
tbl_customers.customer_id,
tbl_customers.email
FROM inc_order as tbl_orders
LEFT JOIN hudi_snapshot_customers as tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
"""
df = spark.sql(query)
df.show()
******************LOGS****************** meta_data {'last_processed_commit': '20230425191054168', 'table_name': 'orders', 'path': 'file:///C:/tmp/hudidb/orders', 'inserted_time': '2023-04-25 19:12:35.910837'} last_processed_commit : 20230425191054168 *************************************** +-------+-----------+--------+-----+----+----+-----------+-----+ |orderid|order_value|priority|state|name|city|customer_id|email| +-------+-----------+--------+-----+----+----+-----------+-----+ +-------+-----------+--------+-----+----+----+-----------+-----+
Lets Perform New inserts into Order tables assuming New Order came in¶
Performing New upsert on Order tables¶
import random
import datetime
from datetime import datetime
global order_data_sample_size,total_customers
total_customers = 2
order_data_sample_size = 3
order_data = get_orders_data(order_data_sample_size=order_data_sample_size)
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data],
schema=list(order_data[0].keys()))
upsert_hudi_table(
db_name='hudidb',
table_name='orders',
record_id='orderid',
precomb_key='ts',
spark_df=spark_df_orders,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/orders
Fetching Incrementally New order and performing Join on RAW Customer table¶
helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='orders',
path=order_hudi_path),
spark_session=spark
)
view_name_orders = "inc_order"
df = helper.read()
df.createOrReplaceTempView(view_name_orders)
spark. \
read. \
format("hudi"). \
load(customers). \
createOrReplaceTempView("hudi_snapshot_customers")
query = """
SELECT
tbl_orders.orderid,
tbl_orders.order_value,
tbl_orders.priority,
tbl_customers.state,
tbl_customers.name,
tbl_customers.city,
tbl_customers.customer_id,
tbl_customers.email
FROM inc_order as tbl_orders
LEFT JOIN hudi_snapshot_customers as tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
"""
df = spark.sql(query)
df.show()
******************LOGS****************** meta_data {'last_processed_commit': '20230425191054168', 'table_name': 'orders', 'path': 'file:///C:/tmp/hudidb/orders', 'inserted_time': '2023-04-25 19:12:35.910837'} last_processed_commit : 20230425191054168 *************************************** +--------------------+-----------+--------+----------+----------+--------------+-----------+------------------+ | orderid|order_value|priority| state| name| city|customer_id| email| +--------------------+-----------+--------+----------+----------+--------------+-----------+------------------+ |eeeac755-264f-491...| 389| URGENT|California|Brian Holt|Pattersonshire| 0|pjones@example.net| |c44a096d-d638-46a...| 68| MEDIUM|California|Brian Holt|Pattersonshire| 0|pjones@example.net| |0902a8b9-0693-439...| 695| URGENT|California|Brian Holt|Pattersonshire| 0|pjones@example.net| +--------------------+-----------+--------+----------+----------+--------------+-----------+------------------+
No comments:
Post a Comment