Building a robust server-side event bus with exactly-once delivery in a microservice architecture
Building a robust server-side event bus with exactly-once delivery in a microservice architecture
In modern distributed systems, ensuring reliable, exactly-once delivery of domain events across services is a practical challenge. This tutorial walks through designing and implementing a lightweight, server-side event bus that provides exactly-once semantics for event delivery, using idempotent handlers, durable storage, and careful coordination. We’ll cover architectural choices, data modeling, idempotency keys, and concrete code examples in Python with PostgreSQL as the durable store and Redis as a fast in-memory queue. You’ll leave with a practical blueprint you can adapt to real projects.
Prerequisites
- Familiarity with microservice architecture concepts
- Basic Python knowledge (async I/O is optional but recommended)
- PostgreSQL and Redis installed or accessible
- Docker (optional but helpful for local setup)
Overview of the approach
- Build a central event bus service responsible for publishing events and delivering them to subscribed services.
- Use a durable event log in PostgreSQL to store published events with unique event IDs and metadata.
- Implement exactly-once delivery using an idempotency key per consumer, persisted state, and a delivery attempt tracking mechanism.
- Use Redis as a work queue to decouple publishing from delivery while maintaining ordering guarantees per consumer where appropriate.
- Provide a consumer protocol that supports replay and recovery, along with at-least-once retries that are bounded and idempotent.
- Include testing strategies and operational considerations (metrics, monitoring, backpressure, and failure modes).
System architecture diagram (conceptual)
- Event Publisher -> Event Bus (PostgreSQL-backed log, durable storage)
- Event Bus -> Delivery Workers (Redis queue) -> Consumer services
- Each consumer maintains an Idempotency Table to track processed events (consumer_id, event_id, status)
- Event Bus exposes a REST/gRPC API for publishers and a delivery endpoint for consumers (or use webhook-type callbacks with retries)
- Operational concerns: dead-letter queue, retry policy, backoff, metrics, and alerting
Data model design
- events table: stores all published events
- event_id (UUID, primary key)
- publisher_id (string)
- topic (string)
- payload (jsonb)
- created_at (timestamp)
- status (enum: 'published', 'failed', etc.)
- version (int, for event schema versioning)
- delivery_attempts table: tracks deliveries to each consumer
- id (serial)
- event_id (UUID, foreign key)
- consumer_id (string)
- attempt_id (UUID, unique per attempt)
- status (enum: 'in_progress', 'delivered', 'failed', 'skipped')
- next_redeliver_at (timestamp)
- created_at (timestamp)
- updated_at (timestamp)
- consumer_idempotency table: per-consumer idempotency registry
- consumer_id (string)
- event_id (UUID)
- status (enum: 'processed', 'failed', 'in_progress')
- processed_at (timestamp)
- primary key (consumer_id, event_id)
End-to-end flow
- Publisher creates an event in the event bus: insert into events with status 'published' and generate event_id.
- A delivery worker reads the event and enqueues delivery for each subscribed consumer in delivery_attempts with status 'in_progress' and a calculated next_redeliver_at.
- Consumer receives the event (via an HTTP POST or webhook) with a unique delivery attempt_id and the event_id.
- Consumer processes the event idempotently:
- Check the consumer_idempotency table for (consumer_id, event_id). If already 'processed', respond with success and skip re-processing.
- If not yet processed, apply the event to the consumer's state deterministically.
- On success, write to consumer_idempotency (status 'processed', processed_at) and update delivery_attempts status to 'delivered'.
- On transient failure, update delivery_attempts status to 'failed' and set next_redeliver_at with exponential backoff.
- Monitoring and compensation:
- If a consumer remains unresponsive beyond a threshold, move the event to a dead-letter queue or emit a separate alert.
- Allow operators to replay events by re-enqueuing an event for a consumer, while keeping idempotency checks to avoid duplicates.
Practical code examples
- Tech stack: Python 3.11, async SQLAlchemy for PostgreSQL, aioredis for Redis, FastAPI for HTTP endpoints.
- Note: This example focuses on the core patterns. In real deployments you’ll want proper migrations, connection pooling, and production-grade error handling.
1) Database schema (SQL)
events table
CREATE TABLE events (
event_id UUID PRIMARY KEY,
publisher_id TEXT NOT NULL,
topic TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
status TEXT NOT NULL DEFAULT 'published',
version INT NOT NULL DEFAULT 1
);
delivery_attempts table
CREATE TABLE delivery_attempts (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
event_id UUID NOT NULL REFERENCES events(event_id),
consumer_id TEXT NOT NULL,
attempt_id UUID NOT NULL,
status TEXT NOT NULL,
next_redeliver_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(event_id, consumer_id, attempt_id)
);
consumer idempotency table
CREATE TABLE consumer_idempotency (
consumer_id TEXT NOT NULL,
event_id UUID NOT NULL,
status TEXT NOT NULL,
processed_at TIMESTAMPTZ,
PRIMARY KEY (consumer_id, event_id)
);
2) Publisher client (publish_event.py)
import uuid
import json
import asyncio
from datetime import datetime
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/events"
engine = create_async_engine(DATABASE_URL)
async def publish_event(publisher_id: str, topic: str, payload: dict, version: int = 1):
event_id = uuid.uuid4()
async with AsyncSession(engine) as session:
await session.execute(
text("""
INSERT INTO events (event_id, publisher_id, topic, payload, created_at, status, version)
VALUES (:event_id, :publisher_id, :topic, :payload, NOW(), 'published', :version)
"""),
{
"event_id": event_id,
"publisher_id": publisher_id,
"topic": topic,
"payload": json.dumps(payload),
"version": version,
},
)
await session.commit()
return event_id
async def main():
event_id = await publish_event("order-service", "order.created", {"order_id": 1234, "amount": 99.99})
print(f"Published event {event_id}")
if name == "main":
asyncio.run(main())
3) Delivery worker (deliveries.py)
import asyncio
import uuid
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
import aioredis
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/events"
REDIS_URL = "redis://localhost"
engine = create_async_engine(DATABASE_URL)
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
async def enqueue_delivery(event_id: str, consumer_id: str, attempt_id: str):
await redis.lpush(f"deliveries:{consumer_id}", json.dumps({
"event_id": event_id,
"attempt_id": attempt_id
}))
async def schedule_deliveries():
async with AsyncSession(engine) as session:
result = await session.execute(text("""
SELECT event_id, topic FROM events
WHERE status = 'published'
"""))
for row in result:
event_id = str(row.event_id)
# In a real system, resolve subscribed consumers from a registry
for consumer_id in ["warehouse-service", "billing-service", "analytics-service"]:
attempt_id = str(uuid.uuid4())
await session.execute(text("""
INSERT INTO delivery_attempts (event_id, consumer_id, attempt_id, status)
VALUES (:event_id, :consumer_id, :attempt_id, 'in_progress')
ON CONFLICT DO NOTHING
"""), {
"event_id": event_id,
"consumer_id": consumer_id,
"attempt_id": attempt_id
})
await enqueue_delivery(event_id, consumer_id, attempt_id)
await session.commit()
async def delivery_worker_loop():
while True:
# Pop a delivery task
raw = await redis.brpop("deliveries:warehouse-service", timeout=5)
if raw:
payload = json.loads(raw)
event_id = payload["event_id"]
attempt_id = payload["attempt_id"]
# Here you would call the consumer listener endpoint or a webhook invoker
# For demonstration, simulate a delivery
success = await simulate_delivery(event_id, "warehouse-service", attempt_id)
# Update delivery_attempts and idempotency based on success
async with AsyncSession(engine) as session:
status = 'delivered' if success else 'failed'
await session.execute(text("""
UPDATE delivery_attempts
SET status = :status, updated_at = NOW(), next_redeliver_at = CASE WHEN :status = 'failed' THEN NOW() + INTERVAL '5 minutes' ELSE NULL END
WHERE event_id = :event_id AND consumer_id = :consumer_id AND attempt_id = :attempt_id
"""), {
"status": status,
"event_id": event_id,
"consumer_id": "warehouse-service",
"attempt_id": attempt_id
})
if success:
await session.execute(text("""
INSERT INTO consumer_idempotency (consumer_id, event_id, status, processed_at)
VALUES (:consumer_id, :event_id, 'processed', NOW())
ON CONFLICT (consumer_id, event_id) DO UPDATE SET status = 'processed', processed_at = NOW()
"""), {
"consumer_id": "warehouse-service",
"event_id": event_id
})
await session.commit()
await asyncio.sleep(0.1)
async def simulate_delivery(event_id, consumer_id, attempt_id):
# Placeholder for real HTTP call to consumer
await asyncio.sleep(0.05)
# Randomize success for demonstration
import random
return random.random() > 0.2
if name == "main":
import asyncio
asyncio.run(schedule_deliveries())
asyncio.run(delivery_worker_loop())
4) Consumer API (FastAPI) for idempotent processing
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import uuid
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
from fastapi.responses import JSONResponse
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/events"
engine = create_async_engine(DATABASE_URL)
app = FastAPI(title="Event Consumer Endpoint")
class Envelope(BaseModel):
event_id: str
consumer_id: str
payload: dict
attempt_id: str
async def is_idempotent(consumer_id: str, event_id: str):
async with AsyncSession(engine) as session:
result = await session.execute(text("""
SELECT status FROM consumer_idempotency WHERE consumer_id = :consumer_id AND event_id = :event_id
"""), {"consumer_id": consumer_id, "event_id": event_id})
row = result.fetchone()
return row is not None and row == 'processed'
async def mark_processed(consumer_id: str, event_id: str):
async with AsyncSession(engine) as session:
await session.execute(text("""
INSERT INTO consumer_idempotency (consumer_id, event_id, status, processed_at)
VALUES (:consumer_id, :event_id, 'processed', NOW())
ON CONFLICT (consumer_id, event_id) DO UPDATE SET status = 'processed', processed_at = NOW()
"""), {"consumer_id": consumer_id, "event_id": event_id})
await session.commit()
@app.post("/consume")
async def consume(envelope: Envelope):
if await is_idempotent(envelope.consumer_id, envelope.event_id):
return JSONResponse(content={"status": "skipped", "reason": "already processed"}, status_code=200)
# Process the event payload (domain-specific logic should be here)
try:
# Example: apply to local state (pseudo)
await asyncio.sleep(0.01)
# If processing succeeds
await mark_processed(envelope.consumer_id, envelope.event_id)
return {"status": "ok", "event_id": envelope.event_id, "consumer": envelope.consumer_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
If you deploy this service, ensure proper authentication and validation for incoming events
Operational considerations
- Exactly-once guarantees rely on strict idempotency checks at the consumer side. Both producer and consumer should enforce idempotency records for every event-consumer pair.
- Dead-letter handling: add a dead-letter queue table for failed deliveries after a maximum number of retries to prevent unbounded retry loops.
- Backpressure and flow control: implement a per-consumer max concurrent deliveries to avoid overwhelming slow consumers.
- Observability: emit metrics for total events published, delivery attempts, success rate, failure rate, and latency. Use a centralized dashboard (Prometheus/Grafana) and log correlation IDs across services.
- Recovery and replay: to replay an event, re-create a new delivery_attempt with a new attempt_id, ensuring idempotency checks prevent duplicate processing.
Testing strategies
- Unit tests: test idempotency checks with in-memory databases, mock HTTP calls to consumers, and ensure that repeated deliveries don’t cause side effects.
- Integration tests: simulate end-to-end publishing and delivery with multiple consumers, verifying exactly-once guarantees.
- Chaos testing: inject delays, partial outages, and network failures to observe behavior under real-world disturbances.
- Replay tests: validate that replayed events are processed correctly when idempotency records are consulted.
Operational tip: start simple
- Begin with a single consumer and a durable event log. Add more consumers and cross-service delivery once the core idempotency and retry loop are solid.
Would you like this example adapted to a different tech stack (e.g., Go or Node.js), or scaled to include multiple independent event buses with a centralized broker?
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)