Thursday, July 18, 2024

Fast GeoSearch on Data Lakes: Learn How to Efficiently Build a Geo Search Using Apache Hudi for Lightning-Fast Data Retrieval Based on Geohashing

geo

Fast GeoSearch on Data Lakes: Learn How to Efficiently Build a Geo Search Using Apache Hudi for Lightning-Fast Data Retrieval Based on Geohashing

Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.

Screenshot 2024-07-18 at 11.00.27 AM.png

Geohash Precision Table

Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.

The hash is up to 12 characters in length — the longer the hash, the smaller the square it references.

  • The first character of the hash identifies one of 32 cells in the grid, roughly 5000km x 5000km on the planet.
  • The second character identifies one of the 32 squares in the first cell, so the resolution of the two characters combined is now 1250km x 1250km.
  • This continues until the twelfth character, which can identify an area as small as just a couple of square inches on Earth.
Geohash Length Grid Cell Size (Approximate) Example Geohash Area Description
1 5,000 km x 5,000 km d Large region, e.g., a country
2 1,250 km x 1,250 km dp Large city, e.g., New York area
3 156 km x 156 km dpz Smaller city or large metropolitan
4 39 km x 39 km dpzr City district or large neighborhood
5 4.8 km x 4.8 km dpzrt Neighborhood or small city area
6 1.2 km x 1.2 km dpzrtr Several city blocks
7 152 m x 152 m dpzrtrx A block or large building
8 38 m x 38 m dpzrtrxy Building or property
9 4.8 m x 4.8 m dpzrtrxyz Small property or structure
10 1.2 m x 1.2 m dpzrtrxyzp Room or smaller structure
11 30 cm x 30 cm dpzrtrxyzpb Detailed area within a room
12 7.6 cm x 7.6 cm dpzrtrxyzpbg Highly detailed area, like a tile

Load all dataset for Dunkin coffee shop in Spark as Dataframe

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import LongType
import geohash2
import os,sys

# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"

HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()


# Load JSON data into DataFrame (replace './dunkinDonuts.json' with your actual JSON file path)
df = spark.read.json('./dunkinDonuts.json')

# Explode array column 'data' to individual rows
df_flat = df.select(explode(df.data).alias("store"))

# Select relevant columns
df_flat = df_flat.select(
    "store.address",
    "store.city",
    "store.country",
    "store.lat",
    "store.lng",
    "store.name"
)

# Create DataFrame for further processing
df = df_flat
df_flat.show()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-aa8a82f6-1770-46fb-887c-6972cdce1030;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 in central
:: resolution report :: resolve 60ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-aa8a82f6-1770-46fb-887c-6972cdce1030
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/07/18 10:52:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/18 10:53:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+--------------------+-------------------+-------+---------+-----------+-------------+
|             address|               city|country|      lat|        lng|         name|
+--------------------+-------------------+-------+---------+-----------+-------------+
|  180 S Airport Blvd|South San Francisco|     US|37.649585| -122.40607|Dunkin Donuts|
|  451 Hegenberger Rd|            Oakland|     US|37.737457|-122.197412|Dunkin Donuts|
|      268 Jackson St|            Hayward|     US|37.660518|-122.085634|Dunkin Donuts|
|  240 El Camino Real|         San Carlos|     US|37.510929|-122.265387|Dunkin Donuts|
|     1250 Newell Ave|       Walnut Creek|     US|37.893844|-122.054286|Dunkin Donuts|
|     593 Woodside Rd|       Redwood City|     US|37.469419|-122.223163|Dunkin Donuts|
|     4383 Clayton Rd|            Concord|     US|37.963967|-121.989257|Dunkin Donuts|
|      5255 Mowry Ave|            Fremont|     US|37.534275|-122.000694|Dunkin Donuts|
|410 Napa Junction Rd|    American Canyon|     US|38.185169|-122.254537|Dunkin Donuts|
| 435 N McDowell Blvd|           Petaluma|     US|38.254609|-122.636133|Dunkin Donuts|
|4701 Century Boul...|          Pittsburg|     US|38.006175|-121.835564|Dunkin Donuts|
|852 E El Camino Real|          Sunnyvale|     US|37.354843|-122.017367|Dunkin Donuts|
|1412 S. Park Vict...|           Milpitas|     US|37.416778|-121.874026|Dunkin Donuts|
|1701 Airport Boul...|           San Jose|     US|37.369339|-121.929265|Dunkin Donuts|
|      282 Sunset Ave|        Suisun City|     US|38.245503|-122.020698|Dunkin Donuts|
|1659 Airport Boul...|           San Jose|     US|37.364815|-121.922516|Dunkin Donuts|
| 1477 Berryessa Road|           San Jose|     US|37.369457|    -121.88|Dunkin Donuts|
|844 S Winchester ...|           San Jose|     US|37.312062|-121.949697|Dunkin Donuts|
|  1110 Foxworthy Ave|           San Jose|     US| 37.27779|-121.880569|Dunkin Donuts|
| 2739 Santa Rosa Ave|         Santa Rosa|     US|38.411042|-122.713859|Dunkin Donuts|
+--------------------+-------------------+-------+---------+-----------+-------------+
only showing top 20 rows

calculating Geo Hash

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, lit
from pyspark.sql.types import LongType
import geohash2
import os


# Base 32 characters used in geohashing
base32 = '0123456789bcdefghjkmnpqrstuvwxyz'

# Create a dictionary for quick look-up of character values
base32_map = {char: idx for idx, char in enumerate(base32)}

# Define UDF to calculate geohash with precision
def calculate_geohash(lat, lon, precision):
    hash_str = geohash2.encode(float(lat), float(lon), precision=precision)
    hash_long = 0
    for char in hash_str:
        hash_long = (hash_long << 5) + base32_map[char]
    return hash_long

# Register UDF for calculating geohash with variable precision
calculate_geohash_udf = udf(lambda lat, lon, precision: calculate_geohash(lat, lon, precision), LongType())


precision = 8

# Use lit() to convert precision into a literal
df = df.withColumn("geo_hash", calculate_geohash_udf(col("lat"), col("lng"), lit(precision)))

df.printSchema()
df.show()
root
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geo_hash: long (nullable = true)

+--------------------+-------------------+-------+---------+-----------+-------------+------------+
|             address|               city|country|      lat|        lng|         name|    geo_hash|
+--------------------+-------------------+-------+---------+-----------+-------------+------------+
|  180 S Airport Blvd|South San Francisco|     US|37.649585| -122.40607|Dunkin Donuts|333160539608|
|  451 Hegenberger Rd|            Oakland|     US|37.737457|-122.197412|Dunkin Donuts|333183372760|
|      268 Jackson St|            Hayward|     US|37.660518|-122.085634|Dunkin Donuts|333183516154|
|  240 El Camino Real|         San Carlos|     US|37.510929|-122.265387|Dunkin Donuts|333179906352|
|     1250 Newell Ave|       Walnut Creek|     US|37.893844|-122.054286|Dunkin Donuts|333184905423|
|     593 Woodside Rd|       Redwood City|     US|37.469419|-122.223163|Dunkin Donuts|333179938479|
|     4383 Clayton Rd|            Concord|     US|37.963967|-121.989257|Dunkin Donuts|333186421904|
|      5255 Mowry Ave|            Fremont|     US|37.534275|-122.000694|Dunkin Donuts|333180742010|
|410 Napa Junction Rd|    American Canyon|     US|38.185169|-122.254537|Dunkin Donuts|333230265434|
| 435 N McDowell Blvd|           Petaluma|     US|38.254609|-122.636133|Dunkin Donuts|333207357601|
|4701 Century Boul...|          Pittsburg|     US|38.006175|-121.835564|Dunkin Donuts|333231354308|
|852 E El Camino Real|          Sunnyvale|     US|37.354843|-122.017367|Dunkin Donuts|333179685437|
|1412 S. Park Vict...|           Milpitas|     US|37.416778|-121.874026|Dunkin Donuts|333181302656|
|1701 Airport Boul...|           San Jose|     US|37.369339|-121.929265|Dunkin Donuts|333181131642|
|      282 Sunset Ave|        Suisun City|     US|38.245503|-122.020698|Dunkin Donuts|333231069424|
|1659 Airport Boul...|           San Jose|     US|37.364815|-121.922516|Dunkin Donuts|333181137033|
| 1477 Berryessa Road|           San Jose|     US|37.369457|    -121.88|Dunkin Donuts|333181235576|
|844 S Winchester ...|           San Jose|     US|37.312062|-121.949697|Dunkin Donuts|333180906206|
|  1110 Foxworthy Ave|           San Jose|     US| 37.27779|-121.880569|Dunkin Donuts|333180973110|
| 2739 Santa Rosa Ave|         Santa Rosa|     US|38.411042|-122.713859|Dunkin Donuts|333209051522|
+--------------------+-------------------+-------+---------+-----------+-------------+------------+
only showing top 20 rows

In [3]:
# Define a UDF to extract hash key from geohash with specified precision
def extract_hash_key(geohash, precision):
    hash_key_str = str(abs(geohash))[:precision]  # Take first 'precision' digits of the absolute geohash
    return int(hash_key_str)

extract_hash_key_udf = udf(lambda geohash, precision: extract_hash_key(geohash, precision), LongType())

# Use lit() to pass precision as a literal column
df = df.withColumn("hash_key", extract_hash_key_udf(col("geo_hash"), lit(precision)))

df.printSchema()
df.show(truncate=False)
root
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geo_hash: long (nullable = true)
 |-- hash_key: long (nullable = true)

+-------------------------+-------------------+-------+---------+-----------+-------------+------------+--------+
|address                  |city               |country|lat      |lng        |name         |geo_hash    |hash_key|
+-------------------------+-------------------+-------+---------+-----------+-------------+------------+--------+
|180 S Airport Blvd       |South San Francisco|US     |37.649585|-122.40607 |Dunkin Donuts|333160539608|33316053|
|451 Hegenberger Rd       |Oakland            |US     |37.737457|-122.197412|Dunkin Donuts|333183372760|33318337|
|268 Jackson St           |Hayward            |US     |37.660518|-122.085634|Dunkin Donuts|333183516154|33318351|
|240 El Camino Real       |San Carlos         |US     |37.510929|-122.265387|Dunkin Donuts|333179906352|33317990|
|1250 Newell Ave          |Walnut Creek       |US     |37.893844|-122.054286|Dunkin Donuts|333184905423|33318490|
|593 Woodside Rd          |Redwood City       |US     |37.469419|-122.223163|Dunkin Donuts|333179938479|33317993|
|4383 Clayton Rd          |Concord            |US     |37.963967|-121.989257|Dunkin Donuts|333186421904|33318642|
|5255 Mowry Ave           |Fremont            |US     |37.534275|-122.000694|Dunkin Donuts|333180742010|33318074|
|410 Napa Junction Rd     |American Canyon    |US     |38.185169|-122.254537|Dunkin Donuts|333230265434|33323026|
|435 N McDowell Blvd      |Petaluma           |US     |38.254609|-122.636133|Dunkin Donuts|333207357601|33320735|
|4701 Century Boulevard   |Pittsburg          |US     |38.006175|-121.835564|Dunkin Donuts|333231354308|33323135|
|852 E El Camino Real     |Sunnyvale          |US     |37.354843|-122.017367|Dunkin Donuts|333179685437|33317968|
|1412 S. Park Victoria Dr.|Milpitas           |US     |37.416778|-121.874026|Dunkin Donuts|333181302656|33318130|
|1701 Airport Boulevard   |San Jose           |US     |37.369339|-121.929265|Dunkin Donuts|333181131642|33318113|
|282 Sunset Ave           |Suisun City        |US     |38.245503|-122.020698|Dunkin Donuts|333231069424|33323106|
|1659 Airport Boulevard   |San Jose           |US     |37.364815|-121.922516|Dunkin Donuts|333181137033|33318113|
|1477 Berryessa Road      |San Jose           |US     |37.369457|-121.88    |Dunkin Donuts|333181235576|33318123|
|844 S Winchester Blvd    |San Jose           |US     |37.312062|-121.949697|Dunkin Donuts|333180906206|33318090|
|1110 Foxworthy Ave       |San Jose           |US     |37.27779 |-121.880569|Dunkin Donuts|333180973110|33318097|
|2739 Santa Rosa Ave      |Santa Rosa         |US     |38.411042|-122.713859|Dunkin Donuts|333209051522|33320905|
+-------------------------+-------------------+-------+---------+-----------+-------------+------------+--------+
only showing top 20 rows

In [9]:
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Assuming 'df' is your DataFrame

# Function to generate random UUID
def generate_random_uuid():
    return str(uuid.uuid4())

# Register UDF (User Defined Function)
generate_id_udf = udf(generate_random_uuid, StringType())

# Add UUID column to DataFrame
df = df.withColumn("store_id", generate_id_udf())

Define function to UPERT into Hudi

In [12]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='RECORD_INDEX'
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.index.type": index_type,
    "hoodie.enable.data.skipping": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
    "hoodie.metadata.record.index.enable": "true",
    "hoodie.parquet.max.file.size": 512 * 1024 * 1024  # 512MB
    ,"hoodie.parquet.small.file.limit": 104857600  # 100MB
        
        
    }
        
        

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

Create Hudi tables

In [13]:
write_to_hudi(
    spark_df=df,
    db_name="default",
    table_name="silver_stores",
    recordkey="store_id",
    precombine="store_id",
    partition_fields="hash_key",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=silver_stores


24/07/18 10:56:03 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
24/07/18 10:56:05 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/07/18 10:56:06 WARN DAGScheduler: Broadcasting large task binary with size 1102.2 KiB
24/07/18 10:56:46 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/07/18 10:56:47 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/07/18 11:02:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/07/18 11:03:15 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/07/18 11:05:12 WARN TaskSetManager: Stage 76 contains a task of very large size (1133 KiB). The maximum recommended task size is 1000 KiB.
24/07/18 11:05:16 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/07/18 11:07:33 WARN TaskSetManager: Stage 87 contains a task of very large size (1133 KiB). The maximum recommended task size is 1000 KiB.
24/07/18 11:07:33 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
                                                                                
In [16]:
print("Ingestion complete ")
Ingestion complete 

Read From Hudi

In [26]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=silver_stores"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \
    .createOrReplaceTempView("stores_temp_view")

Search Process | Compute Hash key and query Directly partition based on hash key

Untitled Diagram.jpg

In [31]:
from pyspark.sql.functions import col, radians, sin, cos, asin, sqrt, lit


target_lat = 41.0534
target_lng = -73.5387
precision = 8
search_radius = 0.5  # in miles

target_geo_hash = calculate_geohash(target_lat, target_lng, precision)
target_hash_key = extract_hash_key(target_geo_hash, precision)

# Assuming 'stores_temp_view' is your registered temporary view or table
sql_query = f"""
SELECT name, address, geo_hash, hash_key, lat, lng,
       3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
                          COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
                          POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) AS distance
FROM stores_temp_view
WHERE hash_key = '{target_hash_key}'
  AND 3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
                           COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
                           POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) <= {search_radius}
"""

# Execute the SQL query
df_filtered_sql = spark.sql(sql_query)

# Show the filtered DataFrame
df_filtered_sql.show(truncate=False)
+-------------+---------------+------------+--------+---------+----------+-------------------+
|name         |address        |geo_hash    |hash_key|lat      |lng       |distance           |
+-------------+---------------+------------+--------+---------+----------+-------------------+
|Dunkin Donuts|33 W Broad St  |437258162044|43725816|41.055604|-73.545717|0.3957724791444152 |
|Dunkin Donuts|450 Main Street|437258167594|43725816|41.053227|-73.539765|0.05672320742692224|
+-------------+---------------+------------+--------+---------+----------+-------------------+

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...