Fast GeoSearch on Data Lakes: Learn How to Efficiently Build a Geo Search Using Apache Hudi for Lightning-Fast Data Retrieval Based on Geohashing¶
Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.¶
Geohash Precision Table¶
Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.
The hash is up to 12 characters in length — the longer the hash, the smaller the square it references.
- The first character of the hash identifies one of 32 cells in the grid, roughly 5000km x 5000km on the planet.
- The second character identifies one of the 32 squares in the first cell, so the resolution of the two characters combined is now 1250km x 1250km.
- This continues until the twelfth character, which can identify an area as small as just a couple of square inches on Earth.
Geohash Length | Grid Cell Size (Approximate) | Example Geohash | Area Description |
---|---|---|---|
1 | 5,000 km x 5,000 km | d |
Large region, e.g., a country |
2 | 1,250 km x 1,250 km | dp |
Large city, e.g., New York area |
3 | 156 km x 156 km | dpz |
Smaller city or large metropolitan |
4 | 39 km x 39 km | dpzr |
City district or large neighborhood |
5 | 4.8 km x 4.8 km | dpzrt |
Neighborhood or small city area |
6 | 1.2 km x 1.2 km | dpzrtr |
Several city blocks |
7 | 152 m x 152 m | dpzrtrx |
A block or large building |
8 | 38 m x 38 m | dpzrtrxy |
Building or property |
9 | 4.8 m x 4.8 m | dpzrtrxyz |
Small property or structure |
10 | 1.2 m x 1.2 m | dpzrtrxyzp |
Room or smaller structure |
11 | 30 cm x 30 cm | dpzrtrxyzpb |
Detailed area within a room |
12 | 7.6 cm x 7.6 cm | dpzrtrxyzpbg |
Highly detailed area, like a tile |
Load all dataset for Dunkin coffee shop in Spark as Dataframe¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import LongType
import geohash2
import os,sys
# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
# Spark session
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Load JSON data into DataFrame (replace './dunkinDonuts.json' with your actual JSON file path)
df = spark.read.json('./dunkinDonuts.json')
# Explode array column 'data' to individual rows
df_flat = df.select(explode(df.data).alias("store"))
# Select relevant columns
df_flat = df_flat.select(
"store.address",
"store.city",
"store.country",
"store.lat",
"store.lng",
"store.name"
)
# Create DataFrame for further processing
df = df_flat
df_flat.show()
calculating Geo Hash¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, lit
from pyspark.sql.types import LongType
import geohash2
import os
# Base 32 characters used in geohashing
base32 = '0123456789bcdefghjkmnpqrstuvwxyz'
# Create a dictionary for quick look-up of character values
base32_map = {char: idx for idx, char in enumerate(base32)}
# Define UDF to calculate geohash with precision
def calculate_geohash(lat, lon, precision):
hash_str = geohash2.encode(float(lat), float(lon), precision=precision)
hash_long = 0
for char in hash_str:
hash_long = (hash_long << 5) + base32_map[char]
return hash_long
# Register UDF for calculating geohash with variable precision
calculate_geohash_udf = udf(lambda lat, lon, precision: calculate_geohash(lat, lon, precision), LongType())
precision = 8
# Use lit() to convert precision into a literal
df = df.withColumn("geo_hash", calculate_geohash_udf(col("lat"), col("lng"), lit(precision)))
df.printSchema()
df.show()
# Define a UDF to extract hash key from geohash with specified precision
def extract_hash_key(geohash, precision):
hash_key_str = str(abs(geohash))[:precision] # Take first 'precision' digits of the absolute geohash
return int(hash_key_str)
extract_hash_key_udf = udf(lambda geohash, precision: extract_hash_key(geohash, precision), LongType())
# Use lit() to pass precision as a literal column
df = df.withColumn("hash_key", extract_hash_key_udf(col("geo_hash"), lit(precision)))
df.printSchema()
df.show(truncate=False)
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Assuming 'df' is your DataFrame
# Function to generate random UUID
def generate_random_uuid():
return str(uuid.uuid4())
# Register UDF (User Defined Function)
generate_id_udf = udf(generate_random_uuid, StringType())
# Add UUID column to DataFrame
df = df.withColumn("store_id", generate_id_udf())
Define function to UPERT into Hudi¶
def write_to_hudi(spark_df,
table_name,
db_name,
method='upsert',
table_type='COPY_ON_WRITE',
recordkey='',
precombine='',
partition_fields='',
index_type='RECORD_INDEX'
):
path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
"hoodie.datasource.write.partitionpath.field": partition_fields,
"hoodie.index.type": index_type,
"hoodie.enable.data.skipping": "true",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
"hoodie.metadata.record.index.enable": "true",
"hoodie.parquet.max.file.size": 512 * 1024 * 1024 # 512MB
,"hoodie.parquet.small.file.limit": 104857600 # 100MB
}
print("\n")
print(path)
print("\n")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
Create Hudi tables¶
write_to_hudi(
spark_df=df,
db_name="default",
table_name="silver_stores",
recordkey="store_id",
precombine="store_id",
partition_fields="hash_key",
index_type="RECORD_INDEX"
)
print("Ingestion complete ")
Read From Hudi¶
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=silver_stores"
spark.read.format("hudi") \
.option("hoodie.enable.data.skipping", "true") \
.option("hoodie.metadata.enable", "true") \
.option("hoodie.metadata.index.column.stats.enable", "true") \
.load(path) \
.createOrReplaceTempView("stores_temp_view")
Search Process | Compute Hash key and query Directly partition based on hash key¶
from pyspark.sql.functions import col, radians, sin, cos, asin, sqrt, lit
target_lat = 41.0534
target_lng = -73.5387
precision = 8
search_radius = 0.5 # in miles
target_geo_hash = calculate_geohash(target_lat, target_lng, precision)
target_hash_key = extract_hash_key(target_geo_hash, precision)
# Assuming 'stores_temp_view' is your registered temporary view or table
sql_query = f"""
SELECT name, address, geo_hash, hash_key, lat, lng,
3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) AS distance
FROM stores_temp_view
WHERE hash_key = '{target_hash_key}'
AND 3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) <= {search_radius}
"""
# Execute the SQL query
df_filtered_sql = spark.sql(sql_query)
# Show the filtered DataFrame
df_filtered_sql.show(truncate=False)