Flink CDC Connector: Joining and Upserting CDC Events to Hudi¶
Step 1: Spin up Postgres Database¶
Mac¶
version: "3.7"
services:
postgres:
image: arm64v8/postgres:13 # Use a compatible ARM64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
Windows¶
version: "3.7"
services:
postgres:
image: postgres:13 # Use a compatible x86_64 image
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
SQL Commands for Creating Tables, Inserting Data, and Enabling Replica Identity¶
-- Create the "customers" table if it does not exist
CREATE TABLE IF NOT EXISTS public.customers (
customer_id INT PRIMARY KEY,
name TEXT,
city TEXT,
state TEXT
);
-- Create the "orders" table if it does not exist
CREATE TABLE IF NOT EXISTS public.orders (
order_id INT PRIMARY KEY,
order_value DOUBLE PRECISION,
customer_id INT
);
-- Insert data into the "customers" table in the public schema
INSERT INTO public.customers (customer_id, name, city, state)
VALUES
(1, 'Alice', 'New York', 'NY'),
(2, 'Bob', 'Los Angeles', 'CA'),
(3, 'Charlie', 'Chicago', 'IL');
-- Insert data into the "orders" table in the public schema
INSERT INTO public.orders (order_id, order_value, customer_id)
VALUES
(101, 100.50, 1),
(102, 75.25, 2),
(103, 200.75, 1);
-- Enable full replica identity for the "customers" table
ALTER TABLE public.customers REPLICA IDENTITY FULL;
-- Enable full replica identity for the "orders" table
ALTER TABLE public.orders REPLICA IDENTITY FULL;
In [2]:
import os
os.environ['AWS_ACCESS_KEY_ID'] = "XXX"
os.environ['AWS_ACCESS_KEY'] = "XXX"
os.environ['AWS_SECRET_ACCESS_KEY'] = "XXXF"
os.environ['AWS_SECRET_KEY'] = "XXXX"
Flink Code¶
Creating table Enviorment and loading JAR Files¶
In [3]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval",
"5000"
)
table_env.get_config().get_configuration().set_string(
"parallelism.default",
"4"
)
Out[3]:
<pyflink.common.configuration.Configuration at 0x13022ca90>
Create Source¶
In [4]:
# Create a source for the "customer_source" table
customer_source = f"""
CREATE TABLE IF NOT EXISTS customer_source (
customer_id INT,
name STRING,
city STRING,
state STRING,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'customer_slot',
'decoding.plugin.name' = 'pgoutput',
'table-name' = 'customers'
);
"""
# Create a source for the "order_source" table
order_source = f"""
CREATE TABLE IF NOT EXISTS order_source (
order_id INT,
order_value DOUBLE PRECISION,
customer_id INT,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'order_slot',
'decoding.plugin.name' = 'pgoutput',
'table-name' = 'orders'
);
"""
# Execute the SQL to create the sources
table_env.execute_sql(customer_source)
table_env.execute_sql(order_source)
print("Created customer_source and order_source tables.")
print("Created order_source and order_source tables.")
Created customer_source and order_source tables. Created order_source and order_source tables.
Joining CDC Streams Preview¶
In [5]:
# table_env.execute_sql("""
# SELECT DISTINCT
# CAST(o.order_id AS STRING) AS order_id,
# CAST(o.operation_ts AS STRING) AS operation_ts,
# CAST(c.name AS STRING) AS customer_name,
# CAST(c.city AS STRING) AS customer_city,
# CAST(c.state AS STRING) AS customer_state,
# CAST(o.order_value AS STRING) AS price
# FROM
# customer_source AS c
# INNER JOIN
# order_source AS o
# ON
# c.customer_id = o.customer_id;
# """).wait()
# print("Job started.")
o/p preview ¶
SINK Hudi SINK¶
In [6]:
# Define the Hudi sink table schema to match the query result schema
hudi_output_path = 's3a://XXX/enriched_orders'
hudi_sink = f"""
CREATE TABLE order_enriched_hudi_sink (
order_id STRING,
operation_ts STRING,
customer_name STRING,
customer_city STRING,
customer_state STRING,
price STRING,
PRIMARY KEY (order_id) NOT ENFORCED
)
PARTITIONED BY (`customer_state`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'order_id', -- Specify record key
'hoodie.datasource.write.precombine.field' = 'operation_ts', -- Use "operation_ts" as precombine key
'hoodie.datasource.write.operation' = 'upsert' -- Use UPSERT method
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
Out[6]:
<pyflink.table.table_result.TableResult at 0x13023b820>
Read from Source and Write into SINK¶
In [ ]:
table_env.execute_sql("""
INSERT INTO order_enriched_hudi_sink
SELECT DISTINCT
CAST(o.order_id AS STRING) AS order_id,
CAST(o.operation_ts AS STRING) AS operation_ts,
CAST(c.name AS STRING) AS customer_name,
CAST(c.city AS STRING) AS customer_city,
CAST(c.state AS STRING) AS customer_state,
CAST(o.order_value AS STRING) AS price
FROM
customer_source AS c
INNER JOIN
order_source AS o
ON
c.customer_id = o.customer_id;
""").wait()
print("Job started.")
2023-09-26 20:13:16,235 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-26 20:13:16,268 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-26 20:13:16,269 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-26 20:13:29,380 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-26 20:13:29,393 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already exists. 2023-09-26 20:13:29,394 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-26 20:13:29,395 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-26 20:13:29,400 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance already exists. 2023-09-26 20:13:29,402 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo": Instance already exists. 2023-09-26 20:13:39,978 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-26 20:13:40,020 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already exists. 2023-09-26 20:13:45,028 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-26 20:13:45,029 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-26 20:13:45,036 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance already exists. 2023-09-26 20:13:45,110 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo": Instance already exists. # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] 2023-09-26 20:15:39,041 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream [] - Application invoked the Syncable API against stream writing to enriched_orders/NY/.hoodie_partition_metadata_2. This is unsupported
Output¶
Clean UP¶
In [7]:
import boto3
def delete_all_items_in_bucket(bucket_name):
# Initialize the S3 client
s3 = boto3.client('s3')
# List all objects in the bucket
objects = s3.list_objects_v2(Bucket=bucket_name)
# Check if the bucket is empty
if 'Contents' in objects:
# Delete each object in the bucket
for obj in objects['Contents']:
key = obj['Key']
s3.delete_object(Bucket=bucket_name, Key=key)
print(f"Deleted: {key}")
else:
print(f"The bucket {bucket_name} is already empty.")
# Usage
bucket_name = 'dXXX'
delete_all_items_in_bucket(bucket_name)
Deleted: silver/enriched_orders/.hoodie/.aux/ Deleted: silver/enriched_orders/.hoodie/.schema/ Deleted: silver/enriched_orders/.hoodie/.temp/ Deleted: silver/enriched_orders/.hoodie/archived/
No comments:
Post a Comment