In this unprecedented time we’re living in, we are all coming together to leverage our joint efforts and use it for COVID19 relief efforts to benefit the public. Everyone is pitching in with their own expertise. Some people have been pitching in by creating PPE products with their 3D printers, some have been developing software and others have been providing technical support for their loved ones or community. In this post, I would like to share how I used a stream of real-time COVID19 data updates to build a NodeJS event-driven application using a simple messaging protocol that can be used to build public facing applications.
In this application, I will be using the different streams Solace made available to the public for anyone to subscribe to. Documentation on how to use the streams can be found in this github repo.
SolaceLabs / covid19-stream-processors
Stream Information & Example Applications for Processing JHU and CovidTracking.com COVID-19 data available as streams over Solace
Research: A bit of background & requirements
Before building this application, I listed out three basic requirements that I wanted to achieve:
Real-time data updates
Light weight application (I didn’t want to continuously poll or check for new data)
Reactive to any data change
Options
From a higher level perspective, there are two different architectures I could've chosen:
A synchronous REST-driven approach or
An Event-driven architecture (EDA) approach
Option 1: REST
With the first option, there are lots of online resources that I could’ve tapped into including APIs or CSV datasets, such as the ones released by John Hopkins University in their github repo. While this is a viable option with lots of online samples and resources, I wanted something more real-time since
a) the data is most valuable when its first released (see figure 1 below) and
b) I wanted an application that reacts to data updates instead of continuously polling for updates. For example, the CSV file in the JHU github repo is updated once or twice a day. If I used this database, I would have to continuously poll it and check updates.
Figure 1: Value of data diminishes with passing of time
Also, since I would be dealing with a large data set, I want to only react to new updated data when changes come in. So a REST approach would not be a light weight implementation. This negates requirement 1 and 2.
Option 2: EDA
With an event driven architecture, I can use a publish-subscribe pattern approach to building my application. What is pub-sub you may ask? In a nutshell, it boils down to having a “Publisher” of the data (e.g. COVID-19 data source) and a “Subscriber” to this data (e.g. my NodeJs application) that reacts only when there is new data published.
With the PubSub+ COVID-19 Broker that Solace made available to the public, updates on COVID-19 data are published on different streams. So, application developers wanting to develop an event driven application (IoT, mobile/web app) could consume the stream of data by subscribing to any of the topics available. Since the consumption of this data is framework/platform/language agnostic, I could use any messaging protocols (MQTT, AMQP, JMS) or open APIs (Python, JS, NodeJS…) that support these protocols. I could also use REST!
Figure 2: High level end-to-end architecture overview
Decision
So, after evaluating the two options stated above, I decided to take the EDA approach to build my application. Also, since I wanted to use a lightweight messaging API, get real-time COVID-19 updates and be reactive to these updates, EDA was the clear winner.
Let’s get to business; well, I mean coding.
Based on the supported languages and protocols the Solace PubSub+ broker deals with, I decided to go with using MQTT since there is a native NodeJS API for this
Figure 3: Languages and protocolas supported by Solace
1. Initial setup
Let’s go ahead and start a NodeJS project. Open a new terminal and execute the following command which creates a new directory for your project, initializes it and installs the mqtt package
mkdir covidproject && cd "$_" && npm init -y && npm i mqtt
2. Connect to the Broker
Create a new file
touch index.js
And open it in your favourite text editor. Insert the following
var mqtt = require('mqtt')
var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883"
var config = {
username: "covid-public-client",
password: "covid19",
}
var client = mqtt.connect(host, config)
client.on('connect', () => {
console.log("Connected to COVID PubSub+ Broker!")
})
What you did above is initialize an mqtt
client and connected to the broker URL using the host and config variables. The mqtt object then returns back signals that your client application can leverage for callback behaviours, in this example it’s the “connect” signal we are listening to client.on(‘connect’)
. We will cover more signals later.
Note: the broker URL and credentials are from here https://github.com/SolaceLabs/covid19-stream-processors#1-connection-information
Now test out your connection by executing the following from terminal
node index.js
You should see Connected to COVID PubSub+ Broker!
output . Voila!
3. Subscribe to the topic
Now that you are connected to the broker, all you need to do is subscribe to topics.
var topics = [
"jhu/csse/covid19/raw",
]
client.on('connect', () => {
console.log("Connected to COVID PubSub+ Broker!")
topics.forEach( (topic) => {
console.log("Subscribing to topic: ", topic)
client.subscribe(topic)
})
})
4. Listen to incoming messages
The second signal we want to listen to is the message
as follows:
client.on('message', (topic, message) => {
console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))
})
Note that the received message from the broker is in binary format. To change it to a human readable format we use message.toString()
and we JSON parse it. Note that the messages are sent in JSON format based on the schemas defined in the repo.
Your final application looks like this:
var mqtt = require('mqtt')
var host = "tcp://mr2r9za6fwi0wf.messaging.solace.cloud:1883"
var config = {
username: "covid-public-client",
password: "covid19",
}
var topics = [
"jhu/csse/covid19/raw",
]
var client = mqtt.connect(host, config)
client.on('connect', () => {
console.log("Connected to COVID PubSub+ Broker!")
topics.forEach( (topic) => {
console.log("Subscribing to topic: ", topic)
client.subscribe(topic)
})
})
client.on('message', (topic, message) => {
console.log("Received message on Topic: ", topic,"\nMessage:\n", JSON.parse(message.toString()))
})
And we’re done! Your application is now connected to the broker and subscribes to one or more topics defined in the array and reacts only when new messages are published.
Sample run
I have modified my application to subscribe to all case updates (deaths, active, confirmed and recovered) in Canada Ontario, the recovered cases in Jordan and the confirmed cases in all the provinces in the the United Kingdom using the following topics on the test
stream
var topics = [
"jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
"jhu/csse/covid19/test/cases/recovered/update/Jordan/#",
"jhu/csse/covid19/test/cases/confirmed/update/United Kingdom/#"
]
Notice the use of MQTT wild cards ('+' and '#') for topic level matches and multi level matches respectively.
You can check out https://www.marcd.dev/COVIDStreamViewer/mqtt/mqttListener.html and subscribe to jhu/csse/covid19/raw
topic for a sample stream viewer.
Next steps
When you look at the topic hierarchy, you can subscribe to different topics and use mqtt wildcards to further customize how your client application consumes the event streams.
I would happy to see your ideas so feel free to share them and create a pull request to the SolaceLabs github repo! And if you have any questions, leave them in the comments sections below. And feel free to checkout my attempt to build a Python application with the same approach in this blog post!
Top comments (0)