Step 1: Spin up Postgres Database¶
Mac¶
version: "3.7"
services:
postgres:
image: arm64v8/postgres:13 # Use a compatible ARM64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Windows¶
version: "3.7"
services:
postgres:
image: postgres:13 # Use a compatible x86_64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Step 2: Create Table on Postgres¶
-- Create a table named 'shipments' with the following columns:
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY, -- Auto-incremented shipment ID, primary key
order_id SERIAL NOT NULL, -- Auto-incremented order ID, not null
origin VARCHAR(255) NOT NULL, -- Origin location, not null
destination VARCHAR(255) NOT NULL, -- Destination location, not null
is_arrived BOOLEAN NOT NULL -- Boolean indicating if the shipment has arrived, not null
);
-- Reset the sequence for the 'shipment_id' column to start from 1001
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
-- Set the REPLICA IDENTITY for the 'shipments' table to FULL, which allows replica tables to store full row values.
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
-- Insert three records into the 'shipments' table with default values for 'shipment_id', and specific values for other columns.
INSERT INTO shipments
VALUES (default, 10001, 'Beijing', 'Shanghai', false), -- Insert shipment from Beijing to Shanghai, not arrived
(default, 10002, 'Hangzhou', 'Shanghai', false), -- Insert shipment from Hangzhou to Shanghai, not arrived
(default, 10003, 'Shanghai', 'Hangzhou', false); -- Insert shipment from Shanghai to Hangzhou, not arrived
ALTER SYSTEM SET wal_level = 'logical';
Flink Code¶
Creating table Enviorment and loading JAR Files¶
In [7]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
Out[7]:
<pyflink.common.configuration.Configuration at 0x13c898460>
Create Sink¶
In [ ]:
postgres_sink = f"""
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'sales',
'decoding.plugin.name' = 'pgoutput',
'table-name' = 'shipments'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(postgres_sink)
table_env.execute_sql(f"SELECT * FROM shipments ").print()
Full code¶
In [ ]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
postgres_sink = f"""
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments',
'slot.name' = 'shipments',
'decoding.plugin.name' = 'pgoutput'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(postgres_sink)
table_env.execute_sql(f"select * FROM shipments").print()
+----+-------------+-------------+--------------------------------+--------------------------------+------------+ | op | shipment_id | order_id | origin | destination | is_arrived | +----+-------------+-------------+--------------------------------+--------------------------------+------------+ | +I | 1003 | 10003 | Shanghai test | Hangzhou | FALSE | | +I | 1002 | 10002 | Hangzhoue update | Shanghai | FALSE | | +I | 1001 | 10001 | Beijin | Shanghai | FALSE | | -U | 1001 | 10001 | Beijin | Shanghai | FALSE | | +U | 1001 | 10001 | Beijin * | Shanghai | FALSE |
No comments:
Post a Comment