Soumil Nitin Shah¶
Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |
- Website : http://soumilshah.com/
- Github: https://github.com/soumilshah1995
- Linkedin: https://www.linkedin.com/in/shah-soumil/
- Blog: https://soumilshah1995.blogspot.com/
- Youtube : https://www.youtube.com/channel/UC_eOodxvwS_H7x2uLQa-svw?view_as=subscriber
- Facebook Page : https://www.facebook.com/soumilshah1995/
- Email : shahsoumil519@gmail.com
projects : https://soumilshah.herokuapp.com/project
I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as data Team Lead at Jobtarget where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search
What is Consistent hashing ?¶
- Consistent Hashing is a distributed hashing scheme that operates independently of the number of servers or objects in a distributed hash table by assigning them a position on an abstract circle, or hash ring. This allows servers and objects to scale without affecting the overall system.
Hudi supports Upsert operation to de-duplicate records in a table, which depends on indexing schemes to perform record location lookup. Among many index options, bucket index (in progress, RFC-29) achieves promising Upsert performance, around ~3x improvement on throughput compared to using Bloom Filter. However, it requires pre-configure a fixed bucket number and cannot be changed afterwards.
Combined with the design of one-one mapping between hash buckets and file groups, hudi tables with bucket index have some practical issues, such as data skew and unlimited file group size, which now can only be resolved by resetting a suitable bucket number through re-writing the whole table.
- Problems can be solved by introducing Consistent Hashing Index. It achieves bucket resizing by splitting or merging several local buckets (i.e., only large file groups) while leaving most buckets untouched. This feature allows us to adjust bucket number dynamically in a background service with minimal impacts on downstream systems relying on Hudi. For example, concurrent readers and writers are not blocked during the resizing.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import datetime
except Exception as e:
pass
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Data Generator¶
import faker
import uuid
import random
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
random.choice(["2020","2021","2022","2023"]),
faker.month()
) for x in range(samples)
]
Step 3: Define Hudi Settings¶
db_name = "hudidb"
table_name = "hudi_bucket_consistent_hasing_index"
recordkey = 'emp_id,state'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "ts"
method = 'upsert'
table_type = "MERGE_ON_READ"
PARTITION_FIELD = 'year'
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.datasource.write.row.writer.enable":"false"
,"hoodie.metadata.enable":"false"
,"hoodie.index.type":"BUCKET"
,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
,'hoodie.bucket.index.max.num.buckets':128
,'hoodie.bucket.index.min.num.buckets':32
## do split if the bucket size reach 1.5 * max_file_size
,"hoodie.bucket.index.split.threshold":1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
,"hoodie.bucket.index.merge.threshold": 0.1
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
,"hoodie.clustering.inline":"true"
,"hoodie.clustering.inline.max.commit":2
,"hoodie.clustering.inline.max.commits":2
,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"
,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
,"hoodie.clean.automatic": "true"
, "hoodie.clean.async": "true"
, "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
, "hoodie.cleaner.fileversions.retained": "3"
, "hoodie-conf hoodie.cleaner.parallelism": '200'
, 'hoodie.cleaner.commits.retained': 2
}
Performing UPSERT¶
for i in range(1, 10):
print("Batch :{} ".format(i))
data = DataGenerator.get_data(100)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
end = datetime.datetime.now()
print(f"Execution Time {end-start}")
Batch :1 Execution Time 0:00:23.924048 Batch :2
Consistent Hashing Index is still an evolving feature and currently there are some limitations to use it as of 0.13.0:¶
This index is supported only for Spark engine using a MOR table.¶
- It does not work with metadata table enabled.
- To scale up or shrink the buckets, users have to manually trigger clustering using above configs (at some cadence), but they cannot have compaction concurrently running.
So, if compaction is enabled with your regular write pipeline, please follow this recommendation: You can choose to trigger the scale/shrink once every 12 hours. In such cases, once every 12 hours, you might need to disable compaction, stop your write pipeline and enable clustering. You should take extreme care to not run both concurrently because it might result in conflicts and a failed pipeline. Once clustering is complete, you can resume your regular write pipeline, which will have compaction enabled.
Referneces¶
- https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index#RFC29:HashIndex-Bucketingtableandhashindex
- https://hudi.apache.org/blog/2020/11/11/hudi-indexing-mechanisms/
- https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md
- https://aws.amazon.com/blogs/big-data/part-1-build-your-apache-hudi-data-lake-on-aws-using-amazon-emr/
- https://hudi.apache.org/releases/release-0.13.0/
No comments:
Post a Comment