Fast GeoSearch on Data Lakes: Learn How to Efficiently Build a Geo Search Using Apache Hudi for Lightning-Fast Data Retrieval Based on Geohashing

Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.

Screenshot 2024-07-18 at 11.00.27 AM.png

Geohash Precision Table

The hash is up to 12 characters in length — the longer the hash, the smaller the square it references.

The hash is up to 12 characters in length — the longer the hash, the smaller the square it references.

  • The first character of the hash identifies one of 32 cells in the grid, roughly 5000km x 5000km on the planet.
  • The second character identifies one of the 32 squares in the first cell, so the resolution of the two characters combined is now 1250km x 1250km.
  • This continues until the twelfth character, which can identify an area as small as just a couple of square inches on Earth.
Geohash Length Grid Cell Size (Approximate) Example Geohash Area Description
1 5,000 km x 5,000 km d Large region, e.g., a country
2 1,250 km x 1,250 km dp Large city, e.g., New York area
3 156 km x 156 km dpz Smaller city or large metropolitan
4 39 km x 39 km dpzr City district or large neighborhood
5 4.8 km x 4.8 km dpzrt Neighborhood or small city area
6 1.2 km x 1.2 km dpzrtr Several city blocks
7 152 m x 152 m dpzrtrx A block or large building
8 38 m x 38 m dpzrtrxy Building or property
9 4.8 m x 4.8 m dpzrtrxyz Small property or structure
10 1.2 m x 1.2 m dpzrtrxyzp Room or smaller structure
11 30 cm x 30 cm dpzrtrxyzpb Detailed area within a room
12 7.6 cm x 7.6 cm dpzrtrxyzpbg Highly detailed area, like a tile

Load all dataset for Dunkin coffee shop in Spark as Dataframe

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import LongType
import geohash2
import os,sys

# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"

HUDI_VERSION = '1.0.0-beta1'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \

# Load JSON data into DataFrame (replace './dunkinDonuts.json' with your actual JSON file path)
df ='./dunkinDonuts.json')

# Explode array column 'data' to individual rows
df_flat ="store"))

# Select relevant columns
df_flat =

# Create DataFrame for further processing
df = df_flat
|             address|               city|country|      lat|        lng|         name|
|  180 S Airport Blvd|South San Francisco|     US|37.649585| -122.40607|Dunkin Donuts|
|  451 Hegenberger Rd|            Oakland|     US|37.737457|-122.197412|Dunkin Donuts|
|      268 Jackson St|            Hayward|     US|37.660518|-122.085634|Dunkin Donuts|
|  240 El Camino Real|         San Carlos|     US|37.510929|-122.265387|Dunkin Donuts|
|     1250 Newell Ave|       Walnut Creek|     US|37.893844|-122.054286|Dunkin Donuts|
|     593 Woodside Rd|       Redwood City|     US|37.469419|-122.223163|Dunkin Donuts|
|     4383 Clayton Rd|            Concord|     US|37.963967|-121.989257|Dunkin Donuts|
|      5255 Mowry Ave|            Fremont|     US|37.534275|-122.000694|Dunkin Donuts|
|410 Napa Junction Rd|    American Canyon|     US|38.185169|-122.254537|Dunkin Donuts|
| 435 N McDowell Blvd|           Petaluma|     US|38.254609|-122.636133|Dunkin Donuts|
|4701 Century Boul...|          Pittsburg|     US|38.006175|-121.835564|Dunkin Donuts|
|852 E El Camino Real|          Sunnyvale|     US|37.354843|-122.017367|Dunkin Donuts|
|1412 S. Park Vict...|           Milpitas|     US|37.416778|-121.874026|Dunkin Donuts|
|1701 Airport Boul...|           San Jose|     US|37.369339|-121.929265|Dunkin Donuts|
|      282 Sunset Ave|        Suisun City|     US|38.245503|-122.020698|Dunkin Donuts|
|1659 Airport Boul...|           San Jose|     US|37.364815|-121.922516|Dunkin Donuts|
| 1477 Berryessa Road|           San Jose|     US|37.369457|    -121.88|Dunkin Donuts|
|844 S Winchester ...|           San Jose|     US|37.312062|-121.949697|Dunkin Donuts|
|  1110 Foxworthy Ave|           San Jose|     US| 37.27779|-121.880569|Dunkin Donuts|
| 2739 Santa Rosa Ave|         Santa Rosa|     US|38.411042|-122.713859|Dunkin Donuts|
only showing top 20 rows

calculating Geo Hash

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, lit
from pyspark.sql.types import LongType
import geohash2
import os

# Base 32 characters used in geohashing
base32 = '0123456789bcdefghjkmnpqrstuvwxyz'

# Create a dictionary for quick look-up of character values
base32_map = {char: idx for idx, char in enumerate(base32)}

# Define UDF to calculate geohash with precision
def calculate_geohash(lat, lon, precision):
    hash_str = geohash2.encode(float(lat), float(lon), precision=precision)
    hash_long = 0
    for char in hash_str:
        hash_long = (hash_long << 5) + base32_map[char]
    return hash_long

# Register UDF for calculating geohash with variable precision
calculate_geohash_udf = udf(lambda lat, lon, precision: calculate_geohash(lat, lon, precision), LongType())

precision = 8

# Use lit() to convert precision into a literal
df = df.withColumn("geo_hash", calculate_geohash_udf(col("lat"), col("lng"), lit(precision)))

 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geo_hash: long (nullable = true)

|             address|               city|country|      lat|        lng|         name|    geo_hash|
|  180 S Airport Blvd|South San Francisco|     US|37.649585| -122.40607|Dunkin Donuts|333160539608|
|  451 Hegenberger Rd|            Oakland|     US|37.737457|-122.197412|Dunkin Donuts|333183372760|
|      268 Jackson St|            Hayward|     US|37.660518|-122.085634|Dunkin Donuts|333183516154|
|  240 El Camino Real|         San Carlos|     US|37.510929|-122.265387|Dunkin Donuts|333179906352|
|     1250 Newell Ave|       Walnut Creek|     US|37.893844|-122.054286|Dunkin Donuts|333184905423|
|     593 Woodside Rd|       Redwood City|     US|37.469419|-122.223163|Dunkin Donuts|333179938479|
|     4383 Clayton Rd|            Concord|     US|37.963967|-121.989257|Dunkin Donuts|333186421904|
|      5255 Mowry Ave|            Fremont|     US|37.534275|-122.000694|Dunkin Donuts|333180742010|
|410 Napa Junction Rd|    American Canyon|     US|38.185169|-122.254537|Dunkin Donuts|333230265434|
| 435 N McDowell Blvd|           Petaluma|     US|38.254609|-122.636133|Dunkin Donuts|333207357601|
|4701 Century Boul...|          Pittsburg|     US|38.006175|-121.835564|Dunkin Donuts|333231354308|
|852 E El Camino Real|          Sunnyvale|     US|37.354843|-122.017367|Dunkin Donuts|333179685437|
|1412 S. Park Vict...|           Milpitas|     US|37.416778|-121.874026|Dunkin Donuts|333181302656|
|1701 Airport Boul...|           San Jose|     US|37.369339|-121.929265|Dunkin Donuts|333181131642|
|      282 Sunset Ave|        Suisun City|     US|38.245503|-122.020698|Dunkin Donuts|333231069424|
|1659 Airport Boul...|           San Jose|     US|37.364815|-121.922516|Dunkin Donuts|333181137033|
| 1477 Berryessa Road|           San Jose|     US|37.369457|    -121.88|Dunkin Donuts|333181235576|
|844 S Winchester ...|           San Jose|     US|37.312062|-121.949697|Dunkin Donuts|333180906206|
|  1110 Foxworthy Ave|           San Jose|     US| 37.27779|-121.880569|Dunkin Donuts|333180973110|
| 2739 Santa Rosa Ave|         Santa Rosa|     US|38.411042|-122.713859|Dunkin Donuts|333209051522|
only showing top 20 rows

In [3]:
# Define a UDF to extract hash key from geohash with specified precision
def extract_hash_key(geohash, precision):
    hash_key_str = str(abs(geohash))[:precision]  # Take first 'precision' digits of the absolute geohash
    return int(hash_key_str)

extract_hash_key_udf = udf(lambda geohash, precision: extract_hash_key(geohash, precision), LongType())

# Use lit() to pass precision as a literal column
df = df.withColumn("hash_key", extract_hash_key_udf(col("geo_hash"), lit(precision)))

 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geo_hash: long (nullable = true)
 |-- hash_key: long (nullable = true)

|address                  |city               |country|lat      |lng        |name         |geo_hash    |hash_key|
|180 S Airport Blvd       |South San Francisco|US     |37.649585|-122.40607 |Dunkin Donuts|333160539608|33316053|
|451 Hegenberger Rd       |Oakland            |US     |37.737457|-122.197412|Dunkin Donuts|333183372760|33318337|
|268 Jackson St           |Hayward            |US     |37.660518|-122.085634|Dunkin Donuts|333183516154|33318351|
|240 El Camino Real       |San Carlos         |US     |37.510929|-122.265387|Dunkin Donuts|333179906352|33317990|
|1250 Newell Ave          |Walnut Creek       |US     |37.893844|-122.054286|Dunkin Donuts|333184905423|33318490|
|593 Woodside Rd          |Redwood City       |US     |37.469419|-122.223163|Dunkin Donuts|333179938479|33317993|
|4383 Clayton Rd          |Concord            |US     |37.963967|-121.989257|Dunkin Donuts|333186421904|33318642|
|5255 Mowry Ave           |Fremont            |US     |37.534275|-122.000694|Dunkin Donuts|333180742010|33318074|
|410 Napa Junction Rd     |American Canyon    |US     |38.185169|-122.254537|Dunkin Donuts|333230265434|33323026|
|435 N McDowell Blvd      |Petaluma           |US     |38.254609|-122.636133|Dunkin Donuts|333207357601|33320735|
|4701 Century Boulevard   |Pittsburg          |US     |38.006175|-121.835564|Dunkin Donuts|333231354308|33323135|
|852 E El Camino Real     |Sunnyvale          |US     |37.354843|-122.017367|Dunkin Donuts|333179685437|33317968|
|1412 S. Park Victoria Dr.|Milpitas           |US     |37.416778|-121.874026|Dunkin Donuts|333181302656|33318130|
|1701 Airport Boulevard   |San Jose           |US     |37.369339|-121.929265|Dunkin Donuts|333181131642|33318113|
|282 Sunset Ave           |Suisun City        |US     |38.245503|-122.020698|Dunkin Donuts|333231069424|33323106|
|1659 Airport Boulevard   |San Jose           |US     |37.364815|-121.922516|Dunkin Donuts|333181137033|33318113|
|1477 Berryessa Road      |San Jose           |US     |37.369457|-121.88    |Dunkin Donuts|333181235576|33318123|
|844 S Winchester Blvd    |San Jose           |US     |37.312062|-121.949697|Dunkin Donuts|333180906206|33318090|
|1110 Foxworthy Ave       |San Jose           |US     |37.27779 |-121.880569|Dunkin Donuts|333180973110|33318097|
|2739 Santa Rosa Ave      |Santa Rosa         |US     |38.411042|-122.713859|Dunkin Donuts|333209051522|33320905|
only showing top 20 rows

In [9]:
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Assuming 'df' is your DataFrame

# Function to generate random UUID
def generate_random_uuid():
    return str(uuid.uuid4())

# Register UDF (User Defined Function)
generate_id_udf = udf(generate_random_uuid, StringType())

# Add UUID column to DataFrame
df = df.withColumn("store_id", generate_id_udf())

Define function to UPERT into Hudi

In [12]:
def write_to_hudi(spark_df, 

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
    '': table_name,
    'hoodie.datasource.write.table.type': table_type,
    '': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.index.type": index_type,
    "": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
    "hoodie.metadata.record.index.enable": "true",
    "hoodie.parquet.max.file.size": 512 * 1024 * 1024  # 512MB
    ,"hoodie.parquet.small.file.limit": 104857600  # 100MB

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \

Create Hudi tables

In [13]:


In [16]:
print("Ingestion complete ")
Ingestion complete 

Read From Hudi

In [26]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=silver_stores""hudi") \
    .option("", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \

Search Process | Compute Hash key and query Directly partition based on hash key

Untitled Diagram.jpg

In [31]:
from pyspark.sql.functions import col, radians, sin, cos, asin, sqrt, lit

target_lat = 41.0534
target_lng = -73.5387
precision = 8
search_radius = 0.5  # in miles

target_geo_hash = calculate_geohash(target_lat, target_lng, precision)
target_hash_key = extract_hash_key(target_geo_hash, precision)

# Assuming 'stores_temp_view' is your registered temporary view or table
sql_query = f"""
SELECT name, address, geo_hash, hash_key, lat, lng,
       3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
                          COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
                          POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) AS distance
FROM stores_temp_view
WHERE hash_key = '{target_hash_key}'
  AND 3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
                           COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
                           POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) <= {search_radius}

# Execute the SQL query
df_filtered_sql = spark.sql(sql_query)

# Show the filtered DataFrame
|name         |address        |geo_hash    |hash_key|lat      |lng       |distance           |
|Dunkin Donuts|33 W Broad St  |437258162044|43725816|41.055604|-73.545717|0.3957724791444152 |
|Dunkin Donuts|450 Main Street|437258167594|43725816|41.053227|-73.539765|0.05672320742692224|

