Soumil Nitin Shah¶
Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |
- Website : http://soumilshah.com/
- Github: https://github.com/soumilshah1995
- Linkedin: https://www.linkedin.com/in/shah-soumil/
- Blog: https://soumilshah1995.blogspot.com/
- Youtube : https://www.youtube.com/channel/UC_eOodxvwS_H7x2uLQa-svw?view_as=subscriber
- Facebook Page : https://www.facebook.com/soumilshah1995/
- Email : shahsoumil519@gmail.com
projects : https://soumilshah.herokuapp.com/project
I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as data Team Lead at Jobtarget where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search
Push Hudi Commit Notification TO HTTP URI with Callback¶
Apache Hudi provides the ability to post a callback notification about a write commit. This may be valuable if you need an event notification stream to take actions with other services after a Hudi write commit.
When a Hudi write operation completes successfully, Hudi generates a callback payload with information about the write operation, such as the commit time, the number of records written, and the path to the Hudi dataset.
- Once the write commit callback is configured, Hudi will send the callback payload to the specified HTTP endpoint or Kafka server after each successful write commit. This can be used to trigger downstream processes, such as refreshing materialized views, updating search indexes, or triggering notifications.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
except Exception as e:
pass
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Define Settings for Hudi¶
db_name = "hudidb"
table_name = "hudi_table"
recordkey = 'uuid,country'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "date"
method = 'upsert'
table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ
PARTITION_FIELD = "country"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
,"hoodie.write.commit.callback.http.url":"http://localhost:5000/"
,"hoodie.write.commit.callback.on":"true"
,"hoodie.write.commit.callback.http.timeout.seconds":"180"
}
Step 4: performing inserts on datalake¶
spark_df = spark.createDataFrame(
data=[
(1, "insert 1", "2020-01-06 12:12:12", "IN"),
(2, "insert 2", "2020-01-06 12:12:13", "US"),
(3, "insert 3", "2020-01-06 12:12:15", "IN"),
(4, "insert 4", "2020-01-06 12:13:15", "US"),
],
schema=["uuid", "message", "date", "country"])
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 1|insert 1|2020-01-06 12:12:12| IN| | 2|insert 2|2020-01-06 12:12:13| US| | 3|insert 3|2020-01-06 12:12:15| IN| | 4|insert 4|2020-01-06 12:13:15| US| +----+--------+-------------------+-------+
Performing Updates on datalakes¶
spark_df = spark.createDataFrame(
data=[
(1, "update 1", "2020-01-06 12:12:12", "IN"),
],
schema=["uuid", "message", "date", "country"])
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 1|update 1|2020-01-06 12:12:12| IN| +----+--------+-------------------+-------+
Flask APP¶
import json
import json
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', methods=["GET", "POST"])
def index():
json_data = request.json
print(json.dumps(json_data, indent=3))
return 'Web App with Python Flask!'
app.run(debug=True)
Payload Recieved¶
{
"commitTime": "20230318110232555",
"tableName": "hudi_table",
"basePath": "file:///C:/tmp/hudidb/hudi_table",
"hoodieWriteStat": [
{
"fileId": "89b25969-2243-4597-9792-e92071abcf91-0",
"path": "IN/89b25969-2243-4597-9792-e92071abcf91-0_0-1275-5246_20230318110232555.parquet",
"cdcStats": null,
"prevCommit": "null",
"numWrites": 2,
"numDeletes": 0,
"numUpdateWrites": 0,
"numInserts": 2,
"totalWriteBytes": 435290,
"totalWriteErrors": 0,
"tempPath": null,
"partitionPath": "IN",
"totalLogRecords": 0,
"totalLogFilesCompacted": 0,
"totalLogSizeCompacted": 0,
"totalUpdatedRecordsCompacted": 0,
"totalLogBlocks": 0,
"totalCorruptLogBlock": 0,
"totalRollbackBlocks": 0,
"fileSizeInBytes": 435290,
"minEventTime": null,
"maxEventTime": null,
"runtimeStats": {
"totalScanTime": 0,
"totalUpsertTime": 0,
"totalCreateTime": 755
}
},
{
"fileId": "6c8531bf-b654-4aaa-b93e-0fd39c58f8ea-0",
"path": "US/6c8531bf-b654-4aaa-b93e-0fd39c58f8ea-0_1-1275-5247_20230318110232555.parquet",
"cdcStats": null,
"prevCommit": "null",
"numWrites": 2,
"numDeletes": 0,
"numUpdateWrites": 0,
"numInserts": 2,
"totalWriteBytes": 435296,
"totalWriteErrors": 0,
"tempPath": null,
"partitionPath": "US",
"totalLogRecords": 0,
"totalLogFilesCompacted": 0,
"totalLogSizeCompacted": 0,
"totalUpdatedRecordsCompacted": 0,
"totalLogBlocks": 0,
"totalCorruptLogBlock": 0,
"totalRollbackBlocks": 0,
"fileSizeInBytes": 435296,
"minEventTime": null,
"maxEventTime": null,
"runtimeStats": {
"totalScanTime": 0,
"totalUpsertTime": 0,
"totalCreateTime": 756
}
}
]
}
No comments:
Post a Comment