Tuesday, June 25, 2024

How to Execute Postgres Stored procedures in Spark | Hands on Guide

storedproc

How to Execute Postgres Stored procedures in Spark | Hands on Guide

Labs

docker-compose.yaml

version: "3.7"

services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=postgres

create stored proc

-- Drop procedure if exists to avoid conflicts
DROP PROCEDURE IF EXISTS public.delete_duplicates;

-- Create the procedure
CREATE OR REPLACE PROCEDURE public.delete_duplicates(
    IN p_table_name TEXT,
    IN p_dedup_column TEXT,
    IN p_primary_key TEXT
)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
    -- Check if the table exists
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = p_table_name) THEN
        -- Use CTE to identify duplicates and delete them, keeping the row with the highest _hoodie_commit_time
        EXECUTE FORMAT('
            DELETE FROM %I
            WHERE ctid IN (
                SELECT ctid
                FROM (
                    SELECT ctid,
                           ROW_NUMBER() OVER (PARTITION BY %I ORDER BY %I DESC) AS rn
                    FROM %I
                ) AS sub
                WHERE rn > 1
            )', p_table_name, p_primary_key, p_dedup_column, p_table_name);

        -- Output message
        RAISE NOTICE 'Duplicates deleted from table %', p_table_name;
    ELSE
        -- Table does not exist
        RAISE EXCEPTION 'Table % does not exist', p_table_name;
    END IF;
END;
$BODY$;

-- Grant execute permission to relevant roles (e.g., postgres)
GRANT EXECUTE ON PROCEDURE public.delete_duplicates(TEXT, TEXT, TEXT) TO postgres;

```

Before running Stored procs

image.png

After Running Stored proc

In [5]:
try:
    import os,sys,pyspark
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
except Exception as e:
    pass


os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .getOrCreate()

# JDBC properties
jdbc_properties = {
    'user': 'postgres',
    'password': 'postgres',
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://localhost:5432/postgres'
}

# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"


try:
    
    # Using aspark.read.jdbc with a dummy query to initialize the connection
    spark.read.jdbc(
        url=jdbc_properties['url'],
        table="(SELECT 1) as tmp",  
        properties=jdbc_properties
    ).count() 
   

    # Now execute the stored procedure
    connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
        jdbc_properties['url'], 
        jdbc_properties["user"], 
        jdbc_properties["password"]
    )
    statement = connection.createStatement()
    statement.execute(sql_command)
    connection.close()
    
    print("Stored procedure executed successfully.")
except Exception as e:
    print("Error executing stored procedure:", e)
Stored procedure executed successfully.

After

image-2.png

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...