Friday, February 23, 2024

Getting Started with Open Data lineage | Marquez Project | Apache Hudi Spark jobs

Untitled1

Getting Started with Open Data lineage | Marquez Project | Apache Hudi Spark jobs

Clone the project

git clone git@github.com:MarquezProject/marquez.git && cd marquez
cd docker
./up.sh --api-port 9000

Step 1: Define Imports

In [1]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing
    print("Imports loaded ")

except Exception as e:
    print("error", e)
Imports loaded 

Step 2: Define Spark Session

In [2]:
os.environ['JAVA_HOME'] = '/opt/homebrew/Cellar/openjdk@11/11.0.22/libexec/openjdk.jdk/Contents/Home'
In [3]:
import os
import sys
import uuid 


HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
OPENLINEAGE_SPARK_VERSION = '1.8.0'  # Use version 1.7.0
openlineage_package = f"io.openlineage:openlineage-spark:{OPENLINEAGE_SPARK_VERSION}"

# Appending OpenLineage package to the existing packages
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},{openlineage_package} pyspark-shell"

# Setting environment variables
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Now you can continue with your SparkSession creation
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \
    .config("spark.openlineage.transport.type", "http") \
    .config("spark.openlineage.transport.url", "http://localhost:9000/api/v1/lineage") \
    .config("spark.openlineage.namespace", "apache-hudi") \
    .config("spark.openlineage.parentJobName", "customer-job") \
    .config("spark.openlineage.parentRunId", uuid.uuid4().__str__()) \
    .getOrCreate()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
io.openlineage#openlineage-spark added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6b31df74-4da1-45e8-bead-37f1b783f9e7;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in spark-list
	found io.openlineage#openlineage-spark;1.8.0 in central
:: resolution report :: resolve 80ms :: artifacts dl 2ms
	:: modules in use:
	io.openlineage#openlineage-spark;1.8.0 from central in [default]
	org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from spark-list in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-6b31df74-4da1-45e8-bead-37f1b783f9e7
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/4ms)
24/02/23 18:04:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Data Generator Class

In [4]:
global faker
faker = Faker()


def get_customer_data(total_customers=2):
    customers_array = []
    for i in range(0, total_customers):
        customer_data = {
            "customer_id": str(uuid.uuid4()),
            "name": faker.name(),
            "state": faker.state(),
            "city": faker.city(),
            "email": faker.email(),
            "created_at": datetime.now().isoformat().__str__(),
            "address": faker.address(),
           "salary": faker.random_int(min=30000, max=100000) 
        }
        customers_array.append(customer_data)
    return customers_array
In [5]:
global total_customers, order_data_sample_size
total_customers = 10000
customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
                                           schema=list(customer_data[0].keys()))
spark_df_customers.show(3)
[Stage 0:>                                                          (0 + 1) / 1]
+--------------------+---------------+----------+----------------+--------------------+--------------------+--------------------+------+
|         customer_id|           name|     state|            city|               email|          created_at|             address|salary|
+--------------------+---------------+----------+----------------+--------------------+--------------------+--------------------+------+
|1f5a7bd7-2328-410...|Ryan Aguilar MD|New Jersey|    Zacharymouth|wattsbrad@example...|2024-02-23T18:04:...|5543 Ashley Alley...| 47935|
|e69869d3-23cd-41e...|   Timothy Hunt|  Michigan|        Jameston|  rbrown@example.org|2024-02-23T18:04:...|3153 Hodges Exten...| 77380|
|61686ff6-6a4e-4c8...| Michael Jordan|   Indiana|East Nicoleville|brownpaul@example...|2024-02-23T18:04:...|930 Jones Drives ...| 84387|
+--------------------+---------------+----------+----------------+--------------------+--------------------+--------------------+------+
only showing top 3 rows

                                                                                

Define Method to Write Data

In [6]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='', 
                 metadata_column_stats=""
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/{db_name}/{table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        
    }

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

Write to Hudi

In [7]:
write_to_hudi(
    spark_df=spark_df_customers,
    db_name="hudidb",
    table_name="customers",
    recordkey="customer_id",
    precombine="created_at",
    partition_fields="state",
    metadata_column_stats="salary"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers


24/02/23 18:05:09 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/02/23 18:05:09 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/02/23 18:05:11 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
                                                                                
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
                                                                                
In [8]:
spark.stop()

New Dataset

In [9]:
try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing
    print("Imports loaded ")

except Exception as e:
    print("error", e)
os.environ['JAVA_HOME'] = '/opt/homebrew/Cellar/openjdk@11/11.0.22/libexec/openjdk.jdk/Contents/Home'

import os
import sys
import uuid 


HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
OPENLINEAGE_SPARK_VERSION = '1.8.0'  # Use version 1.7.0
openlineage_package = f"io.openlineage:openlineage-spark:{OPENLINEAGE_SPARK_VERSION}"

# Appending OpenLineage package to the existing packages
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},{openlineage_package} pyspark-shell"

# Setting environment variables
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Now you can continue with your SparkSession creation
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \
    .config("spark.openlineage.transport.type", "http") \
    .config("spark.openlineage.transport.url", "http://localhost:9000/api/v1/lineage") \
    .config("spark.openlineage.namespace", "apache-hudi") \
    .config("spark.openlineage.parentJobName", "customer-aggreate-job") \
    .config("spark.openlineage.parentRunId", uuid.uuid4().__str__()) \
    .getOrCreate()

path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/hudidb/customers"

# Load data and create temporary view
spark.read.format("hudi") \
    .load(path) \
    .createOrReplaceTempView("hudi_snapshot1")

# Execute SQL query to calculate average salary per state
result_df = spark.sql("""
    SELECT state, AVG(salary) AS average_salary
    FROM hudi_snapshot1
    GROUP BY state
""")


output_path = "./average_salary" 

result_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(output_path)

spark.stop()
Imports loaded 

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...