DEV Community

Cover image for How I built an event-driven NodeJS app on realtime COVID-19 data streams
Tamimi for Solace Developers

Posted on • Edited on

How I built an event-driven NodeJS app on realtime COVID-19 data streams

In this unprecedented time we’re living in, we are all coming together to leverage our joint efforts and use it for COVID19 relief efforts to benefit the public. Everyone is pitching in with their own expertise. Some people have been pitching in by creating PPE products with their 3D printers, some have been developing software and others have been providing technical support for their loved ones or community. In this post, I would like to share how I used a stream of real-time COVID19 data updates to build a NodeJS event-driven application using a simple messaging protocol that can be used to build public facing applications.

In this application, I will be using the different streams Solace made available to the public for anyone to subscribe to. Documentation on how to use the streams can be found in this github repo.

GitHub logo SolaceLabs / covid19-stream-processors

Stream Information & Example Applications for Processing JHU and CovidTracking.com COVID-19 data available as streams over Solace

Research: A bit of background & requirements

Before building this application, I listed out three basic requirements that I wanted to achieve:

  1. Real-time data updates

  2. Light weight application (I didn’t want to continuously poll or check for new data)

  3. Reactive to any data change

Options

From a higher level perspective, there are two different architectures I could've chosen:

  1. A synchronous REST-driven approach or

  2. An Event-driven architecture (EDA) approach

Option 1: REST

With the first option, there are lots of online resources that I could’ve tapped into including APIs or CSV datasets, such as the ones released by John Hopkins University in their github repo. While this is a viable option with lots of online samples and resources, I wanted something more real-time since

a) the data is most valuable when its first released (see figure 1 below) and

b) I wanted an application that reacts to data updates instead of continuously polling for updates. For example, the CSV file in the JHU github repo is updated once or twice a day. If I used this database, I would have to continuously poll it and check updates.

Gartner Stream Processing Figure 1: Value of data diminishes with passing of time

Also, since I would be dealing with a large data set, I want to only react to new updated data when changes come in. So a REST approach would not be a light weight implementation. This negates requirement 1 and 2.

Option 2: EDA

With an event driven architecture, I can use a publish-subscribe pattern approach to building my application. What is pub-sub you may ask? In a nutshell, it boils down to having a “Publisher” of the data (e.g. COVID-19 data source) and a “Subscriber” to this data (e.g. my NodeJs application) that reacts only when there is new data published.

With the PubSub+ COVID-19 Broker that Solace made available to the public, updates on COVID-19 data are published on different streams. So, application developers wanting to develop an event driven application (IoT, mobile/web app) could consume the stream of data by subscribing to any of the topics available. Since the consumption of this data is framework/platform/language agnostic, I could use any messaging protocols (MQTT, AMQP, JMS) or open APIs (Python, JS, NodeJS…) that support these protocols. I could also use REST!

 Solace COVID architecture diagram Figure 2: High level end-to-end architecture overview

Decision

So, after evaluating the two options stated above, I decided to take the EDA approach to build my application. Also, since I wanted to use a lightweight messaging API, get real-time COVID-19 updates and be reactive to these updates, EDA was the clear winner.

Let’s get to business; well, I mean coding.

Coding gif

Based on the supported languages and protocols the Solace PubSub+ broker deals with, I decided to go with using MQTT since there is a native NodeJS API for this

languages and protocols Solace support Figure 3: Languages and protocolas supported by Solace

1. Initial setup

Let’s go ahead and start a NodeJS project. Open a new terminal and execute the following command which creates a new directory for your project, initializes it and installs the mqtt package


mkdir covidproject && cd "$_" && npm init -y && npm i mqtt 

Enter fullscreen mode Exit fullscreen mode

2. Connect to the Broker

Create a new file


touch index.js 

Enter fullscreen mode Exit fullscreen mode

And open it in your favourite text editor. Insert the following


var mqtt = require('mqtt') 

var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883" 

var config = { 

    username: "covid-public-client", 

    password: "covid19", 

} 

var client  = mqtt.connect(host, config) 

client.on('connect', () => { 

    console.log("Connected to COVID PubSub+ Broker!") 

}) 

Enter fullscreen mode Exit fullscreen mode

What you did above is initialize an mqtt client and connected to the broker URL using the host and config variables. The mqtt object then returns back signals that your client application can leverage for callback behaviours, in this example it’s the “connect” signal we are listening to client.on(‘connect’). We will cover more signals later.

Note: the broker URL and credentials are from here https://github.com/SolaceLabs/covid19-stream-processors#1-connection-information

Now test out your connection by executing the following from terminal


node index.js 

Enter fullscreen mode Exit fullscreen mode

You should see Connected to COVID PubSub+ Broker! output . Voila!

3. Subscribe to the topic

Now that you are connected to the broker, all you need to do is subscribe to topics.


var topics = [ 

    "jhu/csse/covid19/raw", 

] 

client.on('connect', () => { 

    console.log("Connected to COVID PubSub+ Broker!") 

    topics.forEach( (topic) => { 

        console.log("Subscribing to topic: ", topic) 

        client.subscribe(topic) 

    }) 

}) 



Enter fullscreen mode Exit fullscreen mode

4. Listen to incoming messages

The second signal we want to listen to is the message as follows:


client.on('message', (topic, message) => { 

    console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))

}) 



Enter fullscreen mode Exit fullscreen mode

Note that the received message from the broker is in binary format. To change it to a human readable format we use message.toString() and we JSON parse it. Note that the messages are sent in JSON format based on the schemas defined in the repo.

Your final application looks like this:


var mqtt = require('mqtt') 



var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883" 

var config = { 

    username: "covid-public-client", 

    password: "covid19", 

} 



var topics = [ 

   "jhu/csse/covid19/raw", 

] 



var client  = mqtt.connect(host, config) 



client.on('connect', () => { 

    console.log("Connected to COVID PubSub+ Broker!") 

    topics.forEach( (topic) => { 

        console.log("Subscribing to topic: ", topic) 

        client.subscribe(topic) 

    }) 

}) 



client.on('message', (topic, message) => { 

    console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))

}) 

Enter fullscreen mode Exit fullscreen mode

And we’re done! Your application is now connected to the broker and subscribes to one or more topics defined in the array and reacts only when new messages are published.

Sample run

I have modified my application to subscribe to all case updates (deaths, active, confirmed and recovered) in Canada Ontario, the recovered cases in Jordan and the confirmed cases in all the provinces in the the United Kingdom using the following topics on the test stream

var topics = [
    "jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
    "jhu/csse/covid19/test/cases/recovered/update/Jordan/#",
    "jhu/csse/covid19/test/cases/confirmed/update/United Kingdom/#"
]
Enter fullscreen mode Exit fullscreen mode

Notice the use of MQTT wild cards ('+' and '#') for topic level matches and multi level matches respectively.

Alt Text
Alt Text

You can check out https://www.marcd.dev/COVIDStreamViewer/mqtt/mqttListener.html and subscribe to jhu/csse/covid19/raw topic for a sample stream viewer.

Next steps

When you look at the topic hierarchy, you can subscribe to different topics and use mqtt wildcards to further customize how your client application consumes the event streams.

I would happy to see your ideas so feel free to share them and create a pull request to the SolaceLabs github repo! And if you have any questions, leave them in the comments sections below. And feel free to checkout my attempt to build a Python application with the same approach in this blog post!

Top comments (0)