Apache Hudi 0.14 Announces hudi_table_changes: Making CDC and Incremental Queries Easier¶
Step 1 Define Imports¶
In [26]:
try:
import os
import sys
import uuid
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
print("Imports loaded ")
except Exception as e:
print("error",e)
Imports loaded
Step 2: Create Spark Session¶
In [27]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
Define Hudi Settings¶
In [28]:
db_name = "hudidb"
table_name = "sample_table"
path = f"file:///Users/soumilnitinshah/Downloads/{db_name}/{table_name}" # Updated path
method = 'upsert'
table_type = "COPY_ON_WRITE"
recordkey = "emp_id"
precombine = "emp_id"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
'hoodie.metadata.record.index.enable': 'true',
'hoodie.index.type':'RECORD_INDEX'
}
In [33]:
# Define the data for the two records with only "emp_id" and "employee_name" columns
data = [
{"emp_id": 1, "employee_name": "John Doe"},
{"emp_id": 2, "employee_name": "Jane Smith"}
]
# Define the schema for the DataFrame with "emp_id" and "employee_name" columns
columns = ["emp_id", "employee_name"]
# Create the Spark DataFrame
spark_df = spark.createDataFrame(data=data, schema=columns)
# Show the DataFrame
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
+------+-------------+ |emp_id|employee_name| +------+-------------+ | 1| John Doe| | 2| Jane Smith| +------+-------------+
23/10/07 18:42:50 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records 23/10/07 18:42:54 WARN DAGScheduler: Broadcasting large task binary with size 1110.4 KiB 23/10/07 18:42:54 WARN DAGScheduler: Broadcasting large task binary with size 1210.7 KiB
Read the Data From Hudi Tables¶
In [34]:
db_name = "hudidb"
table_name = "sample_table"
path = f"file:///Users/soumilnitinshah/Downloads/{db_name}/{table_name}" # Updated path
sql_query = f"""
SELECT
emp_id,
employee_name,
_hoodie_commit_time
FROM
hudi_table_changes('{path}', 'latest_state', 'earliest');"""
spark.read.format("org.apache.hudi").load(path).createOrReplaceTempView("hudi_snapshot")
result_df = spark.sql(sql_query)
result_df.show()
+------+-------------+-------------------+ |emp_id|employee_name|_hoodie_commit_time| +------+-------------+-------------------+ | 1| John Doe| 20231007184248212| | 2| Jane Smith| 20231007184248212| +------+-------------+-------------------+
Inserting 2 More records¶
In [35]:
# Define the data for the two records with only "emp_id" and "employee_name" columns
data = [
{"emp_id": 3, "employee_name": "Soumil"},
{"emp_id": 4, "employee_name": "Nitin"}
]
# Define the schema for the DataFrame with "emp_id" and "employee_name" columns
columns = ["emp_id", "employee_name"]
# Create the Spark DataFrame
spark_df = spark.createDataFrame(data=data, schema=columns)
# Show the DataFrame
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
+------+-------------+ |emp_id|employee_name| +------+-------------+ | 3| Soumil| | 4| Nitin| +------+-------------+
23/10/07 18:43:56 WARN DAGScheduler: Broadcasting large task binary with size 1117.1 KiB 23/10/07 18:43:56 WARN DAGScheduler: Broadcasting large task binary with size 1217.4 KiB
Reading From Hudi Dataset¶
In [37]:
db_name = "hudidb"
table_name = "sample_table"
path = f"file:///Users/soumilnitinshah/Downloads/{db_name}/{table_name}" # Updated path
sql_query = f"""
SELECT
emp_id,
employee_name,
_hoodie_commit_time
FROM
hudi_table_changes('{path}', 'latest_state', '20231007184248212');
"""
spark.read.format("org.apache.hudi").load(path).createOrReplaceTempView("hudi_snapshot")
result_df = spark.sql(sql_query)
result_df.show()
+------+-------------+-------------------+ |emp_id|employee_name|_hoodie_commit_time| +------+-------------+-------------------+ | 3| Soumil| 20231007184352682| | 4| Nitin| 20231007184352682| +------+-------------+-------------------+
No comments:
Post a Comment