DEV Community

Cover image for Event-Driven Architecture Part 1: Message Queues and Topics
Outdated Dev
Outdated Dev

Posted on

Event-Driven Architecture Part 1: Message Queues and Topics

Hello there!πŸ‘‹πŸ§”β€β™‚οΈ Have you ever wondered how modern applications handle millions of events, scale seamlessly, and stay responsive under heavy load? That's where Event-Driven Architecture (EDA) comes in! This is the first part of a series on EDA, where we'll explore the foundational concepts of message queues and topics.

Whether you're building microservices, handling real-time data streams, or designing systems that need to react to events as they happen, understanding Event-Driven Architecture will help you build better, more scalable applications.

Overview

Event-Driven Architecture is like a postal system for your applicationβ€”services communicate by sending and receiving messages (events) rather than calling each other directly. This creates systems that are:

  • Loosely coupled - Services don't need to know about each other
  • Scalable - Can handle high volumes of events
  • Resilient - Failures in one service don't cascade to others
  • Responsive - React to events as they happen

In this first part, we'll explore:

  1. Message Queues - Point-to-point messaging for reliable delivery
  2. Topics - Publish-subscribe messaging for broadcasting events

Let's start by understanding what Event-Driven Architecture is and why it matters.

What is Event-Driven Architecture?

Event-Driven Architecture (EDA) is an architectural pattern where services communicate by producing and consuming events. Instead of services calling each other directly (synchronous communication), services emit events that other services can listen to (asynchronous communication).

Key Concepts

Event - A notification that something has happened (e.g., "OrderCreated", "UserRegistered", "PaymentProcessed")

Producer - A service that creates and publishes events

Consumer - A service that listens for and processes events

Event Broker - Middleware that routes events from producers to consumers (message queues, topics, event streams)

Real-World Analogy

Think of Event-Driven Architecture like a newsroom:

  • Reporters (producers) write stories (events) and submit them
  • The news desk (event broker) organizes and distributes stories
  • Editors (consumers) pick up stories they're interested in and work on them
  • Multiple editors can work on the same story simultaneously
  • If one editor is busy, others can still process stories

Benefits of Event-Driven Architecture

βœ… Loose Coupling - Services don't need direct knowledge of each other
βœ… Scalability - Can handle high volumes of events
βœ… Resilience - Failures are isolated to individual services
βœ… Flexibility - Easy to add new consumers without modifying producers
βœ… Real-time Processing - React to events as they happen
βœ… Better Performance - Asynchronous processing doesn't block

When to Use Event-Driven Architecture

  • High volume of events - Systems generating many events per second
  • Microservices - Services need to communicate without tight coupling
  • Real-time requirements - Need to react to events immediately
  • Event sourcing - Need to maintain event history
  • Integration - Connecting multiple systems or services

1. Message Queues

Message queues provide point-to-point messagingβ€”each message is delivered to exactly one consumer. Think of it like a queue at a bank: each person (message) is served by one teller (consumer).

How Message Queues Work

  1. Producer sends a message to a queue
  2. Queue stores the message
  3. Consumer retrieves and processes the message
  4. Acknowledgment confirms message was processed successfully

Message Queue Characteristics

  • Point-to-point - One message, one consumer
  • FIFO - First In, First Out (usually)
  • Durable - Messages persist until consumed
  • Reliable - Guaranteed delivery (with acknowledgment)

Message Queue Implementation

C# (.NET) - Using RabbitMQ

using RabbitMQ.Client;
using System.Text;
using System.Text.Json;

// Producer
public class OrderService
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public OrderService()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: "orders", durable: true, exclusive: false, autoDelete: false);
    }

    public void CreateOrder(Order order)
    {
        // Business logic to create order
        var orderCreated = new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.Total,
            CreatedAt = DateTime.UtcNow
        };

        // Publish event to queue
        var message = JsonSerializer.Serialize(orderCreated);
        var body = Encoding.UTF8.GetBytes(message);

        var properties = _channel.CreateBasicProperties();
        properties.Persistent = true; // Make message durable

        _channel.BasicPublish(exchange: "", routingKey: "orders", basicProperties: properties, body: body);
        Console.WriteLine($"Published order created event: {order.Id}");
    }
}

// Consumer
public class OrderProcessor
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public OrderProcessor()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: "orders", durable: true, exclusive: false, autoDelete: false);
    }

    public void StartConsuming()
    {
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);

            try
            {
                // Process the order
                ProcessOrder(orderCreated);

                // Acknowledge message
                _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            }
            catch (Exception ex)
            {
                // Reject and requeue on failure
                _channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine($"Error processing order: {ex.Message}");
            }
        };

        _channel.BasicConsume(queue: "orders", autoAck: false, consumer: consumer);
    }

    private void ProcessOrder(OrderCreatedEvent orderCreated)
    {
        // Process order logic
        Console.WriteLine($"Processing order: {orderCreated.OrderId}");
    }
}
Enter fullscreen mode Exit fullscreen mode

Python - Using RabbitMQ

import pika
import json
from datetime import datetime

# Producer
class OrderService:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='orders', durable=True)

    def create_order(self, order):
        # Business logic to create order
        order_created = {
            'order_id': order['id'],
            'customer_id': order['customer_id'],
            'total_amount': order['total'],
            'created_at': datetime.utcnow().isoformat()
        }

        # Publish event to queue
        message = json.dumps(order_created)
        self.channel.basic_publish(
            exchange='',
            routing_key='orders',
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Make message persistent
            )
        )
        print(f"Published order created event: {order['id']}")

# Consumer
class OrderProcessor:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='orders', durable=True)

    def start_consuming(self):
        def callback(ch, method, properties, body):
            try:
                order_created = json.loads(body)
                self.process_order(order_created)

                # Acknowledge message
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                # Reject and requeue on failure
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
                print(f"Error processing order: {e}")

        self.channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=False)
        self.channel.start_consuming()

    def process_order(self, order_created):
        # Process order logic
        print(f"Processing order: {order_created['order_id']}")
Enter fullscreen mode Exit fullscreen mode

TypeScript (Node.js) - Using RabbitMQ

import amqp from 'amqplib';

// Producer
class OrderService {
    private connection: amqp.Connection | null = null;
    private channel: amqp.Channel | null = null;

    async connect() {
        this.connection = await amqp.connect('amqp://localhost');
        this.channel = await this.connection.createChannel();
        await this.channel.assertQueue('orders', { durable: true });
    }

    async createOrder(order: Order) {
        // Business logic to create order
        const orderCreated: OrderCreatedEvent = {
            orderId: order.id,
            customerId: order.customerId,
            totalAmount: order.total,
            createdAt: new Date()
        };

        // Publish event to queue
        const message = JSON.stringify(orderCreated);
        this.channel?.sendToQueue('orders', Buffer.from(message), {
            persistent: true
        });
        console.log(`Published order created event: ${order.id}`);
    }
}

// Consumer
class OrderProcessor {
    private connection: amqp.Connection | null = null;
    private channel: amqp.Channel | null = null;

    async connect() {
        this.connection = await amqp.connect('amqp://localhost');
        this.channel = await this.connection.createChannel();
        await this.channel.assertQueue('orders', { durable: true });
    }

    async startConsuming() {
        await this.channel?.consume('orders', async (msg) => {
            if (!msg) return;

            try {
                const orderCreated: OrderCreatedEvent = JSON.parse(msg.content.toString());
                await this.processOrder(orderCreated);

                // Acknowledge message
                this.channel?.ack(msg);
            } catch (error) {
                // Reject and requeue on failure
                this.channel?.nack(msg, false, true);
                console.error(`Error processing order: ${error}`);
            }
        });
    }

    private async processOrder(orderCreated: OrderCreatedEvent) {
        // Process order logic
        console.log(`Processing order: ${orderCreated.orderId}`);
    }
}
Enter fullscreen mode Exit fullscreen mode

Message Queue Best Practices

βœ… Use durable queues - Survive broker restarts
βœ… Acknowledge messages - Confirm successful processing
βœ… Handle failures - Implement retry logic and dead letter queues
βœ… Idempotency - Make operations safe to retry
βœ… Message versioning - Version your event schemas

❌ Don't ignore acknowledgments - Always acknowledge processed messages
❌ Don't process synchronously - Use async processing for better performance
❌ Don't lose messages - Use persistent queues and acknowledgments

2. Topics (Publish-Subscribe)

Topics provide publish-subscribe messaging. Each message is delivered to all subscribers. Think of it like a radio station: the broadcaster (publisher) sends a signal, and all listeners (subscribers) tuned to that frequency receive it.

How Topics Work

  1. Publisher sends a message to a topic
  2. Topic routes the message to all subscribers
  3. Multiple subscribers receive the same message
  4. Each subscriber processes independently

Topic Characteristics

  • One-to-many - One message, multiple consumers
  • Broadcast - All subscribers receive the message
  • Decoupled - Publishers don't know about subscribers
  • Scalable - Easy to add new subscribers

Topic Implementation

C# (.NET) - Using RabbitMQ Topics

// Publisher
public class OrderPublisher
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public OrderPublisher()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        // Declare topic exchange
        _channel.ExchangeDeclare(exchange: "order_events", type: ExchangeType.Topic);
    }

    public void PublishOrderCreated(OrderCreatedEvent orderCreated)
    {
        var message = JsonSerializer.Serialize(orderCreated);
        var body = Encoding.UTF8.GetBytes(message);

        // Publish to topic with routing key
        _channel.BasicPublish(
            exchange: "order_events",
            routingKey: "order.created",
            basicProperties: null,
            body: body
        );
    }
}

// Subscriber 1: Email Service
public class EmailService
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public EmailService()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.ExchangeDeclare(exchange: "order_events", type: ExchangeType.Topic);

        // Create queue for this service
        var queueName = _channel.QueueDeclare().QueueName;

        // Bind queue to topic with routing key pattern
        _channel.QueueBind(queue: queueName, exchange: "order_events", routingKey: "order.created");

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);

            SendOrderConfirmationEmail(orderCreated);
            _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

    private void SendOrderConfirmationEmail(OrderCreatedEvent orderCreated)
    {
        Console.WriteLine($"Sending confirmation email for order: {orderCreated.OrderId}");
    }
}

// Subscriber 2: Inventory Service
public class InventoryService
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public InventoryService()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.ExchangeDeclare(exchange: "order_events", type: ExchangeType.Topic);

        var queueName = _channel.QueueDeclare().QueueName;
        _channel.QueueBind(queue: queueName, exchange: "order_events", routingKey: "order.created");

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);

            UpdateInventory(orderCreated);
            _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

    private void UpdateInventory(OrderCreatedEvent orderCreated)
    {
        Console.WriteLine($"Updating inventory for order: {orderCreated.OrderId}");
    }
}
Enter fullscreen mode Exit fullscreen mode

Python - Using RabbitMQ Topics

import pika
import json

# Publisher
class OrderPublisher:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='order_events', exchange_type='topic')

    def publish_order_created(self, order_created):
        message = json.dumps(order_created)
        self.channel.basic_publish(
            exchange='order_events',
            routing_key='order.created',
            body=message
        )

# Subscriber: Email Service
class EmailService:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='order_events', exchange_type='topic')

        queue_name = self.channel.queue_declare(queue='', exclusive=True).method.queue
        self.channel.queue_bind(exchange='order_events', queue=queue_name, routing_key='order.created')

        self.channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=True)

    def callback(self, ch, method, properties, body):
        order_created = json.loads(body)
        self.send_order_confirmation_email(order_created)

    def send_order_confirmation_email(self, order_created):
        print(f"Sending confirmation email for order: {order_created['order_id']}")

    def start_consuming(self):
        self.channel.start_consuming()
Enter fullscreen mode Exit fullscreen mode

TypeScript (Node.js) - Using RabbitMQ Topics

import amqp from 'amqplib';

// Publisher
class OrderPublisher {
    private connection: amqp.Connection | null = null;
    private channel: amqp.Channel | null = null;

    async connect() {
        this.connection = await amqp.connect('amqp://localhost');
        this.channel = await this.connection.createChannel();
        await this.channel.assertExchange('order_events', 'topic', { durable: false });
    }

    async publishOrderCreated(orderCreated: OrderCreatedEvent) {
        const message = JSON.stringify(orderCreated);
        this.channel?.publish('order_events', 'order.created', Buffer.from(message));
    }
}

// Subscriber: Email Service
class EmailService {
    private connection: amqp.Connection | null = null;
    private channel: amqp.Channel | null = null;

    async connect() {
        this.connection = await amqp.connect('amqp://localhost');
        this.channel = await this.connection.createChannel();
        await this.channel.assertExchange('order_events', 'topic', { durable: false });

        const queue = await this.channel.assertQueue('', { exclusive: true });
        await this.channel.bindQueue(queue.queue, 'order_events', 'order.created');

        await this.channel.consume(queue.queue, (msg) => {
            if (!msg) return;

            const orderCreated: OrderCreatedEvent = JSON.parse(msg.content.toString());
            this.sendOrderConfirmationEmail(orderCreated);
            this.channel?.ack(msg);
        });
    }

    private sendOrderConfirmationEmail(orderCreated: OrderCreatedEvent) {
        console.log(`Sending confirmation email for order: ${orderCreated.orderId}`);
    }
}
Enter fullscreen mode Exit fullscreen mode

Topic Best Practices

βœ… Use routing keys - Organize events by type (e.g., "order.created", "order.cancelled")
βœ… Wildcard patterns - Use patterns like "order." to subscribe to all order events
βœ… **Separate queues per service
* - Each service should have its own queue
βœ… Durable exchanges - Survive broker restarts
βœ… Version events - Include version in routing key or message

❌ Don't use one queue for all subscribers - Each subscriber needs its own queue
❌ Don't ignore routing keys - Use them to filter relevant events
❌ Don't couple publishers to subscribers - Publishers shouldn't know about subscribers

Message Queue vs Topics: When to Use What?

Feature Message Queue Topics
Delivery One consumer per message All subscribers receive message
Use Case Task distribution Broadcasting events
Example Order processing queue Order created β†’ Email, Inventory, Analytics
Coupling Producer knows consumer exists Producer doesn't know subscribers

Message Queues are best when:

  • You need to distribute work among multiple workers
  • Each message should be processed exactly once
  • You want load balancing across consumers

Topics are best when:

  • You need to broadcast events to multiple services
  • Multiple services need to react to the same event
  • You want loose coupling between producers and consumers

Conclusion

Message queues and topics are foundational concepts in Event-Driven Architecture. By understanding when to use each pattern, you can design systems that are loosely coupled, scalable, and resilient.

Key Takeaways:

  1. Message Queues - Use for point-to-point messaging and task distribution
  2. Topics - Use for broadcasting events to multiple subscribers
  3. Choose wisely - Each pattern has its strengths and use cases
  4. Best practices matter - Follow patterns for durability, reliability, and scalability

In Part 2, we'll explore Event Streaming, advanced Pub/Sub Patterns, and Best Practices for building robust Event-Driven systems.

Stay eventful, and happy coding! πŸš€πŸ“‘

Top comments (0)