Implementing Keyword Search in Apache Hudi: Building Inverted Indexes and Scaling for TB of Text Data with Record Level Index and MetaData indexing and Point Lookups¶
Step 1 Creating Spark Session¶
In [49]:
!pip install pyfarmhash
!pip install nltk
In [1]:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, collect_list, udf
from pyspark.sql.types import StringType
import hashlib
import farmhash
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
Create Hudi Dataframe¶
In [99]:
from pyspark.sql.types import StringType, StructType, StructField
schema = StructType([
StructField("text", StringType(), True),
StructField("_id", StringType(), False) # Assuming _id is a string type
])
# Define the sample texts with hard-coded IDs
texts = [
("I like to eat broccoli and bananas.", "1"),
("I ate a banana and spinach smoothie for breakfast.", "2"),
("I eat a pizza", "3"),
("i had coffee ", "4")
]
# Create DataFrame
df = spark.createDataFrame(texts, schema)
df.show(truncate=False)
Function to UPSERT into Hudi¶
In [100]:
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='RECORD_INDEX'
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.index.type": index_type,
"hoodie.enable.data.skipping": "true",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
"hoodie.metadata.record.index.enable": "true",
"hoodie.parquet.max.file.size": 512 * 1024 * 1024 # 512MB
,"hoodie.parquet.small.file.limit": 104857600 # 100MB
}
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
In [101]:
write_to_hudi(
spark_df=df,
db_name="default",
table_name="bronze_documents",
recordkey="_id",
precombine="_id",
partition_fields="",
index_type="RECORD_INDEX"
)
Define UDf to remove stop words and clean text and get Hash from keywords¶
In [102]:
# Define stop words
stop_words = set(stopwords.words('english'))
# Function to clean and hash keywords
def clean_and_hash_keyword(keyword):
keyword = keyword.lower().strip()
if keyword not in stop_words and keyword.isalpha(): # Remove stop words and non-alphabetic tokens
return str(farmhash.hash32(keyword))
return None
# Register UDF for cleaning and hashing keywords
clean_and_hash_udf = udf(clean_and_hash_keyword, StringType())
Read from hudi Tables and apply tokenization¶
In [103]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"
query = f"""
SELECT
text,_hoodie_commit_time,_id
FROM
hudi_table_changes('{path}', 'latest_state', 'earliest');
"""
df = spark.sql(query)
df.show()
# Tokenize text into keywords, clean and hash them
tokenized_df = df.withColumn("keyword", explode(split(col("text"), "\\s+"))) \
.select("_id", "keyword") \
.withColumn("keyword_hash", clean_and_hash_udf(col("keyword"))) \
.filter(col("keyword_hash").isNotNull())
# Show the cleaned and hashed tokenized DataFrame
tokenized_df.show(truncate=False)
# Create inverted index
inverted_index_df = tokenized_df.groupBy("keyword", "keyword_hash") \
.agg(collect_list("_id").alias("document_ids"))
# Show the inverted index
inverted_index_df.show(truncate=False)
Create Inverted index Hudi tables¶
In [104]:
write_to_hudi(
spark_df=inverted_index_df,
db_name="default",
table_name="inverted_index",
recordkey="keyword_hash",
precombine="keyword_hash",
partition_fields="",
index_type="RECORD_INDEX"
)
Searching items based on keyword¶
In [113]:
#Define the keyword to search
keyword = "eat"
keyword_hash = str(farmhash.hash32(keyword))
print(f"""
--------------
keyword : {keyword}
keyword_hash : {keyword_hash}
""")
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameinverted_index"
spark.read.format("hudi") \
.option("hoodie.enable.data.skipping", "true") \
.option("hoodie.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.load(path) \
.createOrReplaceTempView("inverted_index")
docs_path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"
spark.read.format("hudi") \
.option("hoodie.enable.data.skipping", "true") \
.option("hoodie.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.load(docs_path) \
.createOrReplaceTempView("bronze_documents")
documents_query = f"""
SELECT *
FROM bronze_documents
WHERE _hoodie_record_key IN (SELECT EXPLODE(document_ids) AS _hoodie_record_key FROM inverted_index WHERE keyword_hash = '{keyword_hash}')
"""
# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)
# Show the documents
documents_result.show()
Searching items based on complex sentences¶
In [139]:
# Define stop words
stop_words = set(stopwords.words('english'))
# Example sentence
sentence = "i had food today and coffee and banana"
# Split sentence into individual words and filter out stop words
words = [word.lower().strip() for word in sentence.split() if word.lower().strip() not in stop_words]
# Create DataFrame from words
words_df = spark.createDataFrame([(word,) for word in words], ["keyword"])
# Apply UDF to clean and hash the keywords
hashed_keywords_df = words_df.withColumn("hashed_keyword", clean_and_hash_udf("keyword"))
# Show the filtered and hashed keywords DataFrame
hashed_keywords_df.show(truncate=False)
In [145]:
# Clean and hash the keywords
cleaned_and_hashed_keywords = [clean_and_hash_keyword(word) for word in words]
cleaned_and_hashed_keywords = [hash_val for hash_val in cleaned_and_hashed_keywords if hash_val is not None]
# Convert to a format suitable for SQL query
keyword_hash_list = ",".join(["'{}'".format(hash_val) for hash_val in cleaned_and_hashed_keywords])
# Query inverted index to fetch documents based on hashed keywords
inverted_index_query = f"""
SELECT EXPLODE(document_ids) AS _hoodie_record_key
FROM inverted_index
WHERE keyword_hash IN ({keyword_hash_list})
"""
# Execute SQL query to fetch documents from inverted index
inverted_index_result = spark.sql(inverted_index_query)
# Show the documents from inverted index
inverted_index_result.show()
# Join with bronze_documents to get actual document details
documents_query = f"""
SELECT bd.text,bd._id
FROM bronze_documents bd
JOIN (
{inverted_index_query}
) AS ii
ON bd._hoodie_record_key = ii._hoodie_record_key
"""
# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)
# Show the documents
documents_result.show(truncate=False)
No comments:
Post a Comment