Hello there!ππ§ββοΈ Have you ever wondered how modern applications handle millions of events, scale seamlessly, and stay responsive under heavy load? That's where Event-Driven Architecture (EDA) comes in! This is the first part of a series on EDA, where we'll explore the foundational concepts of message queues and topics.
Whether you're building microservices, handling real-time data streams, or designing systems that need to react to events as they happen, understanding Event-Driven Architecture will help you build better, more scalable applications.
Overview
Event-Driven Architecture is like a postal system for your applicationβservices communicate by sending and receiving messages (events) rather than calling each other directly. This creates systems that are:
- Loosely coupled - Services don't need to know about each other
- Scalable - Can handle high volumes of events
- Resilient - Failures in one service don't cascade to others
- Responsive - React to events as they happen
In this first part, we'll explore:
- Message Queues - Point-to-point messaging for reliable delivery
- Topics - Publish-subscribe messaging for broadcasting events
Let's start by understanding what Event-Driven Architecture is and why it matters.
What is Event-Driven Architecture?
Event-Driven Architecture (EDA) is an architectural pattern where services communicate by producing and consuming events. Instead of services calling each other directly (synchronous communication), services emit events that other services can listen to (asynchronous communication).
Key Concepts
Event - A notification that something has happened (e.g., "OrderCreated", "UserRegistered", "PaymentProcessed")
Producer - A service that creates and publishes events
Consumer - A service that listens for and processes events
Event Broker - Middleware that routes events from producers to consumers (message queues, topics, event streams)
Real-World Analogy
Think of Event-Driven Architecture like a newsroom:
- Reporters (producers) write stories (events) and submit them
- The news desk (event broker) organizes and distributes stories
- Editors (consumers) pick up stories they're interested in and work on them
- Multiple editors can work on the same story simultaneously
- If one editor is busy, others can still process stories
Benefits of Event-Driven Architecture
β
Loose Coupling - Services don't need direct knowledge of each other
β
Scalability - Can handle high volumes of events
β
Resilience - Failures are isolated to individual services
β
Flexibility - Easy to add new consumers without modifying producers
β
Real-time Processing - React to events as they happen
β
Better Performance - Asynchronous processing doesn't block
When to Use Event-Driven Architecture
- High volume of events - Systems generating many events per second
- Microservices - Services need to communicate without tight coupling
- Real-time requirements - Need to react to events immediately
- Event sourcing - Need to maintain event history
- Integration - Connecting multiple systems or services
1. Message Queues
Message queues provide point-to-point messagingβeach message is delivered to exactly one consumer. Think of it like a queue at a bank: each person (message) is served by one teller (consumer).
How Message Queues Work
- Producer sends a message to a queue
- Queue stores the message
- Consumer retrieves and processes the message
- Acknowledgment confirms message was processed successfully
Message Queue Characteristics
- Point-to-point - One message, one consumer
- FIFO - First In, First Out (usually)
- Durable - Messages persist until consumed
- Reliable - Guaranteed delivery (with acknowledgment)
Message Queue Implementation
C# (.NET) - Using RabbitMQ
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
// Producer
public class OrderService
{
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderService()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "orders", durable: true, exclusive: false, autoDelete: false);
}
public void CreateOrder(Order order)
{
// Business logic to create order
var orderCreated = new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.Total,
CreatedAt = DateTime.UtcNow
};
// Publish event to queue
var message = JsonSerializer.Serialize(orderCreated);
var body = Encoding.UTF8.GetBytes(message);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true; // Make message durable
_channel.BasicPublish(exchange: "", routingKey: "orders", basicProperties: properties, body: body);
Console.WriteLine($"Published order created event: {order.Id}");
}
}
// Consumer
public class OrderProcessor
{
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderProcessor()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "orders", durable: true, exclusive: false, autoDelete: false);
}
public void StartConsuming()
{
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);
try
{
// Process the order
ProcessOrder(orderCreated);
// Acknowledge message
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// Reject and requeue on failure
_channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine($"Error processing order: {ex.Message}");
}
};
_channel.BasicConsume(queue: "orders", autoAck: false, consumer: consumer);
}
private void ProcessOrder(OrderCreatedEvent orderCreated)
{
// Process order logic
Console.WriteLine($"Processing order: {orderCreated.OrderId}");
}
}
Python - Using RabbitMQ
import pika
import json
from datetime import datetime
# Producer
class OrderService:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='orders', durable=True)
def create_order(self, order):
# Business logic to create order
order_created = {
'order_id': order['id'],
'customer_id': order['customer_id'],
'total_amount': order['total'],
'created_at': datetime.utcnow().isoformat()
}
# Publish event to queue
message = json.dumps(order_created)
self.channel.basic_publish(
exchange='',
routing_key='orders',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
)
)
print(f"Published order created event: {order['id']}")
# Consumer
class OrderProcessor:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='orders', durable=True)
def start_consuming(self):
def callback(ch, method, properties, body):
try:
order_created = json.loads(body)
self.process_order(order_created)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Reject and requeue on failure
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print(f"Error processing order: {e}")
self.channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=False)
self.channel.start_consuming()
def process_order(self, order_created):
# Process order logic
print(f"Processing order: {order_created['order_id']}")
TypeScript (Node.js) - Using RabbitMQ
import amqp from 'amqplib';
// Producer
class OrderService {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect() {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
await this.channel.assertQueue('orders', { durable: true });
}
async createOrder(order: Order) {
// Business logic to create order
const orderCreated: OrderCreatedEvent = {
orderId: order.id,
customerId: order.customerId,
totalAmount: order.total,
createdAt: new Date()
};
// Publish event to queue
const message = JSON.stringify(orderCreated);
this.channel?.sendToQueue('orders', Buffer.from(message), {
persistent: true
});
console.log(`Published order created event: ${order.id}`);
}
}
// Consumer
class OrderProcessor {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect() {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
await this.channel.assertQueue('orders', { durable: true });
}
async startConsuming() {
await this.channel?.consume('orders', async (msg) => {
if (!msg) return;
try {
const orderCreated: OrderCreatedEvent = JSON.parse(msg.content.toString());
await this.processOrder(orderCreated);
// Acknowledge message
this.channel?.ack(msg);
} catch (error) {
// Reject and requeue on failure
this.channel?.nack(msg, false, true);
console.error(`Error processing order: ${error}`);
}
});
}
private async processOrder(orderCreated: OrderCreatedEvent) {
// Process order logic
console.log(`Processing order: ${orderCreated.orderId}`);
}
}
Message Queue Best Practices
β
Use durable queues - Survive broker restarts
β
Acknowledge messages - Confirm successful processing
β
Handle failures - Implement retry logic and dead letter queues
β
Idempotency - Make operations safe to retry
β
Message versioning - Version your event schemas
β Don't ignore acknowledgments - Always acknowledge processed messages
β Don't process synchronously - Use async processing for better performance
β Don't lose messages - Use persistent queues and acknowledgments
2. Topics (Publish-Subscribe)
Topics provide publish-subscribe messaging. Each message is delivered to all subscribers. Think of it like a radio station: the broadcaster (publisher) sends a signal, and all listeners (subscribers) tuned to that frequency receive it.
How Topics Work
- Publisher sends a message to a topic
- Topic routes the message to all subscribers
- Multiple subscribers receive the same message
- Each subscriber processes independently
Topic Characteristics
- One-to-many - One message, multiple consumers
- Broadcast - All subscribers receive the message
- Decoupled - Publishers don't know about subscribers
- Scalable - Easy to add new subscribers
Topic Implementation
C# (.NET) - Using RabbitMQ Topics
// Publisher
public class OrderPublisher
{
private readonly IConnection _connection;
private readonly IModel _channel;
public OrderPublisher()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// Declare topic exchange
_channel.ExchangeDeclare(exchange: "order_events", type: ExchangeType.Topic);
}
public void PublishOrderCreated(OrderCreatedEvent orderCreated)
{
var message = JsonSerializer.Serialize(orderCreated);
var body = Encoding.UTF8.GetBytes(message);
// Publish to topic with routing key
_channel.BasicPublish(
exchange: "order_events",
routingKey: "order.created",
basicProperties: null,
body: body
);
}
}
// Subscriber 1: Email Service
public class EmailService
{
private readonly IConnection _connection;
private readonly IModel _channel;
public EmailService()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "order_events", type: ExchangeType.Topic);
// Create queue for this service
var queueName = _channel.QueueDeclare().QueueName;
// Bind queue to topic with routing key pattern
_channel.QueueBind(queue: queueName, exchange: "order_events", routingKey: "order.created");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);
SendOrderConfirmationEmail(orderCreated);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
private void SendOrderConfirmationEmail(OrderCreatedEvent orderCreated)
{
Console.WriteLine($"Sending confirmation email for order: {orderCreated.OrderId}");
}
}
// Subscriber 2: Inventory Service
public class InventoryService
{
private readonly IConnection _connection;
private readonly IModel _channel;
public InventoryService()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "order_events", type: ExchangeType.Topic);
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName, exchange: "order_events", routingKey: "order.created");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);
UpdateInventory(orderCreated);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
private void UpdateInventory(OrderCreatedEvent orderCreated)
{
Console.WriteLine($"Updating inventory for order: {orderCreated.OrderId}");
}
}
Python - Using RabbitMQ Topics
import pika
import json
# Publisher
class OrderPublisher:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='order_events', exchange_type='topic')
def publish_order_created(self, order_created):
message = json.dumps(order_created)
self.channel.basic_publish(
exchange='order_events',
routing_key='order.created',
body=message
)
# Subscriber: Email Service
class EmailService:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='order_events', exchange_type='topic')
queue_name = self.channel.queue_declare(queue='', exclusive=True).method.queue
self.channel.queue_bind(exchange='order_events', queue=queue_name, routing_key='order.created')
self.channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=True)
def callback(self, ch, method, properties, body):
order_created = json.loads(body)
self.send_order_confirmation_email(order_created)
def send_order_confirmation_email(self, order_created):
print(f"Sending confirmation email for order: {order_created['order_id']}")
def start_consuming(self):
self.channel.start_consuming()
TypeScript (Node.js) - Using RabbitMQ Topics
import amqp from 'amqplib';
// Publisher
class OrderPublisher {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect() {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
await this.channel.assertExchange('order_events', 'topic', { durable: false });
}
async publishOrderCreated(orderCreated: OrderCreatedEvent) {
const message = JSON.stringify(orderCreated);
this.channel?.publish('order_events', 'order.created', Buffer.from(message));
}
}
// Subscriber: Email Service
class EmailService {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect() {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
await this.channel.assertExchange('order_events', 'topic', { durable: false });
const queue = await this.channel.assertQueue('', { exclusive: true });
await this.channel.bindQueue(queue.queue, 'order_events', 'order.created');
await this.channel.consume(queue.queue, (msg) => {
if (!msg) return;
const orderCreated: OrderCreatedEvent = JSON.parse(msg.content.toString());
this.sendOrderConfirmationEmail(orderCreated);
this.channel?.ack(msg);
});
}
private sendOrderConfirmationEmail(orderCreated: OrderCreatedEvent) {
console.log(`Sending confirmation email for order: ${orderCreated.orderId}`);
}
}
Topic Best Practices
β
Use routing keys - Organize events by type (e.g., "order.created", "order.cancelled")
β
Wildcard patterns - Use patterns like "order." to subscribe to all order events
β
**Separate queues per service* - Each service should have its own queue
β
Durable exchanges - Survive broker restarts
β
Version events - Include version in routing key or message
β Don't use one queue for all subscribers - Each subscriber needs its own queue
β Don't ignore routing keys - Use them to filter relevant events
β Don't couple publishers to subscribers - Publishers shouldn't know about subscribers
Message Queue vs Topics: When to Use What?
| Feature | Message Queue | Topics |
|---|---|---|
| Delivery | One consumer per message | All subscribers receive message |
| Use Case | Task distribution | Broadcasting events |
| Example | Order processing queue | Order created β Email, Inventory, Analytics |
| Coupling | Producer knows consumer exists | Producer doesn't know subscribers |
Message Queues are best when:
- You need to distribute work among multiple workers
- Each message should be processed exactly once
- You want load balancing across consumers
Topics are best when:
- You need to broadcast events to multiple services
- Multiple services need to react to the same event
- You want loose coupling between producers and consumers
Conclusion
Message queues and topics are foundational concepts in Event-Driven Architecture. By understanding when to use each pattern, you can design systems that are loosely coupled, scalable, and resilient.
Key Takeaways:
- Message Queues - Use for point-to-point messaging and task distribution
- Topics - Use for broadcasting events to multiple subscribers
- Choose wisely - Each pattern has its strengths and use cases
- Best practices matter - Follow patterns for durability, reliability, and scalability
In Part 2, we'll explore Event Streaming, advanced Pub/Sub Patterns, and Best Practices for building robust Event-Driven systems.
Stay eventful, and happy coding! ππ‘
Top comments (0)