DEV Community

Bahman Shadmehr
Bahman Shadmehr

Posted on

Creating the base consumer class

Title: Building a RabbitMQ Consumer Base Class

Table of Contents:

  1. Introduction
  2. Code
  3. Code Explanation
  4. Conclusion

Introduction

In this blog post, we will explore a code snippet that implements a base class for a RabbitMQ consumer. The RabbitMQConsumerBase class provides a foundation for building custom RabbitMQ consumers by defining the necessary methods and functionality to consume messages from a queue.

Code

from abc import ABC, abstractmethod
from connection import RabbitMQConnection
from config import Config


class RabbitMQConsumerBase(ABC):
    def __init__(self, queue_name, binding_key, host, port, username, password):
        self.queue_name = queue_name
        self.binding_key = binding_key
        self.host = host
        self.port = port
        self.username = username
        self.password = password

    @abstractmethod
    def process_message(self, channel, method, properties, body):
        pass

    def on_message_callback(self, channel, method, properties, body):
        self.process_message(channel, method, properties, body)
        channel.basic_ack(delivery_tag=method.delivery_tag)

    def consume(self):
        config = Config()
        with RabbitMQConnection(self.host, self.port, self.username, self.password) as connection:
            channel = connection.get_channel()
            channel.exchange_declare(exchange=config.EXCHANGE_NAME, exchange_type="topic")
            channel.queue_declare(queue=self.queue_name)
            channel.queue_bind(queue=self.queue_name, exchange=config.EXCHANGE_NAME, routing_key=self.binding_key)
            channel.basic_consume(
                queue=self.queue_name,
                auto_ack=False,
                on_message_callback=self.on_message_callback,
            )
            print(f"Started consuming messages from queue: {self.queue_name}")

            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()

            print("Consumer stopped...")
Enter fullscreen mode Exit fullscreen mode

Code Explanation

The code snippet introduces the RabbitMQConsumerBase class, which serves as a base class for implementing RabbitMQ consumers. Here's a detailed explanation of the code:

  1. Class Structure

The RabbitMQConsumerBase class is defined as an abstract base class (ABC) using the abc module. This means it cannot be instantiated directly and must be subclassed to provide specific implementations.

The class constructor (__init__) accepts parameters such as queue_name, binding_key, host, port, username, and password, which define the necessary connection and queue details.

  1. Abstract Method

The class defines an abstract method called process_message. Subclasses must override this method to provide custom processing logic for received messages. The method takes the channel, method, properties, and body as parameters.

  1. Message Processing

The on_message_callback method is responsible for invoking the process_message method of the subclass and acknowledging the message. It receives the channel, method, properties, and body as parameters.

Within the on_message_callback method, the process_message method is called, passing the received parameters. After processing the message, the consumer acknowledges it using channel.basic_ack to inform RabbitMQ that the message has been successfully processed.

  1. Consumption Logic

The consume method encapsulates the logic for consuming messages from the RabbitMQ queue.

  • It creates an instance of the Config class to retrieve configuration settings.
  • Within a with block, a RabbitMQConnection instance is created, using the provided connection details.
  • The get_channel method is called to obtain

a channel for communication with RabbitMQ.

  • The necessary exchange, queue, and binding configurations are performed using the channel.
  • The basic_consume method is invoked to start consuming messages from the specified queue.
  • The consumer then enters a consuming loop using channel.start_consuming().
  • The loop can be interrupted by a keyboard interrupt (KeyboardInterrupt), which triggers the channel.stop_consuming() method.
  • Once the loop is exited, a message is printed to indicate that the consumer has stopped.

Conclusion

The RabbitMQConsumerBase class provides a foundation for implementing RabbitMQ consumers. By subclassing this base class and overriding the process_message method, you can define custom logic for processing received messages. The class abstracts away the complexities of establishing a RabbitMQ connection, declaring exchanges and queues, and handling message consumption, allowing you to focus on implementing the specific business logic of your consumer.

Top comments (0)