DEV Community

Cover image for Event-Driven Architecture Part 2: Event Streaming and Pub/Sub Patterns
Outdated Dev
Outdated Dev

Posted on

Event-Driven Architecture Part 2: Event Streaming and Pub/Sub Patterns

Hello there!πŸ‘‹πŸ§”β€β™‚οΈ Welcome back to Part 2 of our Event-Driven Architecture series! In Part 1, we covered the foundational concepts of Message Queues and Topics. Now, we'll dive deeper into Event Streaming, advanced Pub/Sub Patterns, and Best Practices for building robust Event-Driven systems.

Whether you're handling high-throughput event streams, implementing event sourcing, or designing complex event processing workflows, this guide will help you master the advanced aspects of Event-Driven Architecture.

Overview

In this part, we'll explore:

  1. Event Streaming - Continuous flow of events over time with replay capabilities
  2. Pub/Sub Patterns - Advanced patterns for event distribution
  3. Best Practices - Essential patterns and anti-patterns
  4. Real-World Examples - Putting it all together

Let's dive into Event Streaming, one of the most powerful patterns in Event-Driven Architecture.

1. Event Streaming

Event streaming is a continuous flow of events over time, stored in an append-only log. Think of it like a DVR, recording events are recorded as they happen and can be replayed later.

How Event Streaming Works

  1. Producers write events to a stream
  2. Stream stores events in order (append-only log)
  3. Consumers read events from any position in the stream
  4. Replay - Consumers can reprocess events from the beginning

Event Streaming Characteristics

  • Append-only - Events are never modified, only added
  • Ordered - Events maintain their order
  • Replayable - Can reprocess events from any point
  • Durable - Events persist indefinitely
  • Partitioned - Streams can be split into partitions for scalability

Event Streaming Implementation

C# (.NET) - Using Apache Kafka

using Confluent.Kafka;

// Producer
public class OrderEventProducer
{
    private readonly IProducer<string, string> _producer;

    public OrderEventProducer()
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092"
        };
        _producer = new ProducerBuilder<string, string>(config).Build();
    }

    public async Task ProduceOrderCreated(OrderCreatedEvent orderCreated)
    {
        var message = JsonSerializer.Serialize(orderCreated);

        var kafkaMessage = new Message<string, string>
        {
            Key = orderCreated.OrderId.ToString(),
            Value = message
        };

        await _producer.ProduceAsync("order-events", kafkaMessage);
        Console.WriteLine($"Produced order created event: {orderCreated.OrderId}");
    }
}

// Consumer
public class OrderEventConsumer
{
    private readonly IConsumer<string, string> _consumer;

    public OrderEventConsumer()
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "order-processor",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _consumer.Subscribe("order-events");
    }

    public void StartConsuming()
    {
        try
        {
            while (true)
            {
                var result = _consumer.Consume(TimeSpan.FromSeconds(1));

                if (result != null)
                {
                    var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(result.Message.Value);
                    ProcessOrder(orderCreated);

                    // Commit offset
                    _consumer.Commit(result);
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error consuming: {ex.Message}");
        }
    }

    private void ProcessOrder(OrderCreatedEvent orderCreated)
    {
        Console.WriteLine($"Processing order: {orderCreated.OrderId}");
    }
}
Enter fullscreen mode Exit fullscreen mode

Python - Using Apache Kafka

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
class OrderEventProducer:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def produce_order_created(self, order_created):
        self.producer.send('order-events', value=order_created)
        self.producer.flush()
        print(f"Produced order created event: {order_created['order_id']}")

# Consumer
class OrderEventConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'order-events',
            bootstrap_servers=['localhost:9092'],
            group_id='order-processor',
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest'
        )

    def start_consuming(self):
        for message in self.consumer:
            order_created = message.value
            self.process_order(order_created)

    def process_order(self, order_created):
        print(f"Processing order: {order_created['order_id']}")
Enter fullscreen mode Exit fullscreen mode

TypeScript (Node.js) - Using Apache Kafka

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'order-service',
    brokers: ['localhost:9092']
});

// Producer
class OrderEventProducer {
    private producer = kafka.producer();

    async connect() {
        await this.producer.connect();
    }

    async produceOrderCreated(orderCreated: OrderCreatedEvent) {
        await this.producer.send({
            topic: 'order-events',
            messages: [{
                key: orderCreated.orderId.toString(),
                value: JSON.stringify(orderCreated)
            }]
        });
        console.log(`Produced order created event: ${orderCreated.orderId}`);
    }
}

// Consumer
class OrderEventConsumer {
    private consumer = kafka.consumer({ groupId: 'order-processor' });

    async connect() {
        await this.consumer.connect();
        await this.consumer.subscribe({ topic: 'order-events', fromBeginning: true });
    }

    async startConsuming() {
        await this.consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                const orderCreated: OrderCreatedEvent = JSON.parse(message.value?.toString() || '{}');
                await this.processOrder(orderCreated);
            }
        });
    }

    private async processOrder(orderCreated: OrderCreatedEvent) {
        console.log(`Processing order: ${orderCreated.orderId}`);
    }
}
Enter fullscreen mode Exit fullscreen mode

Event Streaming Best Practices

βœ… Use consumer groups - Enable parallel processing and load balancing
βœ… Handle offsets - Track processing progress
βœ… Partition wisely - Distribute load across partitions
βœ… Idempotent producers - Prevent duplicate events
βœ… Schema registry - Validate event schemas

❌ Don't lose offsets - Always commit offsets after processing
❌ Don't ignore ordering - Understand partition ordering guarantees
❌ Don't process synchronously - Use async processing for better throughput

2. Pub/Sub Patterns

Publish-Subscribe (Pub/Sub) patterns define how events are distributed to subscribers. Let's explore common patterns:

2.1 Fan-Out Pattern

One publisher, many subscribers - All subscribers receive every message.

Publisher β†’ Topic β†’ [Subscriber1, Subscriber2, Subscriber3]
Enter fullscreen mode Exit fullscreen mode

Use Case: Broadcasting notifications to multiple services

Example: When an order is created, notify Email Service, Inventory Service, Analytics Service, and Payment Service simultaneously.

2.2 Topic-Based Pattern

Messages filtered by topic - Subscribers subscribe to specific topics.

Publisher β†’ Topic: "orders" β†’ [OrderService1, OrderService2]
Publisher β†’ Topic: "payments" β†’ [PaymentService1]
Enter fullscreen mode Exit fullscreen mode

Use Case: Organizing events by domain or type

Example: Separate topics for "orders", "payments", "inventory" allow services to subscribe only to relevant events.

2.3 Content-Based Pattern

Messages filtered by content - Subscribers filter based on message content.

Publisher β†’ Topic β†’ [Subscriber1 (filters: amount > 100), Subscriber2 (filters: status = "pending")]
Enter fullscreen mode Exit fullscreen mode

Use Case: Conditional processing based on event data

Example: High-value order processor only processes orders over $1000, while urgent order processor handles orders marked as "urgent".

2.4 Event Sourcing Pattern

Store all events - Maintain complete event history for replay and audit.

// Event Store
public class EventStore
{
    private readonly List<Event> _events = new List<Event>();

    public void Append(Event evt)
    {
        _events.Add(evt);
        // Also publish to event stream
        PublishToStream(evt);
    }

    public IEnumerable<Event> GetEvents(string aggregateId)
    {
        return _events.Where(e => e.AggregateId == aggregateId);
    }

    public T RebuildAggregate<T>(string aggregateId) where T : AggregateRoot, new()
    {
        var events = GetEvents(aggregateId);
        var aggregate = new T();

        foreach (var evt in events)
        {
            aggregate.Apply(evt);
        }

        return aggregate;
    }
}

// Usage
public class OrderAggregate : AggregateRoot
{
    public Guid OrderId { get; private set; }
    public decimal Total { get; private set; }
    public OrderStatus Status { get; private set; }

    public void CreateOrder(CreateOrderCommand command)
    {
        Apply(new OrderCreatedEvent
        {
            OrderId = command.OrderId,
            CustomerId = command.CustomerId,
            Total = command.Total
        });
    }

    public void Apply(OrderCreatedEvent evt)
    {
        OrderId = evt.OrderId;
        Total = evt.Total;
        Status = OrderStatus.Created;
    }
}
Enter fullscreen mode Exit fullscreen mode

Use Case: Audit trails, time travel debugging, rebuilding state

Benefits:

  • Complete audit trail
  • Ability to replay events
  • Time-travel debugging
  • Rebuild state from events

Choosing the Right Pattern

Pattern Use Case Example
Message Queue Task distribution, one consumer per message Order processing queue
Topics Broadcasting to multiple services Order created β†’ Email, Inventory, Analytics
Event Streaming Event sourcing, replay, high throughput User activity stream, audit log
Fan-Out Broadcasting to all subscribers System-wide notifications
Topic-Based Organizing by domain Orders, Payments, Inventory topics
Content-Based Conditional processing High-value orders, urgent requests
Event Sourcing Complete event history Audit trails, state rebuilding

Best Practices for Event-Driven Architecture

General Best Practices

βœ… Design for failure - Services should handle missing or duplicate events
βœ… Idempotency - Make operations safe to retry
βœ… Event versioning - Version your events for backward compatibility
βœ… Schema evolution - Plan for schema changes
βœ… Monitoring - Track event throughput, latency, and errors
βœ… Dead letter queues - Handle failed messages gracefully
βœ… Event ordering - Understand ordering guarantees
βœ… Backpressure - Handle slow consumers gracefully

Implementing Idempotency

public class OrderService
{
    private readonly IEventStore _eventStore;

    public async Task ProcessOrderCreated(OrderCreatedEvent evt)
    {
        // Check if already processed
        if (await _eventStore.HasEventBeenProcessed(evt.EventId))
        {
            return; // Already processed, skip
        }

        // Process order
        await CreateOrder(evt);

        // Mark as processed
        await _eventStore.MarkEventAsProcessed(evt.EventId);
    }
}
Enter fullscreen mode Exit fullscreen mode

Event Versioning

public class OrderCreatedEventV1
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal Total { get; set; }
}

public class OrderCreatedEventV2
{
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal Total { get; set; }
    public string Currency { get; set; } // New field
    public DateTime CreatedAt { get; set; } // New field
}

// Consumer handles both versions
public class OrderProcessor
{
    public void ProcessOrderCreated(object evt)
    {
        if (evt is OrderCreatedEventV2 v2)
        {
            ProcessV2(v2);
        }
        else if (evt is OrderCreatedEventV1 v1)
        {
            ProcessV1(v1);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Anti-Patterns to Avoid

❌ Tight coupling - Don't make services depend on specific implementations
❌ Synchronous calls in async systems - Don't block on event processing
❌ Ignoring failures - Always handle and retry failures
❌ No idempotency - Don't assume events are processed exactly once
❌ Ignoring ordering - Understand when ordering matters
❌ No monitoring - Monitor event processing health

Real-World Example: E-Commerce System

Let's see how Event-Driven Architecture works in an e-commerce system:

// Order Service (Producer)
public class OrderService
{
    private readonly IEventPublisher _eventPublisher;

    public async Task<Order> CreateOrder(CreateOrderRequest request)
    {
        var order = new Order
        {
            Id = Guid.NewGuid(),
            CustomerId = request.CustomerId,
            Items = request.Items,
            Total = CalculateTotal(request.Items)
        };

        // Save order
        await _orderRepository.Save(order);

        // Publish event
        await _eventPublisher.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.Total,
            Items = order.Items
        });

        return order;
    }
}

// Email Service (Consumer)
public class EmailService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        await SendOrderConfirmationEmail(evt.CustomerId, evt.OrderId);
    }
}

// Inventory Service (Consumer)
public class InventoryService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        foreach (var item in evt.Items)
        {
            await DecreaseStock(item.ProductId, item.Quantity);
        }
    }
}

// Payment Service (Consumer)
public class PaymentService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        await ProcessPayment(evt.OrderId, evt.TotalAmount);
    }
}

// Analytics Service (Consumer)
public class AnalyticsService : IEventHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent evt)
    {
        await TrackOrderCreated(evt);
    }
}
Enter fullscreen mode Exit fullscreen mode

System Flow

  1. Order Service creates an order and publishes OrderCreatedEvent
  2. Email Service receives event β†’ sends confirmation email
  3. Inventory Service receives event β†’ decreases stock
  4. Payment Service receives event β†’ processes payment
  5. Analytics Service receives event β†’ tracks metrics

All services process independently and asynchronously!

Conclusion

Event-Driven Architecture is a powerful pattern for building scalable, resilient, and loosely coupled systems. By understanding Event Streaming, Pub/Sub Patterns, and Best Practices, you can design systems that handle high volumes of events and scale seamlessly.

Key Takeaways:

  1. Event Streaming - Use for high-throughput, replayable event processing
  2. Pub/Sub Patterns - Choose the right pattern for your use case
  3. Event Sourcing - Maintain complete event history for audit and replay
  4. Design for failure - Always handle failures gracefully
  5. Idempotency - Make operations safe to retry
  6. Event versioning - Plan for schema evolution
  7. Monitor everything - Track event processing health

Remember: Event-Driven Architecture isn't a silver bullet, it adds complexity but provides significant benefits for the right use cases. Use it when you need loose coupling, high scalability, and real-time processing.

Start small, learn from your mistakes, and gradually adopt Event-Driven Architecture where it makes sense. Your systems will thank you!

Stay eventful, and happy coding! πŸš€πŸ“‘

Top comments (0)