Sunday, July 14, 2024

Implementing Keyword Search in Apache Hudi: Building Inverted Indexes and Scaling for TB of Text Data with Record Level Index and MetaData indexing and Point Lookups

inverted

Implementing Keyword Search in Apache Hudi: Building Inverted Indexes and Scaling for TB of Text Data with Record Level Index and MetaData indexing and Point Lookups

Untitled Diagram.jpg

Step 1 Creating Spark Session

In [49]:
!pip install pyfarmhash
!pip install nltk
Requirement already satisfied: pyfarmhash in /opt/anaconda3/lib/python3.11/site-packages (0.3.2)
Requirement already satisfied: nltk in /opt/anaconda3/lib/python3.11/site-packages (3.8.1)
Requirement already satisfied: click in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (8.1.7)
Requirement already satisfied: joblib in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (1.2.0)
Requirement already satisfied: regex>=2021.8.3 in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (2023.10.3)
Requirement already satisfied: tqdm in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (4.65.0)
In [1]:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, collect_list, udf
from pyspark.sql.types import StringType
import hashlib
import farmhash

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords


HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/soumilshah/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cac8bc29-d08c-4220-9d43-b3e8423e91b3;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 in central
:: resolution report :: resolve 60ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-cac8bc29-d08c-4220-9d43-b3e8423e91b3
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/07/14 17:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Create Hudi Dataframe

In [99]:
from pyspark.sql.types import StringType, StructType, StructField

schema = StructType([
    StructField("text", StringType(), True),
    StructField("_id", StringType(), False)  # Assuming _id is a string type
])

# Define the sample texts with hard-coded IDs
texts = [
    ("I like to eat broccoli and bananas.", "1"),
    ("I ate a banana and spinach smoothie for breakfast.", "2"),
    ("I eat a pizza", "3"),
    ("i had coffee ", "4")
]

# Create DataFrame
df = spark.createDataFrame(texts, schema)
df.show(truncate=False)
+--------------------------------------------------+---+
|text                                              |_id|
+--------------------------------------------------+---+
|I like to eat broccoli and bananas.               |1  |
|I ate a banana and spinach smoothie for breakfast.|2  |
|I eat a pizza                                     |3  |
|i had coffee                                      |4  |
+--------------------------------------------------+---+

Function to UPSERT into Hudi

In [100]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='RECORD_INDEX'
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"

    hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.index.type": index_type,
    "hoodie.enable.data.skipping": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
    "hoodie.metadata.record.index.enable": "true",
    "hoodie.parquet.max.file.size": 512 * 1024 * 1024  # 512MB
    ,"hoodie.parquet.small.file.limit": 104857600  # 100MB
        
        
    }
        
        

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [101]:
write_to_hudi(
    spark_df=df,
    db_name="default",
    table_name="bronze_documents",
    recordkey="_id",
    precombine="_id",
    partition_fields="",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents


24/07/14 10:37:27 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/07/14 10:37:29 WARN DAGScheduler: Broadcasting large task binary with size 1130.8 KiB
24/07/14 10:37:29 WARN DAGScheduler: Broadcasting large task binary with size 1231.9 KiB

Define UDf to remove stop words and clean text and get Hash from keywords

In [102]:
# Define stop words
stop_words = set(stopwords.words('english'))

# Function to clean and hash keywords
def clean_and_hash_keyword(keyword):
    keyword = keyword.lower().strip()
    if keyword not in stop_words and keyword.isalpha():  # Remove stop words and non-alphabetic tokens
        return str(farmhash.hash32(keyword))
    return None
    
# Register UDF for cleaning and hashing keywords
clean_and_hash_udf = udf(clean_and_hash_keyword, StringType())

Read from hudi Tables and apply tokenization

In [103]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"

query = f"""
SELECT 
    text,_hoodie_commit_time,_id
FROM 
     hudi_table_changes('{path}', 'latest_state', 'earliest');
"""
df = spark.sql(query)
df.show()

# Tokenize text into keywords, clean and hash them
tokenized_df = df.withColumn("keyword", explode(split(col("text"), "\\s+"))) \
    .select("_id", "keyword") \
    .withColumn("keyword_hash", clean_and_hash_udf(col("keyword"))) \
    .filter(col("keyword_hash").isNotNull())

# Show the cleaned and hashed tokenized DataFrame
tokenized_df.show(truncate=False)

# Create inverted index
inverted_index_df = tokenized_df.groupBy("keyword", "keyword_hash") \
    .agg(collect_list("_id").alias("document_ids"))

# Show the inverted index
inverted_index_df.show(truncate=False)
+--------------------+-------------------+---+
|                text|_hoodie_commit_time|_id|
+--------------------+-------------------+---+
|I like to eat bro...|  20240714103726075|  1|
|I ate a banana an...|  20240714103726075|  2|
|       I eat a pizza|  20240714103726075|  3|
|       i had coffee |  20240714103726075|  4|
+--------------------+-------------------+---+

+---+--------+------------+
|_id|keyword |keyword_hash|
+---+--------+------------+
|1  |like    |2323132262  |
|1  |eat     |2300330484  |
|1  |broccoli|2544305653  |
|2  |ate     |2636712687  |
|2  |banana  |3268754245  |
|2  |spinach |1095938413  |
|2  |smoothie|3187206701  |
|3  |eat     |2300330484  |
|3  |pizza   |4273259341  |
|4  |coffee  |187088678   |
+---+--------+------------+

+--------+------------+------------+
|keyword |keyword_hash|document_ids|
+--------+------------+------------+
|smoothie|3187206701  |[2]         |
|spinach |1095938413  |[2]         |
|broccoli|2544305653  |[1]         |
|like    |2323132262  |[1]         |
|pizza   |4273259341  |[3]         |
|coffee  |187088678   |[4]         |
|banana  |3268754245  |[2]         |
|ate     |2636712687  |[2]         |
|eat     |2300330484  |[1, 3]      |
+--------+------------+------------+

Create Inverted index Hudi tables

In [104]:
write_to_hudi(
    spark_df=inverted_index_df,
    db_name="default",
    table_name="inverted_index",
    recordkey="keyword_hash",
    precombine="keyword_hash",
    partition_fields="",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameinverted_index


24/07/14 10:37:37 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/07/14 10:37:39 WARN DAGScheduler: Broadcasting large task binary with size 1189.8 KiB
24/07/14 10:37:40 WARN DAGScheduler: Broadcasting large task binary with size 1290.8 KiB

Searching items based on keyword

In [113]:
#Define the keyword to search
keyword = "eat"
keyword_hash = str(farmhash.hash32(keyword))

print(f"""
--------------
keyword : {keyword}
keyword_hash : {keyword_hash}

""")

path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameinverted_index"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \
    .createOrReplaceTempView("inverted_index")


docs_path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(docs_path) \
    .createOrReplaceTempView("bronze_documents")



documents_query = f"""
    SELECT *
    FROM bronze_documents
    WHERE _hoodie_record_key IN (SELECT EXPLODE(document_ids) AS _hoodie_record_key FROM inverted_index WHERE keyword_hash = '{keyword_hash}')
"""

# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)

# Show the documents
documents_result.show()
--------------
keyword : eat
keyword_hash : 2300330484


+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                text|_id|
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---+
|  20240714103726075|20240714103726075...|                 1|                      |53df7c4b-cbb2-401...|I like to eat bro...|  1|
|  20240714103726075|20240714103726075...|                 3|                      |53df7c4b-cbb2-401...|       I eat a pizza|  3|
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---+

Searching items based on complex sentences

In [139]:
# Define stop words
stop_words = set(stopwords.words('english'))

# Example sentence
sentence = "i had food today and coffee and banana"

# Split sentence into individual words and filter out stop words
words = [word.lower().strip() for word in sentence.split() if word.lower().strip() not in stop_words]

# Create DataFrame from words
words_df = spark.createDataFrame([(word,) for word in words], ["keyword"])

# Apply UDF to clean and hash the keywords
hashed_keywords_df = words_df.withColumn("hashed_keyword", clean_and_hash_udf("keyword"))

# Show the filtered and hashed keywords DataFrame
hashed_keywords_df.show(truncate=False)
+-------+--------------+
|keyword|hashed_keyword|
+-------+--------------+
|food   |1335030861    |
|today  |1533641318    |
|coffee |187088678     |
|banana |3268754245    |
+-------+--------------+

In [145]:
# Clean and hash the keywords
cleaned_and_hashed_keywords = [clean_and_hash_keyword(word) for word in words]
cleaned_and_hashed_keywords = [hash_val for hash_val in cleaned_and_hashed_keywords if hash_val is not None]

# Convert to a format suitable for SQL query
keyword_hash_list = ",".join(["'{}'".format(hash_val) for hash_val in cleaned_and_hashed_keywords])

# Query inverted index to fetch documents based on hashed keywords
inverted_index_query = f"""
    SELECT EXPLODE(document_ids) AS _hoodie_record_key
    FROM inverted_index
    WHERE keyword_hash IN ({keyword_hash_list})
"""

# Execute SQL query to fetch documents from inverted index
inverted_index_result = spark.sql(inverted_index_query)

# Show the documents from inverted index

inverted_index_result.show()


# Join with bronze_documents to get actual document details
documents_query = f"""
    SELECT bd.text,bd._id
    FROM bronze_documents bd
    JOIN (
        {inverted_index_query}
    ) AS ii
    ON bd._hoodie_record_key = ii._hoodie_record_key
"""

# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)

# Show the documents
documents_result.show(truncate=False)
+------------------+
|_hoodie_record_key|
+------------------+
|                 4|
|                 2|
+------------------+

+--------------------------------------------------+---+
|text                                              |_id|
+--------------------------------------------------+---+
|I ate a banana and spinach smoothie for breakfast.|2  |
|i had coffee                                      |4  |
+--------------------------------------------------+---+

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...