Learn How to Ingest Data from Hudi Incrementally(hudi_table_changes) into Postgres Using Spark¶
Create a Docker Compose files 'docker-compose.yml'¶
version: "3.7"
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
create stored proc¶
-- Drop procedure if exists to avoid conflicts
DROP PROCEDURE IF EXISTS public.delete_duplicates;
-- Create the procedure
CREATE OR REPLACE PROCEDURE public.delete_duplicates(
IN p_table_name TEXT,
IN p_dedup_column TEXT,
IN p_primary_key TEXT
)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
-- Check if the table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = p_table_name) THEN
-- Use CTE to identify duplicates and delete them, keeping the row with the highest _hoodie_commit_time
EXECUTE FORMAT('
DELETE FROM %I
WHERE ctid IN (
SELECT ctid
FROM (
SELECT ctid,
ROW_NUMBER() OVER (PARTITION BY %I ORDER BY %I DESC) AS rn
FROM %I
) AS sub
WHERE rn > 1
)', p_table_name, p_primary_key, p_dedup_column, p_table_name);
-- Output message
RAISE NOTICE 'Duplicates deleted from table %', p_table_name;
ELSE
-- Table does not exist
RAISE EXCEPTION 'Table % does not exist', p_table_name;
END IF;
END;
$BODY$;
-- Grant execute permission to relevant roles (e.g., postgres)
GRANT EXECUTE ON PROCEDURE public.delete_duplicates(TEXT, TEXT, TEXT) TO postgres;
```
Step 1 Creating Hudi tables¶
In [3]:
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
except Exception as e:
pass
Create Spark Session¶
In [4]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
In [11]:
jdbc_properties = {
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}
Create Hudi tables¶
In [5]:
# Define the records
records = [
(1, 'Soumil', 29, 'NYC', '2023-09-28 00:00:00'),
(2, 'Nitin', 30, 'SFO', '2023-09-28 00:00:00')
]
# Define the schema
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True),
StructField("create_ts", StringType(), True)
])
# Create a DataFrame
df = spark.createDataFrame(records, schema)
df.show()
In [6]:
import shutil
try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people")
except Exception as e:pass
In [7]:
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='BLOOM'
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
print(path)
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
'hoodie.datasource.hive_sync.partition_fields': partition_fields
}
spark_df.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(path)
In [8]:
write_to_hudi(
spark_df=df,
method="upsert",
db_name="default",
table_name="people",
recordkey="id",
precombine="create_ts",
partition_fields="city"
)
Read from Hudi Incrementally¶
In [9]:
# Define the path
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people"
# Define the SQL query
sql_query = f"""
SELECT
CAST(id AS INT) AS id,
name,
CAST(age AS INT) AS age,
city,
CAST(create_ts AS TIMESTAMP) AS create_ts,
CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM
hudi_table_changes('{path}', 'latest_state', 'earliest');
"""
# Execute the query
result_df = spark.sql(sql_query)
result_df.show()
In [14]:
result_df.write \
.mode('append') \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/postgres") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "public.people") \
.option("user", "postgres") \
.option("password", "postgres") \
.save()
# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"
# Establish JDBC connection and execute the stored procedure
try:
connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(jdbc_properties['url'], jdbc_properties["user"], jdbc_properties["password"])
statement = connection.createStatement()
statement.execute(sql_command)
connection.close()
print("Stored procedure executed successfully.")
except Exception as e:
print("Error executing stored procedure:", e)
New Updatses Coming to Apache Hudi¶
In [15]:
# Define the records
records = [
(1, 'update*', 29, 'NYC', '2023-09-28 00:00:00'),
(3, 'inserts', 29, 'NYC', '2023-09-28 00:00:00'),
]
# Define the schema
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True),
StructField("create_ts", StringType(), True)
])
# Create a DataFrame
df = spark.createDataFrame(records, schema)
df.show()
write_to_hudi(
spark_df=df,
method="upsert",
db_name="default",
table_name="people",
recordkey="id",
precombine="create_ts",
partition_fields="city"
)
SnapShot of Hudi tables¶
In [16]:
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"""
SELECT
CAST(id AS INT) AS id,
name,
CAST(age AS INT) AS age,
city,
CAST(create_ts AS TIMESTAMP) AS create_ts,
CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM
hudi_snapshot1 """
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
Incrementally Fetching Deltas¶
In [18]:
# Define the path
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people"
hudi_commit = "20240623193141683"
# Define the SQL query
sql_query = f"""
SELECT
CAST(id AS INT) AS id,
name,
CAST(age AS INT) AS age,
city,
CAST(create_ts AS TIMESTAMP) AS create_ts,
CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM
hudi_table_changes('{path}', 'latest_state', '{hudi_commit}');
"""
# Execute the query
result_df = spark.sql(sql_query)
result_df.show()
In [20]:
jdbc_properties = {
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}
result_df.write \
.mode('append') \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/postgres") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "public.people") \
.option("user", "postgres") \
.option("password", "postgres") \
.save()
# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"
# Establish JDBC connection and execute the stored procedure
try:
connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(jdbc_properties['url'], jdbc_properties["user"], jdbc_properties["password"])
statement = connection.createStatement()
statement.execute(sql_command)
connection.close()
print("Stored procedure executed successfully.")
except Exception as e:
print("Error executing stored procedure:", e)
Final SnapShot Hudi tables and Postgres¶
In [21]:
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
query = f"""
SELECT
CAST(id AS INT) AS id,
name,
CAST(age AS INT) AS age,
city,
CAST(create_ts AS TIMESTAMP) AS create_ts,
CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM
hudi_snapshot1 """
result = spark.sql(query)
result.show(n=result.count(), truncate=False)
No comments:
Post a Comment