Building a Modern Data Warehouse with Hudi and Star Schema
What is Lake House Architecture?¶
- Lake House Architecture is a modern data storage architecture that combines the best of both data warehouse and data lake technologies. It allows organizations to store structured and unstructured data together in a single location, providing a unified view of data for analysis and reporting. With Lake House Architecture, data can be stored in a cost-effective manner and can be easily accessed and processed by data analysts and data scientists.
What is a Transactional Data Lake?¶
- A Transactional Data Lake is a type of data lake that allows for both batch and real-time data processing, making it ideal for use in modern data processing and analytics. It provides transactional capabilities for data operations such as insert, update, and delete, ensuring that data is always up-to-date and accurate. This makes it possible for organizations to have a single source of truth for all their data, reducing data silos and improving data consistency and accuracy.
Advantages of Star Schema with Lake House Architecture and Hudi¶
The Star Schema is a popular design pattern for data warehousing that works well with Lake House Architecture and Hudi. The Star Schema organizes data into a central fact table and related dimension tables, providing a simple and intuitive structure for querying data. Using the Star Schema with Lake House Architecture and Hudi offers several advantages, including:
Improved query performance: The Star Schema's simplified structure allows for faster and more efficient querying of large data sets, making it easier for analysts to extract insights from data.
Easier to maintain: The Star Schema is easy to maintain and update, making it ideal for organizations that need to make frequent changes to their data models.
Scalability: Lake House Architecture and Hudi allow for horizontal scaling, enabling organizations to easily scale up or down their data processing and storage needs as their business grows.
Step 1: Define imports¶
try:
import os
import sys
import uuid
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc, to_timestamp, monotonically_increasing_id, to_date, when, udf
from pyspark.sql.types import *
from functools import reduce
from faker import Faker
import pandas as pd
import boto3
import json
from datetime import datetime, date, timedelta
from pyspark.sql.functions import year, quarter, month, dayofmonth, weekofyear
from pyspark.sql import functions as F
except Exception as e:
print(e)
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
bucket = 'delta-streamer-demo-hudi'
os.environ['AWS_REGION'] = 'us-east-1'
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Data Generator Class¶
global faker
faker = Faker()
import uuid
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.datetime.now().isoformat().__str__(),
"address":faker.address()
}
customers_array.append(customer_data)
return customers_array
def get_orders_data(customer_ids, order_data_sample_size=3):
orders_array = []
for i in range(0, order_data_sample_size):
try:
order_id = uuid.uuid4().__str__()
customer_id= random.choice(customer_ids)
order_data = {
"order_id": order_id,
"name": faker.text(max_nb_chars=20),
"order_value": random.randint(10, 1000).__str__(),
"priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
"order_date": faker.date_between(start_date='-30d', end_date='today').strftime('%Y-%m-%d'),
"customer_id": customer_id
}
orders_array.append(order_data)
except Exception as e:
print(e)
return orders_array
Step 4: Method to Upsert into Hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Sample Preview of Customer data¶
global total_customers, order_data_sample_size
total_customers = 2
order_data_sample_size = 3
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
spark_df_customers.show()
+--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ | customer_id| name| state| city| email| created_at| address| +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ |bb559489-b346-46b...|Victoria Romero|Maryland| East Stacy|travis61@example.org|2023-04-26T14:09:...|21179 Audrey Lock...| |1298fe7d-6586-455...| Nathaniel Tate| Florida|North Kelly|sharon99@example.com|2023-04-26T14:09:...|591 Thomas Height...| +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+
Sample preview for Order Data¶
order_data = get_orders_data(order_data_sample_size=order_data_sample_size, customer_ids=[i.get("customer_id")for i in customer_data])
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data],
schema=list(order_data[0].keys()))
spark_df_orders.show()
+--------------------+-------------------+-----------+--------+----------+--------------------+ | order_id| name|order_value|priority|order_date| customer_id| +--------------------+-------------------+-----------+--------+----------+--------------------+ |79e3edc5-dcbc-460...| Maybe resource.| 282| LOW|2023-04-23|1298fe7d-6586-455...| |1e6f81e2-f1b4-464...|Half see situation.| 568| HIGH|2023-04-21|bb559489-b346-46b...| |e8939a8f-1476-4b4...| Sister apply.| 862| HIGH|2023-04-08|bb559489-b346-46b...| +--------------------+-------------------+-----------+--------+----------+--------------------+ None
UPSERT into Hudi tables¶
Upserting into Customer Hudi Tables¶
upsert_hudi_table(
db_name='hudidb',
table_name='customers',
record_id='customer_id',
precomb_key='created_at',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/customers
Upserting into Order Hudi Tables¶
upsert_hudi_table(
db_name='hudidb',
table_name='orders',
record_id='order_id',
precomb_key='order_date',
spark_df=spark_df_orders,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/orders
Bulding Hudi Date Dimensions¶
min_date = '2020-01-01'
max_date = '2025-01-01'
date_range = pd.date_range(start=min_date, end=max_date)
date_data = [(int(day.strftime('%Y%m%d')), day.year, day.month, day.day, str((day.month-1)//3+1),
day.strftime('%A'), day.weekday()) for day in date_range]
date_schema = ['date_key', 'year', 'month', 'day', 'quarter', 'weekday', 'weekday_number']
date_dim_df = spark.createDataFrame(date_data, schema=date_schema)
Upsert date dimension into Hudi¶
upsert_hudi_table(
db_name='hudidb',
table_name='dim_date',
record_id='date_key',
precomb_key='date_key',
spark_df=date_dim_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/dim_date
Creating Hudi Snapshots¶
customers = "file:///C:/tmp/hudidb/customers"
order_hudi = "file:///C:/tmp/hudidb/orders"
date_dim = 'file:///C:/tmp/hudidb/dim_date'
spark.read.format("hudi").load(customers).createOrReplaceTempView("customers")
spark.read.format("hudi").load(order_hudi).createOrReplaceTempView("orders")
spark.read.format("hudi").load(date_dim).createOrReplaceTempView("date_dim")
Preview of Hudi Dataset¶
- orders
- customers
- date dimensions
from pyspark.sql.functions import col
# Columns to exclude
exclude_cols = ['_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name']
# Define function to exclude columns and display DF in tabular format
def display_df(df):
for col_name in exclude_cols:
df = df.drop(col_name)
df.show()
# Display DataFrames
display_df(spark.sql("SELECT * FROM customers"))
display_df(spark.sql("SELECT * FROM orders"))
display_df(spark.sql("SELECT * FROM date_dim"))
+--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ | customer_id| name| state| city| email| created_at| address| +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ |bb559489-b346-46b...|Victoria Romero|Maryland| East Stacy|travis61@example.org|2023-04-26T14:09:...|21179 Audrey Lock...| |1298fe7d-6586-455...| Nathaniel Tate| Florida|North Kelly|sharon99@example.com|2023-04-26T14:09:...|591 Thomas Height...| +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ +--------------------+-------------------+-----------+--------+----------+--------------------+ | order_id| name|order_value|priority|order_date| customer_id| +--------------------+-------------------+-----------+--------+----------+--------------------+ |1e6f81e2-f1b4-464...|Half see situation.| 568| HIGH|2023-04-21|bb559489-b346-46b...| |e8939a8f-1476-4b4...| Sister apply.| 862| HIGH|2023-04-08|bb559489-b346-46b...| |79e3edc5-dcbc-460...| Maybe resource.| 282| LOW|2023-04-23|1298fe7d-6586-455...| +--------------------+-------------------+-----------+--------+----------+--------------------+ +--------+----+-----+---+-------+---------+--------------+ |date_key|year|month|day|quarter| weekday|weekday_number| +--------+----+-----+---+-------+---------+--------------+ |20220410|2022| 4| 10| 2| Sunday| 6| |20241002|2024| 10| 2| 4|Wednesday| 2| |20240430|2024| 4| 30| 2| Tuesday| 1| |20210103|2021| 1| 3| 1| Sunday| 6| |20210411|2021| 4| 11| 2| Sunday| 6| |20200313|2020| 3| 13| 1| Friday| 4| |20231014|2023| 10| 14| 4| Saturday| 5| |20220531|2022| 5| 31| 2| Tuesday| 1| |20211126|2021| 11| 26| 4| Friday| 4| |20240908|2024| 9| 8| 3| Sunday| 6| |20201006|2020| 10| 6| 4| Tuesday| 1| |20230909|2023| 9| 9| 3| Saturday| 5| |20220520|2022| 5| 20| 2| Friday| 4| |20200324|2020| 3| 24| 1| Tuesday| 1| |20220311|2022| 3| 11| 1| Friday| 4| |20200610|2020| 6| 10| 2|Wednesday| 2| |20210125|2021| 1| 25| 1| Monday| 0| |20241211|2024| 12| 11| 4|Wednesday| 2| |20211005|2021| 10| 5| 4| Tuesday| 1| |20210312|2021| 3| 12| 1| Friday| 4| +--------+----+-----+---+-------+---------+--------------+ only showing top 20 rows
Creating Hudi fact table¶
order_fact_df = spark.sql("""
SELECT
o.order_id,
c.customer_id,
o.order_value,
o.priority,
d.date_key
FROM
orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN (
SELECT date_key, to_date(date_key, 'yyyyMMdd') as date
FROM date_dim
) d ON to_date(o.order_date, 'yyyy-MM-dd') = d.date
""")
fact table +--------------------+--------------------+-----------+--------+--------+ | order_id| customer_id|order_value|priority|date_key| +--------------------+--------------------+-----------+--------+--------+ |1e6f81e2-f1b4-464...|bb559489-b346-46b...| 568| HIGH|20230421| |e8939a8f-1476-4b4...|bb559489-b346-46b...| 862| HIGH|20230408| |79e3edc5-dcbc-460...|1298fe7d-6586-455...| 282| LOW|20230423| +--------------------+--------------------+-----------+--------+--------+
Writeing into Hudi Fact table¶
upsert_hudi_table(
db_name='hudidb',
table_name='orders_fact',
record_id='order_id,customer_id',
precomb_key='order_id',
spark_df=order_fact_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/orders_fact
Reading from Fact Table¶
customers = "file:///C:/tmp/hudidb/customers"
order_hudi = "file:///C:/tmp/hudidb/orders"
date_dim = 'file:///C:/tmp/hudidb/dim_date'
order_faact = "file:///C:/tmp/hudidb/orders_fact"
spark.read.format("hudi").load(order_faact).createOrReplaceTempView("orders_fact")
from pyspark.sql.functions import col
# Columns to exclude
exclude_cols = ['_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name']
# Define function to exclude columns and display DF in tabular format
def display_df(df):
for col_name in exclude_cols:
df = df.drop(col_name)
df.show()
# Display DataFrames
print("customers table", end="\n")
display_df(spark.sql("SELECT * FROM customers"))
print("orders table", end="\n")
display_df(spark.sql("SELECT * FROM orders"))
print("Date Dimension ", end="\n")
display_df(spark.sql("SELECT * FROM date_dim"))
print("Fact table", end="\n")
display_df(spark.sql("SELECT * FROM orders_fact"))
customers table +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ | customer_id| name| state| city| email| created_at| address| +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ |bb559489-b346-46b...|Victoria Romero|Maryland| East Stacy|travis61@example.org|2023-04-26T14:09:...|21179 Audrey Lock...| |1298fe7d-6586-455...| Nathaniel Tate| Florida|North Kelly|sharon99@example.com|2023-04-26T14:09:...|591 Thomas Height...| +--------------------+---------------+--------+-----------+--------------------+--------------------+--------------------+ orders table +--------------------+-------------------+-----------+--------+----------+--------------------+ | order_id| name|order_value|priority|order_date| customer_id| +--------------------+-------------------+-----------+--------+----------+--------------------+ |1e6f81e2-f1b4-464...|Half see situation.| 568| HIGH|2023-04-21|bb559489-b346-46b...| |e8939a8f-1476-4b4...| Sister apply.| 862| HIGH|2023-04-08|bb559489-b346-46b...| |79e3edc5-dcbc-460...| Maybe resource.| 282| LOW|2023-04-23|1298fe7d-6586-455...| +--------------------+-------------------+-----------+--------+----------+--------------------+ Date Dimension +--------+----+-----+---+-------+---------+--------------+ |date_key|year|month|day|quarter| weekday|weekday_number| +--------+----+-----+---+-------+---------+--------------+ |20220410|2022| 4| 10| 2| Sunday| 6| |20241002|2024| 10| 2| 4|Wednesday| 2| |20240430|2024| 4| 30| 2| Tuesday| 1| |20210103|2021| 1| 3| 1| Sunday| 6| |20210411|2021| 4| 11| 2| Sunday| 6| |20200313|2020| 3| 13| 1| Friday| 4| |20231014|2023| 10| 14| 4| Saturday| 5| |20220531|2022| 5| 31| 2| Tuesday| 1| |20211126|2021| 11| 26| 4| Friday| 4| |20240908|2024| 9| 8| 3| Sunday| 6| |20201006|2020| 10| 6| 4| Tuesday| 1| |20230909|2023| 9| 9| 3| Saturday| 5| |20220520|2022| 5| 20| 2| Friday| 4| |20200324|2020| 3| 24| 1| Tuesday| 1| |20220311|2022| 3| 11| 1| Friday| 4| |20200610|2020| 6| 10| 2|Wednesday| 2| |20210125|2021| 1| 25| 1| Monday| 0| |20241211|2024| 12| 11| 4|Wednesday| 2| |20211005|2021| 10| 5| 4| Tuesday| 1| |20210312|2021| 3| 12| 1| Friday| 4| +--------+----+-----+---+-------+---------+--------------+ only showing top 20 rows Fact table +--------------------+--------------------+-----------+--------+--------+ | order_id| customer_id|order_value|priority|date_key| +--------------------+--------------------+-----------+--------+--------+ |79e3edc5-dcbc-460...|1298fe7d-6586-455...| 282| LOW|20230423| |1e6f81e2-f1b4-464...|bb559489-b346-46b...| 568| HIGH|20230421| |e8939a8f-1476-4b4...|bb559489-b346-46b...| 862| HIGH|20230408| +--------------------+--------------------+-----------+--------+--------+
Conclusion¶
- In conclusion, Lake House Architecture and Hudi provide a flexible and powerful data storage and processing solution for modern businesses. The Star Schema design pattern enhances the benefits of these technologies, providing a simple and effective way to query and analyze large data sets.
No comments:
Post a Comment