How to Perform Radius-Based Search using Spark and Haversine Formula for Large-Scale Geospatial Data Analysis¶
- Geospatial data analysis has become increasingly important in many fields such as transportation, urban planning, and social sciences. One of the fundamental operations in geospatial analysis is radius-based search, which allows users to find all points within a certain distance from a given location. In this blog, we will discuss how to perform radius-based search using Spark and Haversine formula for large-scale geospatial data analysis.#
What is Haversine Formula?¶
Haversine formula is a mathematical formula used to calculate the distance between two points on a sphere, such as the Earth. The formula takes into account the curvature of the Earth's surface, making it more accurate than simple Euclidean distance calculations. The formula is as follows:¶
- where lat1, long1, lat2, long2 are the latitude and longitude of the two points, Δlat and Δlong are the differences between the latitudes and longitudes, and R is the radius of the Earth (6,371 km).
Step 1: Define Imports¶
try:
import os
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import radians, col, sin, cos, sqrt, lit
from pyspark.sql.functions import radians, col, sin, cos, sqrt, lit, asin
import pickle
import folium
from folium.plugins import HeatMap
from pyspark.sql.functions import col, max
except Exception as e:
pass
Step 2: Define Spark Session¶
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
SparkSession - in-memory
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("uber.csv")
df.printSchema()
root |-- _c0: integer (nullable = true) |-- key: timestamp (nullable = true) |-- fare_amount: double (nullable = true) |-- pickup_datetime: timestamp (nullable = true) |-- pickup_longitude: double (nullable = true) |-- pickup_latitude: double (nullable = true) |-- dropoff_longitude: double (nullable = true) |-- dropoff_latitude: double (nullable = true) |-- passenger_count: integer (nullable = true)
Radius Based Search¶
Suppose I want to analyze the number of rides taken near Continental Army Plaza. In order to achieve this, we will perform a radius search to retrieve all the rides within a 100-mile radius of the plaza.¶
Define center latitude and longitude for Search¶
# define center latitude and longitude
center_latitude = 40.7106
center_longitude = -73.9605
search_radius = 1 # in miles
Add columns for the distance to the search center and whether the ride is within the radius¶
# Add columns for the distance to the search center and whether the ride is within the radius
df = df.withColumn("pickup_lat_diff", radians(lit(center_latitude)) - radians(col("pickup_latitude"))) \
.withColumn("pickup_long_diff", radians(lit(center_longitude)) - radians(col("pickup_longitude"))) \
.withColumn("pickup_distance", 3956 * 2 * asin(sqrt(pow(sin(col("pickup_lat_diff")/2), 2) + cos(radians(lit(center_latitude))) * cos(radians(col("pickup_latitude"))) * pow(sin(col("pickup_long_diff")/2), 2)))) \
.withColumn("pickup_within_radius", col("pickup_distance") <= search_radius)
columns_to_see = ['_c0', 'pickup_lat_diff', 'pickup_long_diff', 'pickup_distance', 'pickup_within_radius']
df.select(columns_to_see).show(2)
+--------+--------------------+--------------------+------------------+--------------------+ | _c0| pickup_lat_diff| pickup_long_diff| pickup_distance|pickup_within_radius| +--------+--------------------+--------------------+------------------+--------------------+ |24238194|-4.84393955085482...|6.862092612296422E-4| 2.81150830067202| false| |27835199|-3.07614280664059...|5.908812182626733E-4|2.1493290842207267| false| +--------+--------------------+--------------------+------------------+--------------------+ only showing top 2 rows
Filter the Data within the search radius¶
# Filter the rides within the search radius
pickup_within_radius_df = df.filter(col("pickup_within_radius") == True)
columns_to_see_after_filter = ['_c0', 'fare_amount', 'passenger_count', 'pickup_datetime', 'pickup_latitude', 'pickup_latitude']
pickup_within_radius_df.select(columns_to_see_after_filter).show(3)
+--------+-----------+---------------+-------------------+-----------------+-----------------+ | _c0|fare_amount|passenger_count| pickup_datetime| pickup_latitude| pickup_latitude| +--------+-----------+---------------+-------------------+-----------------+-----------------+ |40831100| 6.0| 1|2015-04-20 21:38:11|40.71929550170898|40.71929550170898| |39067507| 16.0| 1|2013-08-31 17:01:02| 40.711994| 40.711994| |25415449| 8.5| 1|2011-08-08 13:12:00| 40.71932| 40.71932| +--------+-----------+---------------+-------------------+-----------------+-----------------+ only showing top 3 rows
Plot¶
# Create a map centered on the search location
pickup_map = folium.Map(location=[center_latitude, center_longitude], zoom_start=13)
# Add a circle to represent the search radius
folium.Circle(location=[center_latitude, center_longitude], radius=search_radius*1609.34, color='red', fill=False).add_to(pickup_map)
# Add a heat map layer based on fare amounts
pickup_heat_data = pickup_within_radius_df.select('pickup_latitude', 'pickup_longitude', 'fare_amount').collect()
pickup_heat_data = [[row['pickup_latitude'], row['pickup_longitude'], row['fare_amount']] for row in pickup_heat_data]
pickup_heatmap = folium.plugins.HeatMap(pickup_heat_data, name='Fare Amounts', min_opacity=0.2, max_val=df.select(max('fare_amount')).collect()[0][0])
pickup_heatmap.add_to(pickup_map)
# Display the map
pickup_map
C:\Users\s.shah\AppData\Local\Temp\ipykernel_26456\4242799165.py:12: UserWarning: The `max_val` parameter is no longer necessary. The largest intensity is calculated automatically. pickup_heatmap = folium.plugins.HeatMap(pickup_heat_data, name='Fare Amounts', min_opacity=0.2, max_val=df.select(max('fare_amount')).collect()[0][0])
Conclusion¶
In conclusion, performing radius-based searches on large-scale geospatial data can be a challenging task, but using Spark and the Haversine formula can greatly simplify the process. By leveraging Spark's distributed computing capabilities, we can efficiently process and analyze large volumes of data. The Haversine formula is a well-established and accurate method for calculating distances between two points on a sphere, making it a reliable choice for geospatial analysis.
While Spark and the Haversine formula are excellent tools for performing radius-based searches, there are other alternatives worth considering as well. For example, Geo Hash and Uber H3 library are two popular alternatives for geospatial indexing and querying. Ultimately, the choice of tool will depend on the specific needs of the project and the characteristics of the data being analyzed.
In summary, performing radius-based searches on large-scale geospatial data requires careful consideration of the available tools and methods. By using Spark and the Haversine formula, we can efficiently and accurately analyze large volumes of data to gain valuable insights and make data-driven decisions.
No comments:
Post a Comment