Soumil Nitin Shah¶
Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |
- Website : http://soumilshah.com/
- Github: https://github.com/soumilshah1995
- Linkedin: https://www.linkedin.com/in/shah-soumil/
- Blog: https://soumilshah1995.blogspot.com/
- Youtube : https://www.youtube.com/channel/UC_eOodxvwS_H7x2uLQa-svw?view_as=subscriber
- Facebook Page : https://www.facebook.com/soumilshah1995/
- Email : shahsoumil519@gmail.com
projects : https://soumilshah.herokuapp.com/project
I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as data Team Lead at Jobtarget where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search
Topic : Bucket Index¶
Bucket indexes are suitable for upsert use cases on huge datasets with a large number of file groups within partitions, relatively even data distribution across partitions, and can achieve relatively even data distribution on the bucket hash field column.
It can have better upsert performance in these cases due to no index lookup involved as file groups are located based on a hashing mechanism, which is very fast.
This is totally different from both simple and Bloom indexes, where an explicit index lookup step is involved during write.
The buckets here has one-one mapping with the hudi file group and since the total number of buckets (defined by hoodie.bucket.index.num.buckets(default – 4)) is fixed here, it can potentially lead to skewed data (data distributed unevenly across buckets) and scalability (buckets can grow over time) issues over time.
These issues will be addressed in the upcoming consistent hashing bucket index, which is going to be a special type of bucket index.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import datetime
except Exception as e:
pass
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Data Generator¶
import faker
import uuid
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
faker.year(),
faker.month()
) for x in range(samples)
]
Step 3: Define Hudi Settings¶
db_name = "hudidb"
table_name = "hudi_bucket_index"
recordkey = 'emp_id,state'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "ts"
method = 'upsert'
table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ
BUCKET_INDEX_HASH_FEILD = 'state'
PARTITION_FIELD = 'year'
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
# "hoodie.upsert.shuffle.parallelism":100
,"hoodie.index.type":"BUCKET"
,"hoodie.index.bucket.engine" : 'SIMPLE'
,'hoodie.storage.layout.partitioner.class':'org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner'
,'hoodie.bucket.index.num.buckets':"4"
,"hoodie.bucket.index.hash.field":BUCKET_INDEX_HASH_FEILD
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
}
Performing UPSERT on 1M Records on Bucket Index¶
data = DataGenerator.get_data(1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
%%time
data = DataGenerator.get_data(1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
end = datetime.datetime.now()
print(f"Execution Time {end-start}")
Execution Time 0:00:30.175152 Wall time: 30.5 s
Tip¶
To calculate the total number of buckets needed for bucketing partition in Hive, you need to consider the following factors:¶
Size of the data: The total number of buckets should be proportional to the size of the data. Larger datasets require more buckets to evenly distribute the data.
Memory resources: The total number of buckets also depends on the available memory resources on your system. You need to ensure that you have enough memory to support the number of buckets you are creating.
Hardware configuration: The total number of buckets also depends on your hardware configuration. You need to consider factors such as the number of CPUs, memory, and disk space available.
Data skewness: If your data is skewed, you may need to create more buckets to evenly distribute the data.
- Once you have considered these factors, you can use the following formula to calculate the total number of buckets:
total_number_of_buckets = (size_of_data_in_bytes / block_size) skew_factor replication_factor¶
- size_of_data_in_bytes: The total size of your data in bytes.
- block_size: The Hadoop block size. By default, it is 128 MB.
- skew_factor: A value between 1 and 2. This factor is used to adjust the number of buckets to account for data skewness. If your data is evenly distributed, use a skew_factor of 1. If your data is skewed, use a higher value.
- replication_factor: The number of replicas for each bucket. By default, it is 1.
Example¶
- For example, if you have a total data size of 10 GB, a block size of 128 MB, a skew_factor of 1.5, and a replication_factor of 1, the total number of buckets needed would be:
total_number_of_buckets = (10 1024 1024 1024 / 128) 1.5 * 1 = 115,343¶
Referneces¶
- https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index#RFC29:HashIndex-Bucketingtableandhashindex
- https://hudi.apache.org/blog/2020/11/11/hudi-indexing-mechanisms/
- https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md
- https://aws.amazon.com/blogs/big-data/part-1-build-your-apache-hudi-data-lake-on-aws-using-amazon-emr/
No comments:
Post a Comment