Title: Building a RabbitMQ Consumer Base Class
Table of Contents:
- Introduction
- Code
- Code Explanation
- Conclusion
Introduction
In this blog post, we will explore a code snippet that implements a base class for a RabbitMQ consumer. The RabbitMQConsumerBase class provides a foundation for building custom RabbitMQ consumers by defining the necessary methods and functionality to consume messages from a queue.
Code
from abc import ABC, abstractmethod
from connection import RabbitMQConnection
from config import Config
class RabbitMQConsumerBase(ABC):
def __init__(self, queue_name, binding_key, host, port, username, password):
self.queue_name = queue_name
self.binding_key = binding_key
self.host = host
self.port = port
self.username = username
self.password = password
@abstractmethod
def process_message(self, channel, method, properties, body):
pass
def on_message_callback(self, channel, method, properties, body):
self.process_message(channel, method, properties, body)
channel.basic_ack(delivery_tag=method.delivery_tag)
def consume(self):
config = Config()
with RabbitMQConnection(self.host, self.port, self.username, self.password) as connection:
channel = connection.get_channel()
channel.exchange_declare(exchange=config.EXCHANGE_NAME, exchange_type="topic")
channel.queue_declare(queue=self.queue_name)
channel.queue_bind(queue=self.queue_name, exchange=config.EXCHANGE_NAME, routing_key=self.binding_key)
channel.basic_consume(
queue=self.queue_name,
auto_ack=False,
on_message_callback=self.on_message_callback,
)
print(f"Started consuming messages from queue: {self.queue_name}")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
print("Consumer stopped...")
Code Explanation
The code snippet introduces the RabbitMQConsumerBase class, which serves as a base class for implementing RabbitMQ consumers. Here's a detailed explanation of the code:
- Class Structure
The RabbitMQConsumerBase class is defined as an abstract base class (ABC) using the abc module. This means it cannot be instantiated directly and must be subclassed to provide specific implementations.
The class constructor (__init__) accepts parameters such as queue_name, binding_key, host, port, username, and password, which define the necessary connection and queue details.
- Abstract Method
The class defines an abstract method called process_message. Subclasses must override this method to provide custom processing logic for received messages. The method takes the channel, method, properties, and body as parameters.
- Message Processing
The on_message_callback method is responsible for invoking the process_message method of the subclass and acknowledging the message. It receives the channel, method, properties, and body as parameters.
Within the on_message_callback method, the process_message method is called, passing the received parameters. After processing the message, the consumer acknowledges it using channel.basic_ack to inform RabbitMQ that the message has been successfully processed.
- Consumption Logic
The consume method encapsulates the logic for consuming messages from the RabbitMQ queue.
- It creates an instance of the
Configclass to retrieve configuration settings. - Within a
withblock, aRabbitMQConnectioninstance is created, using the provided connection details. - The
get_channelmethod is called to obtain
a channel for communication with RabbitMQ.
- The necessary exchange, queue, and binding configurations are performed using the channel.
- The
basic_consumemethod is invoked to start consuming messages from the specified queue. - The consumer then enters a consuming loop using
channel.start_consuming(). - The loop can be interrupted by a keyboard interrupt (
KeyboardInterrupt), which triggers thechannel.stop_consuming()method. - Once the loop is exited, a message is printed to indicate that the consumer has stopped.
Conclusion
The RabbitMQConsumerBase class provides a foundation for implementing RabbitMQ consumers. By subclassing this base class and overriding the process_message method, you can define custom logic for processing received messages. The class abstracts away the complexities of establishing a RabbitMQ connection, declaring exchanges and queues, and handling message consumption, allowing you to focus on implementing the specific business logic of your consumer.
Top comments (0)