DEV Community

Maria
Maria

Posted on

Building Message Queues and Event Streaming with C#

Building Message Queues and Event Streaming with C

In today’s world of distributed systems, building robust messaging systems is a cornerstone of modern application architecture. Whether you're dealing with microservices, event-driven architectures, or scalable cloud-based systems, message queues and event streaming play a critical role in ensuring seamless communication between components.

In this blog post, we’ll explore how to implement powerful messaging solutions using RabbitMQ, Apache Kafka, and Azure Service Bus with C#. By the end, you'll have a solid understanding of message patterns, durability, event streaming architectures, and the ability to put theory into practice with hands-on examples.


Why Messaging Systems Matter

Imagine you’re running an e-commerce platform. Every time a customer places an order, several things need to happen: payment processing, inventory updates, shipping notifications, and more. If you rely on direct communication (e.g., HTTP requests) between services, you risk creating tight coupling and bottlenecks. What happens if one service is down? What if traffic spikes?

This is where messaging systems come in. By decoupling services, you can send messages asynchronously, ensuring reliability, scalability, and resilience.


Messaging Models: Queue vs. Topic-Based

Before we dive into the code, let’s clarify the two primary messaging models:

  1. Message Queues (Point-to-Point): A message queue delivers messages to one consumer. Once consumed, the message is removed from the queue.

    • Example: RabbitMQ.
  2. Event Streaming (Publish-Subscribe): In event streaming, messages (events) are published to topics, and multiple consumers can subscribe to the same topic. The message remains available for a defined retention period.

    • Example: Kafka.
  3. Hybrid Solutions: Tools like Azure Service Bus can handle both models, allowing flexibility for various use cases.


RabbitMQ: A Reliable Message Queue

RabbitMQ is a robust, easy-to-implement message broker that supports the Advanced Message Queuing Protocol (AMQP). Let’s get started by implementing a simple producer-consumer scenario in C#.

Setting up RabbitMQ

First, install the RabbitMQ.Client NuGet package:

dotnet add package RabbitMQ.Client
Enter fullscreen mode Exit fullscreen mode

Producer Code

Here’s how to send a message to a queue:

using RabbitMQ.Client;
using System;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: "orders",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        string message = "Order created: #1234";
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "",
                             routingKey: "orders",
                             basicProperties: null,
                             body: body);

        Console.WriteLine($" [x] Sent {message}");
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumer Code

Now, let’s consume messages from the queue:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: "orders",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine($" [x] Received {message}");
        };

        channel.BasicConsume(queue: "orders",
                             autoAck: true,
                             consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}
Enter fullscreen mode Exit fullscreen mode

Why RabbitMQ?

  • Durability: Messages persist even if RabbitMQ restarts.
  • Flexibility: Supports complex routing via exchanges.
  • Ease of Use: Simple setup with a rich ecosystem.

Apache Kafka: Event Streaming at Scale

Kafka is built for high-throughput, fault-tolerant event streaming. It’s ideal for systems where event history matters, such as analytics pipelines or audit logging.

Setting up Kafka

First, install the Confluent.Kafka NuGet package:

dotnet add package Confluent.Kafka
Enter fullscreen mode Exit fullscreen mode

Producer Code

Here’s how to publish messages to a Kafka topic:

using Confluent.Kafka;
using System;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        using var producer = new ProducerBuilder<Null, string>(config).Build();

        var message = "Order placed: #1234";

        try
        {
            var deliveryResult = await producer.ProduceAsync("orders_topic", new Message<Null, string>
            {
                Value = message
            });

            Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
        }
        catch (ProduceException<Null, string> e)
        {
            Console.WriteLine($"Delivery failed: {e.Error.Reason}");
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumer Code

Now, consume messages from the topic:

using Confluent.Kafka;
using System;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            GroupId = "orders-consumer-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();

        consumer.Subscribe("orders_topic");

        Console.WriteLine("Listening for messages...");
        CancellationTokenSource cts = new CancellationTokenSource();

        try
        {
            while (true)
            {
                var cr = consumer.Consume(cts.Token);
                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
            }
        }
        catch (OperationCanceledException)
        {
            consumer.Close();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Azure Service Bus: Enterprise-Grade Messaging

Azure Service Bus is a fully managed messaging service in the cloud, perfect for enterprise applications.

Setting up Azure Service Bus

Install the Azure.Messaging.ServiceBus NuGet package:

dotnet add package Azure.Messaging.ServiceBus
Enter fullscreen mode Exit fullscreen mode

Producer Code

using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;

class Program
{
    private const string ConnectionString = "<Your_Service_Bus_Connection_String>";
    private const string QueueName = "orders";

    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(ConnectionString);
        var sender = client.CreateSender(QueueName);

        string message = "Order placed: #1234";
        await sender.SendMessageAsync(new ServiceBusMessage(message));

        Console.WriteLine("Message sent.");
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumer Code

using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;

class Program
{
    private const string ConnectionString = "<Your_Service_Bus_Connection_String>";
    private const string QueueName = "orders";

    static async Task Main(string[] args)
    {
        await using var client = new ServiceBusClient(ConnectionString);
        var processor = client.CreateProcessor(QueueName);

        processor.ProcessMessageAsync += async args =>
        {
            string message = args.Message.Body.ToString();
            Console.WriteLine($"Received message: {message}");
            await args.CompleteMessageAsync(args.Message);
        };

        processor.ProcessErrorAsync += args =>
        {
            Console.WriteLine($"Error: {args.Exception}");
            return Task.CompletedTask;
        };

        await processor.StartProcessingAsync();
        Console.WriteLine("Press any key to stop the processor...");
        Console.ReadKey();

        await processor.StopProcessingAsync();
    }
}
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls and How to Avoid Them

  1. Message Duplication: Always design your consumers to be idempotent. This ensures that repeated processing of the same message doesn’t cause unintended side effects.
  2. Dead Letter Queues: Use dead letter queues (DLQs) to handle undeliverable messages gracefully.
  3. Over-Engineering: Avoid over-complicating your messaging system. Start simple, and scale as needed.
  4. Monitoring: Always monitor your message brokers for performance bottlenecks.

Key Takeaways and Next Steps

Message queues and event streaming are foundational for modern distributed systems. Here are your next steps:

  1. Experiment with the provided code examples for RabbitMQ, Kafka, and Azure Service Bus.
  2. Explore advanced patterns like delayed messages, message deduplication, and backpressure.
  3. Learn about cloud-native messaging solutions such as AWS SQS/SNS and Google Pub/Sub.

Messaging systems empower developers to build scalable, resilient, and decoupled applications. With C#, you have the tools and libraries to implement these solutions effectively. Happy coding!

Top comments (0)