Getting Started with Open Data lineage | Marquez Project | Apache Hudi Spark jobs¶
Clone the project¶
git clone git@github.com:MarquezProject/marquez.git && cd marquez
cd docker
./up.sh --api-port 9000
Step 1: Define Imports¶
In [1]:
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
Step 2: Define Spark Session¶
In [2]:
os.environ['JAVA_HOME'] = '/opt/homebrew/Cellar/openjdk@11/11.0.22/libexec/openjdk.jdk/Contents/Home'
In [3]:
import os
import sys
import uuid
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
OPENLINEAGE_SPARK_VERSION = '1.8.0' # Use version 1.7.0
openlineage_package = f"io.openlineage:openlineage-spark:{OPENLINEAGE_SPARK_VERSION}"
# Appending OpenLineage package to the existing packages
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},{openlineage_package} pyspark-shell"
# Setting environment variables
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Now you can continue with your SparkSession creation
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \
.config("spark.openlineage.transport.type", "http") \
.config("spark.openlineage.transport.url", "http://localhost:9000/api/v1/lineage") \
.config("spark.openlineage.namespace", "apache-hudi") \
.config("spark.openlineage.parentJobName", "customer-job") \
.config("spark.openlineage.parentRunId", uuid.uuid4().__str__()) \
.getOrCreate()
Data Generator Class¶
In [4]:
global faker
faker = Faker()
def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"state": faker.state(),
"city": faker.city(),
"email": faker.email(),
"created_at": datetime.now().isoformat().__str__(),
"address": faker.address(),
"salary": faker.random_int(min=30000, max=100000)
}
customers_array.append(customer_data)
return customers_array
In [5]:
global total_customers, order_data_sample_size
total_customers = 10000
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
schema=list(customer_data[0].keys()))
spark_df_customers.show(3)
Define Method to Write Data¶
In [6]:
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
metadata_column_stats=""
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/{db_name}/{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
}
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Write to Hudi¶
In [7]:
write_to_hudi(
spark_df=spark_df_customers,
db_name="hudidb",
table_name="customers",
recordkey="customer_id",
precombine="created_at",
partition_fields="state",
metadata_column_stats="salary"
)
In [8]:
spark.stop()
New Dataset¶
In [9]:
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
import random
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
os.environ['JAVA_HOME'] = '/opt/homebrew/Cellar/openjdk@11/11.0.22/libexec/openjdk.jdk/Contents/Home'
import os
import sys
import uuid
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
OPENLINEAGE_SPARK_VERSION = '1.8.0' # Use version 1.7.0
openlineage_package = f"io.openlineage:openlineage-spark:{OPENLINEAGE_SPARK_VERSION}"
# Appending OpenLineage package to the existing packages
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},{openlineage_package} pyspark-shell"
# Setting environment variables
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Now you can continue with your SparkSession creation
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \
.config("spark.openlineage.transport.type", "http") \
.config("spark.openlineage.transport.url", "http://localhost:9000/api/v1/lineage") \
.config("spark.openlineage.namespace", "apache-hudi") \
.config("spark.openlineage.parentJobName", "customer-aggreate-job") \
.config("spark.openlineage.parentRunId", uuid.uuid4().__str__()) \
.getOrCreate()
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"
# Load data and create temporary view
spark.read.format("hudi") \
.load(path) \
.createOrReplaceTempView("hudi_snapshot1")
# Execute SQL query to calculate average salary per state
result_df = spark.sql("""
SELECT state, AVG(salary) AS average_salary
FROM hudi_snapshot1
GROUP BY state
""")
output_path = "./average_salary"
result_df.write \
.format("parquet") \
.mode("overwrite") \
.save(output_path)
spark.stop()
No comments:
Post a Comment