Incremental Data Extraction from Postgres using Triggers and PySpark¶
Create a Table on Postgres¶
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE IF NOT EXISTS public.sales
(
salesid UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
invoiceid integer,
itemid integer,
category text COLLATE pg_catalog."default",
price numeric(10,2),
quantity integer,
orderdate date,
destinationstate text COLLATE pg_catalog."default",
shippingtype text COLLATE pg_catalog."default",
referral text COLLATE pg_catalog."default",
updated_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO public.sales (invoiceid, itemid, category, price, quantity, orderdate, destinationstate, shippingtype, referral)
VALUES (123456, 789, 'Electronics', 599.99, 2, '2023-07-01', 'California', 'Standard', 'Website');
Create Trigger¶
CREATE OR REPLACE FUNCTION update_sales_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_sales_updated_at_trigger
BEFORE UPDATE ON public.sales
FOR EACH ROW
EXECUTE FUNCTION update_sales_updated_at();
Step 1: Define Imports¶
In [1]:
try:
import os
import sys
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from datetime import datetime
from pyspark.sql.functions import from_json, col
import pyspark.sql.functions as F
from dataclasses import dataclass, field
except Exception as e:
pass
Step 2: Define Settings¶
In [2]:
# ======================================= Settings ===============================
global spark, jdbc_url, table_name, user, password, pk, updated_at_column_name, driver
SUBMIT_ARGS = "--packages org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
jdbc_url = 'jdbc:postgresql://localhost:5432/postgres'
table_name = 'public.sales'
user = 'postgres'
password = 'postgres'
pk = "salesid"
updated_at_column_name = "updated_at"
driver = 'org.postgresql.Driver'
spark = SparkSession.builder.appName('IncrementalJDBC').getOrCreate()
spark
Out[2]:
SparkSession - in-memory
Step 3 : Define helper class¶
In [3]:
class Parameters:
def __init__(self):
self.first_time_read_flag = False
self.prev_updated_date = "2000-01-01"
self.insert_data_exists_flag = False
self.update_data_exists_flag = False
class Checkpoints(Parameters):
def __init__(self, directory="./checkpoint/max_id", table_name="sales"):
self.directory = directory
self.table_name = table_name
Parameters.__init__(self)
def read(self):
if self.is_exists():
self.prev_commit, self.prev_updated_date, self.table_name = spark.read.csv(self.directory).collect()[0]
return True
else:
self.first_time_read_flag = True
return False
def write(self):
spark_df = spark.createDataFrame(data=[(str(self.prev_commit), str(self.prev_updated_date), self.table_name)],
schema=['prev_commit', "prev_updated_date", "table_name"])
spark_df.write.mode("overwrite").csv(self.directory)
return True
def is_exists(self):
if os.path.exists(self.directory):
print(f"Checkpoint found ")
return True
else:
print(f"Checkpoint Not found ")
return False
class QuerySource(object):
def __init__(self, updated_at_column_name, check_point_instance):
self.updated_at_column_name = updated_at_column_name
self.check_point_instance = check_point_instance
def get_inc_update(self):
query = f"SELECT * FROM {self.check_point_instance.table_name} WHERE {self.updated_at_column_name} > '{self.check_point_instance.prev_updated_date}'"
df = spark.read.format('jdbc').options(
url=jdbc_url,
query=query,
user=user,
password=password,
driver=driver
).load()
if df.count() >= 0: self.check_point_instance.update_data_exists_flag = True
return df
Step 4: Define main Functions¶
In [4]:
def main():
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
helper_check_points = Checkpoints(
directory=f"./checkpoint/{table_name}",
table_name=table_name,
)
check_points_flag = helper_check_points.read()
print("check_points_flag", check_points_flag)
query_instance = QuerySource(
updated_at_column_name=updated_at_column_name,
check_point_instance=helper_check_points
)
if helper_check_points.first_time_read_flag:
update_inc_df = query_instance.get_inc_update()
helper_check_points.prev_commit = update_inc_df.agg({updated_at_column_name: "max"}).collect()[0][0]
helper_check_points.write()
return update_inc_df
else:
update_inc_df = query_instance.get_inc_update()
current_updated_date = update_inc_df.agg({updated_at_column_name: "max"}).collect()[0][0]
if current_updated_date is not None and helper_check_points.update_data_exists_flag:
if current_updated_date.__str__() > helper_check_points.prev_updated_date.__str__():
helper_check_points.prev_updated_date = current_updated_date
helper_check_points.write()
return update_inc_df
Results¶
Calling Function for First Time¶
Image of Postgres¶
In [5]:
print(main().show())
Checkpoint Not found check_points_flag False +--------------------+---------+------+--------+-----+--------+----------+----------------+------------+---------------+--------------------+ | salesid|invoiceid|itemid|category|price|quantity| orderdate|destinationstate|shippingtype| referral| updated_at| +--------------------+---------+------+--------+-----+--------+----------+----------------+------------+---------------+--------------------+ |73769008-8fd1-44a...| 19450| 90| Kitchen|37.00| 4|2016-06-09| IA| 3-Day|Repeat Customer|2023-07-08 17:57:...| |50717ac3-a278-403...| 8405| 88| Office|98.00| 1|2016-09-19| IA| Free| Other|2023-07-08 17:57:...| +--------------------+---------+------+--------+-----+--------+----------+----------------+------------+---------------+--------------------+ None
Great we were able to pull data lets run template agin we should not see data since we have already processed this data¶
In [7]:
print(main().show())
Checkpoint found check_points_flag True +-------+---------+------+--------+-----+--------+---------+----------------+------------+--------+----------+ |salesid|invoiceid|itemid|category|price|quantity|orderdate|destinationstate|shippingtype|referral|updated_at| +-------+---------+------+--------+-----+--------+---------+----------------+------------+--------+----------+ +-------+---------+------+--------+-----+--------+---------+----------------+------------+--------+----------+ None
lets perform update¶
Lets call template¶
In [10]:
print(main().show())
Checkpoint found check_points_flag True +--------------------+---------+------+---------+-----+--------+----------+----------------+------------+--------+--------------------+ | salesid|invoiceid|itemid| category|price|quantity| orderdate|destinationstate|shippingtype|referral| updated_at| +--------------------+---------+------+---------+-----+--------+----------+----------------+------------+--------+--------------------+ |50717ac3-a278-403...| 8405| 88|Office **|98.00| 1|2016-09-19| IA| Free| Other|2023-07-08 18:37:...| +--------------------+---------+------+---------+-----+--------+----------+----------------+------------+--------+--------------------+ None
if i run it again i should not be seeing any data¶
In [11]:
print(main().show())
Checkpoint found check_points_flag True +-------+---------+------+--------+-----+--------+---------+----------------+------------+--------+----------+ |salesid|invoiceid|itemid|category|price|quantity|orderdate|destinationstate|shippingtype|referral|updated_at| +-------+---------+------+--------+-----+--------+---------+----------------+------------+--------+----------+ +-------+---------+------+--------+-----+--------+---------+----------------+------------+--------+----------+ None
In [ ]:
No comments:
Post a Comment