Efficient real-time data pipelines with event-sourced microservices and the outbox pattern
Efficient real-time data pipelines with event-sourced microservices and the outbox pattern
In modern distributed systems, you often need near-real-time data synchronization across services without sacrificing correctness. This tutorial shows how to design and implement an event-sourced microservice architecture that uses the outbox pattern to reliably publish integration events to downstream consumers. You’ll get practical guidance, architecture decisions, and concrete code you can adapt.
Key goals
- Build a simple, observable event-sourced service
- Ensure at-least-once delivery of events to a message broker
- Decouple services using an outbox table and background workers
- Achieve idempotent event handling in consumers
- Monitor, test, and operate the pipeline in production
Scenario
We’ll model a small domain: an order service that persists domain events (OrderCreated, OrderPaid, OrderShipped) in an event store and publishes corresponding integration events to a message broker (RabbitMQ). We’ll implement the outbox pattern to guarantee delivery even if the producer crashes, and we’ll show a robust consumer design with idempotent handlers.
What you’ll learn
- Event sourcing basics and why it helps with replayability and auditability
- The outbox pattern: using a local outbox table to reliably publish messages
- Idempotent consumers: ensuring repeated deliveries don’t cause duplicates
- Practical code in a modern language (Node.js with TypeScript and PostgreSQL)
- Testing strategies: unit, integration, and end-to-end tests for the pipeline
- Observability: tracing, metrics, and alerting hooks
Architecture overview
- Order service (write model)
- PostgreSQL stores aggregates and events
- Outbox table persists integration events with a committed flag
- Event publishing worker reads outbox entries, publishes to RabbitMQ, then marks as published
- Message broker (RabbitMQ)
- Exchanges and queues per event type
- Durable queues, persistent messages
- Downstream services (read models)
- Subscribed to integration events
- Idempotent event handlers update read models
- Observability
- Structured logs, correlation IDs across services
- Metrics: outbox table size, publish success rate, consumer lag
Minimum viable data model
- orders table: order_id (PK), customer_id, status, total_amount, created_at
- events table: event_id (PK), order_id, event_type, payload (JSON), occurred_at
- outbox table: outbox_id (PK), event_type, payload (JSON), occurred_at, published_at (nullable), status (enum: 'pending', 'sent', 'failed'), retry_count
Step 1: Define domain events and event store
- Event types: OrderCreated, OrderPaid, OrderShipped
- Each domain event includes a payload with the relevant data and a snapshot version
Code sketch (TypeScript with Node.js and PostgreSQL)
- Setup: Use PostgreSQL via pg and TypeORM or Prisma. For clarity, this example uses plain SQL with a small helper.
1) Database schema (simplified)
CREATE TABLE orders (
order_id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
status TEXT NOT NULL,
total_amount NUMERIC(12,2) NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now()
);
CREATE TABLE events (
event_id UUID PRIMARY KEY,
order_id UUID NOT NULL REFERENCES orders(order_id),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now()
);
CREATE TABLE outbox (
outbox_id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(),
published_at TIMESTAMP WITHOUT TIME ZONE,
status TEXT NOT NULL DEFAULT 'pending',
retry_count INT NOT NULL DEFAULT 0
);
CREATE INDEX idx_outbox_pending ON outbox (status) WHERE status = 'pending';
CREATE INDEX idx_outbox_published ON outbox (published_at);
2) Domain event interfaces
type DomainEvent =
| { type: 'OrderCreated'; data: { orderId: string; customerId: string; totalAmount: number } }
| { type: 'OrderPaid'; data: { orderId: string; paidAt: string } }
| { type: 'OrderShipped'; data: { orderId: string; shippedAt: string; trackingCode?: string } };
3) Persisting domain events
async function saveOrderAndEvents(db, order) {
await db.query('BEGIN');
try {
await db.query(
'INSERT INTO orders (order_id, customer_id, status, total_amount) VALUES ($1,$2,$3,$4)',
[order.id, order.customerId, order.status, order.total]
);
const events: DomainEvent[] = order.events;
for (const e of events) {
const payload = JSON.stringify(e.data);
await db.query(
'INSERT INTO events (event_id, order_id, event_type, payload) VALUES ($1,$2,$3,$4)',
[uuid(), order.id, e.type, payload]
);
}
// enqueue outbox rows for each event
for (const e of events) {
await db.query(
'INSERT INTO outbox (outbox_id, event_type, payload, occurred_at) VALUES ($1,$2,$3,$4)',
[uuid(), e.type, JSON.stringify(e.data), new Date()]
);
}
await db.query('COMMIT');
} catch (err) {
await db.query('ROLLBACK');
throw err;
}
}
4) Outbox publishing workflow
- Worker polls pending outbox rows with a small batch, publishes to broker, then marks as published or updates retry count.
Pseudo-code (Node.js)
import { Client } from 'pg';
import amqplib from 'amqplib';
async function publishOutboxBatch(pgClient, mqChannel, batchSize = 50) {
const res = await pgClient.query(
SELECT outbox_id, event_type, payload, [batchSize]
FROM outbox
WHERE status = 'pending'
ORDER BY occurred_at ASC
LIMIT $1 FOR UPDATE SKIP LOCKED
);
for (const row of res.rows) {
try {
const routingKey = row.event_type;
const message = Buffer.from(row.payload);
await mqChannel.publish('orders', routingKey, message, { persistent: true });
// mark published
await pgClient.query(
'UPDATE outbox SET status = $1, published_at = NOW() WHERE outbox_id = $2',
['sent', row.outbox_id]
);
} catch (err) {
// retry strategy
await pgClient.query(
'UPDATE outbox SET retry_count = retry_count + 1, status = $1 WHERE outbox_id = $2',
['pending', row.outbox_id]
);
}
}
}
Notes:
- We use FOR UPDATE SKIP LOCKED to safely paginate in concurrent workers.
- We publish to an exchange named 'orders' with routing keys matching event_type.
- The consumer side must be idempotent.
Step 2: Publishable events to RabbitMQ
- Use a durable exchange (direct or topic). For simplicity, use a direct exchange named "orders".
- Each event type maps to a routing key, e.g., OrderCreated -> "order.created".
Producer setup (Node.js with amqplib)
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange('orders', 'direct', { durable: true });
Then publish:
channel.publish('orders', 'OrderCreated', Buffer.from(JSON.stringify(payload)), { persistent: true });
Step 3: Consumer design (idempotent handlers)
Downstream services consume events by binding queues to the exchange with appropriate routing keys.
Example consumer for read model
- Bind queue "orders-readmodel" to exchange "orders" with routing keys: "OrderCreated", "OrderPaid", "OrderShipped"
- Use a single source of truth for idempotence: store a processed_event_id or maintain a offsets table.
Code sketch (TypeScript)
type ReadModelState = { orderId: string; status: string; total: number; customerId: string };
async function handleEvent(evt: any, db) {
const { event_type, payload } = evt;
switch (event_type) {
case 'OrderCreated':
// create read-model entry
await db.query(
'INSERT INTO read_models (order_id, status, total, customer_id) VALUES ($1,$2,$3,$4)
ON CONFLICT (order_id) DO UPDATE SET status = EXCLUDED.status',
[payload.orderId, 'created', payload.totalAmount, payload.customerId]
);
break;
case 'OrderPaid':
await db.query(
'UPDATE read_models SET status = $1 WHERE order_id = $2',
['paid', payload.orderId]
);
break;
case 'OrderShipped':
await db.query(
'UPDATE read_models SET status = $1, tracking_code = $2 WHERE order_id = $3',
['shipped', payload.trackingCode ?? null, payload.orderId]
);
break;
}
// Idempotence: ensure you don’t apply the same payload twice. You can store a processed_event_id
}
Guidance for idempotence
- Include event_id in the outbox as a unique key and replay safety
- The consumer should check a processed_events table to skip duplicates
- Use upsert semantics or a compensation mechanism if needed
Step 4: Testing strategy
- Unit tests for domain logic: aggregates and event generation
- Integration tests for outbox publish path:
- Mock database and broker
- Verify that an insert into outbox results in a published message and a published_at timestamp
- End-to-end tests:
- Create an order, pay it, ship it
- Assert read model reflects the final state
- Verify messages reach downstream services and handlers run idempotently
Sample test outline (pseudo)
- Given a new order with OrderCreated
- When outbox worker processes
- Then RabbitMQ receives a message with routing key OrderCreated
- And read_model shows status 'created'
Step 5: Observability and operations
- Correlate logs with a request or operation id across services
- Collect metrics:
- Outbox pending count
- Publish success/failure counts
- Consumer lag (time since last processed event)
- Health checks:
- Outbox worker heartbeat
- DB connection pool health
- RabbitMQ connection status
Step 6: Deployment considerations
- Keep events and outbox in the same transactional boundary for consistency where possible, but you can separate through transactions across two-phase commit patterns or append-only designs
- Ensure retries are bounded with exponential backoff
- Use feature flags to switch to alternative publication strategies if needed
Illustrative example: end-to-end flow
- Customer creates an order: write to orders, generate OrderCreated event, write to events table, enqueue outbox entry
- Order service’s outbox worker reads the outbox entry, publishes an OrderCreated event to RabbitMQ, marks as sent
- Read-model service subscribes to OrderCreated, updates its local read model
- If the Order is paid and shipped, subsequent events flow similarly, updating the read model
What to watch for
- Idempotence is essential at the consumer level; messages can be redelivered
- Outbox size can grow; implement purging or archiving after a retention period
- Monitoring and retries prevent silent failures from snowballing
Concrete starter repo structure
- src/
- domain/
- events.ts // Domain event definitions and serializers
- order.ts // Order aggregate with event generation
- infra/
- db.ts // Postgres client, migrations
- outbox.ts // Outbox read/publish logic
- mq.ts // RabbitMQ connection and channels
- consumers/
- readmodel.ts // Idempotent event handlers
- tests/
- unit/
- integration/
- e2e/
- migrations/
- 001_create_schema.sql
- 002_add_read_model.sql
Example code snippet: simple outbox record publication (TypeScript)
async function publishPendingOutboxes(db, mq) {
const batch = await db.query(
SELECT outbox_id, event_type, payload FROM outbox WHERE status = 'pending' ORDER BY occurred_at ASC LIMIT 25 FOR UPDATE SKIP LOCKED
);
const channel = mq.channel;
for (const row of batch.rows) {
try {
channel.publish('orders', row.event_type, Buffer.from(row.payload), { persistent: true });
await db.query('UPDATE outbox SET status = $1, published_at = NOW() WHERE outbox_id = $2', ['sent', row.outbox_id]);
} catch (e) {
await db.query('UPDATE outbox SET retry_count = retry_count + 1 WHERE outbox_id = $1', [row.outbox_id]);
}
}
}
Why this approach helps practical software engineering
- It gives you a reliable backbone for real-time data propagation with a clear boundary between write and read models
- The outbox pattern decouples the database transaction from the messaging system, reducing the risk of message loss due to crash or partial failures
- Idempotent consumer design reduces the complexity of retry semantics and makes the system robust in real-world distributed environments
Next steps and refinements
- Introduce a dedicated event store (append-only log) for full auditability
- Add snapshots for faster rebuilds of read models
- Consider using a streaming platform (Kafka) for high-throughput scenarios and better exactly-once semantics with idempotent processors
- Add schema versioning to events to evolve the system without breaking consumers
If you’d like, I can tailor the code to a specific tech stack you’re using (e.g., Python with PostgreSQL and Kafka, or Go with NATS), or provide a complete repository scaffold with migrations, tests, and CI setup. Would you prefer a Node.js/TypeScript implementation with PostgreSQL and RabbitMQ, or another stack?
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)