Implementing Keyword Search in Apache Hudi: Building Inverted Indexes and Scaling for TB of Text Data with Record Level Index and MetaData indexing and Point Lookups¶
Step 1 Creating Spark Session¶
In [49]:
!pip install pyfarmhash
!pip install nltk
In [1]:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, collect_list, udf
from pyspark.sql.types import StringType
import hashlib
import farmhash
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
Create Hudi Dataframe¶
In [99]:
from pyspark.sql.types import StringType, StructType, StructField
schema = StructType([
    StructField("text", StringType(), True),
    StructField("_id", StringType(), False)  # Assuming _id is a string type
])
# Define the sample texts with hard-coded IDs
texts = [
    ("I like to eat broccoli and bananas.", "1"),
    ("I ate a banana and spinach smoothie for breakfast.", "2"),
    ("I eat a pizza", "3"),
    ("i had coffee ", "4")
]
# Create DataFrame
df = spark.createDataFrame(texts, schema)
df.show(truncate=False)
Function to UPSERT into Hudi¶
In [100]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='RECORD_INDEX'
                 ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"
    hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.index.type": index_type,
    "hoodie.enable.data.skipping": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
    "hoodie.metadata.record.index.enable": "true",
    "hoodie.parquet.max.file.size": 512 * 1024 * 1024  # 512MB
    ,"hoodie.parquet.small.file.limit": 104857600  # 100MB
        
        
    }
        
        
    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [101]:
write_to_hudi(
    spark_df=df,
    db_name="default",
    table_name="bronze_documents",
    recordkey="_id",
    precombine="_id",
    partition_fields="",
    index_type="RECORD_INDEX"
)
Define UDf to remove stop words and clean text and get Hash from keywords¶
In [102]:
# Define stop words
stop_words = set(stopwords.words('english'))
# Function to clean and hash keywords
def clean_and_hash_keyword(keyword):
    keyword = keyword.lower().strip()
    if keyword not in stop_words and keyword.isalpha():  # Remove stop words and non-alphabetic tokens
        return str(farmhash.hash32(keyword))
    return None
    
# Register UDF for cleaning and hashing keywords
clean_and_hash_udf = udf(clean_and_hash_keyword, StringType())
Read from hudi Tables and apply tokenization¶
In [103]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"
query = f"""
SELECT 
    text,_hoodie_commit_time,_id
FROM 
     hudi_table_changes('{path}', 'latest_state', 'earliest');
"""
df = spark.sql(query)
df.show()
# Tokenize text into keywords, clean and hash them
tokenized_df = df.withColumn("keyword", explode(split(col("text"), "\\s+"))) \
    .select("_id", "keyword") \
    .withColumn("keyword_hash", clean_and_hash_udf(col("keyword"))) \
    .filter(col("keyword_hash").isNotNull())
# Show the cleaned and hashed tokenized DataFrame
tokenized_df.show(truncate=False)
# Create inverted index
inverted_index_df = tokenized_df.groupBy("keyword", "keyword_hash") \
    .agg(collect_list("_id").alias("document_ids"))
# Show the inverted index
inverted_index_df.show(truncate=False)
Create Inverted index Hudi tables¶
In [104]:
write_to_hudi(
    spark_df=inverted_index_df,
    db_name="default",
    table_name="inverted_index",
    recordkey="keyword_hash",
    precombine="keyword_hash",
    partition_fields="",
    index_type="RECORD_INDEX"
)
Searching items based on keyword¶
In [113]:
#Define the keyword to search
keyword = "eat"
keyword_hash = str(farmhash.hash32(keyword))
print(f"""
--------------
keyword : {keyword}
keyword_hash : {keyword_hash}
""")
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameinverted_index"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \
    .createOrReplaceTempView("inverted_index")
docs_path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(docs_path) \
    .createOrReplaceTempView("bronze_documents")
documents_query = f"""
    SELECT *
    FROM bronze_documents
    WHERE _hoodie_record_key IN (SELECT EXPLODE(document_ids) AS _hoodie_record_key FROM inverted_index WHERE keyword_hash = '{keyword_hash}')
"""
# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)
# Show the documents
documents_result.show()
Searching items based on complex sentences¶
In [139]:
# Define stop words
stop_words = set(stopwords.words('english'))
# Example sentence
sentence = "i had food today and coffee and banana"
# Split sentence into individual words and filter out stop words
words = [word.lower().strip() for word in sentence.split() if word.lower().strip() not in stop_words]
# Create DataFrame from words
words_df = spark.createDataFrame([(word,) for word in words], ["keyword"])
# Apply UDF to clean and hash the keywords
hashed_keywords_df = words_df.withColumn("hashed_keyword", clean_and_hash_udf("keyword"))
# Show the filtered and hashed keywords DataFrame
hashed_keywords_df.show(truncate=False)
In [145]:
# Clean and hash the keywords
cleaned_and_hashed_keywords = [clean_and_hash_keyword(word) for word in words]
cleaned_and_hashed_keywords = [hash_val for hash_val in cleaned_and_hashed_keywords if hash_val is not None]
# Convert to a format suitable for SQL query
keyword_hash_list = ",".join(["'{}'".format(hash_val) for hash_val in cleaned_and_hashed_keywords])
# Query inverted index to fetch documents based on hashed keywords
inverted_index_query = f"""
    SELECT EXPLODE(document_ids) AS _hoodie_record_key
    FROM inverted_index
    WHERE keyword_hash IN ({keyword_hash_list})
"""
# Execute SQL query to fetch documents from inverted index
inverted_index_result = spark.sql(inverted_index_query)
# Show the documents from inverted index
inverted_index_result.show()
# Join with bronze_documents to get actual document details
documents_query = f"""
    SELECT bd.text,bd._id
    FROM bronze_documents bd
    JOIN (
        {inverted_index_query}
    ) AS ii
    ON bd._hoodie_record_key = ii._hoodie_record_key
"""
# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)
# Show the documents
documents_result.show(truncate=False)
 
No comments:
Post a Comment