Sunday, June 23, 2024

Learn How to Ingest Data from Hudi Incrementally (hudi_table_changes) into Postgres Using Spark

downstream

Learn How to Ingest Data from Hudi Incrementally(hudi_table_changes) into Postgres Using Spark

image.png

Create a Docker Compose files 'docker-compose.yml'

version: "3.7"

services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=postgres

create stored proc

-- Drop procedure if exists to avoid conflicts
DROP PROCEDURE IF EXISTS public.delete_duplicates;

-- Create the procedure
CREATE OR REPLACE PROCEDURE public.delete_duplicates(
    IN p_table_name TEXT,
    IN p_dedup_column TEXT,
    IN p_primary_key TEXT
)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
    -- Check if the table exists
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = p_table_name) THEN
        -- Use CTE to identify duplicates and delete them, keeping the row with the highest _hoodie_commit_time
        EXECUTE FORMAT('
            DELETE FROM %I
            WHERE ctid IN (
                SELECT ctid
                FROM (
                    SELECT ctid,
                           ROW_NUMBER() OVER (PARTITION BY %I ORDER BY %I DESC) AS rn
                    FROM %I
                ) AS sub
                WHERE rn > 1
            )', p_table_name, p_primary_key, p_dedup_column, p_table_name);

        -- Output message
        RAISE NOTICE 'Duplicates deleted from table %', p_table_name;
    ELSE
        -- Table does not exist
        RAISE EXCEPTION 'Table % does not exist', p_table_name;
    END IF;
END;
$BODY$;

-- Grant execute permission to relevant roles (e.g., postgres)
GRANT EXECUTE ON PROCEDURE public.delete_duplicates(TEXT, TEXT, TEXT) TO postgres;

```

Step 1 Creating Hudi tables

In [3]:
try:

    import os
    import sys
    import uuid
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType

except Exception as e:
    pass

Create Spark Session

In [4]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b1121c98-a712-4ddd-8bdb-33624a91f7e9;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in spark-list
	found org.postgresql#postgresql;42.5.4 in spark-list
	found org.checkerframework#checker-qual;3.5.0 in central
:: resolution report :: resolve 82ms :: artifacts dl 5ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from spark-list in [default]
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.5.4 from spark-list in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-b1121c98-a712-4ddd-8bdb-33624a91f7e9
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/3ms)
24/06/23 19:31:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
In [11]:
jdbc_properties = {   
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}

Create Hudi tables

In [5]:
# Define the records
records = [
    (1, 'Soumil', 29, 'NYC', '2023-09-28 00:00:00'),
    (2, 'Nitin', 30, 'SFO', '2023-09-28 00:00:00')
]

# Define the schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

df.show()
[Stage 0:>                                                          (0 + 1) / 1]
+---+------+---+----+-------------------+
| id|  name|age|city|          create_ts|
+---+------+---+----+-------------------+
|  1|Soumil| 29| NYC|2023-09-28 00:00:00|
|  2| Nitin| 30| SFO|2023-09-28 00:00:00|
+---+------+---+----+-------------------+

                                                                                
In [6]:
import shutil

try:shutil.rmtree("/Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people")
except Exception as e:pass
In [7]:
def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
    print(path)

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        'hoodie.datasource.hive_sync.partition_fields': partition_fields
    }

    spark_df.write.format("hudi") \
        .options(**hudi_options) \
        .mode("append") \
        .save(path)
In [8]:
write_to_hudi(
    spark_df=df, 
    method="upsert",
    db_name="default",
    table_name="people",
    recordkey="id",
    precombine="create_ts",
    partition_fields="city"
)
file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people
24/06/23 19:31:41 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/06/23 19:31:41 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/06/23 19:31:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]

Read from Hudi Incrementally

In [9]:
# Define the path
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people"

# Define the SQL query
sql_query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM 
     hudi_table_changes('{path}', 'latest_state', 'earliest');

"""

# Execute the query
result_df = spark.sql(sql_query)
result_df.show()
+---+------+---+----+-------------------+-------------------+
| id|  name|age|city|          create_ts|_hoodie_commit_time|
+---+------+---+----+-------------------+-------------------+
|  1|Soumil| 29| NYC|2023-09-28 00:00:00|  20240623193141683|
|  2| Nitin| 30| SFO|2023-09-28 00:00:00|  20240623193141683|
+---+------+---+----+-------------------+-------------------+

In [14]:
result_df.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "public.people") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .save()

# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"

# Establish JDBC connection and execute the stored procedure
try:
    connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(jdbc_properties['url'], jdbc_properties["user"], jdbc_properties["password"])
    statement = connection.createStatement()
    statement.execute(sql_command)
    connection.close()
    print("Stored procedure executed successfully.")
except Exception as e:
    print("Error executing stored procedure:", e)
Stored procedure executed successfully.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by py4j.reflection.ReflectionShim (file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/py4j-0.10.9.7.jar) to method java.sql.DriverManager.getConnection(java.lang.String,java.util.Properties,java.lang.Class)
WARNING: Please consider reporting this to the maintainers of py4j.reflection.ReflectionShim
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

New Updatses Coming to Apache Hudi

In [15]:
# Define the records
records = [
    (1, 'update*', 29, 'NYC', '2023-09-28 00:00:00'),
    (3, 'inserts', 29, 'NYC', '2023-09-28 00:00:00'),
]

# Define the schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("create_ts", StringType(), True)
])

# Create a DataFrame
df = spark.createDataFrame(records, schema)

df.show()

write_to_hudi(
    spark_df=df, 
    method="upsert",
    db_name="default",
    table_name="people",
    recordkey="id",
    precombine="create_ts",
    partition_fields="city"
)
+---+-------+---+----+-------------------+
| id|   name|age|city|          create_ts|
+---+-------+---+----+-------------------+
|  1|update*| 29| NYC|2023-09-28 00:00:00|
|  3|inserts| 29| NYC|2023-09-28 00:00:00|
+---+-------+---+----+-------------------+

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people

SnapShot of Hudi tables

In [16]:
spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time     
FROM 
    hudi_snapshot1 """

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+---+-------+---+----+-------------------+-------------------+
|id |name   |age|city|create_ts          |_hoodie_commit_time|
+---+-------+---+----+-------------------+-------------------+
|1  |update*|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|3  |inserts|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|2  |Nitin  |30 |SFO |2023-09-28 00:00:00|20240623193141683  |
+---+-------+---+----+-------------------+-------------------+

Incrementally Fetching Deltas

In [18]:
# Define the path
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=people"
hudi_commit = "20240623193141683"

# Define the SQL query
sql_query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time
FROM 
     hudi_table_changes('{path}', 'latest_state', '{hudi_commit}');

"""

# Execute the query
result_df = spark.sql(sql_query)
result_df.show()
+---+-------+---+----+-------------------+-------------------+
| id|   name|age|city|          create_ts|_hoodie_commit_time|
+---+-------+---+----+-------------------+-------------------+
|  1|update*| 29| NYC|2023-09-28 00:00:00|  20240623193352966|
|  3|inserts| 29| NYC|2023-09-28 00:00:00|  20240623193352966|
+---+-------+---+----+-------------------+-------------------+

In [20]:
jdbc_properties = {   
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}



result_df.write \
    .mode('append') \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "public.people") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .save()

# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"

# Establish JDBC connection and execute the stored procedure
try:
    connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(jdbc_properties['url'], jdbc_properties["user"], jdbc_properties["password"])
    statement = connection.createStatement()
    statement.execute(sql_command)
    connection.close()
    print("Stored procedure executed successfully.")
except Exception as e:
    print("Error executing stored procedure:", e)
Stored procedure executed successfully.

Final SnapShot Hudi tables and Postgres

In [21]:
spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

query = f"""
SELECT 
    CAST(id AS INT) AS id,
    name,
    CAST(age AS INT) AS age,
    city,
    CAST(create_ts AS TIMESTAMP) AS create_ts,
    CAST(_hoodie_commit_time AS BIGINT) AS _hoodie_commit_time     
FROM 
    hudi_snapshot1 """

result = spark.sql(query)
result.show(n=result.count(), truncate=False)
+---+-------+---+----+-------------------+-------------------+
|id |name   |age|city|create_ts          |_hoodie_commit_time|
+---+-------+---+----+-------------------+-------------------+
|1  |update*|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|3  |inserts|29 |NYC |2023-09-28 00:00:00|20240623193352966  |
|2  |Nitin  |30 |SFO |2023-09-28 00:00:00|20240623193141683  |
+---+-------+---+----+-------------------+-------------------+

image.png

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...