Getting Started with Apache Flink (PyFlink)and Apache Hudi | Hands on Session¶
Installation Steps¶
conda info --envs
# Create ENV
conda create -n my-flink-environment pip python=3.8
# Activate ENV
conda activate my-flink-environment
# Install Flink
pip install apache-flink
# Install Jupyter Notebook
pip install jupyter
# Make sure java 11 is installed
java -version
## O/P
openjdk version "11.0.11" 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)
jupyter notebook
Step 1: Install Library and Packages¶
In [3]:
! pip install Faker
Collecting Faker Obtaining dependency information for Faker from https://files.pythonhosted.org/packages/73/51/cbc859707aa0fc0ad3819ffb3bdaeee28d10d5ef30150ed9d16691ac3795/Faker-19.6.1-py3-none-any.whl.metadata Using cached Faker-19.6.1-py3-none-any.whl.metadata (15 kB) Requirement already satisfied: python-dateutil>=2.4 in /Users/soumilnitinshah/anaconda3/envs/my-flink-environment/lib/python3.8/site-packages (from Faker) (2.8.2) Requirement already satisfied: typing-extensions>=3.10.0.1 in /Users/soumilnitinshah/anaconda3/envs/my-flink-environment/lib/python3.8/site-packages (from Faker) (4.8.0) Requirement already satisfied: six>=1.5 in /Users/soumilnitinshah/anaconda3/envs/my-flink-environment/lib/python3.8/site-packages (from python-dateutil>=2.4->Faker) (1.16.0) Using cached Faker-19.6.1-py3-none-any.whl (1.7 MB) Installing collected packages: Faker Successfully installed Faker-19.6.1
In [8]:
! pip show apache-flink
Name: apache-flink Version: 1.17.1 Summary: Apache Flink Python API Home-page: https://flink.apache.org Author: Apache Software Foundation Author-email: dev@flink.apache.org License: https://www.apache.org/licenses/LICENSE-2.0 Location: /Users/soumilnitinshah/anaconda3/envs/my-new-environment/lib/python3.8/site-packages Requires: apache-beam, apache-flink-libraries, avro-python3, cloudpickle, fastavro, httplib2, numpy, pandas, pemja, protobuf, py4j, pyarrow, python-dateutil, pytz, requests Required-by:
In [1]:
! java -version
openjdk version "11.0.11" 2021-04-20 OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9) OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)
In [46]:
import os
for file in os.listdir():
if "jar" in file:
print(file)
hudi-flink-bundle_2.12-0.10.1.jar flink-s3-fs-hadoop-1.16.1.jar flink-sql-connector-kinesis-1.16.1.jar hudi-flink1.16-bundle-0.13.0.jar
Download Jar File¶
Creating Flink Table ENV¶
In [9]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.0.jar",
"flink-sql-connector-kinesis-1.16.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
Out[9]:
<pyflink.common.configuration.Configuration at 0x147563fd0>
Generating Some Data¶
In [10]:
import uuid
from faker import Faker
# Initialize Faker
fake = Faker()
# Generate fake data and convert it into a PyFlink table with column names
data = [(str(uuid.uuid4()), fake.name(), fake.city(), fake.state()) for _ in range(10)] # Generate 10 rows of fake data with UUIDs and names
# Define column names
column_names = ["uuid", "first_name", "city", "state"]
In [11]:
# Create a PyFlink table with column names
table = table_env.from_elements(data, schema=column_names)
table_env.create_temporary_view('source_table', table)
table_env.execute_sql(f"SELECT * FROM source_table ").print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+ | uuid | first_name | city | state | +--------------------------------+--------------------------------+--------------------------------+--------------------------------+ | 7fb332a6-789a-4625-b892-ebe... | Danielle Sanchez | Guerreroburgh | Georgia | | 6a4e24f4-552a-47be-a72d-9e9... | Ann Lam | Kimberlybury | West Virginia | | 5c1697da-1eaa-4664-bfe7-f18... | Bradley Allen | Stanleymouth | New York | | 2bb46597-103a-4497-a089-c8c... | Kelsey Cross | Basstown | South Dakota | | 4c91a485-b73b-437d-a45e-0fe... | Jason Jones | East Tyler | Alaska | | 532b9dfb-f34b-465b-b8dd-02c... | Cynthia Rogers | New David | Arkansas | | 77d5a68e-e27f-40a8-9fd1-7c6... | Steven Mayo | Lake John | Rhode Island | | a7dfcef2-41de-469d-8d48-398... | Michael Fisher | Susanmouth | Rhode Island | | c698fd50-e109-4f4e-bdcb-082... | Adrian Pierce | Lake Megan | Minnesota | | 54e64aaa-9016-4f0a-a87c-8fd... | David Ortiz | New Michelleside | Minnesota | +--------------------------------+--------------------------------+--------------------------------+--------------------------------+ 10 rows in set
Hudi SInk¶
Make sure You have set this Enviroment Variables¶
import os
os.environ['AWS_ACCESS_KEY_ID'] = "XX"
os.environ['AWS_ACCESS_KEY'] = "XX"
os.environ['AWS_SECRET_ACCESS_KEY'] = "XX"
os.environ['AWS_SECRET_KEY'] = "XX"
In [12]:
hudi_output_path = 's3a://dXXX/tmp1/'
hudi_sink = f"""
CREATE TABLE customers(
uuid VARCHAR(36) PRIMARY KEY NOT ENFORCED,
first_name VARCHAR(50),
city VARCHAR(50),
state VARCHAR(50)
)
PARTITIONED BY (`state`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}' ,
'table.type' = 'MERGE_ON_READ'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
# Define the data to be inserted into the Hudi table
table_env.execute_sql("""
INSERT INTO customers
SELECT * FROM source_table
""").wait()
2023-09-21 19:58:22,508 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-21 19:58:22,518 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already exists. 2023-09-21 19:58:22,520 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-21 19:58:22,520 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-21 19:58:22,533 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance already exists. 2023-09-21 19:58:22,535 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo": Instance already exists. 2023-09-21 19:58:31,469 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2023-09-21 19:58:31,485 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Stats": Instance already exists. 2023-09-21 19:58:31,488 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2023-09-21 19:58:31,488 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started 2023-09-21 19:58:31,496 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=MetricsSystem,sub=Control": Instance already exists. 2023-09-21 19:58:31,499 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics1-datateam-sandbox-qa-demo": Instance already exists. 2023-09-21 19:58:31,521 WARN org.apache.hadoop.metrics2.util.MBeans [] - Failed to register MBean "Hadoop:service=s3a-file-system,name=S3AMetrics5-datateam-sandbox-qa-demo": Instance already exists. # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] 2023-09-21 19:58:49,570 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream [] - Application invoked the Syncable API against stream writing to tmp1/Arkansas/.hoodie_partition_metadata_1. This is unsupported
No comments:
Post a Comment