Saturday, June 15, 2024

Hudi Time Travel in Action

hudi time travel

Hudi Time Travel Query in Action

Define Imports

In [28]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)
Imports loaded 

Create Spark Session

In [29]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

Helper Methods

In [10]:
def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        
        "hoodie.keep.min.commits": "21",
        "hoodie.cleaner.commits.retained": "20",
        "hoodie.keep.max.commits": "28",


        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.cleaner.policy.failed.writes": "LAZY",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
    }
 

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [30]:
import shutil

try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=messages")
except Exception as e:pass

Schema

In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from datetime import datetime

# Define schema for job data
schema = StructType([
    StructField("job_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("site", StringType(), True),
    StructField("clicks", IntegerType(), True),
    StructField("cost_per_click", FloatType(), True),
    StructField("bid", FloatType(), True),
    StructField("spend", FloatType(), True)
])

# Generate mock job data for job_id "12345"
mock_job_data = [
    ("12345", datetime.now(), "LinkedIn", 50, 0.25, 0.5, 12.5),
    ("12345", datetime.now(), "Indeed", 30, 0.15, 0.3, 4.5),
    ("12345", datetime.now(), "Glassdoor", 70, 0.35, 0.7, 24.5),
    ("12345", datetime.now(), "Indeed", 40, 0.2, 0.4, 8.0), 
    ("12345", datetime.now(), "Indeed", 60, 0.3, 0.6, 18.0)  
]


# Iterate through the mock_job_data array and write each item to Hudi
for job_data in mock_job_data:
    # Create DataFrame for current job data item
    job_df = spark.createDataFrame([job_data], schema)
    job_df.show()
    
    write_to_hudi(
        spark_df=job_df, 
        method="upsert",
        db_name="default",
        table_name="job_posting",
        recordkey="job_id,site",
        precombine="timestamp"
    )
+------+--------------------+--------+------+--------------+---+-----+
|job_id|           timestamp|    site|clicks|cost_per_click|bid|spend|
+------+--------------------+--------+------+--------------+---+-----+
| 12345|2024-06-15 10:58:...|LinkedIn|    50|          0.25|0.5| 12.5|
+------+--------------------+--------+------+--------------+---+-----+

+------+--------------------+------+------+--------------+---+-----+
|job_id|           timestamp|  site|clicks|cost_per_click|bid|spend|
+------+--------------------+------+------+--------------+---+-----+
| 12345|2024-06-15 10:58:...|Indeed|    30|          0.15|0.3|  4.5|
+------+--------------------+------+------+--------------+---+-----+

+------+--------------------+---------+------+--------------+---+-----+
|job_id|           timestamp|     site|clicks|cost_per_click|bid|spend|
+------+--------------------+---------+------+--------------+---+-----+
| 12345|2024-06-15 10:58:...|Glassdoor|    70|          0.35|0.7| 24.5|
+------+--------------------+---------+------+--------------+---+-----+

+------+--------------------+------+------+--------------+---+-----+
|job_id|           timestamp|  site|clicks|cost_per_click|bid|spend|
+------+--------------------+------+------+--------------+---+-----+
| 12345|2024-06-15 10:58:...|Indeed|    40|           0.2|0.4|  8.0|
+------+--------------------+------+------+--------------+---+-----+

+------+--------------------+------+------+--------------+---+-----+
|job_id|           timestamp|  site|clicks|cost_per_click|bid|spend|
+------+--------------------+------+------+--------------+---+-----+
| 12345|2024-06-15 10:58:...|Indeed|    60|           0.3|0.6| 18.0|
+------+--------------------+------+------+--------------+---+-----+

SnapShot of table

In [50]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"


spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"SELECT _hoodie_commit_time,job_id,site,cost_per_click,bid,spend  FROM hudi_snapshot1 "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
SELECT _hoodie_commit_time,job_id,site,cost_per_click,bid,spend  FROM hudi_snapshot1 
+-------------------+------+---------+--------------+---+-----+
|_hoodie_commit_time|job_id|site     |cost_per_click|bid|spend|
+-------------------+------+---------+--------------+---+-----+
|20240615104438804  |12345 |LinkedIn |0.2           |0.4|8.0  |
|20240615104441027  |12345 |Indeed   |0.3           |0.6|18.0 |
|20240615104436480  |12345 |Glassdoor|0.35          |0.7|24.5 |
+-------------------+------+---------+--------------+---+-----+

Lets Focus on Indeed as Example

In [60]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"


spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"SELECT _hoodie_commit_time,job_id,site,cost_per_click,bid,spend  FROM hudi_snapshot1 where site='Indeed' "
print(query)
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
SELECT _hoodie_commit_time,job_id,site,cost_per_click,bid,spend  FROM hudi_snapshot1 where site='Indeed' 
+-------------------+------+------+--------------+---+-----+
|_hoodie_commit_time|job_id|site  |cost_per_click|bid|spend|
+-------------------+------+------+--------------+---+-----+
|20240615105840527  |12345 |Indeed|0.3           |0.6|18.0 |
+-------------------+------+------+--------------+---+-----+

Time Travel

In [ ]:
# 20240615105838555
In [79]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"


spark.read.format("hudi") \
    .option("as.of.instant", "20240615105840526") \
    .load(path) \
    .createOrReplaceTempView("time_travel")



query = f"""
    SELECT 
    _hoodie_commit_time,job_id,site,cost_per_click,bid,spend  
FROM 
    time_travel 
where site='Indeed'

"""

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+-------------------+------+------+--------------+---+-----+
|_hoodie_commit_time|job_id|site  |cost_per_click|bid|spend|
+-------------------+------+------+--------------+---+-----+
|20240615105834896  |12345 |Indeed|0.15          |0.3|4.5  |
+-------------------+------+------+--------------+---+-----+

Time Travels

In [86]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=job_posting"

# Load data for the first timestamp
spark.read.format("hudi") \
    .option("as.of.instant", "20240615105840526") \
    .load(path) \
    .createOrReplaceTempView("time_travel_1")

# Load data for the second timestamp
spark.read.format("hudi") \
    .option("as.of.instant", "20240615105838555") \
    .load(path) \
    .createOrReplaceTempView("time_travel_2")

# Query data for both timestamps
query = """
    SELECT 
        _hoodie_commit_time, job_id, site, cost_per_click, bid, spend  
    FROM 
        time_travel_1 
    WHERE 
        site = 'Indeed'
    
    UNION ALL
    
    SELECT 
        _hoodie_commit_time, job_id, site, cost_per_click, bid, spend  
    FROM 
        time_travel_2 
    WHERE 
        site = 'Indeed'
"""

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+-------------------+------+------+--------------+---+-----+
|_hoodie_commit_time|job_id|site  |cost_per_click|bid|spend|
+-------------------+------+------+--------------+---+-----+
|20240615105838557  |12345 |Indeed|0.2           |0.4|8.0  |
|20240615105834896  |12345 |Indeed|0.15          |0.3|4.5  |
+-------------------+------+------+--------------+---+-----+

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...