Event-Driven Architecture Kit
A comprehensive collection of patterns, schemas, and infrastructure templates for building event-driven systems. This kit covers event sourcing, CQRS, and pub/sub architectures with concrete implementations using Kafka, SQS/SNS, and Azure Event Grid. Each pattern includes schema definitions (Avro/JSON Schema), dead-letter queue handling, exactly-once semantics strategies, and monitoring dashboards. Move beyond request-response architectures and build systems that are decoupled, scalable, and resilient by design.
Key Features
- 12 Event-Driven Patterns — Fan-out, event sourcing, CQRS, saga orchestration, claim-check, content-based routing, and more
- Multi-Broker Support — Implementations for Apache Kafka, AWS SQS/SNS, Azure Event Grid, and GCP Pub/Sub
- Schema Registry — Avro and JSON Schema definitions with compatibility validation and versioning strategy
- Dead-Letter Queue Handling — DLQ configurations, retry policies, and poison message handlers for every broker
- Saga Orchestration — Distributed transaction patterns with compensation logic and state machine definitions
- Exactly-Once Semantics — Idempotency key strategies, transactional outbox pattern, and deduplication middleware
- Observability — Distributed tracing, event flow dashboards, and consumer lag alerting configurations
- Infrastructure as Code — Terraform modules for provisioning messaging infrastructure across clouds
Quick Start
# Deploy Kafka topic infrastructure
cd src/terraform/kafka
terraform init
terraform apply -var="environment=staging"
# Deploy AWS SNS/SQS fan-out pattern
cd src/terraform/aws-sns-sqs
terraform apply -var-file="configs/fan-out.tfvars"
# Run the event schema validator
python3 src/tools/schema_validator.py \
--schema schemas/order-events/v1/order-placed.avsc \
--payload examples/order-placed.json
Architecture
┌──────────────────────────────────────────────────────────┐
│ Event-Driven Architecture Patterns │
│ │
│ Fan-Out Pattern: │
│ ┌──────────┐ ┌───────┐ ┌──────────────────────┐ │
│ │ Producer │───►│ Topic │───►│ Consumer A (Email) │ │
│ │ (Order │ │(SNS/ │───►│ Consumer B (Billing) │ │
│ │ Service)│ │Kafka) │───►│ Consumer C (Audit) │ │
│ └──────────┘ └───────┘ └──────────────────────┘ │
│ │
│ CQRS + Event Sourcing: │
│ ┌──────────┐ ┌───────────┐ ┌──────────────────┐ │
│ │ Command │───►│ Event │───►│ Read Model │ │
│ │ Handler │ │ Store │ │ Projector │ │
│ └──────────┘ └───────────┘ └───────┬──────────┘ │
│ │ │ │
│ │ ┌───────────┐ ┌───────▼──────────┐ │
│ └─────────►│ Command │ │ Query Database │ │
│ │ Database │ │ (read-optimized) │ │
│ └───────────┘ └──────────────────┘ │
│ │
│ Saga (Orchestration): │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Step 1│──►│Step 2│──►│Step 3│──►│Done │ │
│ │Order │ │Pay │ │Ship │ │ │ │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──────┘ │
│ │ fail │ fail │ fail │
│ ▼ ▼ ▼ │
│ Compensate Compensate Compensate │
└──────────────────────────────────────────────────────────┘
Usage Examples
Event Schema — Order Placed (Avro)
{
"type": "record",
"name": "OrderPlaced",
"namespace": "com.acme.orders.events",
"doc": "Published when a customer completes checkout",
"fields": [
{"name": "event_id", "type": "string", "doc": "Unique event identifier (UUID)"},
{"name": "event_time", "type": "long", "doc": "Unix timestamp in milliseconds"},
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "items", "type": {"type": "array", "items": {
"type": "record", "name": "OrderItem", "fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unit_price", "type": "double"}
]
}}},
{"name": "total_amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
SQS/SNS Fan-Out with Dead-Letter Queue
# src/terraform/aws-sns-sqs/fan-out.tf
resource "aws_sns_topic" "order_events" {
name = "${var.project}-order-events"
}
resource "aws_sqs_queue" "email_notifications" {
name = "${var.project}-email-notifications"
visibility_timeout_seconds = 300
message_retention_seconds = 1209600 # 14 days
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.email_dlq.arn
maxReceiveCount = 3 # Move to DLQ after 3 failures
})
}
resource "aws_sqs_queue" "email_dlq" {
name = "${var.project}-email-notifications-dlq"
message_retention_seconds = 1209600
}
resource "aws_sns_topic_subscription" "email_sub" {
topic_arn = aws_sns_topic.order_events.arn
protocol = "sqs"
endpoint = aws_sqs_queue.email_notifications.arn
filter_policy = jsonencode({
event_type = ["OrderPlaced", "OrderShipped"]
})
}
Transactional Outbox Pattern
# src/patterns/outbox.py
"""Transactional outbox for reliable event publishing.
Writes events to an outbox table in the same transaction as
the business operation, then a separate poller publishes them
to the message broker. Guarantees at-least-once delivery.
"""
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class OutboxEvent:
aggregate_type: str
aggregate_id: str
event_type: str
payload: dict
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
published: bool = False
def to_sql_insert(self) -> tuple[str, tuple]:
"""Generate SQL INSERT for the outbox table."""
sql = """
INSERT INTO event_outbox
(event_id, aggregate_type, aggregate_id, event_type, payload, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
"""
params = (
self.event_id, self.aggregate_type, self.aggregate_id,
self.event_type, json.dumps(self.payload), self.created_at,
)
return sql, params
Configuration
# configs/event-config.yaml
broker: kafka # kafka, sqs-sns, event-grid, pubsub
kafka:
bootstrap_servers: "broker1.example.com:9092,broker2.example.com:9092"
schema_registry_url: "https://schema-registry.example.com"
default_replication_factor: 3
default_partitions: 12
retention_hours: 168 # 7 days
sqs_sns:
region: us-east-1
dlq_max_receive_count: 3 # Retries before DLQ
visibility_timeout_seconds: 300
consumer:
max_poll_records: 500
enable_auto_commit: false # Manual commit for exactly-once
idempotency_window_hours: 24 # Deduplication window
Best Practices
- Always use a schema registry — Schema evolution without a registry leads to consumer deserialization failures
-
Design events as facts, not commands —
OrderPlaced(past tense) is an event;PlaceOrderis a command - Set up dead-letter queues on every consumer — Unhandled messages should be captured, not lost
- Use the transactional outbox pattern — Dual-writes (DB + broker) will eventually fail; outbox prevents inconsistency
- Monitor consumer lag — Increasing lag means consumers can't keep up; scale or optimize before it becomes a problem
- Version your schemas — Use backward-compatible evolution (add optional fields, never remove/rename required ones)
Troubleshooting
| Issue | Cause | Fix |
|---|---|---|
| Consumer receives duplicate messages | At-least-once delivery (normal behavior) | Implement idempotency using event_id as deduplication key |
| Messages stuck in DLQ | Consumer throws unrecoverable error | Inspect DLQ messages, fix consumer bug, replay from DLQ |
| Kafka consumer lag increasing | Consumer processing too slow or too few partitions | Add consumer instances (max = partition count) or increase partitions |
| SNS filter policy not matching | JSON structure doesn't match message attributes | Ensure publisher sets MessageAttributes, not just the message body |
This is 1 of 11 resources in the Cloud Architecture Pro toolkit. Get the complete [Event-Driven Architecture Kit] with all files, templates, and documentation for $39.
Or grab the entire Cloud Architecture Pro bundle (11 products) for $149 — save 30%.
Top comments (0)