Hello there!ππ§ββοΈ Welcome back to Part 2 of our Event-Driven Architecture series! In Part 1, we covered the foundational concepts of Message Queues and Topics. Now, we'll dive deeper into Event Streaming, advanced Pub/Sub Patterns, and Best Practices for building robust Event-Driven systems.
Whether you're handling high-throughput event streams, implementing event sourcing, or designing complex event processing workflows, this guide will help you master the advanced aspects of Event-Driven Architecture.
Overview
In this part, we'll explore:
- Event Streaming - Continuous flow of events over time with replay capabilities
- Pub/Sub Patterns - Advanced patterns for event distribution
- Best Practices - Essential patterns and anti-patterns
- Real-World Examples - Putting it all together
Let's dive into Event Streaming, one of the most powerful patterns in Event-Driven Architecture.
1. Event Streaming
Event streaming is a continuous flow of events over time, stored in an append-only log. Think of it like a DVR, recording events are recorded as they happen and can be replayed later.
How Event Streaming Works
- Producers write events to a stream
- Stream stores events in order (append-only log)
- Consumers read events from any position in the stream
- Replay - Consumers can reprocess events from the beginning
Event Streaming Characteristics
- Append-only - Events are never modified, only added
- Ordered - Events maintain their order
- Replayable - Can reprocess events from any point
- Durable - Events persist indefinitely
- Partitioned - Streams can be split into partitions for scalability
Event Streaming Implementation
C# (.NET) - Using Apache Kafka
using Confluent.Kafka;
// Producer
public class OrderEventProducer
{
private readonly IProducer<string, string> _producer;
public OrderEventProducer()
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
_producer = new ProducerBuilder<string, string>(config).Build();
}
public async Task ProduceOrderCreated(OrderCreatedEvent orderCreated)
{
var message = JsonSerializer.Serialize(orderCreated);
var kafkaMessage = new Message<string, string>
{
Key = orderCreated.OrderId.ToString(),
Value = message
};
await _producer.ProduceAsync("order-events", kafkaMessage);
Console.WriteLine($"Produced order created event: {orderCreated.OrderId}");
}
}
// Consumer
public class OrderEventConsumer
{
private readonly IConsumer<string, string> _consumer;
public OrderEventConsumer()
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "order-processor",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<string, string>(config).Build();
_consumer.Subscribe("order-events");
}
public void StartConsuming()
{
try
{
while (true)
{
var result = _consumer.Consume(TimeSpan.FromSeconds(1));
if (result != null)
{
var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(result.Message.Value);
ProcessOrder(orderCreated);
// Commit offset
_consumer.Commit(result);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error consuming: {ex.Message}");
}
}
private void ProcessOrder(OrderCreatedEvent orderCreated)
{
Console.WriteLine($"Processing order: {orderCreated.OrderId}");
}
}
Python - Using Apache Kafka
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
class OrderEventProducer:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def produce_order_created(self, order_created):
self.producer.send('order-events', value=order_created)
self.producer.flush()
print(f"Produced order created event: {order_created['order_id']}")
# Consumer
class OrderEventConsumer:
def __init__(self):
self.consumer = KafkaConsumer(
'order-events',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
def start_consuming(self):
for message in self.consumer:
order_created = message.value
self.process_order(order_created)
def process_order(self, order_created):
print(f"Processing order: {order_created['order_id']}")
TypeScript (Node.js) - Using Apache Kafka
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092']
});
// Producer
class OrderEventProducer {
private producer = kafka.producer();
async connect() {
await this.producer.connect();
}
async produceOrderCreated(orderCreated: OrderCreatedEvent) {
await this.producer.send({
topic: 'order-events',
messages: [{
key: orderCreated.orderId.toString(),
value: JSON.stringify(orderCreated)
}]
});
console.log(`Produced order created event: ${orderCreated.orderId}`);
}
}
// Consumer
class OrderEventConsumer {
private consumer = kafka.consumer({ groupId: 'order-processor' });
async connect() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'order-events', fromBeginning: true });
}
async startConsuming() {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const orderCreated: OrderCreatedEvent = JSON.parse(message.value?.toString() || '{}');
await this.processOrder(orderCreated);
}
});
}
private async processOrder(orderCreated: OrderCreatedEvent) {
console.log(`Processing order: ${orderCreated.orderId}`);
}
}
Event Streaming Best Practices
β
Use consumer groups - Enable parallel processing and load balancing
β
Handle offsets - Track processing progress
β
Partition wisely - Distribute load across partitions
β
Idempotent producers - Prevent duplicate events
β
Schema registry - Validate event schemas
β Don't lose offsets - Always commit offsets after processing
β Don't ignore ordering - Understand partition ordering guarantees
β Don't process synchronously - Use async processing for better throughput
2. Pub/Sub Patterns
Publish-Subscribe (Pub/Sub) patterns define how events are distributed to subscribers. Let's explore common patterns:
2.1 Fan-Out Pattern
One publisher, many subscribers - All subscribers receive every message.
Publisher β Topic β [Subscriber1, Subscriber2, Subscriber3]
Use Case: Broadcasting notifications to multiple services
Example: When an order is created, notify Email Service, Inventory Service, Analytics Service, and Payment Service simultaneously.
2.2 Topic-Based Pattern
Messages filtered by topic - Subscribers subscribe to specific topics.
Publisher β Topic: "orders" β [OrderService1, OrderService2]
Publisher β Topic: "payments" β [PaymentService1]
Use Case: Organizing events by domain or type
Example: Separate topics for "orders", "payments", "inventory" allow services to subscribe only to relevant events.
2.3 Content-Based Pattern
Messages filtered by content - Subscribers filter based on message content.
Publisher β Topic β [Subscriber1 (filters: amount > 100), Subscriber2 (filters: status = "pending")]
Use Case: Conditional processing based on event data
Example: High-value order processor only processes orders over $1000, while urgent order processor handles orders marked as "urgent".
2.4 Event Sourcing Pattern
Store all events - Maintain complete event history for replay and audit.
// Event Store
public class EventStore
{
private readonly List<Event> _events = new List<Event>();
public void Append(Event evt)
{
_events.Add(evt);
// Also publish to event stream
PublishToStream(evt);
}
public IEnumerable<Event> GetEvents(string aggregateId)
{
return _events.Where(e => e.AggregateId == aggregateId);
}
public T RebuildAggregate<T>(string aggregateId) where T : AggregateRoot, new()
{
var events = GetEvents(aggregateId);
var aggregate = new T();
foreach (var evt in events)
{
aggregate.Apply(evt);
}
return aggregate;
}
}
// Usage
public class OrderAggregate : AggregateRoot
{
public Guid OrderId { get; private set; }
public decimal Total { get; private set; }
public OrderStatus Status { get; private set; }
public void CreateOrder(CreateOrderCommand command)
{
Apply(new OrderCreatedEvent
{
OrderId = command.OrderId,
CustomerId = command.CustomerId,
Total = command.Total
});
}
public void Apply(OrderCreatedEvent evt)
{
OrderId = evt.OrderId;
Total = evt.Total;
Status = OrderStatus.Created;
}
}
Use Case: Audit trails, time travel debugging, rebuilding state
Benefits:
- Complete audit trail
- Ability to replay events
- Time-travel debugging
- Rebuild state from events
Choosing the Right Pattern
| Pattern | Use Case | Example |
|---|---|---|
| Message Queue | Task distribution, one consumer per message | Order processing queue |
| Topics | Broadcasting to multiple services | Order created β Email, Inventory, Analytics |
| Event Streaming | Event sourcing, replay, high throughput | User activity stream, audit log |
| Fan-Out | Broadcasting to all subscribers | System-wide notifications |
| Topic-Based | Organizing by domain | Orders, Payments, Inventory topics |
| Content-Based | Conditional processing | High-value orders, urgent requests |
| Event Sourcing | Complete event history | Audit trails, state rebuilding |
Best Practices for Event-Driven Architecture
General Best Practices
β
Design for failure - Services should handle missing or duplicate events
β
Idempotency - Make operations safe to retry
β
Event versioning - Version your events for backward compatibility
β
Schema evolution - Plan for schema changes
β
Monitoring - Track event throughput, latency, and errors
β
Dead letter queues - Handle failed messages gracefully
β
Event ordering - Understand ordering guarantees
β
Backpressure - Handle slow consumers gracefully
Implementing Idempotency
public class OrderService
{
private readonly IEventStore _eventStore;
public async Task ProcessOrderCreated(OrderCreatedEvent evt)
{
// Check if already processed
if (await _eventStore.HasEventBeenProcessed(evt.EventId))
{
return; // Already processed, skip
}
// Process order
await CreateOrder(evt);
// Mark as processed
await _eventStore.MarkEventAsProcessed(evt.EventId);
}
}
Event Versioning
public class OrderCreatedEventV1
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal Total { get; set; }
}
public class OrderCreatedEventV2
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal Total { get; set; }
public string Currency { get; set; } // New field
public DateTime CreatedAt { get; set; } // New field
}
// Consumer handles both versions
public class OrderProcessor
{
public void ProcessOrderCreated(object evt)
{
if (evt is OrderCreatedEventV2 v2)
{
ProcessV2(v2);
}
else if (evt is OrderCreatedEventV1 v1)
{
ProcessV1(v1);
}
}
}
Anti-Patterns to Avoid
β Tight coupling - Don't make services depend on specific implementations
β Synchronous calls in async systems - Don't block on event processing
β Ignoring failures - Always handle and retry failures
β No idempotency - Don't assume events are processed exactly once
β Ignoring ordering - Understand when ordering matters
β No monitoring - Monitor event processing health
Real-World Example: E-Commerce System
Let's see how Event-Driven Architecture works in an e-commerce system:
// Order Service (Producer)
public class OrderService
{
private readonly IEventPublisher _eventPublisher;
public async Task<Order> CreateOrder(CreateOrderRequest request)
{
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = request.CustomerId,
Items = request.Items,
Total = CalculateTotal(request.Items)
};
// Save order
await _orderRepository.Save(order);
// Publish event
await _eventPublisher.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.Total,
Items = order.Items
});
return order;
}
}
// Email Service (Consumer)
public class EmailService : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent evt)
{
await SendOrderConfirmationEmail(evt.CustomerId, evt.OrderId);
}
}
// Inventory Service (Consumer)
public class InventoryService : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent evt)
{
foreach (var item in evt.Items)
{
await DecreaseStock(item.ProductId, item.Quantity);
}
}
}
// Payment Service (Consumer)
public class PaymentService : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent evt)
{
await ProcessPayment(evt.OrderId, evt.TotalAmount);
}
}
// Analytics Service (Consumer)
public class AnalyticsService : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent evt)
{
await TrackOrderCreated(evt);
}
}
System Flow
-
Order Service creates an order and publishes
OrderCreatedEvent - Email Service receives event β sends confirmation email
- Inventory Service receives event β decreases stock
- Payment Service receives event β processes payment
- Analytics Service receives event β tracks metrics
All services process independently and asynchronously!
Conclusion
Event-Driven Architecture is a powerful pattern for building scalable, resilient, and loosely coupled systems. By understanding Event Streaming, Pub/Sub Patterns, and Best Practices, you can design systems that handle high volumes of events and scale seamlessly.
Key Takeaways:
- Event Streaming - Use for high-throughput, replayable event processing
- Pub/Sub Patterns - Choose the right pattern for your use case
- Event Sourcing - Maintain complete event history for audit and replay
- Design for failure - Always handle failures gracefully
- Idempotency - Make operations safe to retry
- Event versioning - Plan for schema evolution
- Monitor everything - Track event processing health
Remember: Event-Driven Architecture isn't a silver bullet, it adds complexity but provides significant benefits for the right use cases. Use it when you need loose coupling, high scalability, and real-time processing.
Start small, learn from your mistakes, and gradually adopt Event-Driven Architecture where it makes sense. Your systems will thank you!
Stay eventful, and happy coding! ππ‘
Top comments (0)