How to Unlock Data Insights from Hudi Metrics for Your Data Lake using Elastic Search and Kibana¶
Setup Elastic Stack Locally¶
https://github.com/deviantony/docker-elk
git clone https://github.com/deviantony/docker-elk.git
docker-compose up setup
docker-compose up
http://localhost:5601/app/home#/
username elastic
password changeme
Backend¶
import json
import requests
from flask import Flask, request
from datetime import datetime
app = Flask(__name__)
# Elasticsearch server configuration
elasticsearch_url = "http://localhost:9200"
index_name = "hudi" # Use your desired index name
username = "elastic"
password = "changeme"
@app.route('/', methods=["POST"])
def index():
json_data = request.json
print(json_data)
print(type(json_data))
# Add an "ingestionTime" field with the current datetime
json_data["ingestionTime"] = datetime.now().isoformat()
# Ingest the JSON data into Elasticsearch using a PUT request
put_url = f"{elasticsearch_url}/{index_name}/_doc/{json_data['commitTime']}"
headers = {'Content-Type': 'application/json'}
# Provide Elasticsearch credentials
auth = (username, password)
# Send the PUT request to insert the JSON data
response = requests.put(put_url, data=json.dumps(json_data), headers=headers, auth=auth)
if response.status_code == 201:
return 'Data Ingested'
else:
return f'Failed to insert data. Status code: {response.status_code}'
if __name__ == "__main__":
app.run(debug=True)
Step 1: Define imports¶
In [1]:
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
print("Imports loaded ")
except Exception as e:
print("error",e)
Imports loaded
Step 2: Create Spark Session¶
In [2]:
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
Warning: Ignoring non-Spark config property: className
:: loading settings :: url = jar:file:/Users/soumilnitinshah/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/soumilnitinshah/.ivy2/cache The jars for the packages stored in: /Users/soumilnitinshah/.ivy2/jars org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-d59215b8-7f92-4d80-989d-c6a324727973;1.0 confs: [default] found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in central :: resolution report :: resolve 211ms :: artifacts dl 6ms :: modules in use: org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 1 | 0 | 0 | 0 || 1 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-d59215b8-7f92-4d80-989d-c6a324727973 confs: [default] 0 artifacts copied, 1 already retrieved (0kB/10ms) 23/10/28 13:57:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Hudi Settings¶
In [3]:
db_name = "hudidb"
table_name = "employees"
path = f"file:///Users/soumilnitinshah/Downloads/{db_name}/{table_name}" # Updated path
method = 'upsert'
table_type = "COPY_ON_WRITE"
recordkey = "emp_id"
precombine = "emp_id"
lambda_function_url = "http://127.0.0.1:5000"
partition_fields = "state"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields
,"hoodie.write.commit.callback.http.url":lambda_function_url
,"hoodie.write.commit.callback.on":"true"
,"hoodie.write.commit.callback.http.timeout.seconds":"180"
}
Data Generator Class¶
In [4]:
import faker
import uuid
import random
global faker
import datetime
import time
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.state(),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
random.choice(["2021","2022","2023"]),
faker.month()
) for x in range(samples)
]
Inserting Documents into datalake¶
100 samples¶
In [6]:
data = DataGenerator.get_data(samples=100)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
1000 samples¶
In [7]:
data = DataGenerator.get_data(samples=1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
10000 samples¶
In [8]:
data = DataGenerator.get_data(samples=10000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
100000 samples¶
In [30]:
data = DataGenerator.get_data(samples=100000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
23/10/28 12:57:56 WARN TaskSetManager: Stage 611 contains a task of very large size (1196 KiB). The maximum recommended task size is 1000 KiB.
Delete Workloads¶
In [35]:
path = 'file:///Users/soumilnitinshah/Downloads/hudidb/employees'
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
spark.sql("select * from hudi_snapshot where salary >= 50000 ").count()
Out[35]:
79616
In [36]:
path = 'file:///Users/soumilnitinshah/Downloads/hudidb/employees'
spark.read.format("hudi").load(path).createOrReplaceTempView("hudi_snapshot")
delete_df = spark.sql("select * from hudi_snapshot where salary >= 50000 ")
db_name = "hudidb"
table_name = "employees"
method = 'delete'
table_type = "COPY_ON_WRITE"
recordkey = "emp_id"
precombine = "emp_id"
lambda_function_url = "http://127.0.0.1:5000"
partition_fields = "state"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields
,"hoodie.write.commit.callback.http.url":lambda_function_url
,"hoodie.write.commit.callback.on":"true"
,"hoodie.write.commit.callback.http.timeout.seconds":"180"
}
delete_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
23/10/28 13:14:05 WARN Executor: Managed memory leak detected; size = 5269600 bytes, task 0.0 in stage 659.0 (TID 3842)
No comments:
Post a Comment