DEV Community

Michael Rosario
Michael Rosario

Posted on

Power Long-Running Python Jobs using RabbitMQ

In any distributed service solution, you have many options for enabling services to communicate with each other. In the python world, you could use HTTP requests and the Flask framework. Other options include gRPC, XML web services, or raw sockets. In some of my personal Python hacking, I wanted to experiment with longer running Python jobs. This post provides some motivations for communication in distributed systems using message queues. I'm hoping this pattern will help machine learning jobs, IoT data captures, or data transformation processes.

https://blog.iron.io/introduction-to-message-queue-architecture/

Install Python tools

In your working folder, let's start by installing tools.
In this tutorial, we're using natural language toolkit(nltk) to execute part of speech tagging of sentences. We're using the pandas data library to manage simple tables of data. For this recipe, we will use RabbitMQ and the related pika library to provide inter-process communication.

pip install nltk
python -m nltk.downloader popular
pip install pandas
pip install pika
Enter fullscreen mode Exit fullscreen mode

Install RabbitMQ using docker-compose

Using the following docker-compose file, we can start a test instance of RabbitMQ
Make sure that you have installed Docker compose tools properly.

version: "3.2"
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    container_name: 'rabbitmq'
    ports:
        - 5672:5672
        - 15672:15672
    volumes:
        - ~/.docker-conf/rabbitmq/data/:/someDataFolderOnHostComputer/
        - ~/.docker-conf/rabbitmq/log/:/someDataFolderOnHostComputer
    networks:
        - rabbitmq_go_net

networks:
  rabbitmq_go_net:
    driver: bridge

Enter fullscreen mode Exit fullscreen mode

Let's make test data.

For this exercise, let's make a test data file called "data.json."

[
    {"sentence": "Cheese burgers and ham are yummy!", "foo": 42},
    {"sentence": "We don't talk about bruno", "foo": 42},
    {"sentence": "I see you!", "foo": 42}
]
Enter fullscreen mode Exit fullscreen mode

How to send data for processing

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
queue = "input_queue"

file = open('data.json','r')
json_text = file.read()

channel.queue_declare(queue=queue)
channel.basic_publish(exchange='', routing_key=queue, body=json_text)
print(" [x] Sent json")
connection.close()
Enter fullscreen mode Exit fullscreen mode

Using the pika library, we start a connection to the RabbitMQ message on localhost. After reading the input data (data.json) into a variable, we send a message to RabbitMQ with the data. In the send operation, we needed to specify the queue name, an empty exchange value, and the message body.

Receiving data and processing it

#!/usr/bin/env python
import pandas as pd
import json
import nltk
import pika, sys, os

input_queue = "input_queue"
output_queue = "output_queue"

def processSentencesJson(json_text):
    df = pd.read_json(json_text)
    results = []
    for index, row in df.iterrows():
        sentence = row[0]     
        tokens = nltk.word_tokenize(sentence)
        tagged = nltk.pos_tag(tokens)
        results.append(tagged)

    return results

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue=input_queue)

    def callback(ch, method, properties, body):
        print(" [x] Received json")
        results = processSentencesJson(body)
        channel.basic_publish(exchange='', routing_key=output_queue, body=json.dumps(results))

    channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Enter fullscreen mode Exit fullscreen mode

In the receiving python process, we start things off by naming the input and output message queue names.

input_queue = "input_queue"
output_queue = "output_queue"
Enter fullscreen mode Exit fullscreen mode

In the main function, we establish a connection to the RabbitMQ. We also define a call back function to execute when we receive a message on the input queue. In the call back function, we execute the "processSentencesJson" function and return results to the output message queue.

I do admire the brief and concise nature of this solution.

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue=input_queue)

    def callback(ch, method, properties, body):
        print(" [x] Received json")
        results = processSentencesJson(body)
        channel.basic_publish(exchange='', routing_key=output_queue, body=json.dumps(results))

    channel.basic_consume(queue=input_queue, on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
Enter fullscreen mode Exit fullscreen mode

In the following function, we accept a JSON string of sentences. Using the pandas library, we convert the JSON string into a data frame structure. We loop over the data frame rows and execute part of speech tagging using NLTK. For each iteration of the loop, we contribute tagged sentences into the results array.


def processSentencesJson(json_text):
    df = pd.read_json(json_text)
    results = []
    for index, row in df.iterrows():
        sentence = row[0]     
        tokens = nltk.word_tokenize(sentence)
        tagged = nltk.pos_tag(tokens)
        print(tagged)
        results.append(tagged)

    return results

Enter fullscreen mode Exit fullscreen mode

The system will execute part of speech tagging for each sentence using the following format.

[('We', 'PRP'), ('do', 'VBP'), ("n't", 'RB'), ('talk', 'VB'), ('about', 'IN'), ('bruno', 'NN')]
Enter fullscreen mode Exit fullscreen mode

Photo credit to Rgourley on Flickr

Top comments (0)