Mastering Slowly Changing Dimension with Hudi: A Step-by-Step Guide to Efficient Data Management¶
Slowly changing dimension (SCD) is a key concept in data management that helps track changes in data over time. It is particularly useful in data warehousing and data analytics, where historical data is essential for trend analysis, forecasting, and identifying patterns. In this readme, we will discuss the advantages of using slowly changing dimensions in the context of lakehouse architecture and how it can be implemented.
Advantages of Slowly Changing Dimension in Lakehouse Architecture¶
Lakehouse architecture is a new paradigm that combines the best of both worlds: data warehousing and data lake. It is designed to handle large-scale data workloads and enables faster data processing and analytics. Slowly changing dimensions play a crucial role in lakehouse architecture by providing the following benefits:
1. Improved Data Integrity¶
Slowly changing dimensions help maintain data integrity by ensuring that the data is accurate and consistent over time. This is particularly important in lakehouse architecture, where data is stored in multiple formats and may be updated frequently. With slowly changing dimensions, you can track changes in data and ensure that the data remains consistent.
2. Efficient Data Processing¶
Lakehouse architecture is designed to handle large-scale data workloads, and slowly changing dimensions help in processing the data efficiently. By tracking changes in data, you can avoid processing unnecessary data and focus on the relevant data.
3. Simplified Data Analysis¶
Slowly changing dimensions simplify data analysis by providing a comprehensive view of the data. With SCD, you can easily track changes in data and analyze the data over time. This is particularly useful in data warehousing and data analytics, where historical data is essential for trend analysis, forecasting, and identifying patterns.
Implementation of Slowly Changing Dimension in Lakehouse Architecture¶
Slowly changing dimension can be implemented in lakehouse architecture using different approaches. Here are three common approaches:
1. Type 1 SCD¶
Type 1 SCD involves replacing old data with new data. This approach is suitable for situations where data changes are infrequent and historical data is not required. In this approach, the old data is overwritten with new data, and the changes are not tracked.
2. Type 2 SCD¶
Type 2 SCD involves creating a new record for the changed data, and the old record is kept as it is. This approach is suitable for situations where historical data is required. In this approach, the new record has a new primary key value and a start and end date that represent the time period during which it is valid.
3. Type 3 SCD¶
Type 3 SCD involves tracking only the most recent value, and the history of changes is not maintained. This approach is suitable for situations where historical data is not required, and only the most recent value is relevant.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
import faker
import datetime
from datetime import datetime
from faker import Faker
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import time
from pyspark.sql.functions import udf, lit
except Exception as e:
print(e)
Step 2: Create Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Creating Data Generator class¶
global faker
faker = Faker()
import uuid
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.now().isoformat().__str__()
}
customers_array.append(customer_data)
return customers_array
Step 4: Method to Upsert into Hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Sample Preview of Customer Dataframe¶
from pyspark.sql.functions import udf, lit
import time
global total_customers, order_data_sample_size
total_customers = 5
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
random_udf = udf(lambda: str(uuid.uuid4()), StringType())
spark_df_customers = spark_df_customers.withColumn("customer_dim_key", random_udf())
spark_df_customers = spark_df_customers.withColumn("is_current", lit(True))
spark_df_customers.select([
'customer_id',
'name',
'state',
'city',
'created_at',
'customer_dim_key',
'is_current'
]).show()
+--------------------+-----------------+------------+----------------+--------------------+--------------------+----------+ | customer_id| name| state| city| created_at| customer_dim_key|is_current| +--------------------+-----------------+------------+----------------+--------------------+--------------------+----------+ |dcf40644-5147-4ed...|Reginald Copeland|North Dakota| Greerburgh|2023-05-03T07:24:...|6850c73a-d3ea-434...| true| |893f92b3-77b3-443...| Kelly Perez| Illinois| Lake Megan|2023-05-03T07:24:...|d3d3b269-79b3-4d6...| true| |2275e56a-40d3-4c6...| Marisa Rodriguez| Oregon|South Rickymouth|2023-05-03T07:24:...|693ba9dd-affa-481...| true| |26b09f56-fb1f-47b...| Shawn Bailey| Mississippi| West Shannon|2023-05-03T07:24:...|0ddcb1fc-495c-46b...| true| |04003093-100b-48f...| Maria Wallace| Utah|Lake Michaelbury|2023-05-03T07:24:...|574d130b-01d5-452...| true| +--------------------+-----------------+------------+----------------+--------------------+--------------------+----------+
Upserting into Hudi tables¶
upsert_hudi_table(
db_name='hudidb',
table_name='dim_customers',
record_id='customer_dim_key',
precomb_key='created_at',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/dim_customers
If a customer has updated their information, we would need to apply Slowly Changing Dimension Type 2.¶
Previous Information¶
customer_data[1]
{'customer_id': '893f92b3-77b3-443b-a764-362a3f2a2300', 'name': 'Kelly Perez', 'state': 'Illinois', 'city': 'Lake Megan', 'email': 'harrisonelizabeth@example.org', 'created_at': '2023-05-03T07:24:52.250777'}
New information¶
customer_data[1]['city'] = 'Stamford'
customer_data[1]['state'] = 'CT'
changed_item =[customer_data[1]]
changed_item
[{'customer_id': '893f92b3-77b3-443b-a764-362a3f2a2300', 'name': 'Kelly Perez', 'state': 'CT', 'city': 'Stamford', 'email': 'harrisonelizabeth@example.org', 'created_at': '2023-05-03T07:24:52.250777'}]
update_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in changed_item],
schema=list(changed_item[0].keys()))
update_df_customers = update_df_customers.withColumn("customer_dim_key", random_udf())
update_df_customers = update_df_customers.withColumn("is_current", lit(True))
update_df_customers.select([
'customer_id',
'name',
'state',
'city',
'created_at',
'customer_dim_key',
'is_current'
]).show()
+--------------------+-----------+-----+--------+--------------------+--------------------+----------+ | customer_id| name|state| city| created_at| customer_dim_key|is_current| +--------------------+-----------+-----+--------+--------------------+--------------------+----------+ |893f92b3-77b3-443...|Kelly Perez| CT|Stamford|2023-05-03T07:24:...|56d6250a-33f4-4d8...| true| +--------------------+-----------+-----+--------+--------------------+--------------------+----------+
Creating Snapshots¶
customers = "file:///C:/tmp/hudidb/dim_customers"
update_df_customers.createOrReplaceTempView("new_customer")
spark.read.format("hudi").load(customers).createOrReplaceTempView("old_customer")
Marking Old record is_current to False¶
query = """
SELECT
old_customer.customer_id,
old_customer.name,
old_customer.state,
old_customer.city,
old_customer.email,
old_customer.created_at,
old_customer.customer_dim_key,
CAST(false AS BOOLEAN) AS is_current
FROM
old_customer
INNER JOIN
new_customer
ON
old_customer.customer_id = new_customer.customer_id
AND
old_customer.is_current = true
"""
customers_to_update_df = spark.sql(query)
print("----------customers_to_update_df---------------")
customers_to_update_df.show()
----------customers_to_update_df--------------- +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ | customer_id| name| state| city| email| created_at| customer_dim_key|is_current| +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ |893f92b3-77b3-443...|Kelly Perez|Illinois|Lake Megan|harrisonelizabeth...|2023-05-03T07:24:...|f86f4824-f147-4e4...| false| +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+
Showing you DF¶
print("----------old record we marked is current to false---------------")
customers_to_update_df.show()
print("----------new record we marked is current to true---------------")
update_df_customers.show()
----------old record we marked is current to false--------------- +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ | customer_id| name| state| city| email| created_at| customer_dim_key|is_current| +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ |893f92b3-77b3-443...|Kelly Perez|Illinois|Lake Megan|harrisonelizabeth...|2023-05-03T07:24:...|f86f4824-f147-4e4...| false| +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ ----------new record we marked is current to true--------------- +--------------------+-----------+-----+--------+--------------------+--------------------+--------------------+----------+ | customer_id| name|state| city| email| created_at| customer_dim_key|is_current| +--------------------+-----------+-----+--------+--------------------+--------------------+--------------------+----------+ |893f92b3-77b3-443...|Kelly Perez| CT|Stamford|harrisonelizabeth...|2023-05-03T07:24:...|eb8414d8-54d4-4c9...| true| +--------------------+-----------+-----+--------+--------------------+--------------------+--------------------+----------+
Merging Both the records¶
merged_customers_df = update_df_customers. unionByName(customers_to_update_df)
merged_customers_df.show()
+--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ | customer_id| name| state| city| email| created_at| customer_dim_key|is_current| +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+ |893f92b3-77b3-443...|Kelly Perez| CT| Stamford|harrisonelizabeth...|2023-05-03T07:24:...|30ffe5cd-ce2f-403...| true| |893f92b3-77b3-443...|Kelly Perez|Illinois|Lake Megan|harrisonelizabeth...|2023-05-03T07:24:...|f86f4824-f147-4e4...| false| +--------------------+-----------+--------+----------+--------------------+--------------------+--------------------+----------+
Upsert into Hudi DIM¶
upsert_hudi_table(
db_name='hudidb',
table_name='dim_customers',
record_id='customer_dim_key',
precomb_key='created_at',
spark_df=merged_customers_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/dim_customers
Reading from Hudi¶
customers = "file:///C:/tmp/hudidb/dim_customers"
spark.read.format("hudi").load(customers).createOrReplaceTempView("customer")
spark.sql("""
select customer_id,
name,
state,
city,
created_at,
is_current
from customer
where name = 'Kelly Perez'
""").show()
+--------------------+-----------+--------+----------+--------------------+----------+ | customer_id| name| state| city| created_at|is_current| +--------------------+-----------+--------+----------+--------------------+----------+ |893f92b3-77b3-443...|Kelly Perez|Illinois|Lake Megan|2023-05-03T07:24:...| false| |893f92b3-77b3-443...|Kelly Perez| CT| Stamford|2023-05-03T07:24:...| true| +--------------------+-----------+--------+----------+--------------------+----------+
Mission Accomplished¶
Conclusion¶
Slowly changing dimensions play a crucial role in data warehousing and data analytics by providing a comprehensive view of the data. In the context of lakehouse architecture, SCDs provide numerous benefits, including improved data integrity, efficient data processing, and simplified data analysis. By implementing slowly changing dimensions in lakehouse architecture, organizations can achieve better data management and faster data processing.
No comments:
Post a Comment