Unify Your Event Data: A Guide to Mapping Events to a Standardized Format with Incremental ETL using Hudi¶
The project "Unify Your Event Data: A Guide to Mapping Events to a Standardized Format with Incremental ETL using Hudi" aims to streamline event data from IoT devices by mapping various event names to a standardized format. By leveraging incremental ETL (Extract, Transform, Load) techniques with Apache Hudi, the project efficiently processes and integrates incoming events into a centralized data lake. This standardized data enables businesses to analyze and track key performance indicators (KPIs) and metrics effectively.
The project involves ingesting events from IoT devices, received at regular intervals through Kinesis Streams, into a Hudi data lake. These events, such as "Temperature High Alert" or "High Temperature Alert," are mapped to a standardized value, such as "High_Temperature_Alert," using a mapping table. The incremental ETL process extracts new events, joins them with the date dimension and mapping table, and populates the fact table. By adopting incremental ETL, the project optimizes processing time and reduces costs associated with reprocessing entire datasets. The resulting fact table provides a consolidated view of the events, allowing businesses to gain valuable insights and make informed decisions based on standardized and integrated event data.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc, to_timestamp, monotonically_increasing_id, to_date, when, udf
from pyspark.sql.types import *
from functools import reduce
from faker import Faker
import pandas as pd
import boto3
import json
from datetime import datetime, date, timedelta
from pyspark.sql.functions import year, quarter, month, dayofmonth, weekofyear
from pyspark.sql import functions as F
except Exception as e:
print(e)
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Data Generator Class¶
import random
import time
import uuid
def generate_iot_sensor_data(total_devices, total_sensors, num_entries):
sensor_data_list = []
for _ in range(num_entries):
# Generate a unique ID (GUID) for the data
unique_id = str(uuid.uuid4())
# Generate a random timestamp
timestamp = random.randint(int(time.time()) - 3600*24*365, int(time.time())) # Random timestamp within the last year
# Generate the device and sensor IDs
device_id = random.randint(1, total_devices)
sensor_id = random.randint(1, total_sensors)
# Generate the event data
event = random.choice(["Temperature High Alert", "High Temperature Alert",
"Temperature Exceeded", "Temperature Threshold Exceeded",
"Temperature Spike Alert", "Temperature Danger Alert",
"Temperature Critical Alert"])
# Create the sensor data dictionary
sensor_data = {
"sensor_id": sensor_id,
"device_id": device_id,
"timestamp": timestamp,
"unique_id": unique_id,
"event": event
}
sensor_data_list.append(sensor_data)
return sensor_data_list
Date Dimension DataFrame¶
min_date = '2020-01-01'
max_date = '2025-01-01'
global total_customers, order_data_sample_size
total_customers = 50
order_data_sample_size = 100
date_range = pd.date_range(start=min_date, end=max_date)
date_data = [(int(day.strftime('%Y%m%d')), day.year, day.month, day.day, str((day.month-1)//3+1),
day.strftime('%A'), day.weekday()) for day in date_range]
date_schema = ['date_key', 'year', 'month', 'day', 'quarter', 'weekday', 'weekday_number']
date_dim_df = spark.createDataFrame(date_data, schema=date_schema)
date_dim_df.show()
+--------+----+-----+---+-------+---------+--------------+ |date_key|year|month|day|quarter| weekday|weekday_number| +--------+----+-----+---+-------+---------+--------------+ |20200101|2020| 1| 1| 1|Wednesday| 2| |20200102|2020| 1| 2| 1| Thursday| 3| |20200103|2020| 1| 3| 1| Friday| 4| |20200104|2020| 1| 4| 1| Saturday| 5| |20200105|2020| 1| 5| 1| Sunday| 6| |20200106|2020| 1| 6| 1| Monday| 0| |20200107|2020| 1| 7| 1| Tuesday| 1| |20200108|2020| 1| 8| 1|Wednesday| 2| |20200109|2020| 1| 9| 1| Thursday| 3| |20200110|2020| 1| 10| 1| Friday| 4| |20200111|2020| 1| 11| 1| Saturday| 5| |20200112|2020| 1| 12| 1| Sunday| 6| |20200113|2020| 1| 13| 1| Monday| 0| |20200114|2020| 1| 14| 1| Tuesday| 1| |20200115|2020| 1| 15| 1|Wednesday| 2| |20200116|2020| 1| 16| 1| Thursday| 3| |20200117|2020| 1| 17| 1| Friday| 4| |20200118|2020| 1| 18| 1| Saturday| 5| |20200119|2020| 1| 19| 1| Sunday| 6| |20200120|2020| 1| 20| 1| Monday| 0| +--------+----+-----+---+-------+---------+--------------+ only showing top 20 rows
Event table¶
total_devices = 5
total_sensors = 3
num_entries = 10
data_list = generate_iot_sensor_data(total_devices, total_sensors, num_entries)
events_data_df = spark.createDataFrame(data=[tuple(i.values()) for i in data_list],schema=list(data_list[0].keys()))
events_data_df.show(truncate=False)
+---------+---------+----------+------------------------------------+------------------------------+ |sensor_id|device_id|timestamp |unique_id |event | +---------+---------+----------+------------------------------------+------------------------------+ |2 |3 |1669929574|03dff10d-ef4e-4db6-b710-1995446c2e48|Temperature Exceeded | |1 |4 |1653874268|f4cf9dc7-3415-45de-9801-cc240797a7e1|Temperature Spike Alert | |2 |4 |1661218823|022f49cd-8498-4e28-ab0a-7290c09741b7|Temperature Exceeded | |3 |3 |1676597652|4536e403-378c-4e74-9195-d612a0c96cc0|Temperature Danger Alert | |2 |2 |1655529165|bc4a378a-c0e9-42ea-82fa-eb8e4cd2a00b|Temperature Danger Alert | |1 |3 |1675115078|8c565b95-ebdf-4f9c-ab0f-4941a720aafa|Temperature Threshold Exceeded| |2 |4 |1672099724|52f29bbe-7c59-450b-920b-4607b58550a4|High Temperature Alert | |1 |3 |1663237495|04da524c-9b56-410f-ac2a-28eb2e747106|Temperature High Alert | |1 |4 |1657939739|8e8089af-1972-4cd3-921e-73a43e751c96|Temperature Spike Alert | |2 |4 |1673910737|15f13f49-0cdb-4144-af5b-755c15939122|Temperature Exceeded | +---------+---------+----------+------------------------------------+------------------------------+
Mapping Table¶
mappings = [
{"id": 1, "event_name": "Temperature High Alert", "standardized_event_name": "High_Temperature_Alert"},
{"id": 2, "event_name": "High Temperature Alert", "standardized_event_name": "High_Temperature_Alert"},
{"id": 3, "event_name": "Temperature Exceeded", "standardized_event_name": "High_Temperature_Alert"},
{"id": 4, "event_name": "Temperature Threshold Exceeded", "standardized_event_name": "High_Temperature_Alert"},
{"id": 5, "event_name": "Temperature Spike Alert", "standardized_event_name": "High_Temperature_Alert"},
{"id": 6, "event_name": "Temperature Danger Alert", "standardized_event_name": "High_Temperature_Alert"},
{"id": 7, "event_name": "Temperature Critical Alert", "standardized_event_name": "High_Temperature_Alert"}
]
mapped_data_df = spark.createDataFrame(data=[tuple(i.values()) for i in mappings],schema=list(mappings[0].keys()))
mapped_data_df.show(truncate=False)
+---+------------------------------+-----------------------+ |id |event_name |standardized_event_name| +---+------------------------------+-----------------------+ |1 |Temperature High Alert |High_Temperature_Alert | |2 |High Temperature Alert |High_Temperature_Alert | |3 |Temperature Exceeded |High_Temperature_Alert | |4 |Temperature Threshold Exceeded|High_Temperature_Alert | |5 |Temperature Spike Alert |High_Temperature_Alert | |6 |Temperature Danger Alert |High_Temperature_Alert | |7 |Temperature Critical Alert |High_Temperature_Alert | +---+------------------------------+-----------------------+
Method to UPSERT into Hudi¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
upsert_hudi_table(
db_name='hudidb',
table_name='dim_date',
record_id='date_key',
precomb_key='date_key',
spark_df=date_dim_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='mappings',
record_id='id',
precomb_key='id',
spark_df=mapped_data_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='events',
record_id='unique_id,',
precomb_key='timestamp',
spark_df=events_data_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/dim_date path file:///C:/tmp/hudidb/mappings path file:///C:/tmp/hudidb/events
Hudi Paths¶
mapping = "file:///C:/tmp/hudidb/mappings"
event = "file:///C:/tmp/hudidb/events"
dim_date = "file:///C:/tmp/hudidb/dim_date"
Incrementally Fecthing the Events and Performing Joins with Mapping and DateDimesion for Fact table¶
bucket = 'delta-streamer-demo-hudi'
os.environ['AWS_REGION'] = 'us-east-1'
bucket = 'delta-streamer-demo-hudi'
os.environ['AWS_ACCESS_KEY'] = "XXXX"
os.environ['AWS_SECRET_KEY'] = "XXXX"
events_helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='events',
path=event
),
spark_session=spark
)
inc_events_df = events_helper.read()
Creating Snapshots¶
inc_events_df.createOrReplaceTempView("events")
spark.read.format("hudi").load(dim_date).createOrReplaceTempView("dim_date")
spark.read.format("hudi").load(mapping).createOrReplaceTempView("mappings")
Fact Table¶
query = """
SELECT
e.sensor_id,
e.device_id,
d.date_key,
e.unique_id,
COALESCE(m.standardized_event_name, 'unknown') AS standardized_event_name
FROM
events e
LEFT JOIN
mappings m ON e.event = m.event_name
LEFT JOIN
dim_date d ON date_format(from_unixtime(e.timestamp), 'yyyyMMdd') = d.date_key
"""
spark_df = spark.sql(query)
spark_df.show(truncate=False)
+---------+---------+--------+------------------------------------+-----------------------+ |sensor_id|device_id|date_key|unique_id |standardized_event_name| +---------+---------+--------+------------------------------------+-----------------------+ |1 |3 |20230130|8c565b95-ebdf-4f9c-ab0f-4941a720aafa|High_Temperature_Alert | |2 |3 |20221201|03dff10d-ef4e-4db6-b710-1995446c2e48|High_Temperature_Alert | |1 |3 |20220915|04da524c-9b56-410f-ac2a-28eb2e747106|High_Temperature_Alert | |2 |2 |20220618|bc4a378a-c0e9-42ea-82fa-eb8e4cd2a00b|High_Temperature_Alert | |1 |4 |20220715|8e8089af-1972-4cd3-921e-73a43e751c96|High_Temperature_Alert | |1 |4 |20220529|f4cf9dc7-3415-45de-9801-cc240797a7e1|High_Temperature_Alert | |2 |4 |20220822|022f49cd-8498-4e28-ab0a-7290c09741b7|High_Temperature_Alert | |2 |4 |20221226|52f29bbe-7c59-450b-920b-4607b58550a4|High_Temperature_Alert | |3 |3 |20230216|4536e403-378c-4e74-9195-d612a0c96cc0|High_Temperature_Alert | |2 |4 |20230116|15f13f49-0cdb-4144-af5b-755c15939122|High_Temperature_Alert | +---------+---------+--------+------------------------------------+-----------------------+
Regular Expression Query¶
query = """
SELECT
e.sensor_id,
e.device_id,
d.date_key,
e.unique_id,
COALESCE(m.standardized_event_name, 'unknown') AS standardized_event_name
FROM
events e
LEFT JOIN
mappings m ON e.event RLIKE m.event_name
LEFT JOIN
dim_date d ON date_format(from_unixtime(e.timestamp), 'yyyyMMdd') = d.date_key
"""
spark_df = spark.sql(query)
spark_df.show(truncate=False)
+---------+---------+--------+------------------------------------+-----------------------+ |sensor_id|device_id|date_key|unique_id |standardized_event_name| +---------+---------+--------+------------------------------------+-----------------------+ |1 |3 |20230130|8c565b95-ebdf-4f9c-ab0f-4941a720aafa|High_Temperature_Alert | |2 |3 |20221201|03dff10d-ef4e-4db6-b710-1995446c2e48|High_Temperature_Alert | |1 |3 |20220915|04da524c-9b56-410f-ac2a-28eb2e747106|High_Temperature_Alert | |2 |2 |20220618|bc4a378a-c0e9-42ea-82fa-eb8e4cd2a00b|High_Temperature_Alert | |1 |4 |20220715|8e8089af-1972-4cd3-921e-73a43e751c96|High_Temperature_Alert | |1 |4 |20220529|f4cf9dc7-3415-45de-9801-cc240797a7e1|High_Temperature_Alert | |2 |4 |20220822|022f49cd-8498-4e28-ab0a-7290c09741b7|High_Temperature_Alert | |2 |4 |20221226|52f29bbe-7c59-450b-920b-4607b58550a4|High_Temperature_Alert | |3 |3 |20230216|4536e403-378c-4e74-9195-d612a0c96cc0|High_Temperature_Alert | |2 |4 |20230116|15f13f49-0cdb-4144-af5b-755c15939122|High_Temperature_Alert | +---------+---------+--------+------------------------------------+-----------------------+
Upserting Fact Table into Hudi¶
upsert_hudi_table(
db_name='hudidb',
table_name='mapped_events',
record_id='unique_id',
precomb_key='unique_id',
spark_df=spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/mapped_events
Let's try reading again with an incremental query to see whether there is any new data to Process; in this case, we should get an empty df.¶
inc_events_df = events_helper.read()
inc_events_df.show()
******************LOGS****************** meta_data {'last_processed_commit': '20230516185703593', 'table_name': 'events', 'path': 'file:///C:/tmp/hudidb/events', 'inserted_time': '2023-05-16 19:03:44.156305'} last_processed_commit : 20230516185703593 *************************************** +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+---------+---------+-----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|sensor_id|device_id|timestamp|unique_id|event| +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+---------+---------+-----+ +-------------------+--------------------+------------------+----------------------+-----------------+---------+---------+---------+---------+-----+
Perfect¶
TIP:¶
- If there are any events without a corresponding mapping, they will automatically be assigned as "unknown". Later, you can retrieve these records with unknown mappings and add them to the mapping table. Subsequently, you can simply update the fact table, also known as backfilling, with this new mapping information.
conclusion¶
In conclusion, the project "Unify Your Event Data: A Guide to Mapping Events to a Standardized Format with Incremental ETL using Hudi" offers a systematic approach to harmonize event data from IoT devices. By mapping diverse event names to a standardized format and employing incremental ETL techniques with Hudi, the project enhances data integration and analysis capabilities. The use of a mapping table and incremental updates significantly reduces processing time and costs, allowing businesses to efficiently handle incoming event streams.
By unifying event data into a centralized data lake and populating a fact table with standardized values, businesses gain a holistic view of their IoT events. This standardized and integrated data empowers organizations to track key performance indicators and metrics accurately, enabling informed decision-making. With the project's emphasis on efficiency, scalability, and improved data quality, businesses can unlock valuable insights from their event data and derive maximum value from their IoT investments.
No comments:
Post a Comment