Abstract
In this article, we'll see how to use the Apache Spark GraphFrames package with SingleStore by using historical data about the London Underground network. We'll store the data as stations (vertices) and line connections (edges) in SingleStore and then load the data into a notebook environment and perform some queries on the data using GraphFrames.
The notebook file used in this article is available on GitHub.
Create a SingleStore Cloud account
A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:
- Workspace Group Name: Spark Demo Group
- Cloud Provider: AWS
- Region: US East 1 (N. Virginia)
- Workspace Name: spark-demo
- Size: S-00
We'll make a note of the password and store it in the secrets vault using the name password
.
Create a new notebook
From the left navigation pane in the cloud portal, we'll select DEVELOP > Data Studio.
In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.
We'll call the notebook spark_graphframes_demo, select a Blank notebook template from the available options, and save it in the Personal location.
Fill out the notebook
First, let's install Java:
!conda install -y --quiet -c conda-forge openjdk
Next, we'll install some libraries:
!pip install folium --quiet
!pip install graphframes --quiet
!pip install pyspark --quiet
In previous articles in our Apache Spark series using notebooks, we've downloaded jar files into a local directory. However, in this article, we'll use spark.jars.packages
and create our SparkSession
as follows:
# List of Maven coordinates for all required packages
maven_packages = [
"graphframes:graphframes:0.8.4-spark3.5-s_2.12",
"org.scala-lang:scala-library:2.12",
"com.singlestore:singlestore-jdbc-client:1.2.4",
"com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0",
"org.apache.commons:commons-dbcp2:2.12.0",
"org.apache.commons:commons-pool2:2.12.0",
"io.spray:spray-json_3:1.3.6"
]
# Create Spark session with all required packages
spark = (SparkSession
.builder
.config("spark.jars.packages", ",".join(maven_packages))
.appName("Spark GraphFrames Test")
.getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
A database is required, so we'll create one:
DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
We'll also create tables to store the data:
USE spark_demo;
DROP TABLE IF EXISTS connections;
CREATE ROWSTORE TABLE IF NOT EXISTS connections (
src INT,
dst INT,
line VARCHAR(32),
colour VARCHAR(8),
time INT,
PRIMARY KEY(src, dst, line)
);
DROP TABLE IF EXISTS stations;
CREATE ROWSTORE TABLE IF NOT EXISTS stations (
id INT PRIMARY KEY,
latitude DOUBLE,
longitude DOUBLE,
name VARCHAR(32),
zone FLOAT,
total_lines INT,
rail INT
);
Now we'll download the historical data for the London Underground into Pandas. We'll also perform some adjustments for GraphFrames and some merging of the data to mirror the schema of the tables we defined above:
connections_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_connections.csv"
stations_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_stations.csv"
lines_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_lines.csv"
connections_df = pd.read_csv(connections_url)
connections_df.rename(
columns = {"station1": "src", "station2": "dst"},
inplace = True
)
stations_df = pd.read_csv(stations_url)
stations_df.drop(
"display_name",
axis = 1,
inplace = True
)
lines_df = pd.read_csv(lines_url)
lines_df.drop(
"stripe",
axis = 1,
inplace = True
)
connections_df = pd.merge(
connections_df,
lines_df,
on = "line",
how = "left"
)
connections_df.drop(
"line",
axis = 1,
inplace = True
)
connections_df.rename(
columns = {"name": "line"},
inplace = True
)
Just before we save our data into SingleStore, we'll create a map of the London Underground using Folium:
London = [51.509865, -0.118092]
mymap = folium.Map(location = London, zoom_start = 12)
# Add markers for stations
for idx, row in stations_df.iterrows():
folium.Marker(
[row["latitude"], row["longitude"]],
popup = row["name"]
).add_to(mymap)
# Add lines with colours
for idx, row in connections_df.iterrows():
source = stations_df.loc[stations_df["id"] == row["src"]]
target = stations_df.loc[stations_df["id"] == row["dst"]]
# Extract latitude and longitude
source_coords = (float(source["latitude"].iloc[0]), float(source["longitude"].iloc[0]))
target_coords = (float(target["latitude"].iloc[0]), float(target["longitude"].iloc[0]))
folium.PolyLine(
locations = [source_coords, target_coords],
color = row["colour"]
).add_to(mymap)
html_content = mymap._repr_html_()
and save the map to Stage for download, as follows:
with nb.stage.open("map.html", "w") as st:
st.write(html_content)
This produces a map, as shown in Figure 2. We can scroll and zoom the map. When clicked, a marker will show the station name and the lines are coloured according to the London Underground scheme.
We'll now prepare the connection to SingleStore:
from sqlalchemy import *
db_connection = create_engine(connection_url)
url = db_connection.url
and write the connections
data:
connections_df.to_sql(
"connections",
con = db_connection,
if_exists = "append",
index = False,
chunksize = 1000
)
and stations
data:
stations_df.to_sql(
"stations",
con = db_connection,
if_exists = "append",
index = False,
chunksize = 1000
)
We can check the data in the connections
table:
SELECT * FROM connections LIMIT 5;
and stations
table:
SELECT * FROM stations LIMIT 5;
Now we'll create the Spark connection to SingleStore:
password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
We also need to set some configuration parameters:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
With the data safely stored in SingleStore, we can now read it back into Spark and use GraphFrames. First the connections
data:
connections = (spark.read
.format("singlestore")
.load("spark_demo.connections")
)
and then the stations
data:
stations = (spark.read
.format("singlestore")
.load("spark_demo.stations")
)
Now we'll create a GraphFrame
:
underground = GraphFrame(stations, connections)
We can show the vertices:
underground.vertices.show(5)
Example output:
+---+--------+---------+---------------+----+-----------+----+
| id|latitude|longitude| name|zone|total_lines|rail|
+---+--------+---------+---------------+----+-----------+----+
| 25| 51.512| -0.1031| Blackfriars| 1.0| 2| 0|
| 39| 51.5481| -0.1188|Caledonian Road| 2.0| 1| 0|
| 43| 51.5147| 0.0082| Canning Town| 3.0| 2| 0|
| 50| 51.7052| -0.611| Chesham|10.0| 1| 0|
| 60| 51.5129| -0.1243| Covent Garden| 1.0| 1| 0|
+---+--------+---------+---------------+----+-----------+----+
only showing top 5 rows
We can show the edges:
underground.edges.show(5)
Example output:
+---+---+--------------------+-------+----+
|src|dst| line| colour|time|
+---+---+--------------------+-------+----+
| 7|145| Northern Line|#000000| 2|
| 11|163| Bakerloo Line|#B36305| 1|
| 19| 97|Docklands Light R...|#00A4A7| 2|
| 28|192| Central Line|#E32017| 1|
| 49|151| Northern Line|#000000| 2|
+---+---+--------------------+-------+----+
only showing top 5 rows
We can check how many stations are in each London Underground Zone:
(underground
.vertices
.groupBy("zone")
.count()
.orderBy("count", ascending = False)
.show()
)
Example output:
+----+-----+
|zone|count|
+----+-----+
| 2.0| 75|
| 1.0| 60|
| 3.0| 47|
| 4.0| 38|
| 5.0| 28|
| 6.0| 18|
| 2.5| 17|
| 3.5| 6|
| 1.5| 4|
| 8.0| 2|
|10.0| 2|
| 7.0| 2|
| 9.0| 1|
| 6.5| 1|
| 5.5| 1|
+----+-----+
It may be useful to find the number of stations by the line name:
(underground
.edges
.filter("line = 'District Line'")
.count()
)
Example output:
59
It could be interesting to know the maximum number of lines running through a station:
(underground
.vertices
.groupBy()
.max("total_lines")
.show()
)
Example output:
+----------------+
|max(total_lines)|
+----------------+
| 6|
+----------------+
and to find the station with the most lines running through it:
(underground
.vertices
.filter("total_lines == 6")
.show()
)
Example output:
+---+--------+---------+--------------------+----+-----------+----+
| id|latitude|longitude| name|zone|total_lines|rail|
+---+--------+---------+--------------------+----+-----------+----+
|145| 51.5308| -0.1238|King's Cross St. ...| 1.0| 6| 1|
+---+--------+---------+--------------------+----+-----------+----+
Many more types of queries are possible. The GraphFrames documentation is a good place to start.
Finally, we can stop Spark:
spark.stop()
Summary
In this short article, we've seen the ease with which we can store graph data in SingleStore and how we can use GraphFrames to perform various queries on the data.
Top comments (0)