DEV Community

Kishan Bhimani
Kishan Bhimani

Posted on • Updated on

RabbitMQ Retry Strategy: Delay with Retry Threshold

RabbitMQ is a well-known message broker that implements the AMQP protocol. The documentation covers a wide range of use cases that should suffice for most real-world scenarios. However, the problem I encountered didn't have a straightforward solution. After researching and reading around the protocol, I devised a solution that leverages multiple powerful RabbitMQ features and combines them to address the problem.

What was the problem?

Consider the following scenario: I want to retry a message in two waysโ€”first, retry after 1 minute, and second, do this a maximum of 5 times. If the service isn't ready to consume the message within approximately 5 minutes (5 retries, each after a minute), then just drop the message altogether.

Achieving the delay is easy with the queue by using TTL (time to live) (explained later).
The maximum retry was a bit tricky. It turned out I had to update the message header to keep track of the retry count (explained later).

The actual challenge was the combined approach but How? ๐Ÿค”

Let's delve into the solution.

RabbitMQ Retry strategy: delay and max retry threshold

Role of each queue

  • job_request_queue:

This queue serves as the entry point for messages from external sources and also functions as a dead-letter queue for the delayed_queue.

  • job_request_dlq:

This queue acts as the dead-letter queue of the job_request_queue. The listener of this queue is responsible for keeping track of the retry count.

  • delayed_queue:

The primary purpose of this queue is to delay message retransmission. Once the delay constraint is satisfied, it will forward the message to the job_request_queue again with the retry counter that was calculated by the job_request_dlq.


Enough of the theory ๐Ÿ˜“, Let's deep dive into the code ๐Ÿ˜‹

I will be using springboot for demonstration purposes and the entire implementation can be found here on GitHub.

๐Ÿ‘‰ Configuration:

The following snapshots can be found under package com.kbhimani.rabbitmq.config

Simple Topic exchange declaration

@Bean
@Qualifier("topicExchange")
TopicExchange topicExchange() {
    return ExchangeBuilder.topicExchange(rabbitConfig.getExchangeName()).durable(true).build();
}
Enter fullscreen mode Exit fullscreen mode

job_request_queue, notice the dead letter configuration, follows the explanation above.

@Bean
@Qualifier("jobRequestQueue")
@DependsOn({"topicExchange", "jobRequestDLQQueue"})
public Queue jobRequestQueue() {
    return QueueBuilder
            .durable(rabbitConfig.getJobRequestQueueName())
            .deadLetterExchange(rabbitConfig.getExchangeName())
            .deadLetterRoutingKey(rabbitConfig.getJobRequestDLQRoutingKey())
            .build();
}

@Bean
public Binding jobRequestQueueBinding() {
    return new Binding(rabbitConfig.getJobRequestQueueName(), Binding.DestinationType.QUEUE,
            rabbitConfig.getExchangeName(), rabbitConfig.getJobRequestRoutingKey(), null);
}
Enter fullscreen mode Exit fullscreen mode

job_request_dlq, bound to the dead letter routing key

@Bean
@Qualifier("jobRequestDLQQueue")
@DependsOn({"topicExchange"})
public Queue jobRequestDLQQueue() {
    return QueueBuilder
            .durable(rabbitConfig.getJobRequestDLQName())
            .build();
}

@Bean
public Binding jobRequestDLQQueueBinding() {
    return new Binding(rabbitConfig.getJobRequestDLQName(), Binding.DestinationType.QUEUE,
            rabbitConfig.getExchangeName(), rabbitConfig.getJobRequestDLQRoutingKey(), null);
}
Enter fullscreen mode Exit fullscreen mode

Following is a crucial configuration. Note how the delayed_queue is set up with a TTL and has job_request_queue designated as its dead-letter queue. The interplay of these configurations enables the retransmission of a message to the job_request_queue once the TTL expires for an individual message in the delayed_queue

@Bean
@Qualifier("jobDelayedQueue")
@DependsOn({"topicExchange"})
public Queue jobDelayedQueue() {
    return QueueBuilder.durable(rabbitConfig.getJobDelayedQueueName())
            .deadLetterExchange(rabbitConfig.getExchangeName())
            .deadLetterRoutingKey(rabbitConfig.getJobRequestRoutingKey())
            .ttl(Integer.parseInt(rabbitConfig.getJobDelayedQueueTTL()))
            .build();
}

@Bean
public Binding delayedQueueBinding() {
    return new Binding(rabbitConfig.getJobDelayedQueueName(), Binding.DestinationType.QUEUE,
            rabbitConfig.getExchangeName(), rabbitConfig.getJobDelayedRoutingKey(), null);
}
Enter fullscreen mode Exit fullscreen mode

Application properties(envrionment valriables) as is:

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

application.rabbitConfig.exchangeName=topic.exchange

application.rabbitConfig.jobRequestQueueName=job_request_queue
application.rabbitConfig.jobRequestRoutingKey=job.run

application.rabbitConfig.jobRequestDLQName=job_request_dlq
application.rabbitConfig.jobRequestDLQRoutingKey=job.error
application.rabbitConfig.maxJobRetryCount=5  

application.rabbitConfig.jobDelayedQueueName=delayed_queue
application.rabbitConfig.jobDelayedRoutingKey=job.delayed
application.rabbitConfig.jobDelayedQueueTTL=60000 // 60 seconds
Enter fullscreen mode Exit fullscreen mode

๐Ÿ‘‰ Listeners:

All the snapshots in this section can be found under the package com.kbhimani.rabbitmq.api.AmqpMessageHandler;

The following is the listener for the job_request_queue, responsible for executing the job. In the event that the job needs to be retried due to failure or any other reason, it throws an AmqpRejectAndDontRequeueException. This action activates the dead-lettering feature of the job_request_queue, consequently facilitating our workflow.

@RabbitListener(queues = {"${application.rabbitConfig.jobRequestQueueName}"}, messageConverter = "messageConverter", errorHandler = "errorHandler")
public void handleNewJobRequest(@Payload final Job jobRequest, @Headers final Map<String, Object> headers) {
    // handle job request from the queue
    if (jobRequest.getId() % 2 == 0) {
        throw new AmqpRejectAndDontRequeueException("Operation failed for job with id: %d".formatted(jobRequest.getId()));
    }
}
Enter fullscreen mode Exit fullscreen mode

This is another crucial listener that monitors messages in the job_request_dlq and maintains the retry count through the x-retries-count header for each message. If the count is within the permitted range, it forwards the message to the delayed_queue; otherwise, it drops the message.

@RabbitListener(queues = {"${application.rabbitConfig.jobRequestDLQName}"}, messageConverter = "messageConverter", errorHandler = "errorHandler")
public void handleDLQRequest(@Payload Message failedMessage) {
    // update the retry counter and drop if exceeded the threshold
    var currRetryCount = Optional.ofNullable((Integer) failedMessage.getMessageProperties().getHeaders().get("x-retries-count")).orElse(1);
    var maxRetryCount = Integer.parseInt(rabbitConfig.getMaxJobRetryCount());

    if (currRetryCount > maxRetryCount) {
        return;
    }

    failedMessage.getMessageProperties().getHeaders().put("x-retries-count", ++currRetryCount);
    rabbitMqTemplate.convertAndSend(rabbitConfig.getExchangeName(), rabbitConfig.getJobDelayedRoutingKey(), failedMessage);
}
Enter fullscreen mode Exit fullscreen mode

๐Ÿ‘‰ Error Handler:

Can be found under the package com.kbhimani.rabbitmq.api.AmqpErrorHandler;

@Component(value = "errorHandler")
public class AmqpErrorHandler implements RabbitListenerErrorHandler {

    @Override
    public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
                              ListenerExecutionFailedException exception) throws Exception {
        if (exception.getCause() instanceof AmqpRejectAndDontRequeueException) {
            throw (AmqpRejectAndDontRequeueException) exception.getCause();
        }
        return null;
    }

}

Enter fullscreen mode Exit fullscreen mode

Max-retry and Delay duration, both the properties are configurable through environment variable.

Changing delay duration or TTL of a queue is only possible by deleting and recreating the queue with new TTL.

And that's a wrap ๐ŸŽ‰

Top comments (0)