How to Use OpenAI Vector Embedding and Store Large Vectors in Apache Hudi for Cost-Effective Data Storage with MiniO and Empowering AI Applications¶
Why Apache Hudi for Vector Embeddings?¶
Apache Hudi is a powerful data lake technology that offers:¶
- Cost-Effective Storage: Utilizes efficient storage formats and strategies, such as Copy-on-Write (COW), enabling significant cost savings when storing large datasets.
- Incremental Data Ingestion: Supports incremental updates and efficient querying of changed data, perfect for managing vector embeddings updated over time.
- Metadata Management: Facilitates metadata management, essential for tracking vector metadata alongside embeddings for versioning and lineage.
- Integration Flexibility: Seamless integration with various downstream applications like Elasticsearch, Pinecone, or PostgreSQL for AI model inference.
docker-compose.yaml¶
version: "3"
services:
metastore_db:
image: postgres:11
hostname: metastore_db
ports:
- 5432:5432
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3://datalake/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"
minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
volumes:
hive-metastore-postgresql:
networks:
default:
name: hudi
Step 1 Creating Spark Session¶
import os, sys, uuid, pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define environment variables
HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'
JAVA_HOME = "/opt/homebrew/opt/openjdk@11"
# Set environment variables
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
# Spark configuration for Hudi and S3
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION},org.apache.hadoop:hadoop-aws:3.3.2 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
# Spark session initialization
spark = SparkSession.builder \
.appName("Hudi Example") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.executor.memory", "2g") \
.config("spark.executor.cores", "2") \
.config("spark.cores.max", "2") \
.config("spark.sql.shuffle.partitions", "2") \
.config("spark.python.worker.reuse", "false") \
.config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000/") \
.config("spark.hadoop.fs.s3a.access.key", "admin") \
.config("spark.hadoop.fs.s3a.secret.key", "password") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.getOrCreate()
Create UDF to call Open AI APis¶
from pyspark.sql.functions import udf
import json, requests
from pyspark.sql.types import StringType, StructType, StructField
def get_embedding(text):
url = "https://api.openai.com/v1/embeddings"
payload = json.dumps({
"model": "text-embedding-ada-002",
"input": text
})
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer XXXXX'
}
response = requests.request("POST", url, headers=headers, data=payload)
vector = response.json()["data"][0]['embedding']
return str(vector)
embedding_udf = udf(get_embedding, StringType())
Sample Dataset¶
schema = StructType([
StructField("text", StringType(), True),
StructField("_id", StringType(), False) # Assuming _id is a string type
])
# Define the sample texts
texts = [
"I like to eat broccoli and bananas.",
"I ate a banana and spinach smoothie for breakfast."
]
# Create DataFrame
df = spark.createDataFrame([(text, str(uuid.uuid4())) for text in texts], schema)
df_with_embeddings = df.withColumn("embedding", embedding_udf(df["text"]))
df_with_embeddings.show()
df_with_embeddings.createOrReplaceTempView("tmp_df_with_embeddings")
Create Hudi Tables¶
query = """
SET hoodie.enable.data.skipping=true;
"""
spark.sql(query)
query = """
SET hoodie.metadata.column.stats.enable=true;
"""
spark.sql(query)
query = """
SET hoodie.metadata.enable=true;
"""
spark.sql(query)
query = """
SET hoodie.metadata.record.index.enable=true;
"""
spark.sql(query)
query_drop = """
DROP TABLE IF EXISTS default.bronze_text_embeddings;
"""
spark.sql(query_drop)
multi_statement_query = """
CREATE TABLE bronze_text_embeddings (
id STRING,
text STRING,
embedding STRING
)
USING hudi
OPTIONS (
primaryKey = 'id',
preCombineField = 'id',
path 's3a://warehouse/default/table_name=bronze_text_embeddings'
)
"""
# Execute the multi-statement query using spark.sql()
spark.sql(multi_statement_query)
Insert into Hudi tables¶
# Define the SQL insert statement
insert_query = """
INSERT INTO bronze_text_embeddings
SELECT _id AS id, text, embedding
FROM tmp_df_with_embeddings
"""
# Execute the insert query using spark.sql()
spark.sql(insert_query)
Fetch vectors incrementally from Hudi tables for downstream processing to Elasticsearch, Pinecone, or PostgreSQL.¶
query = """
SELECT
text,_hoodie_commit_time, embedding,id
FROM
hudi_table_changes('default.bronze_text_embeddings', 'latest_state', 'earliest');
"""
spark.sql(query).show()
Learn How to Ingest Data from Hudi Incrementally (hudi_table_changes) into Postgres Using Spark¶
Power your Down Stream Elastic Search Stack From Apache Hudi Transaction Datalake with CDC|DeepDive¶
- https://www.youtube.com/watch?v=rr2V5xhgPeM
- https://github.com/soumilshah1995/Power-your-Down-Stream-Elastic-Search-Stack-From-Apache-Hudi-Transaction-Datalake-with-CDC
Develop Full Text Search (Semantics Search) with Postgres (PGVector) and Python Hands on Lab¶
Get Vector Based on pointLookup¶
spark.sql("""
select embedding from default.bronze_text_embeddings where id = 'b66c6e90-ab61-4a1a-a3a5-eeb961dfe3fe'
""").show()
Record Level Indexing in Apache Hudi Delivers 70% Faster Point Lookups¶
Enhancing Your Hudi Tables¶
To enhance your Apache Hudi tables, consider adding additional columns such as tags, created_at, or other relevant metadata. These additions can significantly enhance the filtering and retrieval capabilities of your data lakes, ensuring that only relevant vector embeddings are loaded into your target systems.
Example Use Cases¶
- Temporal Filtering: Store embeddings for the last 30 days and query incrementally to load only recent data into your AI model inference pipeline.
- Metadata-Driven Queries: Use tags or categories to selectively load embeddings relevant to specific domains or applications, improving query efficiency.
- Versioning and Lineage: Track changes in embeddings over time using Apache Hudi's metadata capabilities, ensuring reproducibility and traceability in AI model development.
Conclusion¶
In this blog, we explored how Apache Hudi can efficiently store and manage vector embeddings, leveraging cost-effective storage options like MinIO. By utilizing Hudi's incremental querying capabilities, you can power downstream AI applications with fresh and relevant data while optimizing storage costs. This setup ensures scalable and performant management of vector embeddings, supporting a wide range of use cases from recommendation engines to semantic search systems.
References
https://www.onehouse.ai/blog/managing-ai-vector-embeddings-with-onehouse
No comments:
Post a Comment