RabbitMQ is a well-known message broker that implements the AMQP protocol. The documentation covers a wide range of use cases that should suffice for most real-world scenarios. However, the problem I encountered didn't have a straightforward solution. After researching and reading around the protocol, I devised a solution that leverages multiple powerful RabbitMQ features and combines them to address the problem.
What was the problem?
Consider the following scenario: I want to retry a message in two waysβfirst, retry after 1 minute, and second, do this a maximum of 5 times. If the service isn't ready to consume the message within approximately 5 minutes (5 retries, each after a minute), then just drop the message altogether.
Achieving the delay is easy with the queue by using TTL (time to live) (explained later).
The maximum retry was a bit tricky. It turned out I had to update the message header to keep track of the retry count (explained later).
The actual challenge was the combined approach but How? π€
Let's delve into the solution.
Role of each queue
- job_request_queue:
This queue serves as the entry point for messages from external sources and also functions as a dead-letter queue for the delayed_queue.
- job_request_dlq:
This queue acts as the dead-letter queue of the job_request_queue. The listener of this queue is responsible for keeping track of the retry count.
- delayed_queue:
The primary purpose of this queue is to delay message retransmission. Once the delay constraint is satisfied, it will forward the message to the job_request_queue again with the retry counter that was calculated by the job_request_dlq.
Enough of the theory π, Let's deep dive into the code π
I will be using springboot for demonstration purposes and the entire implementation can be found here on GitHub.
π Configuration:
The following snapshots can be found under package com.kbhimani.rabbitmq.config
Simple Topic exchange declaration
@Bean
@Qualifier("topicExchange")
TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(rabbitConfig.getExchangeName()).durable(true).build();
}
job_request_queue, notice the dead letter configuration, follows the explanation above.
@Bean
@Qualifier("jobRequestQueue")
@DependsOn({"topicExchange", "jobRequestDLQQueue"})
public Queue jobRequestQueue() {
return QueueBuilder
.durable(rabbitConfig.getJobRequestQueueName())
.deadLetterExchange(rabbitConfig.getExchangeName())
.deadLetterRoutingKey(rabbitConfig.getJobRequestDLQRoutingKey())
.build();
}
@Bean
public Binding jobRequestQueueBinding() {
return new Binding(rabbitConfig.getJobRequestQueueName(), Binding.DestinationType.QUEUE,
rabbitConfig.getExchangeName(), rabbitConfig.getJobRequestRoutingKey(), null);
}
job_request_dlq, bound to the dead letter routing key
@Bean
@Qualifier("jobRequestDLQQueue")
@DependsOn({"topicExchange"})
public Queue jobRequestDLQQueue() {
return QueueBuilder
.durable(rabbitConfig.getJobRequestDLQName())
.build();
}
@Bean
public Binding jobRequestDLQQueueBinding() {
return new Binding(rabbitConfig.getJobRequestDLQName(), Binding.DestinationType.QUEUE,
rabbitConfig.getExchangeName(), rabbitConfig.getJobRequestDLQRoutingKey(), null);
}
Following is a crucial configuration. Note how the delayed_queue is set up with a TTL and has job_request_queue designated as its dead-letter queue. The interplay of these configurations enables the retransmission of a message to the job_request_queue once the TTL expires for an individual message in the delayed_queue
@Bean
@Qualifier("jobDelayedQueue")
@DependsOn({"topicExchange"})
public Queue jobDelayedQueue() {
return QueueBuilder.durable(rabbitConfig.getJobDelayedQueueName())
.deadLetterExchange(rabbitConfig.getExchangeName())
.deadLetterRoutingKey(rabbitConfig.getJobRequestRoutingKey())
.ttl(Integer.parseInt(rabbitConfig.getJobDelayedQueueTTL()))
.build();
}
@Bean
public Binding delayedQueueBinding() {
return new Binding(rabbitConfig.getJobDelayedQueueName(), Binding.DestinationType.QUEUE,
rabbitConfig.getExchangeName(), rabbitConfig.getJobDelayedRoutingKey(), null);
}
Application properties(envrionment valriables) as is:
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
application.rabbitConfig.exchangeName=topic.exchange
application.rabbitConfig.jobRequestQueueName=job_request_queue
application.rabbitConfig.jobRequestRoutingKey=job.run
application.rabbitConfig.jobRequestDLQName=job_request_dlq
application.rabbitConfig.jobRequestDLQRoutingKey=job.error
application.rabbitConfig.maxJobRetryCount=5
application.rabbitConfig.jobDelayedQueueName=delayed_queue
application.rabbitConfig.jobDelayedRoutingKey=job.delayed
application.rabbitConfig.jobDelayedQueueTTL=60000 // 60 seconds
π Listeners:
All the snapshots in this section can be found under the package com.kbhimani.rabbitmq.api.AmqpMessageHandler;
The following is the listener for the job_request_queue, responsible for executing the job. In the event that the job needs to be retried due to failure or any other reason, it throws an AmqpRejectAndDontRequeueException. This action activates the dead-lettering feature of the job_request_queue, consequently facilitating our workflow.
@RabbitListener(queues = {"${application.rabbitConfig.jobRequestQueueName}"}, messageConverter = "messageConverter", errorHandler = "errorHandler")
public void handleNewJobRequest(@Payload final Job jobRequest, @Headers final Map<String, Object> headers) {
// handle job request from the queue
if (jobRequest.getId() % 2 == 0) {
throw new AmqpRejectAndDontRequeueException("Operation failed for job with id: %d".formatted(jobRequest.getId()));
}
}
This is another crucial listener that monitors messages in the job_request_dlq and maintains the retry count through the x-retries-count header for each message. If the count is within the permitted range, it forwards the message to the delayed_queue; otherwise, it drops the message.
@RabbitListener(queues = {"${application.rabbitConfig.jobRequestDLQName}"}, messageConverter = "messageConverter", errorHandler = "errorHandler")
public void handleDLQRequest(@Payload Message failedMessage) {
// update the retry counter and drop if exceeded the threshold
var currRetryCount = Optional.ofNullable((Integer) failedMessage.getMessageProperties().getHeaders().get("x-retries-count")).orElse(1);
var maxRetryCount = Integer.parseInt(rabbitConfig.getMaxJobRetryCount());
if (currRetryCount > maxRetryCount) {
return;
}
failedMessage.getMessageProperties().getHeaders().put("x-retries-count", ++currRetryCount);
rabbitMqTemplate.convertAndSend(rabbitConfig.getExchangeName(), rabbitConfig.getJobDelayedRoutingKey(), failedMessage);
}
π Error Handler:
Can be found under the package com.kbhimani.rabbitmq.api.AmqpErrorHandler;
@Component(value = "errorHandler")
public class AmqpErrorHandler implements RabbitListenerErrorHandler {
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception {
if (exception.getCause() instanceof AmqpRejectAndDontRequeueException) {
throw (AmqpRejectAndDontRequeueException) exception.getCause();
}
return null;
}
}
Max-retry and Delay duration, both the properties are configurable through environment variable.
Changing delay duration or TTL of a queue is only possible by deleting and recreating the queue with new TTL.
And that's a wrap π
Top comments (0)