Install Flink and Python¶
conda info --envs
# Create ENV
conda create -n my-flink-environment pip python=3.8
# Activate ENV
conda activate my-flink-environment
# Install Flink
pip install apache-flink
# Install Jupyter Notebook
pip install jupyter
# Make sure java 11 is installed
java -version
In [1]:
! pip show apache-flink
! java -version
! Python --version
In [2]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@11'
Flink¶
Steo 1: Create table Enviroments¶
In [3]:
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
In [4]:
for file in os.listdir("jar"):
if "jar" in file: print(file)
Load Jar¶
In [5]:
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"jar/flink-s3-fs-hadoop-1.17.1.jar",
"jar/hudi-flink1.17-bundle-0.14.0.jar",
# "jar/flink-s3-fs-presto-1.17.1.jar",
# "jar/hadoop-aws-2.9.0.jar",
# "jar/hadoop-common-2.8.3.jar",
# "jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar",
# "jar/bundle-2.20.18.jar"
]
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
jar_urls
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
Out[5]:
Creating Hudi table¶
In [6]:
hudi_output_path = 'file:////Users/soumilshah/Desktop/my-flink-environment/hudi/'
hudi_sink = f"""
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}' ,
'table.type' = 'MERGE_ON_READ'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
Out[6]:
Inserting Data¶
In [7]:
# Define the SQL query to select data from the Hudi table source
query = """
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
"""
table_env.execute_sql(query)
Out[7]:
Querying Hudi Tables¶
In [8]:
# Define the SQL query to select data from the Hudi table source
query = """
SELECT * FROM hudi_table where uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
"""
table_env.execute_sql(query).print()
Deleting from Hudi tables¶
In [9]:
# Define the SQL query to select data from the Hudi table source
query = """
UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
"""
table_env.execute_sql(query).print()
In [10]:
# Define the SQL query to select data from the Hudi table source
query = """
SELECT * FROM hudi_table where uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
"""
table_env.execute_sql(query).print()
Deleting from Hudi tables¶
In [11]:
# Define the SQL query to select data from the Hudi table source
query = """
DELETE FROM hudi_table where city='chennai'
"""
table_env.execute_sql(query).print()
In [12]:
# Define the SQL query to select data from the Hudi table source
query = """
SELECT * FROM hudi_table
"""
table_env.execute_sql(query).print()