Thursday, June 20, 2024

4 Different Ways to fetch Apache Hudi Commit time in Python and PySpark

Untitled

4 Different Ways to fetch Apache Hudi Commit time in Python and PySpark

In [2]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
4NonepysparkidleNone
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
4NonepysparkidleNone

Method 1: Stored Proc

In [3]:
# Define the table and limit
table = "default.invoices"
limit = 100

# Construct the query
query = f"CALL show_commits(table => '{table}', limit => {limit});"

# Execute the query and store the results in a DataFrame
commit_time_array = spark.sql(query).select("commit_time").rdd.flatMap(lambda x: x).collect()


# Show the commit_time array
print(commit_time_array)
['20240620203954176', '20240619134307860']

Method 2: Java Classes via Py4J: Direct access to Hudi's internal metadata.

In [4]:
from py4j.java_gateway import java_import
java_import(sc._jvm, "org.apache.hudi.common.table.HoodieTableMetaClient")
java_import(sc._jvm, "org.apache.hudi.common.table.timeline.HoodieTimeline")

# Define the base path for the Hudi table
base_path = "s3://<BUCKET>/xmldata/silver/table_name=invoices/"

# Create HoodieTableMetaClient
meta_client = sc._jvm.HoodieTableMetaClient.builder().setBasePath(base_path).setConf(sc._jsc.hadoopConfiguration()).build()

# Get completed instants
completed_instants = meta_client.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants().toArray()

# Extract commit times from completed instants
commit_times = [instant.getTimestamp() for instant in completed_instants]

# Print the commit times
print(commit_times)
['20240619134307860', '20240620203954176']

Method 3: Reading from Hudi Path: Using Spark SQL to extract commit times from the Hudi table.

In [5]:
path = "s3://<BUCKET>/xmldata/silver/table_name=invoices/"
limit = 100

spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")

commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime asc").limit(limit).collect()))
commits
['20240620203954176']

Method 4: Using boto3: Interacting with AWS S3 directly to fetch and parse commit metadata.

In [6]:
import boto3

# Initialize a boto3 client for S3
s3_client = boto3.client('s3')

# Define the bucket name and base path
bucket_name = '<BUCKET>'
base_path = 'xmldata/silver/table_name=invoices/'

# Function to list objects in a given S3 path
def list_s3_objects(bucket, prefix):
    objects = []
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if 'Contents' in page:
            objects.extend(page['Contents'])
    return objects

# Function to extract commit times from the objects
def extract_commit_times(objects):
    commit_times = []
    for obj in objects:
        key = obj['Key']
        if key.endswith('.commit'):
            # Extract commit timestamp from the file name
            commit_time = key.split('/')[-1].replace('.commit', '')
            commit_times.append(commit_time)
    return commit_times

# List objects in the Hudi commit directory
commit_dir = f"{base_path}.hoodie"
objects = list_s3_objects(bucket_name, commit_dir)

# Extract commit times from the listed objects
commit_times = extract_commit_times(objects)

# Print the commit times
print(commit_times)
['20240619134307860', '20240620203954176']
In [ ]:

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...