DEV Community

Romina Mendez
Romina Mendez

Posted on • Updated on

RabbitMQ-Pika

RabbitMQ enables the management of message queues between senders and receivers. In the following post, we will employ Python's Pika library for its implementation.


Introduction: What is RabbitMQ?

RabbitMQ is an intermediary system designed to facilitate the transfer of messages between producers and consumers through the implementation of queues.

This component, essential in distributed systems architecture, is grounded in key concepts:

1️⃣. Producer: The entity responsible for originating and dispatching messages.
2️⃣. Queue: A reservoir where messages are temporarily stored.
3️⃣. Consumer: The receiving instance that processes messages according to the system's needs.

This introduction aims to provide a clear and concise overview of the fundamental elements of RabbitMQ, paving the way for a deeper understanding of its functioning in messaging environments.


Implementation with Pika in Python 🐍

Implementing in Python with the Pika library involves creating two essential programs: the producer and the consumer.

Pika provides an effective interface for communication with RabbitMQ, leveraging a set of carefully designed objects for this purpose.

In our practical example, envision the producer as an application designed to manage food delivery orders 🛵. This application, geared towards optimizing the delivery process, is responsible for sending multiple messages📝 related to user 📱 food orders.

To achieve this implementation, we will undertake the following steps:

Steps Descriptions
Producer: Develop a program that, like an efficient order-taker, generates and sends messages📝 to the RabbitMQ queue. These messages will contain valuable information about food orders.
Consumer: Create a program that acts as the receiver of these messages in the queue. The consumer will be responsible for processing these messages according to the system's needs, performing relevant actions, such as managing the delivery of orders.

This structured and efficient approach ensures a clear and functional implementation, providing a robust foundation for systems managing information flows in dynamic environments.


1️⃣. Install pika

!pip install pika


2️⃣. Create send.py 📄 file

import pika
from datetime import datetime

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='delivery')

pedidos=['🍕🍕🍕','🍔🍔🍔','🍰🍰🍰','🍺🍺🍺']

for i in pedidos:
    channel.basic_publish(exchange='', routing_key='delivery', body=i)
    print(" [x] Se envia pedido!'"+ i)

connection.close()
Enter fullscreen mode Exit fullscreen mode

3️⃣. Create send.py 📄 file

import pika, sys, os
from datetime import datetime


def main(queue='delivery'):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())

    channel.basic_consume(queue='delivery', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main(queue=queue)
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
Enter fullscreen mode Exit fullscreen mode

Image description


4️⃣. MongoDB + Pika

In the following, we will modify the script to enable it to connect to a MongoDB Atlas and perform the insertion of received messages.

import pymongo
import pika, sys, os
from datetime import datetime

# Crear una conexion con MongoClient
client = pymongo.MongoClient("mongodb+srv://NombreUser:PasswordUser@clusterName.moczg.mongodb.net/rabbit?retryWrites=true&w=majority")

# Database
db = client["rabbit"]

# Collection
collection= db["mensajes"]

def main(queue='delivery'):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())
        body_indsert={'fecha':datetime.now(),'queue':queue,'message':body.decode()}
        db["mensajes"].insert_one(body_indsert)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main(queue=queue)
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
Enter fullscreen mode Exit fullscreen mode

To download the code for these two files, you can do so from the following link.


To learn more about RabbitMQ, you can visit the following sites:

Top comments (0)