Partial updates in Apache Hudi¶
Partial updates in Apache Hudi refer to the capability of updating specific fields or attributes within a record or a dataset, rather than updating the entire record. Apache Hudi is an open-source data management framework that provides efficient and scalable incremental processing for big data workloads.¶
Advantages of using partial updates in Apache Hudi include:¶
Reduced data movement: Partial updates minimize the need to transfer and rewrite large amounts of data. Instead of updating the entire record, only the changed fields or attributes are updated, resulting in reduced network traffic and improved overall performance.
Efficient storage utilization: With partial updates, only the modified fields are stored, leading to improved storage efficiency. This is particularly beneficial when dealing with large datasets, as it reduces the amount of disk space required to store the data.
Faster processing: Partial updates enable faster data processing and analytics. Since only the changed fields are processed, there is a reduction in the amount of data that needs to be read and processed, resulting in faster query response times and improved overall performance.
Data consistency: Partial updates can help maintain data consistency when dealing with concurrent updates. By updating specific fields within a record, Apache Hudi ensures that only the modified fields are affected, minimizing the chances of conflicts and ensuring data integrity.
Optimized resource utilization: Partial updates can help optimize resource utilization in distributed computing environments. By updating only the necessary fields, computational resources such as CPU and memory are utilized more efficiently, leading to improved system performance and scalability.
Easier data maintenance: Partial updates simplify the process of managing and maintaining large datasets. Since only the changed fields are updated, it becomes easier to track and manage changes over time, making it more manageable to handle data updates and ensuring data accuracy.
Step 1: Define Import and Spark Session¶
# https://hudi.apache.org/docs/next/record_payload/#overwritenondefaultswithlatestavropayload
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
import faker
import datetime
from datetime import datetime
from faker import Faker
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import time
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
except Exception as e:
print(e)
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Define Data Generator Class¶
import random
import datetime
from faker import Faker
faker = Faker()
def get_customer_data(total_customers=2):
customers_array = []
for i in range(total_customers):
customer_data = {
"customer_id": str(i),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.datetime.now().isoformat().__str__(),
"salary": get_random_salary()
}
customers_array.append(customer_data)
return customers_array
def get_random_salary():
if random.random() < 0.5: # 50% chance of returning None
return None
else:
return random.randint(20000, 80000) # Generate a random salary between 20000 and 80000
Method to Upsert into Hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.PartialUpdateAvroPayload'
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
global total_customers, order_data_sample_size
total_customers = 5
table_schema = StructType([
StructField("customer_id", StringType(), nullable=False),
StructField("name", StringType(), nullable=True), # Change nullable property to True
StructField("state", StringType(), nullable=True),
StructField("city", StringType(), nullable=True),
StructField("created_at", StringType(), nullable=True),
StructField("email", StringType(), nullable=True),
StructField("salary", IntegerType(), nullable=True)
])
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=table_schema)
spark_df_customers.select([
'customer_id',
'name',
'state',
'city',
'created_at',
"salary"
]).show(truncate=False)
+-----------+------------------+--------------+------------------+---------------------------+------+ |customer_id|name |state |city |created_at |salary| +-----------+------------------+--------------+------------------+---------------------------+------+ |0 |Carolyn Calderon |South Carolina|West Geraldborough|porterchristina@example.org|69285 | |1 |Mackenzie Martinez|Maryland |Lake Nicole |tdean@example.org |34853 | |2 |Sydney Ayala |Rhode Island |Port Patrickstad |kevin20@example.com |52640 | |3 |Elizabeth Jones |Michigan |Clarkborough |maurice87@example.net |null | |4 |Jacob Barnes |Arizona |Michellehaven |lewisjennifer@example.net |48149 | +-----------+------------------+--------------+------------------+---------------------------+------+
UPSERT into Hudi tables¶
upsert_hudi_table(
db_name='hudidb',
table_name='dim_customers',
record_id='customer_id',
precomb_key='customer_id',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
path = "file:///C:/tmp/hudidb/dim_customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("snapshot")
spark.sql("select * from snapshot").select([
'customer_id',
'name',
'state',
'city',
'created_at',
"salary",
"_hoodie_commit_time"
]).show(truncate=False)
path file:///C:/tmp/hudidb/dim_customers +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+ |customer_id|name |state |city |created_at |salary|_hoodie_commit_time| +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+ |1 |Mackenzie Martinez|Maryland |Lake Nicole |tdean@example.org |34853 |20230518073201741 | |0 |Carolyn Calderon |South Carolina|West Geraldborough|porterchristina@example.org|69285 |20230518073201741 | |4 |Jacob Barnes |Arizona |Michellehaven |lewisjennifer@example.net |48149 |20230518073201741 | |3 |Elizabeth Jones |Michigan |Clarkborough |maurice87@example.net |null |20230518073201741 | |2 |Sydney Ayala |Rhode Island |Port Patrickstad |kevin20@example.com |52640 |20230518073201741 | +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
Assume i want to update Customer ID 1¶
# =====================partial updates ================
changed_item = [{
'customer_id': '1',
"state":"CT",
'city': 'stamford'
}]
update_df_customers = spark.createDataFrame(data=changed_item, schema=table_schema)
update_df_customers.show()
+-----------+----+-----+--------+----------+-----+------+ |customer_id|name|state| city|created_at|email|salary| +-----------+----+-----+--------+----------+-----+------+ | 1|null| CT|stamford| null| null| null| +-----------+----+-----+--------+----------+-----+------+
Upserting into Hudi¶
upsert_hudi_table(
db_name='hudidb',
table_name='dim_customers',
record_id='customer_id',
precomb_key='customer_id',
spark_df=update_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/dim_customers
path = "file:///C:/tmp/hudidb/dim_customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("new_snapshot")
spark.sql("select * from new_snapshot").select([
'customer_id',
'name',
'state',
'city',
'created_at',
"salary",
"_hoodie_commit_time"
]).show(truncate=False)
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+ |customer_id|name |state |city |created_at |salary|_hoodie_commit_time| +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+ |1 |Mackenzie Martinez|CT |stamford |tdean@example.org |34853 |20230518073305092 | |0 |Carolyn Calderon |South Carolina|West Geraldborough|porterchristina@example.org|69285 |20230518073201741 | |4 |Jacob Barnes |Arizona |Michellehaven |lewisjennifer@example.net |48149 |20230518073201741 | |3 |Elizabeth Jones |Michigan |Clarkborough |maurice87@example.net |null |20230518073201741 | |2 |Sydney Ayala |Rhode Island |Port Patrickstad |kevin20@example.com |52640 |20230518073201741 | +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
Lets say i want to update the salary where its Null and i want to set default value¶
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
path = "file:///C:/tmp/hudidb/dim_customers"
# Read data into a DataFrame
df = spark.read.format("hudi").load(path).createOrReplaceTempView("new_snapshots")
fill_df = spark.sql("""
SELECT customer_id,
COALESCE(name, '') AS name,
COALESCE(state, '') AS state,
COALESCE(city, '') AS city,
COALESCE(created_at, '') AS created_at,
COALESCE(email, '') AS email,
COALESCE(salary, 0) AS salary
FROM new_snapshots
WHERE salary IS NULL
""")
fill_df.show()
+-----------+---------------+--------+------------+--------------------+--------------------+------+ |customer_id| name| state| city| created_at| email|salary| +-----------+---------------+--------+------------+--------------------+--------------------+------+ | 3|Elizabeth Jones|Michigan|Clarkborough|maurice87@example...|2023-05-18T07:25:...| 0| +-----------+---------------+--------+------------+--------------------+--------------------+------+
upserting this into Hudi to fill 0 where salary is null¶
upsert_hudi_table(
db_name='hudidb',
table_name='dim_customers',
record_id='customer_id',
precomb_key='customer_id',
spark_df=fill_df,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/dim_customers
Reading from hudi tables¶
path = "file:///C:/tmp/hudidb/dim_customers"
spark.read.format("hudi").load(path).createOrReplaceTempView("snapshot")
spark.sql("select * from snapshot").select([
'customer_id',
'name',
'state',
'city',
'created_at',
"salary",
"_hoodie_commit_time"
]).show(truncate=False)
+-----------+------------------+--------------+------------------+---------------------------+------+-------------------+ |customer_id|name |state |city |created_at |salary|_hoodie_commit_time| +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+ |1 |Mackenzie Martinez|CT |stamford |tdean@example.org |34853 |20230518075732194 | |0 |Carolyn Calderon |South Carolina|West Geraldborough|porterchristina@example.org|69285 |20230518073201741 | |4 |Jacob Barnes |Arizona |Michellehaven |lewisjennifer@example.net |48149 |20230518073201741 | |3 |Elizabeth Jones |Michigan |Clarkborough |maurice87@example.net |0 |20230518075806943 | |2 |Sydney Ayala |Rhode Island |Port Patrickstad |kevin20@example.com |52640 |20230518073201741 | +-----------+------------------+--------------+------------------+---------------------------+------+-------------------+
No comments:
Post a Comment