Soumil Nitin Shah¶
Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |
- Website : http://soumilshah.com/
- Github: https://github.com/soumilshah1995
- Linkedin: https://www.linkedin.com/in/shah-soumil/
- Blog: https://soumilshah1995.blogspot.com/
- Youtube : https://www.youtube.com/channel/UC_eOodxvwS_H7x2uLQa-svw?view_as=subscriber
- Facebook Page : https://www.facebook.com/soumilshah1995/
- Email : shahsoumil519@gmail.com
projects : https://soumilshah.herokuapp.com/project
I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as data Team Lead at Jobtarget where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search
RFC - 18 : Hudi Announces Insert Overwrite API¶
- This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning steps in the upsert write path completely.
Advantages of Insert Overwrite API¶
Faster Batch ETL Jobs: If you are recomputing entire target partitions in batch ETL jobs, the "insert overwrite" API can be faster than upsert. This is because it allows you to bypass indexing, precombining, and other repartitioning steps that are required in the upsert write path.
Efficient Data Processing: For tables or partitions where the majority of records change in every cycle, upsert or merge operations can be inefficient. The "insert overwrite" API allows you to ignore all existing data and commit only the new data provided, making the data processing more efficient.
Operational Tasks: The "insert overwrite" API can also be used for certain operational tasks, such as fixing a specific corrupted partition. You can perform an "insert overwrite" operation on that partition with records from the source, which can be much faster than restore and replay for some data sources.
Overall, the "insert overwrite" API in Hudi provides a convenient and efficient way to handle certain use cases, and can help improve the performance of batch ETL jobs and other data processing tasks.
Here is an example scenario where the "insert overwrite" API in Hudi can be used:¶
Suppose you have a Hudi table that stores data for daily transactions. The table is partitioned by date, with each partition representing a single day's worth of transactions. Due to a system error, the partition for a particular date (let's say March 17th) becomes corrupted, and some of the data in that partition is lost.
To fix the corrupted partition, you need to restore the lost data from a backup source. However, replaying the entire backup data for that partition can take a long time and impact the performance of your system.
In this scenario, you can use the "insert overwrite" API to fix the corrupted partition. You can extract the backup data for March 17th from your source, create a new Hudi file with just that data, and overwrite the existing partition with the new file. This will replace the corrupted data with the backup data and fix the issue.
Overall, the "insert overwrite" API provides a faster and more efficient way to fix corrupted partitions and replace data in certain use cases.
Read More¶
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
except Exception as e:
print("error",e)
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Define Settings for Hudi¶
db_name = "hudidb"
table_name = "hudi_table"
recordkey = 'uuid,country'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "date"
method = 'upsert'
table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ
PARTITION_FIELD = "country"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
}
Step 4: performing inserts on datalake¶
spark_df = spark.createDataFrame(
data=[
(1, "insert 1", "2020-01-06 12:12:12", "IN"),
(2, "insert 2", "2020-01-06 12:12:13", "US"),
(3, "insert 3", "2020-01-06 12:12:15", "IN"),
(4, "insert 4", "2020-01-06 12:13:15", "US"),
],
schema=["uuid", "message", "date", "country"])
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 1|insert 1|2020-01-06 12:12:12| IN| | 2|insert 2|2020-01-06 12:12:13| US| | 3|insert 3|2020-01-06 12:12:15| IN| | 4|insert 4|2020-01-06 12:13:15| US| +----+--------+-------------------+-------+
Performing Insert Overwrite¶
db_name = "hudidb"
table_name = "hudi_table"
recordkey = 'uuid,country'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "date"
method = 'insert_overwrite'
table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ
PARTITION_FIELD = "country"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
}
spark_df = spark.createDataFrame(
data=[
(1, "update 1", "2020-01-06 12:12:12", "IN"),
(3, "update 3", "2020-01-06 12:12:15", "IN"),
],
schema=["uuid", "message", "date", "country"])
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 1|update 1|2020-01-06 12:12:12| IN| | 3|update 3|2020-01-06 12:12:15| IN| +----+--------+-------------------+-------+
df = spark. \
read. \
format("hudi"). \
load(path)
df.select(["uuid", "message", "date", "country"]).show()
+----+--------+-------------------+-------+ |uuid| message| date|country| +----+--------+-------------------+-------+ | 2|insert 2|2020-01-06 12:12:13| US| | 4|insert 4|2020-01-06 12:13:15| US| | 1|update 1|2020-01-06 12:12:12| IN| | 3|update 3|2020-01-06 12:12:15| IN| +----+--------+-------------------+-------+
No comments:
Post a Comment