Thursday, July 18, 2024

Fast GeoSearch on Data Lakes: Learn How to Efficiently Build a Geo Search Using Apache Hudi for Lightning-Fast Data Retrieval Based on Geohashing

geo

Fast GeoSearch on Data Lakes: Learn How to Efficiently Build a Geo Search Using Apache Hudi for Lightning-Fast Data Retrieval Based on Geohashing

Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.

Screenshot 2024-07-18 at 11.00.27 AM.png

Geohash Precision Table

Geohashes are a little complicated — but the key thing to know is that it divides the world into a number of squares in a grid. By doing this, we can eliminate most of the data upfront and only focus on the square our potential targets are in. It’s a pretty elegant solution.

The hash is up to 12 characters in length — the longer the hash, the smaller the square it references.

  • The first character of the hash identifies one of 32 cells in the grid, roughly 5000km x 5000km on the planet.
  • The second character identifies one of the 32 squares in the first cell, so the resolution of the two characters combined is now 1250km x 1250km.
  • This continues until the twelfth character, which can identify an area as small as just a couple of square inches on Earth.
Geohash Length Grid Cell Size (Approximate) Example Geohash Area Description
1 5,000 km x 5,000 km d Large region, e.g., a country
2 1,250 km x 1,250 km dp Large city, e.g., New York area
3 156 km x 156 km dpz Smaller city or large metropolitan
4 39 km x 39 km dpzr City district or large neighborhood
5 4.8 km x 4.8 km dpzrt Neighborhood or small city area
6 1.2 km x 1.2 km dpzrtr Several city blocks
7 152 m x 152 m dpzrtrx A block or large building
8 38 m x 38 m dpzrtrxy Building or property
9 4.8 m x 4.8 m dpzrtrxyz Small property or structure
10 1.2 m x 1.2 m dpzrtrxyzp Room or smaller structure
11 30 cm x 30 cm dpzrtrxyzpb Detailed area within a room
12 7.6 cm x 7.6 cm dpzrtrxyzpbg Highly detailed area, like a tile

Load all dataset for Dunkin coffee shop in Spark as Dataframe

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import LongType
import geohash2
import os,sys

# Set Java Home environment variable if needed
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"

HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()


# Load JSON data into DataFrame (replace './dunkinDonuts.json' with your actual JSON file path)
df = spark.read.json('./dunkinDonuts.json')

# Explode array column 'data' to individual rows
df_flat = df.select(explode(df.data).alias("store"))

# Select relevant columns
df_flat = df_flat.select(
    "store.address",
    "store.city",
    "store.country",
    "store.lat",
    "store.lng",
    "store.name"
)

# Create DataFrame for further processing
df = df_flat
df_flat.show()
Warning: Ignoring non-Spark config property: className
Ivy Default Cache set to: /Users/soumilshah/.ivy2/cache
The jars for the packages stored in: /Users/soumilshah/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-aa8a82f6-1770-46fb-887c-6972cdce1030;1.0
	confs: [default]
:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 in central
:: resolution report :: resolve 60ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.0-beta1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-aa8a82f6-1770-46fb-887c-6972cdce1030
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/2ms)
24/07/18 10:52:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/18 10:53:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+--------------------+-------------------+-------+---------+-----------+-------------+
|             address|               city|country|      lat|        lng|         name|
+--------------------+-------------------+-------+---------+-----------+-------------+
|  180 S Airport Blvd|South San Francisco|     US|37.649585| -122.40607|Dunkin Donuts|
|  451 Hegenberger Rd|            Oakland|     US|37.737457|-122.197412|Dunkin Donuts|
|      268 Jackson St|            Hayward|     US|37.660518|-122.085634|Dunkin Donuts|
|  240 El Camino Real|         San Carlos|     US|37.510929|-122.265387|Dunkin Donuts|
|     1250 Newell Ave|       Walnut Creek|     US|37.893844|-122.054286|Dunkin Donuts|
|     593 Woodside Rd|       Redwood City|     US|37.469419|-122.223163|Dunkin Donuts|
|     4383 Clayton Rd|            Concord|     US|37.963967|-121.989257|Dunkin Donuts|
|      5255 Mowry Ave|            Fremont|     US|37.534275|-122.000694|Dunkin Donuts|
|410 Napa Junction Rd|    American Canyon|     US|38.185169|-122.254537|Dunkin Donuts|
| 435 N McDowell Blvd|           Petaluma|     US|38.254609|-122.636133|Dunkin Donuts|
|4701 Century Boul...|          Pittsburg|     US|38.006175|-121.835564|Dunkin Donuts|
|852 E El Camino Real|          Sunnyvale|     US|37.354843|-122.017367|Dunkin Donuts|
|1412 S. Park Vict...|           Milpitas|     US|37.416778|-121.874026|Dunkin Donuts|
|1701 Airport Boul...|           San Jose|     US|37.369339|-121.929265|Dunkin Donuts|
|      282 Sunset Ave|        Suisun City|     US|38.245503|-122.020698|Dunkin Donuts|
|1659 Airport Boul...|           San Jose|     US|37.364815|-121.922516|Dunkin Donuts|
| 1477 Berryessa Road|           San Jose|     US|37.369457|    -121.88|Dunkin Donuts|
|844 S Winchester ...|           San Jose|     US|37.312062|-121.949697|Dunkin Donuts|
|  1110 Foxworthy Ave|           San Jose|     US| 37.27779|-121.880569|Dunkin Donuts|
| 2739 Santa Rosa Ave|         Santa Rosa|     US|38.411042|-122.713859|Dunkin Donuts|
+--------------------+-------------------+-------+---------+-----------+-------------+
only showing top 20 rows

calculating Geo Hash

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode, lit
from pyspark.sql.types import LongType
import geohash2
import os


# Base 32 characters used in geohashing
base32 = '0123456789bcdefghjkmnpqrstuvwxyz'

# Create a dictionary for quick look-up of character values
base32_map = {char: idx for idx, char in enumerate(base32)}

# Define UDF to calculate geohash with precision
def calculate_geohash(lat, lon, precision):
    hash_str = geohash2.encode(float(lat), float(lon), precision=precision)
    hash_long = 0
    for char in hash_str:
        hash_long = (hash_long << 5) + base32_map[char]
    return hash_long

# Register UDF for calculating geohash with variable precision
calculate_geohash_udf = udf(lambda lat, lon, precision: calculate_geohash(lat, lon, precision), LongType())


precision = 8

# Use lit() to convert precision into a literal
df = df.withColumn("geo_hash", calculate_geohash_udf(col("lat"), col("lng"), lit(precision)))

df.printSchema()
df.show()
root
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geo_hash: long (nullable = true)

+--------------------+-------------------+-------+---------+-----------+-------------+------------+
|             address|               city|country|      lat|        lng|         name|    geo_hash|
+--------------------+-------------------+-------+---------+-----------+-------------+------------+
|  180 S Airport Blvd|South San Francisco|     US|37.649585| -122.40607|Dunkin Donuts|333160539608|
|  451 Hegenberger Rd|            Oakland|     US|37.737457|-122.197412|Dunkin Donuts|333183372760|
|      268 Jackson St|            Hayward|     US|37.660518|-122.085634|Dunkin Donuts|333183516154|
|  240 El Camino Real|         San Carlos|     US|37.510929|-122.265387|Dunkin Donuts|333179906352|
|     1250 Newell Ave|       Walnut Creek|     US|37.893844|-122.054286|Dunkin Donuts|333184905423|
|     593 Woodside Rd|       Redwood City|     US|37.469419|-122.223163|Dunkin Donuts|333179938479|
|     4383 Clayton Rd|            Concord|     US|37.963967|-121.989257|Dunkin Donuts|333186421904|
|      5255 Mowry Ave|            Fremont|     US|37.534275|-122.000694|Dunkin Donuts|333180742010|
|410 Napa Junction Rd|    American Canyon|     US|38.185169|-122.254537|Dunkin Donuts|333230265434|
| 435 N McDowell Blvd|           Petaluma|     US|38.254609|-122.636133|Dunkin Donuts|333207357601|
|4701 Century Boul...|          Pittsburg|     US|38.006175|-121.835564|Dunkin Donuts|333231354308|
|852 E El Camino Real|          Sunnyvale|     US|37.354843|-122.017367|Dunkin Donuts|333179685437|
|1412 S. Park Vict...|           Milpitas|     US|37.416778|-121.874026|Dunkin Donuts|333181302656|
|1701 Airport Boul...|           San Jose|     US|37.369339|-121.929265|Dunkin Donuts|333181131642|
|      282 Sunset Ave|        Suisun City|     US|38.245503|-122.020698|Dunkin Donuts|333231069424|
|1659 Airport Boul...|           San Jose|     US|37.364815|-121.922516|Dunkin Donuts|333181137033|
| 1477 Berryessa Road|           San Jose|     US|37.369457|    -121.88|Dunkin Donuts|333181235576|
|844 S Winchester ...|           San Jose|     US|37.312062|-121.949697|Dunkin Donuts|333180906206|
|  1110 Foxworthy Ave|           San Jose|     US| 37.27779|-121.880569|Dunkin Donuts|333180973110|
| 2739 Santa Rosa Ave|         Santa Rosa|     US|38.411042|-122.713859|Dunkin Donuts|333209051522|
+--------------------+-------------------+-------+---------+-----------+-------------+------------+
only showing top 20 rows

In [3]:
# Define a UDF to extract hash key from geohash with specified precision
def extract_hash_key(geohash, precision):
    hash_key_str = str(abs(geohash))[:precision]  # Take first 'precision' digits of the absolute geohash
    return int(hash_key_str)

extract_hash_key_udf = udf(lambda geohash, precision: extract_hash_key(geohash, precision), LongType())

# Use lit() to pass precision as a literal column
df = df.withColumn("hash_key", extract_hash_key_udf(col("geo_hash"), lit(precision)))

df.printSchema()
df.show(truncate=False)
root
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geo_hash: long (nullable = true)
 |-- hash_key: long (nullable = true)

+-------------------------+-------------------+-------+---------+-----------+-------------+------------+--------+
|address                  |city               |country|lat      |lng        |name         |geo_hash    |hash_key|
+-------------------------+-------------------+-------+---------+-----------+-------------+------------+--------+
|180 S Airport Blvd       |South San Francisco|US     |37.649585|-122.40607 |Dunkin Donuts|333160539608|33316053|
|451 Hegenberger Rd       |Oakland            |US     |37.737457|-122.197412|Dunkin Donuts|333183372760|33318337|
|268 Jackson St           |Hayward            |US     |37.660518|-122.085634|Dunkin Donuts|333183516154|33318351|
|240 El Camino Real       |San Carlos         |US     |37.510929|-122.265387|Dunkin Donuts|333179906352|33317990|
|1250 Newell Ave          |Walnut Creek       |US     |37.893844|-122.054286|Dunkin Donuts|333184905423|33318490|
|593 Woodside Rd          |Redwood City       |US     |37.469419|-122.223163|Dunkin Donuts|333179938479|33317993|
|4383 Clayton Rd          |Concord            |US     |37.963967|-121.989257|Dunkin Donuts|333186421904|33318642|
|5255 Mowry Ave           |Fremont            |US     |37.534275|-122.000694|Dunkin Donuts|333180742010|33318074|
|410 Napa Junction Rd     |American Canyon    |US     |38.185169|-122.254537|Dunkin Donuts|333230265434|33323026|
|435 N McDowell Blvd      |Petaluma           |US     |38.254609|-122.636133|Dunkin Donuts|333207357601|33320735|
|4701 Century Boulevard   |Pittsburg          |US     |38.006175|-121.835564|Dunkin Donuts|333231354308|33323135|
|852 E El Camino Real     |Sunnyvale          |US     |37.354843|-122.017367|Dunkin Donuts|333179685437|33317968|
|1412 S. Park Victoria Dr.|Milpitas           |US     |37.416778|-121.874026|Dunkin Donuts|333181302656|33318130|
|1701 Airport Boulevard   |San Jose           |US     |37.369339|-121.929265|Dunkin Donuts|333181131642|33318113|
|282 Sunset Ave           |Suisun City        |US     |38.245503|-122.020698|Dunkin Donuts|333231069424|33323106|
|1659 Airport Boulevard   |San Jose           |US     |37.364815|-121.922516|Dunkin Donuts|333181137033|33318113|
|1477 Berryessa Road      |San Jose           |US     |37.369457|-121.88    |Dunkin Donuts|333181235576|33318123|
|844 S Winchester Blvd    |San Jose           |US     |37.312062|-121.949697|Dunkin Donuts|333180906206|33318090|
|1110 Foxworthy Ave       |San Jose           |US     |37.27779 |-121.880569|Dunkin Donuts|333180973110|33318097|
|2739 Santa Rosa Ave      |Santa Rosa         |US     |38.411042|-122.713859|Dunkin Donuts|333209051522|33320905|
+-------------------------+-------------------+-------+---------+-----------+-------------+------------+--------+
only showing top 20 rows

In [9]:
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Assuming 'df' is your DataFrame

# Function to generate random UUID
def generate_random_uuid():
    return str(uuid.uuid4())

# Register UDF (User Defined Function)
generate_id_udf = udf(generate_random_uuid, StringType())

# Add UUID column to DataFrame
df = df.withColumn("store_id", generate_id_udf())

Define function to UPERT into Hudi

In [12]:
def write_to_hudi(spark_df, 
                  table_name, 
                  db_name, 
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='RECORD_INDEX'
                 ):

    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.index.type": index_type,
    "hoodie.enable.data.skipping": "true",
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider",
    "hoodie.metadata.record.index.enable": "true",
    "hoodie.parquet.max.file.size": 512 * 1024 * 1024  # 512MB
    ,"hoodie.parquet.small.file.limit": 104857600  # 100MB
        
        
    }
        
        

    print("\n")
    print(path)
    print("\n")
    
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

Create Hudi tables

In [13]:
write_to_hudi(
    spark_df=df,
    db_name="default",
    table_name="silver_stores",
    recordkey="store_id",
    precombine="store_id",
    partition_fields="hash_key",
    index_type="RECORD_INDEX"
)

file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=silver_stores


24/07/18 10:56:03 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
24/07/18 10:56:05 WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records
24/07/18 10:56:06 WARN DAGScheduler: Broadcasting large task binary with size 1102.2 KiB
24/07/18 10:56:46 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/07/18 10:56:47 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/07/18 11:02:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/07/18 11:03:15 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/07/18 11:05:12 WARN TaskSetManager: Stage 76 contains a task of very large size (1133 KiB). The maximum recommended task size is 1000 KiB.
24/07/18 11:05:16 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/07/18 11:07:33 WARN TaskSetManager: Stage 87 contains a task of very large size (1133 KiB). The maximum recommended task size is 1000 KiB.
24/07/18 11:07:33 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
                                                                                
In [16]:
print("Ingestion complete ")
Ingestion complete 

Read From Hudi

In [26]:
path = "file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database=default/table_name=silver_stores"
spark.read.format("hudi") \
    .option("hoodie.enable.data.skipping", "true") \
    .option("hoodie.metadata.enable", "true") \
    .option("hoodie.metadata.index.column.stats.enable", "true") \
    .load(path) \
    .createOrReplaceTempView("stores_temp_view")

Search Process | Compute Hash key and query Directly partition based on hash key

Untitled Diagram.jpg

In [31]:
from pyspark.sql.functions import col, radians, sin, cos, asin, sqrt, lit


target_lat = 41.0534
target_lng = -73.5387
precision = 8
search_radius = 0.5  # in miles

target_geo_hash = calculate_geohash(target_lat, target_lng, precision)
target_hash_key = extract_hash_key(target_geo_hash, precision)

# Assuming 'stores_temp_view' is your registered temporary view or table
sql_query = f"""
SELECT name, address, geo_hash, hash_key, lat, lng,
       3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
                          COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
                          POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) AS distance
FROM stores_temp_view
WHERE hash_key = '{target_hash_key}'
  AND 3956 * 2 * ASIN(SQRT(POW(SIN(RADIANS({target_lat} - lat) / 2), 2) +
                           COS(RADIANS({target_lat})) * COS(RADIANS(lat)) *
                           POW(SIN(RADIANS({target_lng} - lng) / 2), 2))) <= {search_radius}
"""

# Execute the SQL query
df_filtered_sql = spark.sql(sql_query)

# Show the filtered DataFrame
df_filtered_sql.show(truncate=False)
+-------------+---------------+------------+--------+---------+----------+-------------------+
|name         |address        |geo_hash    |hash_key|lat      |lng       |distance           |
+-------------+---------------+------------+--------+---------+----------+-------------------+
|Dunkin Donuts|33 W Broad St  |437258162044|43725816|41.055604|-73.545717|0.3957724791444152 |
|Dunkin Donuts|450 Main Street|437258167594|43725816|41.053227|-73.539765|0.05672320742692224|
+-------------+---------------+------------+--------+---------+----------+-------------------+

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...