Efficiently Managing Ride and Late Arriving Tips Data with Incremental ETL using Apache Hudi : Step by Step guide
- I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as Lead DataEngineer where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search
Divyansh patel¶
- I'm a highly skilled and motivated professional with a Master's degree in Computer Science and extensive experience in Data Engineering and AWS Cloud Engineering. I'm currently working with the renowned industry expert Soumil Shah and thrive on tackling complex problems and delivering innovative solutions. My passion for problem-solving and commitment to excellence enable me to make a positive impact on any project or team I work with. I look forward to connecting and collaborating with like-minded professionals
What is Incremental ETL ?¶
- Incremental ETL is the process of extracting, transforming, and loading only the data that has been updated since the last data processing job. This approach is in contrast to full ETL, where all data is processed every time a job runs, regardless of whether it has changed or not.
Advantages of Incremental ETL¶
The advantages of incremental ETL include:
Reduced processing time: Since only the updated data is processed, incremental ETL significantly reduces the amount of time required to complete data processing jobs.
Improved data accuracy: Incremental ETL ensures that only the most up-to-date data is processed, reducing the risk of errors that can occur when processing outdated data.
Cost-effective: Incremental ETL can help to reduce storage costs as only the updated data needs to be stored, rather than processing the entire dataset every time a job runs.
Step 1: Define imports¶
try:
import os
import sys
import uuid
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc, to_timestamp, monotonically_increasing_id, to_date, when, udf
from pyspark.sql.types import *
from functools import reduce
from faker import Faker
import pandas as pd
import boto3
import json
from datetime import datetime, date, timedelta
from pyspark.sql.functions import year, quarter, month, dayofmonth, weekofyear
from pyspark.sql import functions as F
except Exception as e:
print(e)
Step 2: Define Spark Session¶
with open("dev.env.txt", "r") as f:
data = f.read()
for items in data.split("\n"): os.environ[items.split("=")[0]] = items.split("=")[1]
with open("dev.env.txt", "r") as f:
data = f.read()
for items in data.split("\n"): os.environ[items.split("=")[0]] = items.split("=")[1]
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
bucket = 'delta-streamer-demo-hudi'
os.environ['AWS_REGION'] = 'us-east-1'
bucket = 'delta-streamer-demo-hudi'
os.environ['AWS_ACCESS_KEY'] = os.getenv("DEV_ACCESS_KEY")
os.environ['AWS_SECRET_KEY'] = os.getenv("DEV_SECRET_KEY")
os.environ['AWS_REGION'] = 'us-east-1'
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Data generator¶
from faker import Faker
import random
# Create an instance of the Faker class
fake = Faker()
# Function to generate fake data for the ride table
def generate_ride_data(num_records, drivers):
rides = []
for i in range(num_records):
ride = {
"ride_id": i + 1,
"user_id": fake.random_int(min=1, max=100),
"driver_id": random.choice(drivers)['driver_id'],
"ride_date": fake.date_between(start_date='-30d', end_date='today'),
"ride_time": fake.time(pattern='%H:%M:%S', end_datetime=None),
"pickup_location": fake.address(),
"dropoff_location": fake.address(),
"distance_travelled": round(random.uniform(0.5, 10.0), 2),
"ride_duration": fake.time(pattern='%H:%M:%S', end_datetime=None),
"fare": round(random.uniform(5.0, 50.0), 2)
}
rides.append(ride)
return rides
# Function to generate fake data for the tips table
def generate_tip_data(num_records, ride_ids, drivers):
tips = []
for i in range(num_records):
tip = {
"tip_id": i + 1,
"ride_id": random.choice(ride_ids),
"tip_amount": round(random.uniform(0.0, 5.0), 2),
"tipping_date": fake.date_between(start_date='-30d', end_date='today'),
"tipping_time": fake.time(pattern='%H:%M:%S', end_datetime=None)
}
tips.append(tip)
return tips
def generate_driver_data(num_records):
drivers = []
for i in range(num_records):
driver = {
"driver_id": i + 1,
"name": fake.name(),
"phone_number": fake.phone_number(),
"email": fake.email(),
"city": fake.city(),
"state": fake.state(),
"car_make": fake.word(),
"car_model": fake.word(),
"license_plate": fake.license_plate(),
"avg_rating": round(random.uniform(2.0, 5.0), 2),
"num_ratings": random.randint(0, 100),
}
drivers.append(driver)
return drivers
# Example usage
drivers = generate_driver_data(5)
rides = generate_ride_data(3, drivers)
ride_ids = [ride['ride_id'] for ride in rides]
tips = generate_tip_data(2,ride_ids, drivers)
Step 4: Define Method to Perform upsert into Hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Step 5 : Sample preview of dataset¶
Preview of Rides Dataset¶
rides_df = spark.createDataFrame(data=[tuple(i.values()) for i in rides],
schema=list(rides[0].keys()))
rides_df.select(['ride_id',
'user_id',
'driver_id',
'ride_date',
'fare']).show()
+-------+-------+---------+----------+-----+ |ride_id|user_id|driver_id| ride_date| fare| +-------+-------+---------+----------+-----+ | 1| 75| 4|2023-04-22| 47.1| | 2| 93| 4|2023-04-07|28.86| | 3| 65| 1|2023-04-01|23.16| +-------+-------+---------+----------+-----+
Preview of Tips Dataset Which are late arriving¶
tips_df = spark.createDataFrame(data=[tuple(i.values()) for i in tips],
schema=list(tips[0].keys()))
tips_df.show()
+------+-------+----------+------------+------------+ |tip_id|ride_id|tip_amount|tipping_date|tipping_time| +------+-------+----------+------------+------------+ | 1| 3| 2.48| 2023-04-03| 11:18:31| | 2| 3| 1.46| 2023-04-26| 06:08:33| +------+-------+----------+------------+------------+
Preview of Driver Dimensions¶
driver_df = spark.createDataFrame(data=[tuple(i.values()) for i in drivers],
schema=list(drivers[0].keys()))
driver_df.select(['driver_id',
'name',
'city',
'state',
'car_model',
'license_plate',
'avg_rating']).show()
+---------+---------------+----------------+-------------+---------+-------------+----------+ |driver_id| name| city| state|car_model|license_plate|avg_rating| +---------+---------------+----------------+-------------+---------+-------------+----------+ | 1| Nancy Weber|New Tammyborough| Rhode Island| south| H86-YTS| 2.59| | 2|Steven Williams| Walshport|New Hampshire| truth| 7XQ04| 3.06| | 3| Nicholas Ward| East Eduardo| Louisiana| if| JI 15967| 2.62| | 4| Nathan Morris| Lake Ryantown| Nevada| must| 0RZ 545| 2.9| | 5| Kevin Gardner| Laneburgh| Virginia| prepare| O04 1BA| 3.16| +---------+---------------+----------------+-------------+---------+-------------+----------+
Preview of Date Dimensions¶
min_date = '2020-01-01'
max_date = '2025-01-01'
date_range = pd.date_range(start=min_date, end=max_date)
date_data = [(int(day.strftime('%Y%m%d')), day.year, day.month, day.day, str((day.month-1)//3+1),
day.strftime('%A'), day.weekday()) for day in date_range]
date_schema = ['date_key', 'year', 'month', 'day', 'quarter', 'weekday', 'weekday_number']
date_dim_df = spark.createDataFrame(date_data, schema=date_schema)
date_dim_df.show()
+--------+----+-----+---+-------+---------+--------------+ |date_key|year|month|day|quarter| weekday|weekday_number| +--------+----+-----+---+-------+---------+--------------+ |20200101|2020| 1| 1| 1|Wednesday| 2| |20200102|2020| 1| 2| 1| Thursday| 3| |20200103|2020| 1| 3| 1| Friday| 4| |20200104|2020| 1| 4| 1| Saturday| 5| |20200105|2020| 1| 5| 1| Sunday| 6| |20200106|2020| 1| 6| 1| Monday| 0| |20200107|2020| 1| 7| 1| Tuesday| 1| |20200108|2020| 1| 8| 1|Wednesday| 2| |20200109|2020| 1| 9| 1| Thursday| 3| |20200110|2020| 1| 10| 1| Friday| 4| |20200111|2020| 1| 11| 1| Saturday| 5| |20200112|2020| 1| 12| 1| Sunday| 6| |20200113|2020| 1| 13| 1| Monday| 0| |20200114|2020| 1| 14| 1| Tuesday| 1| |20200115|2020| 1| 15| 1|Wednesday| 2| |20200116|2020| 1| 16| 1| Thursday| 3| |20200117|2020| 1| 17| 1| Friday| 4| |20200118|2020| 1| 18| 1| Saturday| 5| |20200119|2020| 1| 19| 1| Sunday| 6| |20200120|2020| 1| 20| 1| Monday| 0| +--------+----+-----+---+-------+---------+--------------+ only showing top 20 rows
Creating Hudi tables for tips and Rides, Driver and Date Dims¶
upsert_hudi_table(
db_name='hudidb',
table_name='rides',
record_id='ride_id',
precomb_key='ride_date',
spark_df=rides_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='tips',
record_id='tip_id',
precomb_key='tipping_date',
spark_df=tips_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='dim_date',
record_id='date_key',
precomb_key='date_key',
spark_df=date_dim_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='driver',
record_id='driver_id',
precomb_key='driver_id',
spark_df=driver_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/rides path file:///C:/tmp/hudidb/tips path file:///C:/tmp/hudidb/dim_date
We need Dashboard which shows Earning for Driver with Tips which can be late arrving updates¶
- We can incrementally pull data from the rides table and continuously update the fact table. In the process, we can set the tip amount to 0. Additionally, we can have another job that iterates over the tips table incrementally. If a record is found, we will update the tips table. If a record is not found, we will attempt to retrieve the records from the rides table and then perform an insert into the fact table.
Python Utility Helper class for Incremental Data Extraction¶
import ast
try:
import sys
import os
import datetime
from ast import literal_eval
import re
import boto3
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import json
from dataclasses import dataclass
from datetime import datetime
except Exception as e:
pass
class AWSS3(object):
"""Helper class to which add functionality on top of boto3 """
def __init__(self, bucket):
self.BucketName = bucket
self.client = boto3.client("s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
region_name=os.getenv("AWS_REGION")
)
def put_files(self, Response=None, Key=None):
"""
Put the File on S3
:return: Bool
"""
try:
response = self.client.put_object(
Body=Response, Bucket=self.BucketName, Key=Key
)
return "ok"
except Exception as e:
raise Exception("Error : {} ".format(e))
def item_exists(self, Key):
"""Given key check if the items exists on AWS S3 """
try:
response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
return True
except Exception as e:
return False
def get_item(self, Key):
"""Gets the Bytes Data from AWS S3 """
try:
response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
return response_new["Body"].read()
except Exception as e:
print("Error :{}".format(e))
return False
def find_one_update(self, data=None, key=None):
"""
This checks if Key is on S3 if it is return the data from s3
else store on s3 and return it
"""
flag = self.item_exists(Key=key)
if flag:
data = self.get_item(Key=key)
return data
else:
self.put_files(Key=key, Response=data)
return data
def delete_object(self, Key):
response = self.client.delete_object(Bucket=self.BucketName, Key=Key, )
return response
def get_all_keys(self, Prefix=""):
"""
:param Prefix: Prefix string
:return: Keys List
"""
try:
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.BucketName, Prefix=Prefix)
tmp = []
for page in pages:
for obj in page["Contents"]:
tmp.append(obj["Key"])
return tmp
except Exception as e:
return []
def print_tree(self):
keys = self.get_all_keys()
for key in keys:
print(key)
return None
def find_one_similar_key(self, searchTerm=""):
keys = self.get_all_keys()
return [key for key in keys if re.search(searchTerm, key)]
def __repr__(self):
return "AWS S3 Helper class "
@dataclass
class HUDISettings:
"""Class for keeping track of an item in inventory."""
table_name: str
path: str
class HUDIIncrementalReader(AWSS3):
def __init__(self, bucket, hudi_settings, spark_session):
AWSS3.__init__(self, bucket=bucket)
if type(hudi_settings).__name__ != "HUDISettings": raise Exception("please pass correct settings ")
self.hudi_settings = hudi_settings
self.spark = spark_session
def __check_meta_data_file(self):
"""
check if metadata for table exists
:return: Bool
"""
file_name = f"metadata/{self.hudi_settings.table_name}.json"
return self.item_exists(Key=file_name)
def __read_meta_data(self):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
return ast.literal_eval(self.get_item(Key=file_name).decode("utf-8"))
def __push_meta_data(self, json_data):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
self.put_files(
Key=file_name, Response=json.dumps(json_data)
)
def clean_check_point(self):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
self.delete_object(Key=file_name)
def __get_begin_commit(self):
self.spark.read.format("hudi").load(self.hudi_settings.path).createOrReplaceTempView("hudi_snapshot")
commits = list(map(lambda row: row[0], self.spark.sql(
"select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime asc").limit(
50).collect()))
"""begin from start """
begin_time = int(commits[0]) - 1
return begin_time
def __read_inc_data(self, commit_time):
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': commit_time,
}
incremental_df = self.spark.read.format("hudi").options(**incremental_read_options).load(
self.hudi_settings.path).createOrReplaceTempView("hudi_incremental")
df = self.spark.sql("select * from hudi_incremental")
return df
def __get_last_commit(self):
commits = list(map(lambda row: row[0], self.spark.sql(
"select distinct(_hoodie_commit_time) as commitTime from hudi_incremental order by commitTime asc").limit(
50).collect()))
last_commit = commits[len(commits) - 1]
return last_commit
def __run(self):
"""Check the metadata file"""
flag = self.__check_meta_data_file()
"""if metadata files exists load the last commit and start inc loading from that commit """
if flag:
meta_data = json.loads(self.__read_meta_data())
print(f"""
******************LOGS******************
meta_data {meta_data}
last_processed_commit : {meta_data.get("last_processed_commit")}
***************************************
""")
read_commit = str(meta_data.get("last_processed_commit"))
df = self.__read_inc_data(commit_time=read_commit)
"""if there is no INC data then it return Empty DF """
if not df.rdd.isEmpty():
last_commit = self.__get_last_commit()
self.__push_meta_data(json_data=json.dumps({
"last_processed_commit": last_commit,
"table_name": self.hudi_settings.table_name,
"path": self.hudi_settings.path,
"inserted_time": datetime.now().__str__(),
}))
return df
else:
return df
else:
"""Metadata files does not exists meaning we need to create metadata file on S3 and start reading from begining commit"""
read_commit = self.__get_begin_commit()
df = self.__read_inc_data(commit_time=read_commit)
last_commit = self.__get_last_commit()
self.__push_meta_data(json_data=json.dumps({
"last_processed_commit": last_commit,
"table_name": self.hudi_settings.table_name,
"path": self.hudi_settings.path,
"inserted_time": datetime.now().__str__(),
}))
return df
def read(self):
"""
reads INC data and return Spark Df
:return:
"""
return self.__run()
Main logic¶
Create instance for Incremnetal pull heklper class¶
rides_path = "file:///C:/tmp/hudidb/rides"
tips_path = "file:///C:/tmp/hudidb/tips"
date_path = "file:///C:/tmp/hudidb/dim_date"
rides_helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='rides',
path=rides_path),
spark_session=spark
)
tips_helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='tips',
path=tips_path),
spark_session=spark
)
Can be used to flush checkpoints for testing and debugging¶
# rides_helper.clean_check_point()
# tips_helper.clean_check_point()
inc_rides_df = rides_helper.read()
inc_tips_df = tips_helper.read()
print("count rides", inc_rides_df.count())
print("count tips", inc_tips_df.count())
count rides 3 count tips 2
Creating Snapshots¶
inc_rides_df.createOrReplaceTempView("rides")
inc_tips_df.createOrReplaceTempView("tips")
date_dim = 'file:///C:/tmp/hudidb/dim_date'
spark.read.format("hudi").load(date_dim).createOrReplaceTempView("date_dim")
inserting into Fact Table¶
earning_fact_df = spark.sql("""
SELECT
r.driver_id,
r.ride_id,
r.fare,
COALESCE(t.tip_amount, 0) AS tip_amount,
(r.fare + COALESCE(t.tip_amount, 0)) AS total_amount,
d.date_key as earning_date_key
FROM
rides r
LEFT JOIN
tips t ON r.ride_id = t.ride_id
JOIN (
SELECT
date_key,
to_date(date_key, 'yyyyMMdd') as date
FROM
date_dim
) d ON to_date(r.ride_date, 'yyyy-MM-dd') = d.date
""")
earning_fact_df.show()
+---------+-------+-----+----------+------------+----------------+ |driver_id|ride_id| fare|tip_amount|total_amount|earning_date_key| +---------+-------+-----+----------+------------+----------------+ | 4| 1| 47.1| 0.0| 47.1| 20230422| | 1| 3|23.16| 1.46| 24.62| 20230401| | 1| 3|23.16| 2.48| 25.64| 20230401| | 4| 2|28.86| 0.0| 28.86| 20230407| +---------+-------+-----+----------+------------+----------------+
upsert into Fact Table¶
upsert_hudi_table(
db_name='hudidb',
table_name='driver_earnings',
record_id='driver_id,ride_id',
precomb_key='ride_id',
spark_df=earning_fact_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/driver_earnings
Say you got late Arrving Tips which is very common people added tip later on tomorrow¶
late_arrving_updates = [
{
'tip_id': 3,
'ride_id': 2,
'driver_id': 4,
'tip_amount': 1.3,
"tipping_date": fake.date_between(start_date='-30d', end_date='today'),
"tipping_time": fake.time(pattern='%H:%M:%S', end_datetime=None)
}
]
late_arrving_updates_df = spark.createDataFrame(data=[tuple(i.values()) for i in late_arrving_updates],
schema=list(late_arrving_updates[0].keys()))
late_arrving_updates_df.show()
late_arrving_updates_df = spark.createDataFrame(data=[tuple(i.values()) for i in late_arrving_updates],
schema=list(late_arrving_updates[0].keys()))
late_arrving_updates_df.show()
+------+-------+---------+----------+------------+------------+ |tip_id|ride_id|driver_id|tip_amount|tipping_date|tipping_time| +------+-------+---------+----------+------------+------------+ | 3| 2| 4| 1.3| 2023-03-29| 02:12:49| +------+-------+---------+----------+------------+------------+
New tips came in into Tips Hudi Table¶
upsert_hudi_table(
db_name='hudidb',
table_name='tips',
record_id='tip_id',
precomb_key='tipping_date',
spark_df=late_arrving_updates_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/tips
inc_rides_df = rides_helper.read()
inc_tips_df = tips_helper.read()
print("count rides", inc_rides_df.count())
print("count tips", inc_tips_df.count())
******************LOGS****************** meta_data {'last_processed_commit': '20230428100909112', 'table_name': 'rides', 'path': 'file:///C:/tmp/hudidb/rides', 'inserted_time': '2023-04-28 10:10:13.307984'} last_processed_commit : 20230428100909112 *************************************** ******************LOGS****************** meta_data {'last_processed_commit': '20230428100926498', 'table_name': 'tips', 'path': 'file:///C:/tmp/hudidb/tips', 'inserted_time': '2023-04-28 10:10:13.838387'} last_processed_commit : 20230428100926498 *************************************** count rides 0 count tips 1
inc_tips_df.select([
'tip_id',
'ride_id',
'driver_id',
'tip_amount',
'tipping_date',
'tipping_time'
]).show()
+------+-------+---------+----------+------------+------------+ |tip_id|ride_id|driver_id|tip_amount|tipping_date|tipping_time| +------+-------+---------+----------+------------+------------+ | 3| 2| 4| 1.3| 2023-03-29| 02:12:49| +------+-------+---------+----------+------------+------------+
Snapshots¶
inc_rides_df.createOrReplaceTempView("rides")
inc_tips_df.createOrReplaceTempView("tips")
date_dim = 'file:///C:/tmp/hudidb/dim_date'
earning_fact = "file:///C:/tmp/hudidb/driver_earnings"
spark.read.format("hudi").load(date_dim).createOrReplaceTempView("date_dim")
spark.read.format("hudi").load(earning_fact).createOrReplaceTempView("driver_earnings")
Perfect¶
earning_fact_new_df = spark.sql("""
SELECT
de.driver_id,
de.ride_id,
de.fare,
de.total_amount,
COALESCE(t.tip_amount, 0) AS tip_amount,
(de.fare + COALESCE(t.tip_amount, 0)) AS total_amount,
de.earning_date_key
FROM
driver_earnings de
JOIN
tips t ON de.ride_id = t.ride_id
""")
earning_fact_new_df.show()
+---------+-------+-----+------------+----------+------------+----------------+ |driver_id|ride_id| fare|total_amount|tip_amount|total_amount|earning_date_key| +---------+-------+-----+------------+----------+------------+----------------+ | 4| 2|28.86| 28.86| 1.3| 30.16| 20230407| +---------+-------+-----+------------+----------+------------+----------------+
Updating the Tip for Driver in Fact table¶
upsert_hudi_table(
db_name='hudidb',
table_name='tips',
record_id='tip_id',
precomb_key='tipping_date',
spark_df=late_arrving_updates_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/tips
Entire Main logic¶
rides_path = "file:///C:/tmp/hudidb/rides"
tips_path = "file:///C:/tmp/hudidb/tips"
date_path = "file:///C:/tmp/hudidb/dim_date"
rides_helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='rides',
path=rides_path),
spark_session=spark
)
tips_helper = HUDIIncrementalReader(
bucket=bucket,
hudi_settings=HUDISettings(
table_name='tips',
path=tips_path),
spark_session=spark
)
inc_rides_df = rides_helper.read()
inc_tips_df = tips_helper.read()
inc_rides_df.createOrReplaceTempView("rides")
inc_tips_df.createOrReplaceTempView("tips")
spark.read.format("hudi").load(date_dim).createOrReplaceTempView("date_dim")
if inc_rides_df.count() > 0:
earning_fact_df = spark.sql("""
SELECT
r.driver_id,
r.ride_id,
r.fare,
COALESCE(t.tip_amount, 0) AS tip_amount,
(r.fare + COALESCE(t.tip_amount, 0)) AS total_amount,
d.date_key as earning_date_key
FROM
rides r
LEFT JOIN
tips t ON r.ride_id = t.ride_id
JOIN (
SELECT
date_key,
to_date(date_key, 'yyyyMMdd') as date
FROM
date_dim
) d ON to_date(r.ride_date, 'yyyy-MM-dd') = d.date
""")
upsert_hudi_table(
db_name='hudidb',
table_name='driver_earnings',
record_id='driver_id,ride_id',
precomb_key='ride_id',
spark_df=earning_fact_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
if inc_rides_df.count() == 0 and inc_tips_df.count() > 0:
earning_fact = "file:///C:/tmp/hudidb/driver_earnings"
spark.read.format("hudi").load(earning_fact).createOrReplaceTempView("driver_earnings")
earning_fact_new_df = spark.sql("""
SELECT
de.driver_id,
de.ride_id,
de.fare,
de.total_amount,
COALESCE(t.tip_amount, 0) AS tip_amount,
(de.fare + COALESCE(t.tip_amount, 0)) AS total_amount,
de.earning_date_key
FROM
driver_earnings de
JOIN
tips t ON de.ride_id = t.ride_id
""")
upsert_hudi_table(
db_name='hudidb',
table_name='tips',
record_id='tip_id',
precomb_key='tipping_date',
spark_df=late_arrving_updates_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
Special Thanks¶
We would like to express our sincere gratitude to Uber for their insightful article on Uber's Lakehouse Architecture, which we found to be informative and engaging. The article sheds light on the innovative approach Uber has taken to handle their vast amounts of data, providing valuable insights into the world of big data and the challenges that organizations face in managing it effectively.
We greatly appreciate Uber's willingness to share their knowledge and experience in this field, which will undoubtedly benefit professionals and organizations working in data management and analytics. We commend Uber for their dedication to staying at the forefront of technology and innovation, and for their commitment to providing high-quality services to their customers.
Once again, we would like to extend our special thanks to Uber for publishing such a valuable resource, and we look forward to learning more from their future contributions in the field.
No comments:
Post a Comment