Learn How to Ingest Data from PostgreSQL into Hudi Tables on S3 with Apache Flink and Flink PostgreSQL CDC Connector¶
Step 1: Spin up Postgres Database¶
Mac¶
version: "3.7"
services:
postgres:
image: arm64v8/postgres:13 # Use a compatible ARM64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Windows¶
version: "3.7"
services:
postgres:
image: postgres:13 # Use a compatible x86_64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Step 2: Create Table on Postgres¶
-- Create a table named 'shipments' with the following columns:
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY, -- Auto-incremented shipment ID, primary key
order_id SERIAL NOT NULL, -- Auto-incremented order ID, not null
origin VARCHAR(255) NOT NULL, -- Origin location, not null
destination VARCHAR(255) NOT NULL, -- Destination location, not null
is_arrived BOOLEAN NOT NULL -- Boolean indicating if the shipment has arrived, not null
);
-- Reset the sequence for the 'shipment_id' column to start from 1001
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
-- Set the REPLICA IDENTITY for the 'shipments' table to FULL, which allows replica tables to store full row values.
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
-- Insert three records into the 'shipments' table with default values for 'shipment_id', and specific values for other columns.
INSERT INTO shipments
VALUES (default, 10001, 'Beijing', 'Shanghai', false), -- Insert shipment from Beijing to Shanghai, not arrived
(default, 10002, 'Hangzhou', 'Shanghai', false), -- Insert shipment from Hangzhou to Shanghai, not arrived
(default, 10003, 'Shanghai', 'Hangzhou', false); -- Insert shipment from Shanghai to Hangzhou, not arrived
ALTER SYSTEM SET wal_level = 'logical';
In [8]:
import os
os.environ['AWS_ACCESS_KEY_ID'] = "XXX"
os.environ['AWS_ACCESS_KEY'] = "XXX"
os.environ['AWS_SECRET_ACCESS_KEY'] = "XXX"
os.environ['AWS_SECRET_KEY'] = "XXXX"
Flink Code¶
Creating table Enviorment and loading JAR Files¶
In [2]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval",
"5000"
)
Out[2]:
<pyflink.common.configuration.Configuration at 0x1081fd6a0>
Create Source¶
In [3]:
postgres_sink = f"""
CREATE TABLE shipments_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'sales',
'decoding.plugin.name' = 'pgoutput',
'table-name' = 'shipments'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(postgres_sink)
Out[3]:
<pyflink.table.table_result.TableResult at 0x1081fd760>
Hudi Sink¶
In [4]:
hudi_output_path = 's3a://datateam-sandbox-qa-demo/tmp100/shipments/'
hudi_sink = f"""
CREATE TABLE shipments_hudi_sink (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
)
PARTITIONED BY (`destination`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}' ,
'table.type' = 'MERGE_ON_READ'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
Out[4]:
<pyflink.table.table_result.TableResult at 0x1392245e0>
Execute the Cell to View Real time CDC Data from Postgres¶
Read from Source and Write into SINK¶
In [ ]:
table_env.execute_sql("""
INSERT INTO shipments_hudi_sink
(shipment_id, order_id, origin, destination, is_arrived)
SELECT
shipment_id,
order_id,
origin,
destination,
is_arrived
FROM shipments_source
""").wait()
print("Job started.")
Clean UP¶
In [1]:
import boto3
def delete_all_items_in_bucket(bucket_name):
# Initialize the S3 client
s3 = boto3.client('s3')
# List all objects in the bucket
objects = s3.list_objects_v2(Bucket=bucket_name)
# Check if the bucket is empty
if 'Contents' in objects:
# Delete each object in the bucket
for obj in objects['Contents']:
key = obj['Key']
s3.delete_object(Bucket=bucket_name, Key=key)
print(f"Deleted: {key}")
else:
print(f"The bucket {bucket_name} is already empty.")
# Usage
bucket_name = 'datateam-sandbox-qa-demo'
delete_all_items_in_bucket(bucket_name)
Deleted: flink-s3-fs-hadoop-1.15.0.jar Deleted: hudi-flink1.15-bundle-0.13.1.jar Deleted: tmp100/shipments/.hoodie/.aux/.bootstrap/.fileids/ Deleted: tmp100/shipments/.hoodie/.aux/.bootstrap/.partitions/ Deleted: tmp100/shipments/.hoodie/.aux/ckp_meta/ Deleted: tmp100/shipments/.hoodie/.aux/view_storage_conf.properties Deleted: tmp100/shipments/.hoodie/.schema/ Deleted: tmp100/shipments/.hoodie/.temp/ Deleted: tmp100/shipments/.hoodie/archived/ Deleted: tmp100/shipments/.hoodie/hoodie.properties
No comments:
Post a Comment