4 Different Ways to fetch Apache Hudi Commit time in Python and PySpark¶
In [2]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Method 1: Stored Proc¶
In [3]:
# Define the table and limit
table = "default.invoices"
limit = 100
# Construct the query
query = f"CALL show_commits(table => '{table}', limit => {limit});"
# Execute the query and store the results in a DataFrame
commit_time_array = spark.sql(query).select("commit_time").rdd.flatMap(lambda x: x).collect()
# Show the commit_time array
print(commit_time_array)
Method 2: Java Classes via Py4J: Direct access to Hudi's internal metadata.¶
In [4]:
from py4j.java_gateway import java_import
java_import(sc._jvm, "org.apache.hudi.common.table.HoodieTableMetaClient")
java_import(sc._jvm, "org.apache.hudi.common.table.timeline.HoodieTimeline")
# Define the base path for the Hudi table
base_path = "s3://<BUCKET>/xmldata/silver/table_name=invoices/"
# Create HoodieTableMetaClient
meta_client = sc._jvm.HoodieTableMetaClient.builder().setBasePath(base_path).setConf(sc._jsc.hadoopConfiguration()).build()
# Get completed instants
completed_instants = meta_client.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants().toArray()
# Extract commit times from completed instants
commit_times = [instant.getTimestamp() for instant in completed_instants]
# Print the commit times
print(commit_times)
Method 3: Reading from Hudi Path: Using Spark SQL to extract commit times from the Hudi table.¶
In [5]:
path = "s3://<BUCKET>/xmldata/silver/table_name=invoices/"
limit = 100
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime asc").limit(limit).collect()))
commits
Method 4: Using boto3: Interacting with AWS S3 directly to fetch and parse commit metadata.¶
In [6]:
import boto3
# Initialize a boto3 client for S3
s3_client = boto3.client('s3')
# Define the bucket name and base path
bucket_name = '<BUCKET>'
base_path = 'xmldata/silver/table_name=invoices/'
# Function to list objects in a given S3 path
def list_s3_objects(bucket, prefix):
objects = []
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
if 'Contents' in page:
objects.extend(page['Contents'])
return objects
# Function to extract commit times from the objects
def extract_commit_times(objects):
commit_times = []
for obj in objects:
key = obj['Key']
if key.endswith('.commit'):
# Extract commit timestamp from the file name
commit_time = key.split('/')[-1].replace('.commit', '')
commit_times.append(commit_time)
return commit_times
# List objects in the Hudi commit directory
commit_dir = f"{base_path}.hoodie"
objects = list_s3_objects(bucket_name, commit_dir)
# Extract commit times from the listed objects
commit_times = extract_commit_times(objects)
# Print the commit times
print(commit_times)
In [ ]:
No comments:
Post a Comment