Join Two Postgres Tables in Python Flink Using Postgres CDC Connector and Build Real Time View Using JDBC Sink¶
Step 1: Spin up Postgres Database¶
Mac¶
version: "3.7"
services:
postgres:
image: arm64v8/postgres:13 # Use a compatible ARM64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Windows¶
version: "3.7"
services:
postgres:
image: postgres:13 # Use a compatible x86_64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
SQL Commands for Creating Tables, Inserting Data, and Enabling Replica Identity¶
-- Create the "customers" table if it does not exist
CREATE TABLE IF NOT EXISTS public.customers (
customer_id INT PRIMARY KEY,
name TEXT,
city TEXT,
state TEXT
);
-- Create the "orders" table if it does not exist
CREATE TABLE IF NOT EXISTS public.orders (
order_id INT PRIMARY KEY,
order_value DOUBLE PRECISION,
customer_id INT
);
-- Insert data into the "customers" table in the public schema
INSERT INTO public.customers (customer_id, name, city, state)
VALUES
(1, 'Alice', 'New York', 'NY'),
(2, 'Bob', 'Los Angeles', 'CA'),
(3, 'Charlie', 'Chicago', 'IL');
-- Insert data into the "orders" table in the public schema
INSERT INTO public.orders (order_id, order_value, customer_id)
VALUES
(101, 100.50, 1),
(102, 75.25, 2),
(103, 200.75, 1);
-- Enable full replica identity for the "customers" table
ALTER TABLE public.customers REPLICA IDENTITY FULL;
-- Enable full replica identity for the "orders" table
ALTER TABLE public.orders REPLICA IDENTITY FULL;
ALTER SYSTEM SET wal_level = 'logical';
### Restart Your Docker container
Flink Code¶
Creating table Enviorment and loading JAR Files¶
In [2]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval",
"5000"
)
table_env.get_config().get_configuration().set_string(
"parallelism.default",
"4"
)
# Configure checkpointing
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode",
"EXACTLY_ONCE"
)
# Set the checkpointing directory to the current directory
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.checkpoints-directory",
CURRENT_DIR
)
Out[2]:
<pyflink.common.configuration.Configuration at 0x15587bf10>
Create Source¶
In [3]:
# Create a source for the "customer_source" table
customer_source = f"""
CREATE TABLE IF NOT EXISTS customer_source (
customer_id INT,
name STRING,
city STRING,
state STRING,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'customer_slot',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true',
'table-name' = 'customers'
);
"""
# Create a source for the "order_source" table
order_source = f"""
CREATE TABLE IF NOT EXISTS order_source (
order_id INT,
order_value DOUBLE PRECISION,
customer_id INT,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'order_slot',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true',
'table-name' = 'orders'
);
"""
# Execute the SQL to create the sources
table_env.execute_sql(customer_source)
table_env.execute_sql(order_source)
print("Created customer_source and order_source tables.")
print("Created order_source and order_source tables.")
Created customer_source and order_source tables. Created order_source and order_source tables.
Joining CDC Streams Preview¶
In [4]:
# table_env.execute_sql("""
# SELECT DISTINCT
# CAST(o.order_id AS STRING) AS order_id,
# CAST(o.operation_ts AS STRING) AS operation_ts,
# CAST(c.name AS STRING) AS customer_name,
# CAST(c.city AS STRING) AS customer_city,
# CAST(c.state AS STRING) AS customer_state,
# CAST(o.order_value AS STRING) AS price
# FROM
# customer_source AS c
# INNER JOIN
# order_source AS o
# ON
# c.customer_id = o.customer_id;
# """).print()
# print("Job started.")
JDBC SINK¶
In [5]:
# Define the JDBC sink table schema to match the query result schema
jdbc_sink = """
CREATE TABLE order_enriched_jdbc_sink (
order_id STRING,
operation_ts STRING,
customer_name STRING,
customer_city STRING,
customer_state STRING,
price STRING,
PRIMARY KEY (order_id) NOT ENFORCED
)
WITH (
'connector' = 'jdbc', -- Use JDBC connector
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'order_enriched',
'driver' = 'org.postgresql.Driver',
'username' = 'postgres',
'password' = 'postgres',
'sink.buffer-flush.max-rows' = '1000', -- Adjust batch size as needed
'sink.buffer-flush.interval' = '2s' -- Adjust flush interval as needed
);
"""
# Execute the SQL to create the JDBC sink table
table_env.execute_sql(jdbc_sink)
Out[5]:
<pyflink.table.table_result.TableResult at 0x119669a30>
Read from Source and Write into SINK¶
Create Table in Postgres¶
-- Table: public.order_enriched
-- DROP TABLE IF EXISTS public.order_enriched;
CREATE TABLE IF NOT EXISTS public.order_enriched
(
order_id character varying(255) COLLATE pg_catalog."default" NOT NULL,
operation_ts character varying(255) COLLATE pg_catalog."default",
customer_name character varying(255) COLLATE pg_catalog."default",
customer_city character varying(255) COLLATE pg_catalog."default",
customer_state character varying(255) COLLATE pg_catalog."default",
price character varying(255) COLLATE pg_catalog."default",
CONSTRAINT order_enriched_pkey PRIMARY KEY (order_id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public.order_enriched
OWNER to postgres;
Read from Source and Write into SINK¶
In [ ]:
table_env.execute_sql("""
INSERT INTO order_enriched_jdbc_sink
SELECT DISTINCT
CAST(o.order_id AS STRING) AS order_id,
CAST(o.operation_ts AS STRING) AS operation_ts,
CAST(c.name AS STRING) AS customer_name,
CAST(c.city AS STRING) AS customer_city,
CAST(c.state AS STRING) AS customer_state,
CAST(o.order_value AS STRING) AS price
FROM
customer_source AS c
INNER JOIN
order_source AS o
ON
c.customer_id = o.customer_id;
""").wait()
print("Job started.")