Aerospike is a highly scalable key value database offering best in class performance. It is typically deployed into real-time environments managing terabyte to petabyte data volumes.
Aerospike will typically be run alongside other scalable distributed software such as Kafka, for system coupling or Spark for analytics. The Aerospike Connect product line makes integration as easy as possible.
This article looks at how Aerospike Spark Connect works in practice by offering a comprehensive and easily reproduced end to end example using aerospike-ansible.
Database Cluster Setup
First take a look at Ansible for Aerospike which explains how to use aerospike-ansible.
In this example I set cluster_instance_type
to c5d.18xlarge in vars/cluster-config.yml
.
Follow the instructions up to and including one touch setup. You'll get as far as
ansible-playbook aws-setup-plus-aerospike-install.yml
ansible-playbook aerospike-java-client-setup.yml
which will give you a 3 node cluster by default, plus a client instance with relevant software installed.
Spark Cluster Setup
This is done via
ansible-playbook spark-cluster-setup.yml
For this example, prior to running, I set spark_instance_type
to c5d.4xlarge in vars/cluster-config.yml
.
This playbook creates a 3 node Spark cluster, of the given instance type, with Spark installed and running. It also installs Aerospike Spark Connect.
Note you will need to set enterprise: true
and provide a path to a valid Aerospike feature key using feature_key: /your/path/to/key
in vars/cluster-config.yml
. You must therefore be either a licensed Aerospike customer, or running an Aerospike trial.
Near the end of the process you will see
TASK [Spark master IP & master internal url] ************************************************************************************************************************************************************************
ok: [localhost] => {
"msg": "Spark master is 3.88.237.103. Spark master internal url is spark://10.0.2.122:7077."
}
Make a note of the Spark master internal url - it is needed later.
Load Data
Our example makes use of 20m records from the 1bn NYC Taxi ride corpus, available in compressed form at https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz. We load to Aerospike using aerospike loader, which is installed on the client machine set up above. First of all we get the addresses of the hosts in the Aerospike cluster - these are needed later.
source ./scripts/cluster-ip-address-list.sh
Sample output
Adds cluster ips to this array- AERO_CLUSTER_IPS
Use as ${ AERO_CLUSTER_IPS[index]}
There are 3 entries
##########################################################
cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58
cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234
cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95
aerospike loader requires a config file to load the data into Aerospike. This maps csv column postions to named and typed bins. A sample entry looks like
{
"name": "pkup_datetime",
"value": {
"column_position": 3,
"type": "timestamp",
"encoding": "yyyy-MM-dd hh:mm:ss",
"dst_type": "integer"
}
}
This is provided in the repo at recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json
. We upload this to the client instance.
source ./scripts/client-ip-address-list.sh
scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[0]}:~
Next get the data onto the client machine. There's more than one way to do this, but you need to plan as the dataset is 7.6Gb when uncompressed. I used the below, but specifics will depend on the specifics of your drives and filesystem.
./scripts/client-quick-ssh.sh # to log in, followed by
sudo mkfs.ext4 /dev/nvme1n1
sudo mkdir /data
sudo mount -t ext4 /dev/nvme1n1 /data
sudo chmod 777 /data
wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz
gunzip /data/trips_xaa.csv.gz
Finally we load our data in, using the config file we uploaded.
cd ~/aerospike-loader
./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv
Note we're using one of the cluster ip addresses we recorded earlier.
Using Spark
Log into one of the Spark nodes. Via aerospike-ansible there is a utility script for this
./scripts/spark-quick-ssh.sh
Start up a Spark shell, using the Spark master URL we saw when running the Spark cluster setup playbook.
/spark/bin/spark-shell --master spark://10.0.2.122:7077
Import relevant libraries
import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf
import java.util.Date
import java.text.SimpleDateFormat
Supply Aerospike configuration - note we supply the cluster ip used previously:
spark.conf.set("aerospike.seedhost", "10.0.0.234")
spark.conf.set("aerospike.namespace", "test")
Define a view, and a function we will be using
val sqlContext = spark.sqlContext
sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))
val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load
taxi.createOrReplaceTempView("taxi")
Finally we run our queries
// Journeys grouped by cab type
val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")
result.show()
+--------+--------+
|cab_type| count|
+--------+--------+
| green|20000000|
+--------+--------+
// Average fare based on different passenger count
val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")
result.show()
+-------------+----------+
|passenger_cnt|avg_amount|
+-------------+----------+
| 0| 10.86|
| 1| 14.63|
| 2| 15.75|
| 3| 15.87|
| 4| 15.85|
| 5| 14.76|
| 6| 15.42|
| 7| 23.74|
| 8| 19.52|
| 9| 34.9|
+-------------+----------+
// No of journeys for different numbers of passengers
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");
result.show()
+-------------+---------+--------+
|passenger_cnt|trip_year| count|
+-------------+---------+--------+
| 0| 2014| 4106|
| 1| 2014|16557518|
| 2| 2014| 1473578|
| 3| 2014| 507862|
| 4| 2014| 160714|
| 5| 2014| 939276|
| 6| 2014| 355846|
| 7| 2014| 492|
| 8| 2014| 494|
| 9| 2014| 114|
+-------------+---------+--------+
// Number of trips for each passenger count/distance combination
// Ordered by trip count, descending
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")
result.show()
+-------------+---------+--------+-------+
|passenger_cnt|trip_year|distance| trips|
+-------------+---------+--------+-------+
| 1| 2014| 1.0|5321230|
| 1| 2014| 2.0|3500458|
| 1| 2014| 3.0|2166462|
| 1| 2014| 4.0|1418494|
| 1| 2014| 5.0| 918460|
| 1| 2014| 0.0| 868210|
| 1| 2014| 6.0| 653646|
| 1| 2014| 7.0| 488416|
| 2| 2014| 1.0| 433746|
| 1| 2014| 8.0| 345728|
| 2| 2014| 2.0| 305578|
| 5| 2014| 1.0| 302120|
| 1| 2014| 9.0| 226278|
| 5| 2014| 2.0| 199968|
| 2| 2014| 3.0| 199522|
| 1| 2014| 10.0| 163928|
| 3| 2014| 1.0| 145580|
| 2| 2014| 4.0| 137152|
| 5| 2014| 3.0| 122714|
| 1| 2014| 11.0| 117570|
+-------------+---------+--------+-------+
only showing top 20 rows
Conclusion
This shows you how quickly you can get up and running with a large data corpus. The example was done with 20m rows but this is easily extended to the full corpus. We can also see just how quickly you can get up and running with the aerospike-ansible tooling.
Top comments (0)