Hudi Time Travel Query in Action¶
Define Imports¶
In [28]:
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
Create Spark Session¶
In [29]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
Helper Methods¶
In [10]:
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='BLOOM'
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.keep.min.commits": "21",
"hoodie.cleaner.commits.retained": "20",
"hoodie.keep.max.commits": "28",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
In [30]:
import shutil
try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages")
except Exception as e:pass
Schema¶
In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from datetime import datetime
# Define schema for job data
schema = StructType([
StructField("job_id", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("site", StringType(), True),
StructField("clicks", IntegerType(), True),
StructField("cost_per_click", FloatType(), True),
StructField("bid", FloatType(), True),
StructField("spend", FloatType(), True)
])
# Generate mock job data for job_id "12345"
mock_job_data = [
("12345", datetime.now(), "LinkedIn", 50, 0.25, 0.5, 12.5),
("12345", datetime.now(), "Indeed", 30, 0.15, 0.3, 4.5),
("12345", datetime.now(), "Glassdoor", 70, 0.35, 0.7, 24.5),
("12345", datetime.now(), "Indeed", 40, 0.2, 0.4, 8.0),
("12345", datetime.now(), "Indeed", 60, 0.3, 0.6, 18.0)
]
# Iterate through the mock_job_data array and write each item to Hudi
for job_data in mock_job_data:
# Create DataFrame for current job data item
job_df = spark.createDataFrame([job_data], schema)
job_df.show()
write_to_hudi(
spark_df=job_df,
method="upsert",
db_name="default",
table_name="job_posting",
recordkey="job_id,site",
precombine="timestamp"
)
SnapShot of table¶
In [50]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"SELECT _hoodie_commit_time,job_id,site,cost_per_click,bid,spend FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Lets Focus on Indeed as Example¶
In [60]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"SELECT _hoodie_commit_time,job_id,site,cost_per_click,bid,spend FROM hudi_snapshot1 where site='Indeed' "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Time Travel¶
In [ ]:
# 20240615105838555
In [79]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"
spark.read.format("hudi") \
.option("as.of.instant", "20240615105840526") \
.load(path) \
.createOrReplaceTempView("time_travel")
query = f"""
SELECT
_hoodie_commit_time,job_id,site,cost_per_click,bid,spend
FROM
time_travel
where site='Indeed'
"""
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Time Travels¶
In [86]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"
# Load data for the first timestamp
spark.read.format("hudi") \
.option("as.of.instant", "20240615105840526") \
.load(path) \
.createOrReplaceTempView("time_travel_1")
# Load data for the second timestamp
spark.read.format("hudi") \
.option("as.of.instant", "20240615105838555") \
.load(path) \
.createOrReplaceTempView("time_travel_2")
# Query data for both timestamps
query = """
SELECT
_hoodie_commit_time, job_id, site, cost_per_click, bid, spend
FROM
time_travel_1
WHERE
site = 'Indeed'
UNION ALL
SELECT
_hoodie_commit_time, job_id, site, cost_per_click, bid, spend
FROM
time_travel_2
WHERE
site = 'Indeed'
"""
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
No comments:
Post a Comment