Saturday, January 20, 2024

Learn How to Move Data From MongoDB to Apache Hudi Uisng PySpark

Untitled

Learn How to Move Data From MongoDB to Apache Hudi Uisng PySpark

Screenshot 2024-01-20 at 3.49.28 PM.png

Spin up MongoDB


version: '3.7'

services:
  mongodb_container:
    image: mongo:latest
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: rootpassword
    ports:
      - 27017:27017
In [1]:
!pip install pymongo
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
Requirement already satisfied: pymongo in /opt/homebrew/lib/python3.11/site-packages (4.6.1)
Requirement already satisfied: dnspython<3.0.0,>=1.16.0 in /opt/homebrew/lib/python3.11/site-packages (from pymongo) (2.5.0)
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'

Define imports

In [2]:
try:
    import os
    import pandas as pd
    import sys
    import io
    import pymongo
    import json
    from pymongo import MongoClient
    from bson.objectid import ObjectId

    print("All Modules loaded ")
except Exception as e:
    print("Error : {} ".format(e))
All Modules loaded 

Insert sample data into MongoDB

In [3]:
CONNECTION_URL = "mongodb://root:rootpassword@localhost:27017"
client = MongoClient(host=CONNECTION_URL)
client.list_database_names()
Out[3]:
['admin', 'config', 'db', 'local']

insert Documents

In [4]:
client['db']['orders'].insert_one(
    {
    'order_id': 'e23cdeec-5f1b-4b2b-8ed7-7d1a505603d5',
    'name': 'Show beat federal.',
    'order_value': '898',
    'priority': 'MEDIUM',
    'order_date': '2023-12-21',
    'customer_id': '2ac220b5-2524-4977-95fc-c93080cf0e6a',
    'ts': '170549934'
}
)
Out[4]:
InsertOneResult(ObjectId('65ad2d97bfce23c68c91353b'), acknowledged=True)

Read teh documenst

In [5]:
for x in client['db']['orders'].find({}):
    print(x)
    break
{'_id': ObjectId('65ac2284799d6cf112fba9f4'), 'order_id': 'e23cdeec-5f1b-4b2b-8ed7-7d1a505603d5', 'name': 'Show beat federal.', 'order_value': '898', 'priority': 'MEDIUM', 'order_date': '2023-12-21', 'customer_id': '2ac220b5-2524-4977-95fc-c93080cf0e6a', 'ts': '170549934'}

Apache PySpark

In [39]:
import os
import sys
from pyspark.sql import SparkSession

# Define Spark and Hudi versions
SPARK_VERSION = '3.4'
HUDI_VERSION = '0.14.0'

global spark


def create_spark_session():
    MONGO_DB_VERSION = "3.0.0"  # Assuming MongoDB version for the example

    SUBMIT_ARGS = (
        f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},"
        f"org.mongodb.spark:mongo-spark-connector_2.12:{MONGO_DB_VERSION} pyspark-shell"
    )

    os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
    os.environ['PYSPARK_PYTHON'] = sys.executable

    spark = SparkSession.builder \
        .appName("mongodb_load") \
        .master('local') \
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
        .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
        .config('className', 'org.apache.hudi') \
        .config('spark.sql.hive.convertMetastoreParquet', 'false') \
        .getOrCreate()

    return spark


spark = create_spark_session()


def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)


def load_mongodb_data(spark, source_config):
    MONGO_DB_HOST = source_config.get("host", "localhost")
    MONGO_DB_DATABASE = source_config.get("database", "db")
    MONGO_DB_COLLECTION = source_config.get("collection", "orders")
    MONGO_DB_USERNAME = source_config.get("username", "root")
    MONGO_DB_PASSWORD = source_config.get("password", "rootpassword")

    mongodb_uri = (
        f"mongodb://{MONGO_DB_USERNAME}:{MONGO_DB_PASSWORD}@"
        f"{MONGO_DB_HOST}:27017"
    )

    df = spark.read.format("mongo") \
        .option("uri", mongodb_uri) \
        .option("database", MONGO_DB_DATABASE) \
        .option("collection", MONGO_DB_COLLECTION) \
        .load().createTempView(MONGO_DB_COLLECTION)

    print(f"loaded table {MONGO_DB_DATABASE}.{MONGO_DB_COLLECTION}")


def main():
    json_config = {
        "sources": [
            {
                "host": "localhost",
                "database": "db",
                "collection": "orders",
                "username": "root",
                "password": "rootpassword",
                "version": "3.0.0"
            }
        ],
    }

    for source in json_config.get("sources", []):
        load_mongodb_data(spark, source)

    df = spark.sql("""
  SELECT 
    _id.oid AS _id,
    customer_id,
    name,
    order_date,
    order_id,
    order_value,
    priority,
    ts
  FROM 
    orders
""")
    df.show()
    upsert_hudi_table(
        glue_database="default",
        table_name="orders",
        record_id="order_id",
        precomb_key="order_date",
        table_type='COPY_ON_WRITE',
        partition_fields="order_date",
        method='upsert',
        index_type='BLOOM',
        enable_partition=True,
        enable_cleaner=True,
        enable_hive_sync=False,
        enable_clustering='False',
        clustering_column='default',
        enable_meta_data_indexing='false',
        use_sql_transformer=False,
        sql_transformer_query='default',
        target_path="file:////Users/soumilshah/IdeaProjects/SparkProject/mongodblabs/hudi/",
        spark_df=df,
    )


main()

Read Data From Hudi Tables

In [40]:
try:
    import os
    import sys
    import uuid
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)

HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()


path = "file:///Users/soumilshah/IdeaProjects/SparkProject/mongodblabs/hudi/"
spark.read.format("org.apache.hudi").load(path).createOrReplaceTempView("hudi_snapshot")

print("\n")
spark.sql("SELECT * FROM hudi_snapshot LIMIT 1").printSchema()
print("\n")

print("\n")
spark.sql("SELECT * FROM hudi_snapshot limit 10;").show()
print("\n")

print("Printing List of Columns")
print("=======================", end="\n")
for column in spark.sql("SELECT * FROM hudi_snapshot LIMIT 1").columns:
    print("column name: ", column)
print("=======================", end="\n")
Imports loaded 
24/01/20 15:45:50 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/01/20 15:45:50 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_value: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- order_date: string (nullable = true)





+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+---------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                 _id|         customer_id|              name|            order_id|order_value|priority|       ts|order_date|
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+---------+----------+
|  20240120153657937|20240120153657937...|65ac2284799d6cf11...|  order_date=2023-1...|c6e11a39-4390-4f5...|65ac2284799d6cf11...|2ac220b5-2524-497...|Show beat federal.|e23cdeec-5f1b-4b2...|        898|  MEDIUM|170549934|2023-12-21|
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+---------+----------+



Printing List of Columns
=======================
column name:  _hoodie_commit_time
column name:  _hoodie_commit_seqno
column name:  _hoodie_record_key
column name:  _hoodie_partition_path
column name:  _hoodie_file_name
column name:  _id
column name:  customer_id
column name:  name
column name:  order_id
column name:  order_value
column name:  priority
column name:  ts
column name:  order_date
=======================

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...