This notebook shows you show to use use spatial queries in Spark environments. The notebook uses the spatio-temporal library that is pre-installed on all Spark environments in Watson Studio. You will learn how to perform common spatial queries in Spark.
The types of spatial queries you will learn to use are:
Often, a spatial function has one parameter that refers to a spatial column in one table and a second parameter that refers to a spatial constant or to a spatial column in another table. This notebook shows you how to use functions to access and combine data of different types to perform spatial queries.
This notebook runs on Python and Spark.
spark._jvm.org.apache.spark.sql.types.SqlGeometry.registerAll(spark._jsparkSession)
This notebook uses a sample data set that is available in the IBM Watson Studio Gallery. Direct links are used by default to make sure this notebook is publicly runnable.
In your own cases, you should use your preferred way of loading data into a Spark dataframe, depending on where your data source sits.
Here are some hints if you are using IBM Cloud Object Storage:
Code snippets
button in the menu bar, find your dataset, and then, under Insert as, select SparkSession DataFrame
. Code that adds a Spark data frame will be generated automatically.ibmos2spark
to read the data into a Spark data frame.Read the hospital data where each hospital's location is a latitude-longitude point:
import pandas as pd
from pyspark.sql.types import *
from urllib.request import Request, urlopen
req = Request('https://api.dataplatform.cloud.ibm.com/v2/gallery-assets/entries/5562ced564e776edc5f91e13d48d8309/data?accessKey=466875ad0187d4ea757478e5c1130b59')
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:77.0) Gecko/20100101 Firefox/77.0')
content = urlopen(req)
hospital_pdf = pd.read_csv(content)
print(hospital_pdf)
If you run into an error running the above code due to link not found, please download the hospitals.csv
data set in the Watson Studio gallery and insert it manually using the method given above.
hospital_schema = StructType([StructField('id', IntegerType()),
StructField('name', StringType()),
StructField('city', StringType()),
StructField('state', StringType()),
StructField('lon', DoubleType()),
StructField('lat', DoubleType())])
hospital_df = spark.createDataFrame(hospital_pdf, hospital_schema)
hospital_df.show(3)
Read the county data where each county is a polygon/multipolygon:
from urllib.request import Request, urlopen # Python 3
req = Request('https://api.dataplatform.cloud.ibm.com/v2/gallery-assets/entries/c8cc28f4c30dc4d8c0b13f18c50c3244/data?accessKey=c8cc28f4c30dc4d8c0b13f18c50fa2d5')
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:77.0) Gecko/20100101 Firefox/77.0')
content = urlopen(req)
counties_pdf = pd.read_csv(content)[['NAME', 'STATE_NAME', 'POP2000', 'shape_WKT']]
print(counties_pdf)
counties_schema = StructType([StructField('NAME', StringType()),
StructField('STATE_NAME', StringType()),
StructField('POP2000', IntegerType()),
StructField('shape_WKT', StringType())])
counties_df = spark.createDataFrame(counties_pdf, counties_schema)
counties_df.show(3)
The raw spatial data in the data frame can be of various types, for example columns indicating latitude and longitude or column indicating wkt string for the geometry, and so on.
Therefore, the first step is to use a spatial query to generate a new spatial column that combines the data in these columns.
For example, use the function:
ST_Point(lon_col, lat_col)
if the raw spatial data is in a latitude column and a longitude columnST_WKTTOSQL(wkt_col)
if the raw spatial data is in a column containing the wkt string form of the geometryFor the full list of possible query functions, see Geospatial Toolkit functions.
Create a geometry column for the hospital data using ST_Point(lon, lat)
:
hospital_df.createOrReplaceTempView("hospitals")
hospital_df = spark.sql("SELECT *, ST_Point(lon, lat) as location from hospitals")
hospital_df.show(3, False)
Create a geometry column for the county data using ST_WKTToSQL(wkt_string)
:
counties_df.createOrReplaceTempView('counties')
counties_df = spark.sql("SELECT NAME, STATE_NAME, POP2000, ST_WKTToSQL(shape_WKT) as shape from counties")
counties_df.show(3)
A data frame can also be used to create a temporary view. Registering a data frame as a table allows you to run SQL queries over its data. Register the hospital and county data frames as a temporary view:
#spark.sql.legacy.storeAnalyzedPlanForView = True
#SparkSession.sql("set spark.sql.legacy.storeAnalyzedPlanForView = true")
spark.conf.set("spark.sql.legacy.storeAnalyzedPlanForView","False")
print(spark.conf.get("spark.sql.legacy.storeAnalyzedPlanForView"))
hospital_df.createOrReplaceTempView('hospitals_temp')
counties_df.createOrReplaceTempView('counties_temp')
This sample query shows you how to find the hospitals that are within a certain distance of a given location (which is constructed using the ST_Point
constructor).
spark.sql("""
SELECT name, city, state
FROM hospitals_temp
WHERE ST_Distance(location, ST_Point(-77.574722, 43.146732)) < 10000.0
""").show()
The following sample queries show you how to use spatial functions to determine which polygon contains a given point. The examples use the following functions:
ST_Contains(geom1, geom2)
: returns TRUE if the geom2
values are completely contained by the polygons identified by geom1
.ST_Within(geom1, geom2)
: returns TRUE if the geom1
values are within the polygons identified by geom2
.ST_Intersects(geom1, geom2)
: returns TRUE if geom1
and geom2
intersect spatially in any way. This can be that they touch, cross, or contain one other.spark.sql("""
SELECT NAME
FROM counties_temp
WHERE ST_Contains(shape, ST_Point(-74.237, 42.037))
""").show()
spark.sql("""
SELECT NAME
FROM counties_temp
WHERE
ST_Within(ST_Point(-74.237, 42.037), shape)
""").show()
spark.sql("""
SELECT NAME
FROM counties_temp
WHERE
ST_Intersects(shape, ST_Point(-74.237, 42.037))
""").show()
Each of the following queries determine which hospitals are located within the specified polygon, which is defined as a constant using the well-known text (WKT) representation. The polygon definition consists of the character string POLYGON followed by a pair of $x$ and $y$ coordinates for each vertex, separated by a comma. The individual $x$ and $y$ values are separated by a space. The entire list of coordinate pairs must be in parentheses.
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE
ST_Contains(ST_WKTToSQL('POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'), location)
""").show(3)
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE ST_Within(location, ST_WKTToSQL('POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE ST_Intersects(location, ST_WKTToSQL('POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)
Just as a regular join function can join two tables based on the values in columns that contain character or numeric data, spatial join functions can be used to join tables based on the values in the columns that contain spatial data. The following examples use the counties and hospitals tables.
You can use the spatial join function to find the hospitals located within a specific county. For example, the following query returns a list of all the hospitals in the Dutchess county:
spark.sql("""
SELECT c.NAME, h.name
FROM counties_temp AS c, hospitals_temp AS h
WHERE c.NAME = 'Dutchess'
AND ST_Intersects(c.shape, h.location)
""").show()
Alternatively, you can use the SQL JOIN ... ON ...
notation, which is equivalent to a spatial predicate in the WHERE
clause. For example, the following query produces the same result set as the previous query:
spark.sql("""
SELECT h.name, c.NAME
FROM counties_temp AS c
JOIN hospitals_temp AS h
ON c.NAME = 'Dutchess'
AND ST_Intersects(h.location, c.shape)
""").show()
The following query returns the name of the county in which a particular hospital is located:
spark.sql("""
SELECT c.NAME, h.name
FROM hospitals_temp AS h, counties_temp AS c
WHERE ST_Intersects(h.location, c.shape)
AND h.name = 'Vassar Brothers Hospital'
""").show()
This example shows you how to use spatial joins in conjunction with additional predicates and aggregation, which can address business problems. These examples continue to use the hospitals and counties tables, but the same principles could be applied to any other type of data.
The following example queries the hospitals within each county in New York state, qualifying by the state name in the counties table.
spark.sql("""
SELECT c.NAME, h.name
FROM counties_temp AS c, hospitals_temp AS h
WHERE ST_Intersects(h.location, c.shape)
AND c.STATE_NAME='New York'
ORDER BY c.NAME, h.name
""").show(3)
The same results can be obtained by rewriting the above query and using the fields from the hospitals table:
spark.sql("""
SELECT c.NAME, h.name
FROM hospitals_temp AS h, counties_temp AS c
WHERE ST_Intersects(h.location, c.shape)
AND h.state='NY'
ORDER BY c.NAME, h.name
""").show(3)
The following example lists the number of hospitals per county in New York:
spark.sql("""
SELECT c.NAME, COUNT(h.name) AS hospital_count
FROM counties_temp AS c, hospitals_temp AS h
WHERE ST_Intersects(h.location, c.shape)
AND c.STATE_NAME='New York'
GROUP BY c.NAME
""").show(3)
To identify counties where the population is underserved by hospitals, an interesting metric might be the number of people per hospital in each county. Using the population of each county in the year 2000, you can calculate this number.
spark.sql("""
SELECT c.NAME,
COUNT(h.name) AS hospital_count,
c.POP2000 AS Population,
c.POP2000/COUNT(h.name) AS people_per_hospital
FROM counties_temp AS c, hospitals_temp AS h
WHERE c.STATE_NAME='New York'
AND ST_Intersects(h.location, c.shape)
GROUP BY c.NAME, c.POP2000
ORDER BY people_per_hospital DESC
""").show(3)
With additional detail, such as number of beds, number of doctors per hospital, you could determine a better measure for health care coverage per state and population.
A common use case for mapping applications, and in particular for web mapping, is to select objects that fall within a specific rectangular region. This can be done by creating a polygon to represent the rectangle and using the ST_Intersects
spatial predicate.
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE ST_Intersects(location, ST_WKTToSQL(
'POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)
Another common spatial query is to find things within a specified distance of a particular location. You have probably used web-mapping applications to get this kind of information. You can issue SQL queries from your application for questions like:
The spatial function used for these queries is ST_Distance
, which computes the distance between the spatial values and returns a result in meters.
The following query generates eight results:
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE ST_Intersects(location, ST_WKTToSQL(
'POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)
A different way of querying the same location above is to use the ST_Buffer
function, where a circular buffer is created around the given geometry and the desired geometries within that buffer are determined. The ST_Buffer
function takes as parameters a spatial geometry and a distance in meters to the buffer around this spatial value. The results are the same as when you us ST_Intersects
.
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE
ST_Intersects(location,
ST_Buffer(ST_Point(-74.237, 42.037), 46800.0))
ORDER BY name
""").show(3)
The following query returns the distance from a specified point to each object within a 30 mile (or approximately 46800m) radius:
spark.sql("""
SELECT name, ST_Distance(location, ST_Point(-74.237, 42.037)) AS distance
FROM hospitals_temp
WHERE ST_Distance(location, ST_Point(-74.237, 42.037)) < 46800.0
ORDER BY distance
""").show(3)
You could also use ST_Buffer
to compute the spatial relation and then determine the distance as is shown in the following query:
spark.sql("""
SELECT name, ST_Distance(location, ST_Point(-74.237, 42.037)) AS distance
FROM hospitals_temp
WHERE
ST_Intersects(location,
ST_Buffer(ST_Point(-74.237, 42.037), 46800.0))
ORDER BY distance
""").show(3)
A key difference to be noted here is that the ST_Buffer
in this package supports buffering of arbitrary geometries and can be used to compute in that manner. Note that:
ST_Buffer
query on large geometries can be expensive.spark.sql("""
SELECT name, ST_Distance(location, ST_WKTToSQL(
'LINESTRING (-74.0 42.0, -73.0 42.0)'))
FROM hospitals_temp
WHERE ST_Intersects(location, ST_Buffer(ST_WKTToSQL(
'LINESTRING (-74.0 42.0, -73.0 42.0)'), 46800.0))
""").show(3)
In this notebook, you learned how to query spatial data you downloaded from the IBM Watson Studio Gallery. You registered each data frame (one with data on hospitals and another with county information) as a table to run your queries on. The sample queries showed you how to determine the hospitals within a certain distance or in a polygon, to find the name of the county in which a hospital is located, or to identify the counties where the population is underserved by hospitals. The sample queries showed you how to use and combine the most common Spark SQL spatial functions in queries.
Linsong Chu, Research Engineer at IBM Research
Copyright © 2019 IBM. This notebook and its source code are released under the terms of the MIT License.