Learning How to Consume Data from kafka using Flink¶
Installation Steps¶
conda info --envs
# Create ENV
conda create -n my-flink-environment pip python=3.8
# Activate ENV
conda activate my-flink-environment
# Install Libraray
pip install apache-flink
pip install kafka-python
pip install jupyter
# Make sure java 11 is installed
java -version
## O/P
openjdk version "11.0.11" 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)
jupyter notebook
Spin up Kafka Locally using Docker compose file¶
Windows| MAC¶
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
In [ ]:
import os
os.environ['AWS_ACCESS_KEY_ID'] = "XX"
os.environ['AWS_ACCESS_KEY'] = "XX"
os.environ['AWS_SECRET_ACCESS_KEY'] = "XXX"
os.environ['AWS_SECRET_KEY'] = "XXX"
Create Flink Table ENV¶
In [6]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-kafka-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.1.jar",
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval",
"5000"
)
table_env.get_config().get_configuration().set_string(
"parallelism.default",
"4"
)
# Configure checkpointing
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode",
"EXACTLY_ONCE"
)
# Set the checkpointing directory to the current directory
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.checkpoints-directory",
CURRENT_DIR
)
Out[6]:
<pyflink.common.configuration.Configuration at 0x1321e5460>
Publishing Some Messages on Kafka Topic¶
In [7]:
try:
from kafka import KafkaProducer
from faker import Faker
import json
from time import sleep
import uuid
except Exception as e:
pass
producer = KafkaProducer(bootstrap_servers='localhost:9092')
_instance = Faker()
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data():
return [
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
faker.random_int(min=10000, max=150000),
faker.random_int(min=18, max=60),
faker.random_int(min=0, max=100000),
faker.unix_time()
]
for _ in range(3):
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
data_list = DataGenerator.get_data()
json_data = dict(
zip
(columns,data_list
)
)
_payload = json.dumps(json_data).encode("utf-8")
response = producer.send('topic_customers', _payload)
print(_payload)
sleep(1)
b'{"emp_id": "3e77ec90-7e18-453c-b04b-4eb2fa379438", "employee_name": "Lori Martin", "department": "HR", "state": "RJ", "salary": 49661, "age": 50, "bonus": 13016, "ts": 534145525}' b'{"emp_id": "b0c6d323-2d2a-45ab-bc1d-259976224e6f", "employee_name": "Erin Mitchell", "department": "Sales", "state": "FL", "salary": 21833, "age": 43, "bonus": 36340, "ts": 1101831048}' b'{"emp_id": "9af98ca1-4d72-4181-b40c-52a2135a6b80", "employee_name": "Kaitlyn Pacheco", "department": "Marketing", "state": "IL", "salary": 66223, "age": 33, "bonus": 19170, "ts": 442470381}'
Create Flink Source kafka¶
In [8]:
# Create a source for the "customer_source" table
customer_source = f"""
CREATE TABLE IF NOT EXISTS customer_source (
emp_id STRING,
employee_name STRING,
department STRING,
age STRING,
bonus STRING,
ts STRING,
city STRING,
state STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic_customers',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'topic_customers',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
"""
# Execute the SQL to create the sources
table_env.execute_sql(customer_source)
print("Created customer_source and order_source tables.")
Created customer_source and order_source tables.
In [5]:
# table_env.execute_sql("""
# SELECT * FROM customer_source
# """).print()
# print("Job started.")
Sample O/P¶
Create HUDI SINK¶
In [9]:
# Define the Hudi sink table schema to match the query result schema
BUCKET = "XXXX"
hudi_output_path = f's3a://{BUCKET}/silver'
hudi_sink = f"""
CREATE TABLE hudi_sink (
emp_id STRING,
employee_name STRING,
department STRING,
age STRING,
bonus STRING,
ts STRING,
city STRING,
state STRING,
PRIMARY KEY (emp_id) NOT ENFORCED
)PARTITIONED BY (`state`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'emp_id',
'hoodie.datasource.write.precombine.field' = 'ts',
'hoodie.datasource.write.operation' = 'upsert'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
Out[9]:
<pyflink.table.table_result.TableResult at 0x1321d65b0>
Writing From Source to SINK¶
In [ ]:
table_env.execute_sql("""
INSERT INTO hudi_sink
SELECT * FROM customer_source
""").wait()
print("Job started.")
2023-09-27 15:39:25,985 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-27 15:39:31,019 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-27 15:39:31,020 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-27 15:39:43,894 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-27 15:39:43,909 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already exists. 2023-09-27 15:39:43,911 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-27 15:39:43,912 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-27 15:39:43,917 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance already exists. 2023-09-27 15:39:43,919 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo": Instance already exists. 2023-09-27 15:39:52,874 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-27 15:39:52,906 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already exists. 2023-09-27 15:39:57,911 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-27 15:39:57,912 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-27 15:39:57,925 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance already exists. 2023-09-27 15:39:57,939 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo": Instance already exists. # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] 2023-09-27 15:40:04,290 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream [] - Application invoked the Syncable API against stream writing to silver/NY/.hoodie_partition_metadata_3. This is unsupported
Clean UP¶
In [10]:
import boto3
def delete_all_items_in_bucket(bucket_name):
# Initialize the S3 client
s3 = boto3.client('s3')
# List all objects in the bucket
objects = s3.list_objects_v2(Bucket=bucket_name)
# Check if the bucket is empty
if 'Contents' in objects:
# Delete each object in the bucket
for obj in objects['Contents']:
key = obj['Key']
s3.delete_object(Bucket=bucket_name, Key=key)
print(f"Deleted: {key}")
else:
print(f"The bucket {bucket_name} is already empty.")
# Usage
bucket_name = BUCKET
delete_all_items_in_bucket(bucket_name)
Deleted: silver/.hoodie/.aux/.bootstrap/.fileids/ Deleted: silver/.hoodie/.aux/.bootstrap/.partitions/ Deleted: silver/.hoodie/.aux/ckp_meta/20230927152626514.COMPLETED Deleted: silver/.hoodie/.aux/ckp_meta/20230927152626514.INFLIGHT Deleted: silver/.hoodie/.aux/ckp_meta/20230927152641226.INFLIGHT Deleted: silver/.hoodie/.aux/view_storage_conf.properties Deleted: silver/.hoodie/.schema/ Deleted: silver/.hoodie/.temp/ Deleted: silver/.hoodie/20230927152626514.deltacommit Deleted: silver/.hoodie/20230927152626514.deltacommit.inflight Deleted: silver/.hoodie/20230927152626514.deltacommit.requested Deleted: silver/.hoodie/20230927152641226.deltacommit.inflight Deleted: silver/.hoodie/20230927152641226.deltacommit.requested Deleted: silver/.hoodie/archived/ Deleted: silver/.hoodie/hoodie.properties Deleted: silver/CA/.34d67f17-082c-4922-8714-1fa57e7ee3ec_20230927152626514.log.1_0-4-0 Deleted: silver/CA/.9e4d7bd1-4e0d-44e5-9082-6bd72b545518_20230927152626514.log.1_2-4-0 Deleted: silver/CA/.hoodie_partition_metadata Deleted: silver/FL/.0f2f2609-f406-4425-845c-4a158b4ecbfd_20230927152626514.log.1_1-4-0 Deleted: silver/FL/.6c7c67a8-5f8a-44a1-b327-b6fc565e5be8_20230927152626514.log.1_0-4-0 Deleted: silver/FL/.6f3014a8-73a4-4919-b1a0-6f9a8d331011_20230927152626514.log.1_3-4-0 Deleted: silver/FL/.hoodie_partition_metadata Deleted: silver/IL/.8d85bde1-f320-4aea-88d2-ba1de2aa8334_20230927152626514.log.1_1-4-0 Deleted: silver/IL/.hoodie_partition_metadata Deleted: silver/NY/.1800d94f-20c4-4b0e-8658-393e34b8cb8a_20230927152626514.log.1_3-4-0 Deleted: silver/NY/.daa34d7c-dbca-4c3c-8e9f-d21ff66c0933_20230927152626514.log.1_2-4-0 Deleted: silver/NY/.hoodie_partition_metadata Deleted: silver/TX/.1f3388a0-5ba9-463a-ba13-e1d764d9ccfc_20230927152626514.log.1_2-4-0 Deleted: silver/TX/.6f873fb5-4be5-4319-8dd5-21854ca8d0dd_20230927152626514.log.1_3-4-0 Deleted: silver/TX/.921c609e-fde7-4c5a-ba93-c20ed44a7eac_20230927152626514.log.1_0-4-0 Deleted: silver/TX/.hoodie_partition_metadata
In [ ]:
No comments:
Post a Comment