Saturday, January 20, 2024

Learn How to Move Data From MongoDB to Apache Hudi Uisng PySpark

Untitled

Learn How to Move Data From MongoDB to Apache Hudi Uisng PySpark

Screenshot 2024-01-20 at 3.49.28 PM.png

Spin up MongoDB


version: '3.7'

services:
  mongodb_container:
    image: mongo:latest
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: rootpassword
    ports:
      - 27017:27017
In [1]:
!pip install pymongo
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
Requirement already satisfied: pymongo in /opt/homebrew/lib/python3.11/site-packages (4.6.1)
Requirement already satisfied: dnspython<3.0.0,>=1.16.0 in /opt/homebrew/lib/python3.11/site-packages (from pymongo) (2.5.0)
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'
WARNING: Skipping /opt/homebrew/lib/python3.11/site-packages/six-1.16.0-py3.11.egg-info due to invalid metadata entry 'name'

Define imports

In [2]:
try:
    import os
    import pandas as pd
    import sys
    import io
    import pymongo
    import json
    from pymongo import MongoClient
    from bson.objectid import ObjectId

    print("All Modules loaded ")
except Exception as e:
    print("Error : {} ".format(e))
All Modules loaded 

Insert sample data into MongoDB

In [3]:
CONNECTION_URL = "mongodb://root:rootpassword@localhost:27017"
client = MongoClient(host=CONNECTION_URL)
client.list_database_names()
Out[3]:
['admin', 'config', 'db', 'local']

insert Documents

In [4]:
client['db']['orders'].insert_one(
    {
    'order_id': 'e23cdeec-5f1b-4b2b-8ed7-7d1a505603d5',
    'name': 'Show beat federal.',
    'order_value': '898',
    'priority': 'MEDIUM',
    'order_date': '2023-12-21',
    'customer_id': '2ac220b5-2524-4977-95fc-c93080cf0e6a',
    'ts': '170549934'
}
)
Out[4]:
InsertOneResult(ObjectId('65ad2d97bfce23c68c91353b'), acknowledged=True)

Read teh documenst

In [5]:
for x in client['db']['orders'].find({}):
    print(x)
    break
{'_id': ObjectId('65ac2284799d6cf112fba9f4'), 'order_id': 'e23cdeec-5f1b-4b2b-8ed7-7d1a505603d5', 'name': 'Show beat federal.', 'order_value': '898', 'priority': 'MEDIUM', 'order_date': '2023-12-21', 'customer_id': '2ac220b5-2524-4977-95fc-c93080cf0e6a', 'ts': '170549934'}

Apache PySpark

In [39]:
import os
import sys
from pyspark.sql import SparkSession

# Define Spark and Hudi versions
SPARK_VERSION = '3.4'
HUDI_VERSION = '0.14.0'

global spark


def create_spark_session():
    MONGO_DB_VERSION = "3.0.0"  # Assuming MongoDB version for the example

    SUBMIT_ARGS = (
        f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},"
        f"org.mongodb.spark:mongo-spark-connector_2.12:{MONGO_DB_VERSION} pyspark-shell"
    )

    os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
    os.environ['PYSPARK_PYTHON'] = sys.executable

    spark = SparkSession.builder \
        .appName("mongodb_load") \
        .master('local') \
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
        .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
        .config('className', 'org.apache.hudi') \
        .config('spark.sql.hive.convertMetastoreParquet', 'false') \
        .getOrCreate()

    return spark


spark = create_spark_session()


def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)


def load_mongodb_data(spark, source_config):
    MONGO_DB_HOST = source_config.get("host", "localhost")
    MONGO_DB_DATABASE = source_config.get("database", "db")
    MONGO_DB_COLLECTION = source_config.get("collection", "orders")
    MONGO_DB_USERNAME = source_config.get("username", "root")
    MONGO_DB_PASSWORD = source_config.get("password", "rootpassword")

    mongodb_uri = (
        f"mongodb://{MONGO_DB_USERNAME}:{MONGO_DB_PASSWORD}@"
        f"{MONGO_DB_HOST}:27017"
    )

    df = spark.read.format("mongo") \
        .option("uri", mongodb_uri) \
        .option("database", MONGO_DB_DATABASE) \
        .option("collection", MONGO_DB_COLLECTION) \
        .load().createTempView(MONGO_DB_COLLECTION)

    print(f"loaded table {MONGO_DB_DATABASE}.{MONGO_DB_COLLECTION}")


def main():
    json_config = {
        "sources": [
            {
                "host": "localhost",
                "database": "db",
                "collection": "orders",
                "username": "root",
                "password": "rootpassword",
                "version": "3.0.0"
            }
        ],
    }

    for source in json_config.get("sources", []):
        load_mongodb_data(spark, source)

    df = spark.sql("""
  SELECT 
    _id.oid AS _id,
    customer_id,
    name,
    order_date,
    order_id,
    order_value,
    priority,
    ts
  FROM 
    orders
""")
    df.show()
    upsert_hudi_table(
        glue_database="default",
        table_name="orders",
        record_id="order_id",
        precomb_key="order_date",
        table_type='COPY_ON_WRITE',
        partition_fields="order_date",
        method='upsert',
        index_type='BLOOM',
        enable_partition=True,
        enable_cleaner=True,
        enable_hive_sync=False,
        enable_clustering='False',
        clustering_column='default',
        enable_meta_data_indexing='false',
        use_sql_transformer=False,
        sql_transformer_query='default',
        target_path="file:////Users/soumilshah/IdeaProjects/SparkProject/mongodblabs/hudi/",
        spark_df=df,
    )


main()

Read Data From Hudi Tables

In [40]:
try:
    import os
    import sys
    import uuid
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import pandas as pd  # Import Pandas library for pretty printing

    print("Imports loaded ")

except Exception as e:
    print("error", e)

HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()


path = "file:///Users/soumilshah/IdeaProjects/SparkProject/mongodblabs/hudi/"
spark.read.format("org.apache.hudi").load(path).createOrReplaceTempView("hudi_snapshot")

print("\n")
spark.sql("SELECT * FROM hudi_snapshot LIMIT 1").printSchema()
print("\n")

print("\n")
spark.sql("SELECT * FROM hudi_snapshot limit 10;").show()
print("\n")

print("Printing List of Columns")
print("=======================", end="\n")
for column in spark.sql("SELECT * FROM hudi_snapshot LIMIT 1").columns:
    print("column name: ", column)
print("=======================", end="\n")
Imports loaded 
24/01/20 15:45:50 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/01/20 15:45:50 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_value: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- order_date: string (nullable = true)





+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+---------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                 _id|         customer_id|              name|            order_id|order_value|priority|       ts|order_date|
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+---------+----------+
|  20240120153657937|20240120153657937...|65ac2284799d6cf11...|  order_date=2023-1...|c6e11a39-4390-4f5...|65ac2284799d6cf11...|2ac220b5-2524-497...|Show beat federal.|e23cdeec-5f1b-4b2...|        898|  MEDIUM|170549934|2023-12-21|
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+---------+----------+



Printing List of Columns
=======================
column name:  _hoodie_commit_time
column name:  _hoodie_commit_seqno
column name:  _hoodie_record_key
column name:  _hoodie_partition_path
column name:  _hoodie_file_name
column name:  _id
column name:  customer_id
column name:  name
column name:  order_id
column name:  order_value
column name:  priority
column name:  ts
column name:  order_date
=======================

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...