Step 1 : Create Kinesis Streams¶
In [18]:
import boto3
import os
def create_kinesis_stream(stream_name, shard_count):
try:
# Initialize the Kinesis client
kinesis_client = boto3.client('kinesis')
# Create the Kinesis stream
response = kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)
# Check for successful response
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Kinesis stream '{stream_name}' created with {shard_count} shard(s)")
else:
print("Failed to create Kinesis stream")
except Exception as e:
print(f"Error: {str(e)}")
def delete_kinesis_stream(stream_name):
try:
# Initialize the Kinesis client
kinesis_client = boto3.client('kinesis')
# Delete the Kinesis stream
response = kinesis_client.delete_stream(
StreamName=stream_name
)
# Check for successful response
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Kinesis stream '{stream_name}' deleted successfully")
else:
print("Failed to delete Kinesis stream")
except Exception as e:
print(f"Error: {str(e)}")
create_kinesis_stream("stocks-stream", 1)
Kinesis stream 'stocks-stream' created with 1 shard(s)
Step 2 : Publish some Dummy Data into Input Streams¶
In [23]:
import datetime
import json
import random
import boto3
import uuid # Import the uuid module
STREAM_NAME = "stocks-stream"
def get_data():
# Generate a UUID as a string
unique_id = str(uuid.uuid4())
return {
'uuid': unique_id, # Add the 'uuid' field
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2)
}
def generate(stream_name, kinesis_client, num_samples):
for _ in range(num_samples):
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
if __name__ == '__main__':
num_samples = 10 # Change this to the desired number of samples
generate(STREAM_NAME, boto3.client('kinesis'), num_samples)
{'uuid': 'ee586054-bfdf-4fa2-9a2c-6b03d2ef6cd5', 'event_time': '2023-09-25T18:48:37.126292', 'ticker': 'INTC', 'price': 42.42} {'uuid': 'c719a76d-e10c-4442-92fe-9a1659df03ba', 'event_time': '2023-09-25T18:48:37.556000', 'ticker': 'AAPL', 'price': 51.21} {'uuid': '0806e3ac-0820-4712-9c47-fbdea3080446', 'event_time': '2023-09-25T18:48:37.657274', 'ticker': 'MSFT', 'price': 93.99} {'uuid': '372c5a69-5511-4819-9fdf-1f88fb425495', 'event_time': '2023-09-25T18:48:37.755060', 'ticker': 'AAPL', 'price': 86.91} {'uuid': '8b74aa98-c307-4a1b-a859-1aa68dee0e13', 'event_time': '2023-09-25T18:48:37.849373', 'ticker': 'MSFT', 'price': 25.36} {'uuid': '5d09f2c1-eaed-4176-b00c-93b109be8bf3', 'event_time': '2023-09-25T18:48:37.945560', 'ticker': 'TBV', 'price': 60.0} {'uuid': 'a5ecf26c-fb55-45b5-aad3-024626bf4101', 'event_time': '2023-09-25T18:48:38.041130', 'ticker': 'TBV', 'price': 25.0} {'uuid': '948d30cd-a155-4559-9659-2986f92dc665', 'event_time': '2023-09-25T18:48:38.138086', 'ticker': 'AMZN', 'price': 36.52} {'uuid': 'd476eac3-6ff0-49fa-9e64-02504700531f', 'event_time': '2023-09-25T18:48:38.234057', 'ticker': 'TBV', 'price': 63.51} {'uuid': '4ba781fd-c615-49d3-9cc3-f55749204a48', 'event_time': '2023-09-25T18:48:38.331207', 'ticker': 'AAPL', 'price': 77.17}
Download and Upload JAR to S3¶
In [21]:
import boto3
import requests
def download_and_upload_to_s3(url, bucket_name, s3_key):
# Download the JAR file
response = requests.get(url)
if response.status_code == 200:
jar_content = response.content
# Upload to S3
s3_client = boto3.client('s3')
s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=jar_content)
print(f"Uploaded {s3_key} to {bucket_name}")
else:
print(f"Failed to download {url}")
if __name__ == "__main__":
# URLs of the JAR files you want to download
jar_urls = [
"https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.15-bundle/0.13.1/hudi-flink1.15-bundle-0.13.1.jar",
"https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.15.0/flink-s3-fs-hadoop-1.15.0.jar",
]
# S3 bucket name and S3 keys (object keys) for uploaded JARs
bucket_name = 'XX'
s3_keys = [
'hudi-flink1.15-bundle-0.13.1.jar',
'flink-s3-fs-hadoop-1.15.0.jar',
]
for i, jar_url in enumerate(jar_urls):
download_and_upload_to_s3(jar_url, bucket_name, s3_keys[i])
Uploaded hudi-flink1.15-bundle-0.13.1.jar to sample-backup-us-west-1 Uploaded flink-s3-fs-hadoop-1.15.0.jar to sample-backup-us-west-1
paste This commands on Zeppellin notebook¶
In [ ]:
%flink.conf
execution.checkpointing.interval 5000
In [ ]:
%flink.ssql(type=update)
DROP TABLE IF EXISTS stocks_stream_table;
CREATE TABLE stocks_stream_table (
uuid VARCHAR,
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'stocks-stream',
'aws.region' = 'us-west-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
In [ ]:
%flink.ssql(type=update)
DROP TABLE IF EXISTS stock_table_hudi;
CREATE TABLE IF NOT EXISTS stock_table_hudi (
uuid VARCHAR PRIMARY KEY NOT ENFORCED,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3)
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'hudi',
'path' = 's3a://XXXX/tmp/',
'table.type' = 'MERGE_ON_READ',
'hoodie.embed.timeline.server' = 'false'
);
In [ ]:
%ssql
INSERT INTO stock_table_hudi
SELECT UUID(), ticker, price, event_time FROM stocks_stream_table;
Clean Up¶
In [24]:
delete_kinesis_stream("stocks-stream")
Kinesis stream 'stocks-stream' deleted successfully
No comments:
Post a Comment