Thursday, June 20, 2024

4 Different Ways to fetch Apache Hudi Commit time in Python and PySpark

Untitled

4 Different Ways to fetch Apache Hudi Commit time in Python and PySpark

In [2]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
4NonepysparkidleNone
SparkSession available as 'spark'.
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
4NonepysparkidleNone

Method 1: Stored Proc

In [3]:
# Define the table and limit
table = "default.invoices"
limit = 100

# Construct the query
query = f"CALL show_commits(table => '{table}', limit => {limit});"

# Execute the query and store the results in a DataFrame
commit_time_array = spark.sql(query).select("commit_time").rdd.flatMap(lambda x: x).collect()


# Show the commit_time array
print(commit_time_array)
['20240620203954176', '20240619134307860']

Method 2: Java Classes via Py4J: Direct access to Hudi's internal metadata.

In [4]:
from py4j.java_gateway import java_import
java_import(sc._jvm, "org.apache.hudi.common.table.HoodieTableMetaClient")
java_import(sc._jvm, "org.apache.hudi.common.table.timeline.HoodieTimeline")

# Define the base path for the Hudi table
base_path = "s3://<BUCKET>/xmldata/silver/table_name=invoices/"

# Create HoodieTableMetaClient
meta_client = sc._jvm.HoodieTableMetaClient.builder().setBasePath(base_path).setConf(sc._jsc.hadoopConfiguration()).build()

# Get completed instants
completed_instants = meta_client.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants().toArray()

# Extract commit times from completed instants
commit_times = [instant.getTimestamp() for instant in completed_instants]

# Print the commit times
print(commit_times)
['20240619134307860', '20240620203954176']

Method 3: Reading from Hudi Path: Using Spark SQL to extract commit times from the Hudi table.

In [5]:
path = "s3://<BUCKET>/xmldata/silver/table_name=invoices/"
limit = 100

spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")

commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime asc").limit(limit).collect()))
commits
['20240620203954176']

Method 4: Using boto3: Interacting with AWS S3 directly to fetch and parse commit metadata.

In [6]:
import boto3

# Initialize a boto3 client for S3
s3_client = boto3.client('s3')

# Define the bucket name and base path
bucket_name = '<BUCKET>'
base_path = 'xmldata/silver/table_name=invoices/'

# Function to list objects in a given S3 path
def list_s3_objects(bucket, prefix):
    objects = []
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if 'Contents' in page:
            objects.extend(page['Contents'])
    return objects

# Function to extract commit times from the objects
def extract_commit_times(objects):
    commit_times = []
    for obj in objects:
        key = obj['Key']
        if key.endswith('.commit'):
            # Extract commit timestamp from the file name
            commit_time = key.split('/')[-1].replace('.commit', '')
            commit_times.append(commit_time)
    return commit_times

# List objects in the Hudi commit directory
commit_dir = f"{base_path}.hoodie"
objects = list_s3_objects(bucket_name, commit_dir)

# Extract commit times from the listed objects
commit_times = extract_commit_times(objects)

# Print the commit times
print(commit_times)
['20240619134307860', '20240620203954176']
In [ ]:

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...