Thursday, July 11, 2024

How to Use OpenAI Vector Embedding and Store Large Vectors in Apache Hudi for Cost-Effective Data Storage with MiniO and Empowering AI Applications

vectorhudi

How to Use OpenAI Vector Embedding and Store Large Vectors in Apache Hudi for Cost-Effective Data Storage with MiniO and Empowering AI Applications

Screenshot 2024-07-10 at 8.08.17 PM.png

Why Apache Hudi for Vector Embeddings?

Apache Hudi is a powerful data lake technology that offers:

  • Cost-Effective Storage: Utilizes efficient storage formats and strategies, such as Copy-on-Write (COW), enabling significant cost savings when storing large datasets.
  • Incremental Data Ingestion: Supports incremental updates and efficient querying of changed data, perfect for managing vector embeddings updated over time.
  • Metadata Management: Facilitates metadata management, essential for tracking vector metadata alongside embeddings for versioning and lineage.
  • Integration Flexibility: Seamless integration with various downstream applications like Elasticsearch, Pinecone, or PostgreSQL for AI model inference.

docker-compose.yaml

version: "3"

services:

  metastore_db:
    image: postgres:11
    hostname: metastore_db
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: hive
      POSTGRES_PASSWORD: hive
      POSTGRES_DB: metastore

  hive-metastore:
    hostname: hive-metastore
    image: 'starburstdata/hive:3.1.2-e.18'
    ports:
      - '9083:9083' # Metastore Thrift
    environment:
      HIVE_METASTORE_DRIVER: org.postgresql.Driver
      HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
      HIVE_METASTORE_USER: hive
      HIVE_METASTORE_PASSWORD: hive
      HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
      S3_ENDPOINT: http://minio:9000
      S3_ACCESS_KEY: admin
      S3_SECRET_KEY: password
      S3_PATH_STYLE_ACCESS: "true"
      REGION: ""
      GOOGLE_CLOUD_KEY_FILE_PATH: ""
      AZURE_ADL_CLIENT_ID: ""
      AZURE_ADL_CREDENTIAL: ""
      AZURE_ADL_REFRESH_URL: ""
      AZURE_ABFS_STORAGE_ACCOUNT: ""
      AZURE_ABFS_ACCESS_KEY: ""
      AZURE_WASB_STORAGE_ACCOUNT: ""
      AZURE_ABFS_OAUTH: ""
      AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
      AZURE_ABFS_OAUTH_CLIENT_ID: ""
      AZURE_ABFS_OAUTH_SECRET: ""
      AZURE_ABFS_OAUTH_ENDPOINT: ""
      AZURE_WASB_ACCESS_KEY: ""
      HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
    depends_on:
      - metastore_db
    healthcheck:
      test: bash -c "exec 6<> /dev/tcp/localhost/9083"

  minio:
    image: minio/minio
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      default:
        aliases:
          - warehouse.minio
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]

  mc:
    depends_on:
      - minio
    image: minio/mc
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
      until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
      /usr/bin/mc rm -r --force minio/warehouse;
      /usr/bin/mc mb minio/warehouse;
      /usr/bin/mc policy set public minio/warehouse;
      tail -f /dev/null
      "

volumes:
  hive-metastore-postgresql:

networks:
  default:
    name: hudi

Step 1 Creating Spark Session

In [1]:
import os, sys, uuid, pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define environment variables
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
JAVA_HOME = "/opt/homebrew/opt/openjdk@11"

# Set environment variables
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

# Spark configuration for Hudi and S3
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},org.apache.hadoop:hadoop-aws:3.3.2 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

# Spark session initialization
spark = SparkSession.builder \
    .appName("Hudi Example") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "2") \
    .config("spark.sql.shuffle.partitions", "2") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000/") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-78d3ce20-9783-4093-9170-424105490366;1.0
	confs: [default]
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 in spark-list
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 103ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.2 from central in [default]
	org.apache.hudi#hudi-spark3.4-bundle_2.12;0.14.0 from spark-list in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-78d3ce20-9783-4093-9170-424105490366
	confs: [default]
	0 artifacts copied, 4 already retrieved (0kB/3ms)
24/07/10 18:54:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Create UDF to call Open AI APis

In [4]:
from pyspark.sql.functions import udf
import json, requests
from pyspark.sql.types import StringType, StructType, StructField

def get_embedding(text):
    url = "https://api.openai.com/v1/embeddings"
    payload = json.dumps({
      "model": "text-embedding-ada-002",
      "input": text
    })
    headers = {
      'Content-Type': 'application/json',
      'Authorization': 'Bearer XXXXX'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    vector = response.json()["data"][0]['embedding']
    return str(vector)
    
embedding_udf = udf(get_embedding, StringType())

Sample Dataset

In [5]:
schema = StructType([
    StructField("text", StringType(), True),
    StructField("_id", StringType(), False)  # Assuming _id is a string type
])


# Define the sample texts
texts = [
    "I like to eat broccoli and bananas.",
    "I ate a banana and spinach smoothie for breakfast."
]

# Create DataFrame
df = spark.createDataFrame([(text, str(uuid.uuid4())) for text in texts], schema)
df_with_embeddings = df.withColumn("embedding", embedding_udf(df["text"]))
df_with_embeddings.show()

df_with_embeddings.createOrReplaceTempView("tmp_df_with_embeddings")
                                                                                
+--------------------+--------------------+--------------------+
|                text|                 _id|           embedding|
+--------------------+--------------------+--------------------+
|I like to eat bro...|b66c6e90-ab61-4a1...|[-0.009960265, -0...|
|I ate a banana an...|324924be-e465-441...|[-0.010535064, -0...|
+--------------------+--------------------+--------------------+

Create Hudi Tables

In [12]:
query = """
SET hoodie.enable.data.skipping=true;
"""
spark.sql(query)

query = """
SET hoodie.metadata.column.stats.enable=true;
"""
spark.sql(query)

query = """
SET hoodie.metadata.enable=true;
"""
spark.sql(query)

query = """
SET hoodie.metadata.record.index.enable=true;
"""
spark.sql(query)


query_drop = """
DROP TABLE IF EXISTS default.bronze_text_embeddings;
"""
spark.sql(query_drop)

multi_statement_query = """
CREATE TABLE bronze_text_embeddings (
    id STRING,
    text STRING,
    embedding STRING
)
USING hudi
OPTIONS (
    primaryKey = 'id',
    preCombineField = 'id',
    path 's3a://warehouse/default/table_name=bronze_text_embeddings'
)
"""

# Execute the multi-statement query using spark.sql()
spark.sql(multi_statement_query)
Out[12]:
DataFrame[]

Insert into Hudi tables

In [13]:
# Define the SQL insert statement
insert_query = """
INSERT INTO bronze_text_embeddings
SELECT _id AS id, text, embedding
FROM tmp_df_with_embeddings
"""

# Execute the insert query using spark.sql()
spark.sql(insert_query)
24/07/10 19:37:36 WARN DAGScheduler: Broadcasting large task binary with size 1131.1 KiB
24/07/10 19:37:36 WARN DAGScheduler: Broadcasting large task binary with size 1238.1 KiB
Out[13]:
DataFrame[]

Fetch vectors incrementally from Hudi tables for downstream processing to Elasticsearch, Pinecone, or PostgreSQL.

In [15]:
query = """
SELECT 
    text,_hoodie_commit_time, embedding,id
FROM 
     hudi_table_changes('default.bronze_text_embeddings', 'latest_state', 'earliest');
"""
spark.sql(query).show()
+--------------------+-------------------+--------------------+--------------------+
|                text|_hoodie_commit_time|           embedding|                  id|
+--------------------+-------------------+--------------------+--------------------+
|I like to eat bro...|  20240710193733552|[-0.009960265, -0...|b66c6e90-ab61-4a1...|
|I ate a banana an...|  20240710193733552|[-0.010535064, -0...|324924be-e465-441...|
+--------------------+-------------------+--------------------+--------------------+

Learn How to Ingest Data from Hudi Incrementally (hudi_table_changes) into Postgres Using Spark

Power your Down Stream Elastic Search Stack From Apache Hudi Transaction Datalake with CDC|DeepDive

Develop Full Text Search (Semantics Search) with Postgres (PGVector) and Python Hands on Lab

Get Vector Based on pointLookup

In [19]:
spark.sql("""
select embedding from default.bronze_text_embeddings where id  = 'b66c6e90-ab61-4a1a-a3a5-eeb961dfe3fe'
""").show()
+--------------------+
|           embedding|
+--------------------+
|[-0.009960265, -0...|
+--------------------+

Record Level Indexing in Apache Hudi Delivers 70% Faster Point Lookups

Enhancing Your Hudi Tables

To enhance your Apache Hudi tables, consider adding additional columns such as tags, created_at, or other relevant metadata. These additions can significantly enhance the filtering and retrieval capabilities of your data lakes, ensuring that only relevant vector embeddings are loaded into your target systems.

Example Use Cases

  • Temporal Filtering: Store embeddings for the last 30 days and query incrementally to load only recent data into your AI model inference pipeline.
  • Metadata-Driven Queries: Use tags or categories to selectively load embeddings relevant to specific domains or applications, improving query efficiency.
  • Versioning and Lineage: Track changes in embeddings over time using Apache Hudi's metadata capabilities, ensuring reproducibility and traceability in AI model development.

Conclusion

In this blog, we explored how Apache Hudi can efficiently store and manage vector embeddings, leveraging cost-effective storage options like MinIO. By utilizing Hudi's incremental querying capabilities, you can power downstream AI applications with fresh and relevant data while optimizing storage costs. This setup ensures scalable and performant management of vector embeddings, supporting a wide range of use cases from recommendation engines to semantic search systems.

References

https://www.onehouse.ai/blog/managing-ai-vector-embeddings-with-onehouse

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...