DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

Event-Driven Architecture Kit

Event-Driven Architecture Kit

A comprehensive collection of patterns, schemas, and infrastructure templates for building event-driven systems. This kit covers event sourcing, CQRS, and pub/sub architectures with concrete implementations using Kafka, SQS/SNS, and Azure Event Grid. Each pattern includes schema definitions (Avro/JSON Schema), dead-letter queue handling, exactly-once semantics strategies, and monitoring dashboards. Move beyond request-response architectures and build systems that are decoupled, scalable, and resilient by design.

Key Features

  • 12 Event-Driven Patterns — Fan-out, event sourcing, CQRS, saga orchestration, claim-check, content-based routing, and more
  • Multi-Broker Support — Implementations for Apache Kafka, AWS SQS/SNS, Azure Event Grid, and GCP Pub/Sub
  • Schema Registry — Avro and JSON Schema definitions with compatibility validation and versioning strategy
  • Dead-Letter Queue Handling — DLQ configurations, retry policies, and poison message handlers for every broker
  • Saga Orchestration — Distributed transaction patterns with compensation logic and state machine definitions
  • Exactly-Once Semantics — Idempotency key strategies, transactional outbox pattern, and deduplication middleware
  • Observability — Distributed tracing, event flow dashboards, and consumer lag alerting configurations
  • Infrastructure as Code — Terraform modules for provisioning messaging infrastructure across clouds

Quick Start

# Deploy Kafka topic infrastructure
cd src/terraform/kafka
terraform init
terraform apply -var="environment=staging"

# Deploy AWS SNS/SQS fan-out pattern
cd src/terraform/aws-sns-sqs
terraform apply -var-file="configs/fan-out.tfvars"

# Run the event schema validator
python3 src/tools/schema_validator.py \
  --schema schemas/order-events/v1/order-placed.avsc \
  --payload examples/order-placed.json
Enter fullscreen mode Exit fullscreen mode

Architecture

┌──────────────────────────────────────────────────────────┐
│         Event-Driven Architecture Patterns               │
│                                                          │
│  Fan-Out Pattern:                                        │
│  ┌──────────┐    ┌───────┐    ┌──────────────────────┐   │
│  │ Producer │───►│ Topic │───►│ Consumer A (Email)   │   │
│  │ (Order   │    │(SNS/  │───►│ Consumer B (Billing) │   │
│  │  Service)│    │Kafka) │───►│ Consumer C (Audit)   │   │
│  └──────────┘    └───────┘    └──────────────────────┘   │
│                                                          │
│  CQRS + Event Sourcing:                                  │
│  ┌──────────┐    ┌───────────┐    ┌──────────────────┐   │
│  │ Command  │───►│  Event    │───►│  Read Model      │   │
│  │ Handler  │    │  Store    │    │  Projector        │   │
│  └──────────┘    └───────────┘    └───────┬──────────┘   │
│       │                                   │              │
│       │          ┌───────────┐    ┌───────▼──────────┐   │
│       └─────────►│  Command  │    │  Query Database  │   │
│                  │  Database │    │  (read-optimized) │   │
│                  └───────────┘    └──────────────────┘   │
│                                                          │
│  Saga (Orchestration):                                   │
│  ┌──────┐   ┌──────┐   ┌──────┐   ┌──────┐             │
│  │Step 1│──►│Step 2│──►│Step 3│──►│Done  │             │
│  │Order │   │Pay   │   │Ship  │   │      │             │
│  └──┬───┘   └──┬───┘   └──┬───┘   └──────┘             │
│     │ fail     │ fail     │ fail                        │
│     ▼          ▼          ▼                              │
│  Compensate Compensate Compensate                       │
└──────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Usage Examples

Event Schema — Order Placed (Avro)

{
  "type": "record",
  "name": "OrderPlaced",
  "namespace": "com.acme.orders.events",
  "doc": "Published when a customer completes checkout",
  "fields": [
    {"name": "event_id",    "type": "string", "doc": "Unique event identifier (UUID)"},
    {"name": "event_time",  "type": "long",   "doc": "Unix timestamp in milliseconds"},
    {"name": "order_id",    "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "items", "type": {"type": "array", "items": {
      "type": "record", "name": "OrderItem", "fields": [
        {"name": "product_id", "type": "string"},
        {"name": "quantity",   "type": "int"},
        {"name": "unit_price", "type": "double"}
      ]
    }}},
    {"name": "total_amount", "type": "double"},
    {"name": "currency",     "type": "string", "default": "USD"}
  ]
}
Enter fullscreen mode Exit fullscreen mode

SQS/SNS Fan-Out with Dead-Letter Queue

# src/terraform/aws-sns-sqs/fan-out.tf
resource "aws_sns_topic" "order_events" {
  name = "${var.project}-order-events"
}

resource "aws_sqs_queue" "email_notifications" {
  name                       = "${var.project}-email-notifications"
  visibility_timeout_seconds = 300
  message_retention_seconds  = 1209600   # 14 days
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.email_dlq.arn
    maxReceiveCount     = 3              # Move to DLQ after 3 failures
  })
}

resource "aws_sqs_queue" "email_dlq" {
  name                      = "${var.project}-email-notifications-dlq"
  message_retention_seconds = 1209600
}

resource "aws_sns_topic_subscription" "email_sub" {
  topic_arn = aws_sns_topic.order_events.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.email_notifications.arn
  filter_policy = jsonencode({
    event_type = ["OrderPlaced", "OrderShipped"]
  })
}
Enter fullscreen mode Exit fullscreen mode

Transactional Outbox Pattern

# src/patterns/outbox.py
"""Transactional outbox for reliable event publishing.

Writes events to an outbox table in the same transaction as
the business operation, then a separate poller publishes them
to the message broker. Guarantees at-least-once delivery.
"""
import json
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone

@dataclass
class OutboxEvent:
    aggregate_type: str
    aggregate_id: str
    event_type: str
    payload: dict
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    created_at: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    published: bool = False

    def to_sql_insert(self) -> tuple[str, tuple]:
        """Generate SQL INSERT for the outbox table."""
        sql = """
            INSERT INTO event_outbox
            (event_id, aggregate_type, aggregate_id, event_type, payload, created_at)
            VALUES (%s, %s, %s, %s, %s, %s)
        """
        params = (
            self.event_id, self.aggregate_type, self.aggregate_id,
            self.event_type, json.dumps(self.payload), self.created_at,
        )
        return sql, params
Enter fullscreen mode Exit fullscreen mode

Configuration

# configs/event-config.yaml
broker: kafka                       # kafka, sqs-sns, event-grid, pubsub

kafka:
  bootstrap_servers: "broker1.example.com:9092,broker2.example.com:9092"
  schema_registry_url: "https://schema-registry.example.com"
  default_replication_factor: 3
  default_partitions: 12
  retention_hours: 168              # 7 days

sqs_sns:
  region: us-east-1
  dlq_max_receive_count: 3          # Retries before DLQ
  visibility_timeout_seconds: 300

consumer:
  max_poll_records: 500
  enable_auto_commit: false         # Manual commit for exactly-once
  idempotency_window_hours: 24      # Deduplication window
Enter fullscreen mode Exit fullscreen mode

Best Practices

  • Always use a schema registry — Schema evolution without a registry leads to consumer deserialization failures
  • Design events as facts, not commandsOrderPlaced (past tense) is an event; PlaceOrder is a command
  • Set up dead-letter queues on every consumer — Unhandled messages should be captured, not lost
  • Use the transactional outbox pattern — Dual-writes (DB + broker) will eventually fail; outbox prevents inconsistency
  • Monitor consumer lag — Increasing lag means consumers can't keep up; scale or optimize before it becomes a problem
  • Version your schemas — Use backward-compatible evolution (add optional fields, never remove/rename required ones)

Troubleshooting

Issue Cause Fix
Consumer receives duplicate messages At-least-once delivery (normal behavior) Implement idempotency using event_id as deduplication key
Messages stuck in DLQ Consumer throws unrecoverable error Inspect DLQ messages, fix consumer bug, replay from DLQ
Kafka consumer lag increasing Consumer processing too slow or too few partitions Add consumer instances (max = partition count) or increase partitions
SNS filter policy not matching JSON structure doesn't match message attributes Ensure publisher sets MessageAttributes, not just the message body

This is 1 of 11 resources in the Cloud Architecture Pro toolkit. Get the complete [Event-Driven Architecture Kit] with all files, templates, and documentation for $39.

Get the Full Kit →

Or grab the entire Cloud Architecture Pro bundle (11 products) for $149 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)