How to Execute Postgres Stored procedures in Spark | Hands on Guide¶
Labs¶
docker-compose.yaml
version: "3.7"
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
create stored proc¶
-- Drop procedure if exists to avoid conflicts
DROP PROCEDURE IF EXISTS public.delete_duplicates;
-- Create the procedure
CREATE OR REPLACE PROCEDURE public.delete_duplicates(
IN p_table_name TEXT,
IN p_dedup_column TEXT,
IN p_primary_key TEXT
)
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
-- Check if the table exists
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = p_table_name) THEN
-- Use CTE to identify duplicates and delete them, keeping the row with the highest _hoodie_commit_time
EXECUTE FORMAT('
DELETE FROM %I
WHERE ctid IN (
SELECT ctid
FROM (
SELECT ctid,
ROW_NUMBER() OVER (PARTITION BY %I ORDER BY %I DESC) AS rn
FROM %I
) AS sub
WHERE rn > 1
)', p_table_name, p_primary_key, p_dedup_column, p_table_name);
-- Output message
RAISE NOTICE 'Duplicates deleted from table %', p_table_name;
ELSE
-- Table does not exist
RAISE EXCEPTION 'Table % does not exist', p_table_name;
END IF;
END;
$BODY$;
-- Grant execute permission to relevant roles (e.g., postgres)
GRANT EXECUTE ON PROCEDURE public.delete_duplicates(TEXT, TEXT, TEXT) TO postgres;
```
Before running Stored procs¶
After Running Stored proc¶
In [5]:
try:
import os,sys,pyspark
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
except Exception as e:
pass
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.getOrCreate()
# JDBC properties
jdbc_properties = {
'user': 'postgres',
'password': 'postgres',
'driver': 'org.postgresql.Driver',
'url': 'jdbc:postgresql://localhost:5432/postgres'
}
# Define the SQL command to execute the stored procedure
sql_command = "CALL public.delete_duplicates('people', '_hoodie_commit_time', 'id')"
try:
# Using aspark.read.jdbc with a dummy query to initialize the connection
spark.read.jdbc(
url=jdbc_properties['url'],
table="(SELECT 1) as tmp",
properties=jdbc_properties
).count()
# Now execute the stored procedure
connection = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
jdbc_properties['url'],
jdbc_properties["user"],
jdbc_properties["password"]
)
statement = connection.createStatement()
statement.execute(sql_command)
connection.close()
print("Stored procedure executed successfully.")
except Exception as e:
print("Error executing stored procedure:", e)
No comments:
Post a Comment