Sunday, July 14, 2024

Implementing Keyword Search in Apache Hudi: Building Inverted Indexes and Scaling for TB of Text Data with Record Level Index and MetaData indexing and Point Lookups

inverted

Implementing Keyword Search in Apache Hudi: Building Inverted Indexes and Scaling for TB of Text Data with Record Level Index and MetaData indexing and Point Lookups

Untitled Diagram.jpg

Step 1 Creating Spark Session

In [49]:
!pip install pyfarmhash
!pip install nltk
Requirement already satisfied: pyfarmhash in /opt/anaconda3/lib/python3.11/site-packages (0.3.2)
Requirement already satisfied: nltk in /opt/anaconda3/lib/python3.11/site-packages (3.8.1)
Requirement already satisfied: click in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (8.1.7)
Requirement already satisfied: joblib in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (1.2.0)
Requirement already satisfied: regex>=2021.8.3 in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (2023.10.3)
Requirement already satisfied: tqdm in /opt/anaconda3/lib/python3.11/site-packages (from nltk) (4.65.0)
In [1]:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, collect_list, udf
from pyspark.sql.types import StringType
import hashlib
import farmhash

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords


HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/soumilshah/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cac8bc29-d08c-4220-9d43-b3e8423e91b3;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 in central
:: resolution report :: resolve 60ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-cac8bc29-d08c-4220-9d43-b3e8423e91b3
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/07/14 17:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Create Hudi Dataframe

In [99]:
from pyspark.sql.types import StringType, StructType, StructField

schema = StructType([
    StructField("text", StringType(), True),
    StructField("_id", StringType(), False)  # Assuming _id is a string type
])

# Define the sample texts with hard-coded IDs
texts = [
    ("I like to eat broccoli and bananas.", "1"),
    ("I ate a banana and spinach smoothie for breakfast.", "2"),
    ("I eat a pizza", "3"),
    ("i had coffee ", "4")
]

# Create DataFrame
df = spark.createDataFrame(texts, schema)
df.show(truncate=False)
+--------------------------------------------------+---+
|text                                              |_id|
+--------------------------------------------------+---+
|I like to eat broccoli and bananas.               |1  |
|I ate a banana and spinach smoothie for breakfast.|2  |
|I eat a pizza                                     |3  |
|i had coffee                                      |4  |
+--------------------------------------------------+---+

Function to UPSERT into Hudi

In [100]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='RECORD_INDEX'
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name{table_name}"

    hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.index.type": index_type,
    "hoodie.enable.data.skipping": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
    "hoodie.metadata.record.index.enable": "true",
    "hoodie.parquet.max.file.size": 512 * 1024 * 1024  # 512MB
    ,"hoodie.parquet.small.file.limit": 104857600  # 100MB
        
        
    }
        
        

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)
In [101]:
write_to_hudi(
    spark_df=df,
    db_name="default",
    table_name="bronze_documents",
    recordkey="_id",
    precombine="_id",
    partition_fields="",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents


24/07/14 10:37:27 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/07/14 10:37:29 WARN DAGScheduler: Broadcasting large task binary with size 1130.8 KiB
24/07/14 10:37:29 WARN DAGScheduler: Broadcasting large task binary with size 1231.9 KiB

Define UDf to remove stop words and clean text and get Hash from keywords

In [102]:
# Define stop words
stop_words = set(stopwords.words('english'))

# Function to clean and hash keywords
def clean_and_hash_keyword(keyword):
    keyword = keyword.lower().strip()
    if keyword not in stop_words and keyword.isalpha():  # Remove stop words and non-alphabetic tokens
        return str(farmhash.hash32(keyword))
    return None
    
# Register UDF for cleaning and hashing keywords
clean_and_hash_udf = udf(clean_and_hash_keyword, StringType())

Read from hudi Tables and apply tokenization

In [103]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"

query = f"""
SELECT 
    text,_hoodie_commit_time,_id
FROM 
     hudi_table_changes('{path}', 'latest_state', 'earliest');
"""
df = spark.sql(query)
df.show()

# Tokenize text into keywords, clean and hash them
tokenized_df = df.withColumn("keyword", explode(split(col("text"), "\\s+"))) \
    .select("_id", "keyword") \
    .withColumn("keyword_hash", clean_and_hash_udf(col("keyword"))) \
    .filter(col("keyword_hash").isNotNull())

# Show the cleaned and hashed tokenized DataFrame
tokenized_df.show(truncate=False)

# Create inverted index
inverted_index_df = tokenized_df.groupBy("keyword", "keyword_hash") \
    .agg(collect_list("_id").alias("document_ids"))

# Show the inverted index
inverted_index_df.show(truncate=False)
+--------------------+-------------------+---+
|                text|_hoodie_commit_time|_id|
+--------------------+-------------------+---+
|I like to eat bro...|  20240714103726075|  1|
|I ate a banana an...|  20240714103726075|  2|
|       I eat a pizza|  20240714103726075|  3|
|       i had coffee |  20240714103726075|  4|
+--------------------+-------------------+---+

+---+--------+------------+
|_id|keyword |keyword_hash|
+---+--------+------------+
|1  |like    |2323132262  |
|1  |eat     |2300330484  |
|1  |broccoli|2544305653  |
|2  |ate     |2636712687  |
|2  |banana  |3268754245  |
|2  |spinach |1095938413  |
|2  |smoothie|3187206701  |
|3  |eat     |2300330484  |
|3  |pizza   |4273259341  |
|4  |coffee  |187088678   |
+---+--------+------------+

+--------+------------+------------+
|keyword |keyword_hash|document_ids|
+--------+------------+------------+
|smoothie|3187206701  |[2]         |
|spinach |1095938413  |[2]         |
|broccoli|2544305653  |[1]         |
|like    |2323132262  |[1]         |
|pizza   |4273259341  |[3]         |
|coffee  |187088678   |[4]         |
|banana  |3268754245  |[2]         |
|ate     |2636712687  |[2]         |
|eat     |2300330484  |[1, 3]      |
+--------+------------+------------+

Create Inverted index Hudi tables

In [104]:
write_to_hudi(
    spark_df=inverted_index_df,
    db_name="default",
    table_name="inverted_index",
    recordkey="keyword_hash",
    precombine="keyword_hash",
    partition_fields="",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameinverted_index


24/07/14 10:37:37 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/07/14 10:37:39 WARN DAGScheduler: Broadcasting large task binary with size 1189.8 KiB
24/07/14 10:37:40 WARN DAGScheduler: Broadcasting large task binary with size 1290.8 KiB

Searching items based on keyword

In [113]:
#Define the keyword to search
keyword = "eat"
keyword_hash = str(farmhash.hash32(keyword))

print(f"""
--------------
keyword : {keyword}
keyword_hash : {keyword_hash}

""")

path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_nameinverted_index"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \
    .createOrReplaceTempView("inverted_index")


docs_path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_namebronze_documents"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(docs_path) \
    .createOrReplaceTempView("bronze_documents")



documents_query = f"""
    SELECT *
    FROM bronze_documents
    WHERE _hoodie_record_key IN (SELECT EXPLODE(document_ids) AS _hoodie_record_key FROM inverted_index WHERE keyword_hash = '{keyword_hash}')
"""

# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)

# Show the documents
documents_result.show()
--------------
keyword : eat
keyword_hash : 2300330484


+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                text|_id|
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---+
|  20240714103726075|20240714103726075...|                 1|                      |53df7c4b-cbb2-401...|I like to eat bro...|  1|
|  20240714103726075|20240714103726075...|                 3|                      |53df7c4b-cbb2-401...|       I eat a pizza|  3|
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---+

Searching items based on complex sentences

In [139]:
# Define stop words
stop_words = set(stopwords.words('english'))

# Example sentence
sentence = "i had food today and coffee and banana"

# Split sentence into individual words and filter out stop words
words = [word.lower().strip() for word in sentence.split() if word.lower().strip() not in stop_words]

# Create DataFrame from words
words_df = spark.createDataFrame([(word,) for word in words], ["keyword"])

# Apply UDF to clean and hash the keywords
hashed_keywords_df = words_df.withColumn("hashed_keyword", clean_and_hash_udf("keyword"))

# Show the filtered and hashed keywords DataFrame
hashed_keywords_df.show(truncate=False)
+-------+--------------+
|keyword|hashed_keyword|
+-------+--------------+
|food   |1335030861    |
|today  |1533641318    |
|coffee |187088678     |
|banana |3268754245    |
+-------+--------------+

In [145]:
# Clean and hash the keywords
cleaned_and_hashed_keywords = [clean_and_hash_keyword(word) for word in words]
cleaned_and_hashed_keywords = [hash_val for hash_val in cleaned_and_hashed_keywords if hash_val is not None]

# Convert to a format suitable for SQL query
keyword_hash_list = ",".join(["'{}'".format(hash_val) for hash_val in cleaned_and_hashed_keywords])

# Query inverted index to fetch documents based on hashed keywords
inverted_index_query = f"""
    SELECT EXPLODE(document_ids) AS _hoodie_record_key
    FROM inverted_index
    WHERE keyword_hash IN ({keyword_hash_list})
"""

# Execute SQL query to fetch documents from inverted index
inverted_index_result = spark.sql(inverted_index_query)

# Show the documents from inverted index

inverted_index_result.show()


# Join with bronze_documents to get actual document details
documents_query = f"""
    SELECT bd.text,bd._id
    FROM bronze_documents bd
    JOIN (
        {inverted_index_query}
    ) AS ii
    ON bd._hoodie_record_key = ii._hoodie_record_key
"""

# Execute SQL query to fetch documents
documents_result = spark.sql(documents_query)

# Show the documents
documents_result.show(truncate=False)
+------------------+
|_hoodie_record_key|
+------------------+
|                 4|
|                 2|
+------------------+

+--------------------------------------------------+---+
|text                                              |_id|
+--------------------------------------------------+---+
|I ate a banana and spinach smoothie for breakfast.|2  |
|i had coffee                                      |4  |
+--------------------------------------------------+---+

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...