Tuesday, June 25, 2024

How to Execute Postgres Stored procedures in Spark | Hands on Guide

storedproc

How to Execute Postgres Stored procedures in Spark | Hands on Guide

Labs

docker-compose.yaml

version: "3.7"

services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=postgres

create stored proc

-- Drop procedure if exists to avoid conflicts
DROP PROCEDURE IF EXISTS public.delete_duplicates;

-- Create the procedure
CREATE OR REPLACE PROCEDURE public.delete_duplicates(
    IN p_table_name TEXT,
    IN p_dedup_column TEXT,
    IN p_primary_key TEXT
)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
    -- Check if the table exists
    IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = p_table_name) THEN
        -- Use CTE to identify duplicates and delete them, keeping the row with the highest _hoodie_commit_time
        EXECUTE FORMAT('
            DELETE FROM %I
            WHERE ctid IN (
                SELECT ctid
                FROM (
                    SELECT ctid,
                           ROW_NUMBER() OVER (PARTITION BY %I ORDER BY %I DESC) AS rn
                    FROM %I
                ) AS sub
                WHERE rn > 1
            )', p_table_name, p_primary_key, p_dedup_column, p_table_name);

        -- Output message
        RAISE NOTICE 'Duplicates deleted from table %', p_table_name;
    ELSE
        -- Table does not exist
        RAISE EXCEPTION 'Table % does not exist', p_table_name;
    END IF;
END;
$BODY$;

-- Grant execute permission to relevant roles (e.g., postgres)
GRANT EXECUTE ON PROCEDURE public.delete_duplicates(TEXT, TEXT, TEXT) TO postgres;

```

Before running Stored procs

image.png

After Running Stored proc

In [5]:
try:
    import os,sys,pyspark
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
except Exception as e:
    pass


os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .getOrCreate()

# JDBC properties
jdbc_properties = {
    'user': 'postgres',
    'password': 'postgres',
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://localhost:5432/postgres'
}

# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"


try:
    
    # Using aspark.read.jdbc with a dummy query to initialize the connection
    spark.read.jdbc(
        url=jdbc_properties['url'],
        table="(SELECT 1) as tmp",  
        properties=jdbc_properties
    ).count() 
   

    # Now execute the stored procedure
    connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
        jdbc_properties['url'], 
        jdbc_properties["user"], 
        jdbc_properties["password"]
    )
    statement = connection.createStatement()
    statement.execute(sql_command)
    connection.close()
    
    print("Stored procedure executed successfully.")
except Exception as e:
    print("Error executing stored procedure:", e)
Stored procedure executed successfully.

After

image-2.png

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...