Sunday, June 23, 2024

Learn How to Ingest Data from Hudi Incrementally (hudi_table_changes) into Postgres Using Spark

downstream

Learn How to Ingest Data from Hudi Incrementally(hudi_table_changes) into Postgres Using Spark

image.png

Create a Docker Compose files 'docker-compose.yml'

version: "3.7"

services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=postgres

create stored proc

-- Drop procedure if exists to avoid conflicts
DROP PROCEDURE IF EXISTS public.delete_duplicates;

-- Create the procedure
CREATE OR REPLACE PROCEDURE public.delete_duplicates(
    IN p_table_name TEXT,
    IN p_dedup_column TEXT,
    IN p_primary_key TEXT
)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
    -- Check if the table exists
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = p_table_name) THEN
        -- Use CTE to identify duplicates and delete them, keeping the row with the highest _hoodie_commit_time
        EXECUTE FORMAT('
            DELETE FROM %I
            WHERE ctid IN (
                SELECT ctid
                FROM (
                    SELECT ctid,
                           ROW_NUMBER() OVER (PARTITION BY %I ORDER BY %I DESC) AS rn
                    FROM %I
                ) AS sub
                WHERE rn > 1
            )', p_table_name, p_primary_key, p_dedup_column, p_table_name);

        -- Output message
        RAISE NOTICE 'Duplicates deleted from table %', p_table_name;
    ELSE
        -- Table does not exist
        RAISE EXCEPTION 'Table % does not exist', p_table_name;
    END IF;
END;
$BODY$;

-- Grant execute permission to relevant roles (e.g., postgres)
GRANT EXECUTE ON PROCEDURE public.delete_duplicates(TEXT, TEXT, TEXT) TO postgres;

```

Step 1 Creating Hudi tables

In [3]:
try:

    import os
    import sys
    import uuid
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType

except Exception as e:
    pass

Create Spark Session

In [4]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b1121c98-a712-4ddd-8bdb-33624a91f7e9;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in spark-list
	found org.postgresql#postgresql;42.5.4 in spark-list
	found org.checkerframework#checker-qual;3.5.0 in central
:: resolution report :: resolve 82ms :: artifacts dl 5ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from spark-list in [default]
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.5.4 from spark-list in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b1121c98-a712-4ddd-8bdb-33624a91f7e9
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/3ms)
24/06/23 19:31:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
In [11]:
jdbc_properties = {   
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}

Create Hudi tables

In [5]:
# Define the records
records = [
    (1, 'Soumil', 29, 'NYC', '2023-09-28 00:00:00'),
    (2, 'Nitin', 30, 'SFO', '2023-09-28 00:00:00')
]

# Define the schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

df.show()
[Stage 0:>                                                          (0 + 1) / 1]
+---+------+---+----+-------------------+
| id|  name|age|city|          create_ts|
+---+------+---+----+-------------------+
|  1|Soumil| 29| NYC|2023-09-28 00:00:00|
|  2| Nitin| 30| SFO|2023-09-28 00:00:00|
+---+------+---+----+-------------------+

                                                                                
In [6]:
import shutil

try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people")
except Exception as e:pass
In [7]:
def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
    print(path)

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        'hoodie.datasource.hive_sync.partition_fields': partition_fields
    }

    spark_df.write.format("hudi") \
        .options(**hudi_options) \
        .mode("append") \
        .save(path)
In [8]:
write_to_hudi(
    spark_df=df, 
    method="upsert",
    db_name="default",
    table_name="people",
    recordkey="id",
    precombine="create_ts",
    partition_fields="city"
)
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people
24/06/23 19:31:41 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/06/23 19:31:41 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/06/23 19:31:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]

Read from Hudi Incrementally

In [9]:
# Define the path
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people"

# Define the SQL query
sql_query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM 
     hudi_table_changes('{path}', 'latest_state', 'earliest');

"""

# Execute the query
result_df = spark.sql(sql_query)
result_df.show()
+---+------+---+----+-------------------+-------------------+
| id|  name|age|city|          create_ts|_hoodie_commit_time|
+---+------+---+----+-------------------+-------------------+
|  1|Soumil| 29| NYC|2023-09-28 00:00:00|  20240623193141683|
|  2| Nitin| 30| SFO|2023-09-28 00:00:00|  20240623193141683|
+---+------+---+----+-------------------+-------------------+

In [14]:
result_df.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "public.people") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .save()

# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"

# Establish JDBC connection and execute the stored procedure
try:
    connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(jdbc_properties['url'], jdbc_properties["user"], jdbc_properties["password"])
    statement = connection.createStatement()
    statement.execute(sql_command)
    connection.close()
    print("Stored procedure executed successfully.")
except Exception as e:
    print("Error executing stored procedure:", e)
Stored procedure executed successfully.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by py4j.reflection.ReflectionShim (file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/py4j-0.10.9.7.jar) to method java.sql.DriverManager.getConnection(java.lang.String,java.util.Properties,java.lang.Class)
WARNING: Please consider reporting this to the maintainers of py4j.reflection.ReflectionShim
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

New Updatses Coming to Apache Hudi

In [15]:
# Define the records
records = [
    (1, 'update*', 29, 'NYC', '2023-09-28 00:00:00'),
    (3, 'inserts', 29, 'NYC', '2023-09-28 00:00:00'),
]

# Define the schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

df.show()

write_to_hudi(
    spark_df=df, 
    method="upsert",
    db_name="default",
    table_name="people",
    recordkey="id",
    precombine="create_ts",
    partition_fields="city"
)
+---+-------+---+----+-------------------+
| id|   name|age|city|          create_ts|
+---+-------+---+----+-------------------+
|  1|update*| 29| NYC|2023-09-28 00:00:00|
|  3|inserts| 29| NYC|2023-09-28 00:00:00|
+---+-------+---+----+-------------------+

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people

SnapShot of Hudi tables

In [16]:
spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time     
FROM 
    hudi_snapshot1 """

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+---+-------+---+----+-------------------+-------------------+
|id |name   |age|city|create_ts          |_hoodie_commit_time|
+---+-------+---+----+-------------------+-------------------+
|1  |update*|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|3  |inserts|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|2  |Nitin  |30 |SFO |2023-09-28 00:00:00|20240623193141683  |
+---+-------+---+----+-------------------+-------------------+

Incrementally Fetching Deltas

In [18]:
# Define the path
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people"
hudi_commit = "20240623193141683"

# Define the SQL query
sql_query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM 
     hudi_table_changes('{path}', 'latest_state', '{hudi_commit}');

"""

# Execute the query
result_df = spark.sql(sql_query)
result_df.show()
+---+-------+---+----+-------------------+-------------------+
| id|   name|age|city|          create_ts|_hoodie_commit_time|
+---+-------+---+----+-------------------+-------------------+
|  1|update*| 29| NYC|2023-09-28 00:00:00|  20240623193352966|
|  3|inserts| 29| NYC|2023-09-28 00:00:00|  20240623193352966|
+---+-------+---+----+-------------------+-------------------+

In [20]:
jdbc_properties = {   
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}



result_df.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "public.people") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .save()

# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"

# Establish JDBC connection and execute the stored procedure
try:
    connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(jdbc_properties['url'], jdbc_properties["user"], jdbc_properties["password"])
    statement = connection.createStatement()
    statement.execute(sql_command)
    connection.close()
    print("Stored procedure executed successfully.")
except Exception as e:
    print("Error executing stored procedure:", e)
Stored procedure executed successfully.

Final SnapShot Hudi tables and Postgres

In [21]:
spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time     
FROM 
    hudi_snapshot1 """

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+---+-------+---+----+-------------------+-------------------+
|id |name   |age|city|create_ts          |_hoodie_commit_time|
+---+-------+---+----+-------------------+-------------------+
|1  |update*|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|3  |inserts|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|2  |Nitin  |30 |SFO |2023-09-28 00:00:00|20240623193141683  |
+---+-------+---+----+-------------------+-------------------+

image.png

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...