DEV Community

Cover image for Consuming multiples messages from one channel with the official RabbitMQ dotnet client
dirceu junior
dirceu junior

Posted on

Consuming multiples messages from one channel with the official RabbitMQ dotnet client

In this post, you'll learn how to consume multiple messages on a single channel using the official RabbitMQ.Client for dotnet with the proper caution to avoid concurrency problems or freezing the consumer.

We'll create a consumer that will receive strings and print them to the console n times every second. The received messages will be like message 5, the word message will be printed out five times with a one-second interval.

I'll assume that you have a RabbitMQ instance running locally and some basic knowledge of it.

Creating the producer

First of all, we need to create a console app and set up a producer to send those messages.

Our producer will send the messages to the queue multiple_consumers_single_channel using the default exchange.

Here's the producer code:

using System.Text.Json;
using RabbitMQ.Client;

// creating the connection and getting a channel
var connFac = new ConnectionFactory
     {
         HostName = "localhost"
     };
using var conn = connFac.CreateConnection();
using var channel = conn.CreateModel();

// reusing the message properties
var defaultProps = channel.CreateBasicProperties();

while (true)
{
    var msg = Console.ReadLine();
    if ("exit".Equals(msg))
        break;

    var body = JsonSerializer.SerializeToUtf8Bytes(msg);
    channel.BasicPublish(
        exchange: "", 
        routingKey: "multiple_consumers_single_channel", 
        defaultProps, body
        );
}
Enter fullscreen mode Exit fullscreen mode

First, we establish a connection with the local instance of RabbitMQ. If the Username and Password properties of the connection factory aren't set, then the default values: guest and guest will be used.

As you have noted, the queue is not declared, because it'll be declared at the consumer.

Then we save the message properties outside the loop to avoid unnecessary allocations every time we type a new message to send.

Finally, we have an always-running loop that only exits if you type exit. Inside this loop, the message is serialized to a UTF8 byte array and sent using the BasicPublish method from the channel.

Creating the consumer

With the producer code done, let's start the consumer.

Establishing the connection

After creating a new console app, we need to do the same as we have done at the producer. Get a connection to the RabbitMQ and a channel to listen to the incoming messages.

Let's start with the following code:

var connFac = new ConnectionFactory 
    {
       HostName = "localhost",
       DispatchConsumersAsync = true
    };
using var conn = connFac.CreateConnection();
using var channel = conn.CreateModel();
Enter fullscreen mode Exit fullscreen mode

The main difference in the above code from the Producer is the DispatchConsumersAsync = true, this is needed because every new message will start a new Task to run on the ThreadPool (we'll code this in the next steps).
The default synchronous consumer is based on void events (events without return), the asynchronous consumer is based on Tasks events (events that returns a Task), like seen on the image bellow taken from the official client. Using it'll prevent us from the bad usage of async void stated on the Async Guidance written by David Fowler.


RabbitMQ Official Client Async Event Handler
RabbitMQ Official Client Async Event Handler

Setting up the queue

With the channel, we can then declare our queue and use the BasicQos method from the channel to set the prefetchCount to the number of messages that we want to process in parallel at the consumer.

Let's do it by adding the following code:

channel.QueueDeclare("multiple_consumers_single_channel");
channel.BasicQos(
    prefetchSize: 0,
    prefetchCount: 2,
    global: false);
Enter fullscreen mode Exit fullscreen mode

The prefetchCount is basically how many messages a consumer can hold without acknowledging. In our case, we have defined the value 2, meaning we'll get at most two messages at a time to process.

The value of prefetchSize should be 0 because it's not implemented on the dotnet client.

Listening to the messages

Now with the following code, the async consumer is created and subscribed to the queue.

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += (_, ea) => HandleMessage(ea);

channel.BasicConsume(
    "multiple_consumers_single_channel",
    autoAck: false,
    consumer);
Console.WriteLine("Listening for new messages");
Console.ReadKey();
Enter fullscreen mode Exit fullscreen mode

The method HandleMessage to handle the incoming messages will be created in the next step. So let's talk about the BasicConsume, the key point of this method is to pass the autoAck: false so we can apply backpressure and process only two messages at a time, as we have defined previously with BasicQos.

After starting to consume the messages, the application should be kept running, for this reason, the Console.ReadKey() is placed at the end of the code.

Processing messages in parallel

Finally, we have arrived to parallel message processing.

Let's first take a look at the HandleMessage and then I'll explain it.

Task HandleMessage(BasicDeliverEventArgs ea)
{
    // copying the message body before pass it to the task
    var msgBody = ea.Body.Span.ToArray();

    // starting the task
    Task.Run(async () =>
    {
        var body = JsonSerializer
                      .Deserialize<string>(msgBody);
        var splittedMsg = body.Split(" ");
        var msg = splittedMsg[0];
        var timesToRepeat = int.Parse(splittedMsg[1]);

        for (int i = 0; i < timesToRepeat; i++)
        {
            Console
               .WriteLine($"{i}-{DateTime.Now}-{msg}");
            await Task.Delay(1000);
        }

        channel.BasicAck(ea.DeliveryTag, false);
    })

    return Task.CompletedTask;
}
Enter fullscreen mode Exit fullscreen mode

The above code isn't Thread-Safe keep reading to know how to fix this.

First, we copy the message body from the event args before the handler method returns with the .ToArray method. As stated in the official dotnet API guide this has to be done because the message body can be deallocated at any moment after the handler returns.

A new background Task is started without waiting for its completion and then we return from the HandleMessage .

Inside the Task the message is acknowledged with the BasicAck method. The problem with this is that the channel can be accessed simultaneously by different threads, leading to errors in processing messages.

channel.BasicAck(ea.DeliveryTag, false);
Enter fullscreen mode Exit fullscreen mode

Each channel must be only accessed at one thread at a time, to guarantee this we'll use a lock around the BasicAck method, as the channel is public used on the library, we need to create an object just to control this lock. This is a best practice when using locks.

Now the code, with the lock around the channel access, looks like this:

Task HandleMessage(BasicDeliverEventArgs ea,
                   object channelLockObject)
{
    var msgBody = ea.Body.Span.ToArray();
    Task.Run(async () =>
    {
        var body = JsonSerializer
                      .Deserialize<string>(msgBody);
        var splittedMsg = body.Split(" ");
        var msg = splittedMsg[0];
        var timesToRepeat = int.Parse(splittedMsg[1]);

        for (int i = 0; i < timesToRepeat; i++)
        {
            Console
               .WriteLine($"{i}-{DateTime.Now}-{msg}");
            await Task.Delay(1000);
        }

        lock (channelLockObject)
        {
            channel.BasicAck(ea.DeliveryTag, false);
        }
    });

    return Task.CompletedTask;
}
Enter fullscreen mode Exit fullscreen mode

Avoiding consumer freezing

The code looks almost done, but we have one last problem. Let's run our publisher and consumer and send some messages.
I started sending two valid messages, and everything worked fine but when I sent a message with an invalid value, nothing happen, then I sent another invalid message and nothing happen. Then when I tried to send valid messages again, nothing still happens.


Freezed consumer
Freezed consumer

At the RabbitMQ management interface, there were two messages unacked and two messages ready to be consumed.
The two unacked messages are the invalid messages, when the consumer tries to parse invalidValue to an integer an exception is thrown, and the Task finishes without giving a response to the queue.
This causes the consumer to be frozen with unacked messages.


Unacked messages at RabbitMQ management interface
Unacked messages at RabbitMQ management interface

To fix this we could wrap the Task code with a try-catch block and inside the catch, we could: nack the message, log the error, write it to a database, send it to another queue, or something else. In this example, we'll only log the error and nack the message.
We'll go with an unusual way, a Taskcontinuation that'll run only if the Task finishes with a faulted state. I think this is more elegant than a try-catch.
At the end of the Task.Run method, append the following code:

.ContinueWith(faultedTask =>
{
    var msgs = faultedTask.Exception!
        .InnerExceptions.Select(e => e.Message);
    Console.WriteLine(string.Join(", ", msgs));
    lock (channelLock)
    {
        channel.BasicNack(
                 ea.DeliveryTag,
                 multiple: false,
                 requeue: false);
    }
}, TaskContinuationOptions.OnlyOnFaulted);
Enter fullscreen mode Exit fullscreen mode

In this continuation the inner exceptions messages are aggregated in one string and logged to the console. Then the message is nacked, inside a lock to avoid concurrent access to the channel. Also, notice that the requeue parameter was set to false, to avoid trying to reprocess the invalid message infinetly.

Summing up

The final consumer code:

using System.Text.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var connFac = new ConnectionFactory 
    {
        HostName = "localhost",
        DispatchConsumersAsync = true
    };
using var conn = connFac.CreateConnection();
using var channel = conn.CreateModel();

channel.QueueDeclare("multiple_consumers_single_channel");
channel.BasicQos(prefetchSize: 0,
                 prefetchCount: 2, 
                 global: false);

var consumer = new AsyncEventingBasicConsumer(channel);
var channelLock = new object();

consumer.Received += (_, ea) => 
                    HandleMessage(ea, channelLock);

channel.BasicConsume("multiple_consumers_single_channel",
                     autoAck: false,
                     consumer);
Console.WriteLine("Listening for new messages");
Console.ReadKey();

Task HandleMessage(BasicDeliverEventArgs ea, 
                   object channelLockObject)
{
    var msgBody = ea.Body.Span.ToArray();
    Task.Run(async () =>
    {
        var body = JsonSerializer
                    .Deserialize<string>(msgBody);
        var splittedMsg = body.Split(" ");
        var msg = splittedMsg[0];
        var timesToRepeat = int.Parse(splittedMsg[1]);

        for (int i = 0; i < timesToRepeat; i++)
        {
            Console
                .WriteLine($"{i}-{DateTime.Now}-{msg}");
            await Task.Delay(1000);
        }

        lock (channelLockObject)
        {
            channel.BasicAck(ea.DeliveryTag, false);
        }
    }).ContinueWith(faultedTask =>
    {
        var msgs = faultedTask.Exception!
            .InnerExceptions.Select(e => e.Message);
        Console.WriteLine(string.Join(", ", msgs));
        lock (channelLock)
        {
            channel.BasicNack(ea.DeliveryTag, 
                              multiple: false,
                              requeue: false);
        }
    }, TaskContinuationOptions.OnlyOnFaulted);

    return Task.CompletedTask;
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

The key points of processing multiples messages in parallel using one channel are:

  • Handle the concurrent access to the channel;
  • Handle what happens when the consumer tries to process an invalid message.

In this post background Tasks was spawned directly from the Received event. Another approach would be to use the Received event only to send the message body to a dotnet Channel, which works like an in-memory queue, and then with consumers listening to this in-memory channel, process the message in parallel.

Top comments (0)