loading...
Cover image for How I used COVID Tracking data to build an event-driven application and learn more Python
Solace Developers

How I used COVID Tracking data to build an event-driven application and learn more Python

tweettamimi profile image Tamimi Ahmad Updated on 6 min read

TL;DR I built a python event driven application leveraging a publicly available real-time stream of COVID-19 data updates using Solace PubSub+ Event Broker as the messaging infrastructure. Check out the source code in this github repo under samples/python

GitHub logo SolaceLabs / covid19-stream-processors

Stream Information & Example Applications for Processing JHU and CovidTracking.com COVID-19 data available as streams over Solace

Phew! Now that the TL;DR elephant in the room is out of the way, lets get to the core of it

Right now with the situation the world is currently in, some of us are probably thinking hmm what can I do to contribute back to society? If I can't directly contribute something, what can I learn during quarantine to polish my skills and at the same time build something useful? I dont know about you, but I definitely enhanced my baking skills with the amount of banana bread loaves I baked (and ate). Apart from baking, I took this situation as an opportunity to learn new technologies and have something relatable to implement it on (i.e. COVID). I chose Python as the programming language of choice to learn and complement all what I know in event driven architectures.

In a previous blog post, I discussed how I built an event-driven NodeJS application on real-time COVID-19 data streams. I also covered some background on what event-driven is, why is it important and how to use the publish/subscribe messaging pattern to approach building and EDA application, so I recommend reading it if you havent already . If JavaScript/NodeJs is not your cup of tea, then worry not! In this blogpost, I will be sharing how I used Python to build a simple event driven application using the same COVID-19 streams of data with the same three basic requirements.

Tech Stack

Before building any application and work on a project, I start by drafting my architecture design, the different tools to be used and the programming language I need to successfully get an MVP up and running. With this approach in mind, I will be using the following tech stack:

  1. Programming Language: Python
  2. Messaging Protocol: MQTT
  3. Message Broker: Solace PubSub+
  4. Data source: COVID-19 data provided by the publisher application as per this documentation.

Note that in my tech stack, I didnt list any REST API or client/server architecture Done. Next!

1. Environment Setup & Prerequisites

To setup my Python environment, I will be using virtualenv. In a nutshell, virtualenv is a tool used to isolate Python environments by creating a directory structure encapsulating all the installed packages you need in the application youre building. This way, you wont be contaminating your global Python packages with anything required by your application. You can read more about it here and on why it is a recommended approach for Python development.

Note: If you come from a NodeJS background, think in terms of node_modules.

In a new terminal, execute the following command to create a new directory for your project, setup the Python virtual environment and activate it:

mkdir covid_python && cd "$_" && virtualenv env && source env/bin/activate

Now install the paho-mqtt python package:

pip install paho_mqtt

2. Skeleton code setup

Create a new covid.py file and initialize it with the following:

import paho.mqtt.client as mqtt

# Solace Broker Info
url = "mr2r9za6fwi0wf.messaging.solace.cloud"
username = "covid-public-client"
password = "covid19"

def on_connect(client, data, flags, rc):
    assert (rc == 0), "Error Connecting. Return code: " + str(rc)
    print("Connected to the COVID PubSub+ Solace Broker!")


client = mqtt.Client()
client.username_pw_set(username=username, password=password)

client.on_connect = on_connect

client.connect(url)
client.loop_forever()

What you did above is import the paho-mqtt package and initialized an mqtt client using the configuration parameters.

Its important to note the following:

  • The client.loop_forever() is a blocking function call that processes network traffic, dispatched callbacks and handles reconnecting. You can read more about other loop*() functions in the main documentation page.

  • .on_connect is a function called when the broker responds to our connection request. We assign this callback to the on_connect(client, userdata, flags, rc) function that will get executed upon a successful connection.

You now have a runnable skeleton to test out our connection. From your terminal execute the following

python covid.py

You should see Connected to the COVID PubSub+ Solace Broker! on your terminal. Saweeet! Connection established . To stop the execution of the program, press CMD+c (or Ctrl+c if you swim in the Windows domain).

3. Handle events

To handle the incoming messages from the PubSub+ Broker in an event-driven manner, we have to first subscribe to the topic of interest (according to the available topics) and handle the messages with the second type of callback function: .on_message.

import paho.mqtt.client as mqtt

def on_connect(client, data, flags, rc):
    assert (rc == 0), "Error Connecting. Return code: " + str(rc)
    print("Connected to the COVID PubSub+ Solace Broker!")

    for t in topics:
        print("Subscribing to: " + t)
        client.subscribe(t)

def on_message(client, data, msg):
    print("Received message on: %s\n %s" % (msg.topic, msg.payload.decode('ascii')))

# Broker Info
url = "mr2r9za6fwi0wf.messaging.solace.cloud"
username = "covid-public-client"
password = "covid19"

topics = [
    "jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
    "com/covidtracking/states/current/update/New York",
]

client = mqtt.Client()
client.username_pw_set(username=username, password=password)

client.on_connect = on_connect
client.on_message = on_message

client.connect(url)
client.loop_forever()

In the above code program:

  • I added an on_message(client, data, msg) function to handle the .on_message callback from the client. Note the decoding of the received message payload to ascii since we receive the payload in binary format.
  • There are subscriptions to two different topics
    1. All cases updates in Ontario, Canada obtained from the John Hopkins University data source. I used a test stream.
    2. All cases updates in New York obtained from The Covid Tracking project data source. Note you can use the topic com/covidtracking/states/current/get/raw to get raw updates from all the states in the United States

Run the code and wait for updates to come in!

The Matrix binary flow

A note on JSON

Since the returned format of the message payload is in JSON, you can use the python json package to handle, manipulate and BEAUTIFY your output. Here is the full code for your application

import paho.mqtt.client as mqtt
import json

def on_connect(client, data, flags, rc):
    assert (rc == 0), "Error Connecting. Return code: " + str(rc)
    print("Connected to the COVID PubSub+ Solace Broker!")

    for t in topics:
        print("Subscribing to: " + t)
        client.subscribe(t)

def on_message(client, data, msg):
    msg_json_object = json.loads(msg.payload.decode('ascii'))
    msg_json_pretty = json.dumps(msg_json_object, indent=2)
    print("Received message on: %s\n %s" % (msg.topic, msg_json_pretty))

# Broker Info
url = "mr2r9za6fwi0wf.messaging.solace.cloud"
username = "covid-public-client"
password = "covid19"

topics = [
    "jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
    "com/covidtracking/states/current/update/New York",
]

client = mqtt.Client()
client.username_pw_set(username=username, password=password)

client.on_connect = on_connect
client.on_message = on_message

client.connect(url)
client.loop_forever()

Whats Next

I would like to explore more front-end frameworks that I can use in conjunction with Python. So ideally, I will have my Python application communicate with the Solace PubSub+ Event Broker as the backend piece of the application infrastructure and a cool front-end framework to present it. Hey whats up Django?

Closing Remarks

So here is the real question, what did YOU learn during quarantine? Share in the comments below and lets start a discussion!

Oh and if you have any recommendations for front end frameworks I would love to what you have and your suggestions.

And to end on a good note, Id like to leave you with one of my favorite gifs
Hacker nerd programming

Posted on by:

tweettamimi profile

Tamimi Ahmad

@tweettamimi

Just another developer chillin in an event-driven world of applications

Solace Developers

We make PubSub+, the only unified message broker supporting pub/sub, queueing, request/reply and streaming across hybrid cloud and IoT environments.

Discussion

pic
Editor guide
 

Nice! Thanks for sharing. I need to update my python game as well.

 

Thanks! There's alot I want to learn in python as well