Learn How to Move Data From MongoDB to Apache Hudi Uisng PySpark¶
Spin up MongoDB¶
version: '3.7'
services:
mongodb_container:
image: mongo:latest
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: rootpassword
ports:
- 27017:27017
In [1]:
!pip install pymongo
Define imports¶
In [2]:
try:
import os
import pandas as pd
import sys
import io
import pymongo
import json
from pymongo import MongoClient
from bson.objectid import ObjectId
print("All Modules loaded ")
except Exception as e:
print("Error : {} ".format(e))
Insert sample data into MongoDB¶
In [3]:
CONNECTION_URL = "mongodb://root:rootpassword@localhost:27017"
client = MongoClient(host=CONNECTION_URL)
client.list_database_names()
Out[3]:
insert Documents¶
In [4]:
client['db']['orders'].insert_one(
{
'order_id': 'e23cdeec-5f1b-4b2b-8ed7-7d1a505603d5',
'name': 'Show beat federal.',
'order_value': '898',
'priority': 'MEDIUM',
'order_date': '2023-12-21',
'customer_id': '2ac220b5-2524-4977-95fc-c93080cf0e6a',
'ts': '170549934'
}
)
Out[4]:
Read teh documenst¶
In [5]:
for x in client['db']['orders'].find({}):
print(x)
break
Apache PySpark¶
In [39]:
import os
import sys
from pyspark.sql import SparkSession
# Define Spark and Hudi versions
SPARK_VERSION = '3.4'
HUDI_VERSION = '0.14.0'
global spark
def create_spark_session():
MONGO_DB_VERSION = "3.0.0" # Assuming MongoDB version for the example
SUBMIT_ARGS = (
f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},"
f"org.mongodb.spark:mongo-spark-connector_2.12:{MONGO_DB_VERSION} pyspark-shell"
)
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.appName("mongodb_load") \
.master('local') \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
return spark
spark = create_spark_session()
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
enable_meta_data_indexing,
use_sql_transformer, sql_transformer_query,
target_path, index_type, method='upsert', clustering_column='default'):
"""
Upserts a dataframe into a Hudi table.
Args:
glue_database (str): The name of the glue database.
table_name (str): The name of the Hudi table.
record_id (str): The name of the field in the dataframe that will be used as the record key.
precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
partition_fields this is used to parrtition data
enable_partition (bool): Whether or not to enable partitioning.
enable_cleaner (bool): Whether or not to enable data cleaning.
enable_hive_sync (bool): Whether or not to enable syncing with Hive.
use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
sql_transformer_query (str): The SQL query to use for data transformation.
target_path (str): The path to the target Hudi table.
method (str): The Hudi write method to use (default is 'upsert').
index_type : BLOOM or GLOBAL_BLOOM
Returns:
None
"""
# These are the basic settings for the Hoodie table
hudi_final_settings = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.table.type": table_type,
"hoodie.datasource.write.operation": method,
"hoodie.datasource.write.recordkey.field": record_id,
"hoodie.datasource.write.precombine.field": precomb_key,
}
# These settings enable syncing with Hive
hudi_hive_sync_settings = {
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": glue_database,
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
}
# These settings enable automatic cleaning of old data
hudi_cleaner_options = {
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "true",
"hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
"hoodie.cleaner.fileversions.retained": "3",
"hoodie-conf hoodie.cleaner.parallelism": '200',
'hoodie.cleaner.commits.retained': 5
}
# These settings enable partitioning of the data
partition_settings = {
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.datasource.hive_sync.partition_fields": partition_fields,
"hoodie.datasource.write.hive_style_partitioning": "true",
}
hudi_clustering = {
"hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
"hoodie.clustering.inline": "true",
"hoodie.clustering.plan.strategy.sort.columns": clustering_column,
"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
"hoodie.clustering.plan.strategy.small.file.limit": "629145600"
}
# Define a dictionary with the index settings for Hudi
hudi_index_settings = {
"hoodie.index.type": index_type, # Specify the index type for Hudi
}
# Define a dictionary with the Fiel Size
hudi_file_size = {
"hoodie.parquet.max.file.size": 512 * 1024 * 1024, # 512MB
"hoodie.parquet.small.file.limit": 104857600, # 100MB
}
hudi_meta_data_indexing = {
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.async": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.check.timeout.seconds": "60",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
}
if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
for key, value in hudi_meta_data_indexing.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
for key, value in hudi_clustering.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# Add the Hudi index settings to the final settings dictionary
for key, value in hudi_index_settings.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
for key, value in hudi_file_size.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
# If partitioning is enabled, add the partition settings to the final settings
if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
for key, value in partition_settings.items(): hudi_final_settings[key] = value
# If data cleaning is enabled, add the cleaner options to the final settings
if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value
# If Hive syncing is enabled, add the Hive sync settings to the final settings
if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value
# If there is data to write, apply any SQL transformations and write to the target path
if spark_df.count() > 0:
if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
spark_df.createOrReplaceTempView("temp")
spark_df = spark.sql(sql_transformer_query)
spark_df.write.format("hudi"). \
options(**hudi_final_settings). \
mode("append"). \
save(target_path)
def load_mongodb_data(spark, source_config):
MONGO_DB_HOST = source_config.get("host", "localhost")
MONGO_DB_DATABASE = source_config.get("database", "db")
MONGO_DB_COLLECTION = source_config.get("collection", "orders")
MONGO_DB_USERNAME = source_config.get("username", "root")
MONGO_DB_PASSWORD = source_config.get("password", "rootpassword")
mongodb_uri = (
f"mongodb://{MONGO_DB_USERNAME}:{MONGO_DB_PASSWORD}@"
f"{MONGO_DB_HOST}:27017"
)
df = spark.read.format("mongo") \
.option("uri", mongodb_uri) \
.option("database", MONGO_DB_DATABASE) \
.option("collection", MONGO_DB_COLLECTION) \
.load().createTempView(MONGO_DB_COLLECTION)
print(f"loaded table {MONGO_DB_DATABASE}.{MONGO_DB_COLLECTION}")
def main():
json_config = {
"sources": [
{
"host": "localhost",
"database": "db",
"collection": "orders",
"username": "root",
"password": "rootpassword",
"version": "3.0.0"
}
],
}
for source in json_config.get("sources", []):
load_mongodb_data(spark, source)
df = spark.sql("""
SELECT
_id.oid AS _id,
customer_id,
name,
order_date,
order_id,
order_value,
priority,
ts
FROM
orders
""")
df.show()
upsert_hudi_table(
glue_database="default",
table_name="orders",
record_id="order_id",
precomb_key="order_date",
table_type='COPY_ON_WRITE',
partition_fields="order_date",
method='upsert',
index_type='BLOOM',
enable_partition=True,
enable_cleaner=True,
enable_hive_sync=False,
enable_clustering='False',
clustering_column='default',
enable_meta_data_indexing='false',
use_sql_transformer=False,
sql_transformer_query='default',
target_path="file:////Users/soumilshah/IdeaProjects/SparkProject/mongodblabs/hudi/",
spark_df=df,
)
main()
Read Data From Hudi Tables¶
In [40]:
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import pandas as pd # Import Pandas library for pretty printing
print("Imports loaded ")
except Exception as e:
print("error", e)
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/mongodblabs/hudi/"
spark.read.format("org.apache.hudi").load(path).createOrReplaceTempView("hudi_snapshot")
print("\n")
spark.sql("SELECT * FROM hudi_snapshot LIMIT 1").printSchema()
print("\n")
print("\n")
spark.sql("SELECT * FROM hudi_snapshot limit 10;").show()
print("\n")
print("Printing List of Columns")
print("=======================", end="\n")
for column in spark.sql("SELECT * FROM hudi_snapshot LIMIT 1").columns:
print("column name: ", column)
print("=======================", end="\n")
No comments:
Post a Comment