Spark Your Data Analysis with Hudi: Mastering Join Techniques for Raw Tables and Beyond¶
- A JOIN operation in SQL combines rows from two or more tables into a single result set based on a common column between them. JOIN is a fundamental operation in data analysis and is used extensively to combine data from multiple tables for further analysis.
Spark SQL is a powerful tool for processing structured data in a distributed computing environment. It provides a unified interface for querying data across a variety of data sources, including Hive tables, Parquet files, and JSON data. With Spark SQL, you can easily perform JOIN operations on large datasets in a highly scalable and efficient way.
One of the biggest advantages of using Spark SQL-based joins is the ability to process large datasets in parallel across a cluster of machines. This significantly reduces the time required to process the data, making it ideal for big data applications. In addition, Spark SQL provides a rich set of join operations, including inner joins, outer joins, left and right outer joins, and cross joins.
When it comes to Hudi, it is an open-source data management framework that provides efficient data ingestion, storage, and management capabilities. Hudi tables are designed to support both batch and real-time data processing, making it an ideal choice for modern data applications. Hudi also provides support for incremental data processing, which allows you to process only the changed data since the last run, reducing the processing time and improving performance.
Combining the power of Spark SQL and Hudi allows you to perform complex join operations on large datasets in a highly efficient and scalable way. By leveraging these technologies, you can quickly process and analyze vast amounts of data, gain valuable insights, and make better data-driven decisions.
Step 1: Define Imports¶
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import pandas as pd
except Exception as e:
pass
Step 2: Create Spark Session¶
SUBMIT_ARGS = "--packages org.postgresql:postgresql:42.5.4,org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
Step 3: Define Data Generator¶
from faker import Faker
global faker
faker = Faker()
def get_dummy_data(samples=100):
customers_array = []
orders_array = []
for i in range(0, samples):
order_id = uuid.uuid4().__str__()
customer_id = uuid.uuid4().__str__()
partition_key = uuid.uuid4().__str__()
order_data = {
"orderid": order_id,
"customer_id": customer_id,
"ts": datetime.now().isoformat().__str__(),
"order_value": random.randint(10, 1000).__str__(),
"priority": random.choice(["LOW", "MEDIUM", "URGENT"])
}
customer_data = {
"customer_id": customer_id,
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"ts": datetime.now().isoformat().__str__()
}
customers_array.append(customer_data)
orders_array.append(order_data)
return orders_array, customers_array
Step 4: Define Function to perform Upsert into hudi tables¶
def upsert_hudi_table(
db_name,
table_name,
record_id,
precomb_key,
spark_df,
table_type='COPY_ON_WRITE',
method='upsert',
):
path = f"file:///C:/tmp/{db_name}/{table_name}"
print("path", path, end="\n")
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': record_id,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precomb_key,
}
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Step 5: Sample Preview of DF¶
order_data, customer_data = get_dummy_data()
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data], schema=list(order_data[0].keys()))
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data], schema=list(customer_data[0].keys()))
print(spark_df_orders.show(2))
print(spark_df_customers.show(2))
+--------------------+--------------------+--------------------+-----------+--------+ | orderid| customer_id| ts|order_value|priority| +--------------------+--------------------+--------------------+-----------+--------+ |0ce1a594-e51f-45a...|4741c933-721d-423...|2023-04-25T11:48:...| 313| URGENT| |582ee83f-c149-407...|b3b387cd-79fa-44d...|2023-04-25T11:48:...| 338| URGENT| +--------------------+--------------------+--------------------+-----------+--------+ only showing top 2 rows None +--------------------+------------------+-----+----------+--------------------+--------------------+ | customer_id| name|state| city| email| ts| +--------------------+------------------+-----+----------+--------------------+--------------------+ |4741c933-721d-423...|Richard Sanchez MD|Texas|Carterstad|newmanmatthew@exa...|2023-04-25T11:48:...| |b3b387cd-79fa-44d...| Vanessa Dudley| Iowa| Port Tina| hsuarez@example.com|2023-04-25T11:48:...| +--------------------+------------------+-----+----------+--------------------+--------------------+ only showing top 2 rows None
Insert into Hudi tables¶
order_data, customer_data = get_dummy_data()
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data], schema=list(order_data[0].keys()))
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data], schema=list(customer_data[0].keys()))
upsert_hudi_table(
db_name='hudidb',
table_name='customers',
record_id='customer_id',
precomb_key='ts',
spark_df=spark_df_customers,
table_type='COPY_ON_WRITE',
method='upsert',
)
upsert_hudi_table(
db_name='hudidb',
table_name='orders',
record_id='orderid',
precomb_key='ts',
spark_df=spark_df_orders,
table_type='COPY_ON_WRITE',
method='upsert',
)
path file:///C:/tmp/hudidb/customers path file:///C:/tmp/hudidb/orders
Simple Joins Two Hudi tables¶
customers = "file:///C:/tmp/hudidb/customers"
order_hudi = "file:///C:/tmp/hudidb/orders"
spark. \
read. \
format("hudi"). \
load(customers). \
createOrReplaceTempView("hudi_snapshot_customers")
spark. \
read. \
format("hudi"). \
load(order_hudi). \
createOrReplaceTempView("hudi_snapshot_orders")
(createOrReplaceTempView is a method in Spark SQL that creates a temporary view of a DataFrame or Dataset, which can be queried using SQL statements.
When you call createOrReplaceTempView on a DataFrame or Dataset, it creates a temporary view in the SparkSession's catalog. The view can be accessed using a SQL query as if it were a table in a database. The name of the temporary view must be unique within the SparkSession and can be used in subsequent Spark SQL statements.
The createOrReplaceTempView method is useful when you need to perform ad hoc analysis on a DataFrame or Dataset and want to use SQL to query it. By creating a temporary view, you can leverage the full power of SQL to manipulate and analyze data, without having to write complex code in Spark SQL API.
It's worth noting that the temporary view created by createOrReplaceTempView is only available for the duration of the SparkSession in which it was created. If you want to create a view that is available across multiple SparkSessions, you can create a permanent view using createOrReplaceGlobalTempView.
Performing Joins¶
Show me Top state where order have been recived¶
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
SQL_QUERY ="""
SELECT
tbl_customers.state,
COUNT(tbl_orders.orderid) AS total_orders
FROM hudi_snapshot_orders as tbl_orders
JOIN hudi_snapshot_customers as tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
GROUP BY tbl_customers.state
ORDER BY total_orders DESC
LIMIT 5;
"""
df = spark.sql(SQL_QUERY)
# convert the DataFrame to a pandas DataFrame
pandas_df = df.toPandas()
# plot the result using seaborn
sns.set(style="whitegrid")
ax = sns.barplot(x="state", y="total_orders", data=pandas_df)
plt.show()
Show me total order value by state¶
query = """
SELECT tbl_customers.state,
SUM(tbl_orders.order_value) AS total_order_value
FROM hudi_snapshot_orders as tbl_orders
JOIN hudi_snapshot_customers as tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
GROUP BY
tbl_customers.state
ORDER BY
total_order_value DESC
limit 5;
"""
# run the SQL query and store the result in a DataFrame
df = spark.sql(query)
# convert the DataFrame to a pandas DataFrame
pandas_df = df.toPandas()
# plot the result using seaborn
sns.set(style="whitegrid")
ax = sns.barplot(x="state", y="total_order_value", data=pandas_df)
plt.title("state vs total order value")
plt.show()
No comments:
Post a Comment