DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Efficient real-time data pipelines with event-sourced microservices and the outbox pattern

Efficient real-time data pipelines with event-sourced microservices and the outbox pattern

Efficient real-time data pipelines with event-sourced microservices and the outbox pattern

In modern distributed systems, you often need near-real-time data synchronization across services without sacrificing correctness. This tutorial shows how to design and implement an event-sourced microservice architecture that uses the outbox pattern to reliably publish integration events to downstream consumers. You’ll get practical guidance, architecture decisions, and concrete code you can adapt.

Key goals

  • Build a simple, observable event-sourced service
  • Ensure at-least-once delivery of events to a message broker
  • Decouple services using an outbox table and background workers
  • Achieve idempotent event handling in consumers
  • Monitor, test, and operate the pipeline in production

Scenario
We’ll model a small domain: an order service that persists domain events (OrderCreated, OrderPaid, OrderShipped) in an event store and publishes corresponding integration events to a message broker (RabbitMQ). We’ll implement the outbox pattern to guarantee delivery even if the producer crashes, and we’ll show a robust consumer design with idempotent handlers.

What you’ll learn

  • Event sourcing basics and why it helps with replayability and auditability
  • The outbox pattern: using a local outbox table to reliably publish messages
  • Idempotent consumers: ensuring repeated deliveries don’t cause duplicates
  • Practical code in a modern language (Node.js with TypeScript and PostgreSQL)
  • Testing strategies: unit, integration, and end-to-end tests for the pipeline
  • Observability: tracing, metrics, and alerting hooks

Architecture overview

  • Order service (write model)
    • PostgreSQL stores aggregates and events
    • Outbox table persists integration events with a committed flag
    • Event publishing worker reads outbox entries, publishes to RabbitMQ, then marks as published
  • Message broker (RabbitMQ)
    • Exchanges and queues per event type
    • Durable queues, persistent messages
  • Downstream services (read models)
    • Subscribed to integration events
    • Idempotent event handlers update read models
  • Observability
    • Structured logs, correlation IDs across services
    • Metrics: outbox table size, publish success rate, consumer lag

Minimum viable data model

  • orders table: order_id (PK), customer_id, status, total_amount, created_at
  • events table: event_id (PK), order_id, event_type, payload (JSON), occurred_at
  • outbox table: outbox_id (PK), event_type, payload (JSON), occurred_at, published_at (nullable), status (enum: 'pending', 'sent', 'failed'), retry_count

Step 1: Define domain events and event store

  • Event types: OrderCreated, OrderPaid, OrderShipped
  • Each domain event includes a payload with the relevant data and a snapshot version

Code sketch (TypeScript with Node.js and PostgreSQL)

  • Setup: Use PostgreSQL via pg and TypeORM or Prisma. For clarity, this example uses plain SQL with a small helper.

1) Database schema (simplified)
CREATE TABLE orders (
order_id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
status TEXT NOT NULL,
total_amount NUMERIC(12,2) NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now()
);

CREATE TABLE events (
event_id UUID PRIMARY KEY,
order_id UUID NOT NULL REFERENCES orders(order_id),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now()
);

CREATE TABLE outbox (
outbox_id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(),
published_at TIMESTAMP WITHOUT TIME ZONE,
status TEXT NOT NULL DEFAULT 'pending',
retry_count INT NOT NULL DEFAULT 0
);

CREATE INDEX idx_outbox_pending ON outbox (status) WHERE status = 'pending';
CREATE INDEX idx_outbox_published ON outbox (published_at);

2) Domain event interfaces
type DomainEvent =
| { type: 'OrderCreated'; data: { orderId: string; customerId: string; totalAmount: number } }
| { type: 'OrderPaid'; data: { orderId: string; paidAt: string } }
| { type: 'OrderShipped'; data: { orderId: string; shippedAt: string; trackingCode?: string } };

3) Persisting domain events
async function saveOrderAndEvents(db, order) {
await db.query('BEGIN');
try {
await db.query(
'INSERT INTO orders (order_id, customer_id, status, total_amount) VALUES ($1,$2,$3,$4)',
[order.id, order.customerId, order.status, order.total]
);
const events: DomainEvent[] = order.events;
for (const e of events) {
const payload = JSON.stringify(e.data);
await db.query(
'INSERT INTO events (event_id, order_id, event_type, payload) VALUES ($1,$2,$3,$4)',
[uuid(), order.id, e.type, payload]
);
}
// enqueue outbox rows for each event
for (const e of events) {
await db.query(
'INSERT INTO outbox (outbox_id, event_type, payload, occurred_at) VALUES ($1,$2,$3,$4)',
[uuid(), e.type, JSON.stringify(e.data), new Date()]
);
}
await db.query('COMMIT');
} catch (err) {
await db.query('ROLLBACK');
throw err;
}
}

4) Outbox publishing workflow

  • Worker polls pending outbox rows with a small batch, publishes to broker, then marks as published or updates retry count.

Pseudo-code (Node.js)
import { Client } from 'pg';
import amqplib from 'amqplib';

async function publishOutboxBatch(pgClient, mqChannel, batchSize = 50) {
const res = await pgClient.query(
SELECT outbox_id, event_type, payload
FROM outbox
WHERE status = 'pending'
ORDER BY occurred_at ASC
LIMIT $1 FOR UPDATE SKIP LOCKED
, [batchSize]
);
for (const row of res.rows) {
try {
const routingKey = row.event_type;
const message = Buffer.from(row.payload);
await mqChannel.publish('orders', routingKey, message, { persistent: true });

  // mark published
  await pgClient.query(
    'UPDATE outbox SET status = $1, published_at = NOW() WHERE outbox_id = $2',
    ['sent', row.outbox_id]
  );
} catch (err) {
  // retry strategy
  await pgClient.query(
    'UPDATE outbox SET retry_count = retry_count + 1, status = $1 WHERE outbox_id = $2',
    ['pending', row.outbox_id]
  );
}
Enter fullscreen mode Exit fullscreen mode

}
}

Notes:

  • We use FOR UPDATE SKIP LOCKED to safely paginate in concurrent workers.
  • We publish to an exchange named 'orders' with routing keys matching event_type.
  • The consumer side must be idempotent.

Step 2: Publishable events to RabbitMQ

  • Use a durable exchange (direct or topic). For simplicity, use a direct exchange named "orders".
  • Each event type maps to a routing key, e.g., OrderCreated -> "order.created".

Producer setup (Node.js with amqplib)
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange('orders', 'direct', { durable: true });

Then publish:
channel.publish('orders', 'OrderCreated', Buffer.from(JSON.stringify(payload)), { persistent: true });

Step 3: Consumer design (idempotent handlers)
Downstream services consume events by binding queues to the exchange with appropriate routing keys.

Example consumer for read model

  • Bind queue "orders-readmodel" to exchange "orders" with routing keys: "OrderCreated", "OrderPaid", "OrderShipped"
  • Use a single source of truth for idempotence: store a processed_event_id or maintain a offsets table.

Code sketch (TypeScript)
type ReadModelState = { orderId: string; status: string; total: number; customerId: string };

async function handleEvent(evt: any, db) {
const { event_type, payload } = evt;
switch (event_type) {
case 'OrderCreated':
// create read-model entry
await db.query(
'INSERT INTO read_models (order_id, status, total, customer_id) VALUES ($1,$2,$3,$4)
ON CONFLICT (order_id) DO UPDATE SET status = EXCLUDED.status',
[payload.orderId, 'created', payload.totalAmount, payload.customerId]
);
break;
case 'OrderPaid':
await db.query(
'UPDATE read_models SET status = $1 WHERE order_id = $2',
['paid', payload.orderId]
);
break;
case 'OrderShipped':
await db.query(
'UPDATE read_models SET status = $1, tracking_code = $2 WHERE order_id = $3',
['shipped', payload.trackingCode ?? null, payload.orderId]
);
break;
}
// Idempotence: ensure you don’t apply the same payload twice. You can store a processed_event_id
}

Guidance for idempotence

  • Include event_id in the outbox as a unique key and replay safety
  • The consumer should check a processed_events table to skip duplicates
  • Use upsert semantics or a compensation mechanism if needed

Step 4: Testing strategy

  • Unit tests for domain logic: aggregates and event generation
  • Integration tests for outbox publish path:
    • Mock database and broker
    • Verify that an insert into outbox results in a published message and a published_at timestamp
  • End-to-end tests:
    • Create an order, pay it, ship it
    • Assert read model reflects the final state
    • Verify messages reach downstream services and handlers run idempotently

Sample test outline (pseudo)

  • Given a new order with OrderCreated
  • When outbox worker processes
  • Then RabbitMQ receives a message with routing key OrderCreated
  • And read_model shows status 'created'

Step 5: Observability and operations

  • Correlate logs with a request or operation id across services
  • Collect metrics:
    • Outbox pending count
    • Publish success/failure counts
    • Consumer lag (time since last processed event)
  • Health checks:
    • Outbox worker heartbeat
    • DB connection pool health
    • RabbitMQ connection status

Step 6: Deployment considerations

  • Keep events and outbox in the same transactional boundary for consistency where possible, but you can separate through transactions across two-phase commit patterns or append-only designs
  • Ensure retries are bounded with exponential backoff
  • Use feature flags to switch to alternative publication strategies if needed

Illustrative example: end-to-end flow

  • Customer creates an order: write to orders, generate OrderCreated event, write to events table, enqueue outbox entry
  • Order service’s outbox worker reads the outbox entry, publishes an OrderCreated event to RabbitMQ, marks as sent
  • Read-model service subscribes to OrderCreated, updates its local read model
  • If the Order is paid and shipped, subsequent events flow similarly, updating the read model

What to watch for

  • Idempotence is essential at the consumer level; messages can be redelivered
  • Outbox size can grow; implement purging or archiving after a retention period
  • Monitoring and retries prevent silent failures from snowballing

Concrete starter repo structure

  • src/
    • domain/
    • events.ts // Domain event definitions and serializers
    • order.ts // Order aggregate with event generation
    • infra/
    • db.ts // Postgres client, migrations
    • outbox.ts // Outbox read/publish logic
    • mq.ts // RabbitMQ connection and channels
    • consumers/
    • readmodel.ts // Idempotent event handlers
    • tests/
    • unit/
    • integration/
    • e2e/
  • migrations/
    • 001_create_schema.sql
    • 002_add_read_model.sql

Example code snippet: simple outbox record publication (TypeScript)
async function publishPendingOutboxes(db, mq) {
const batch = await db.query(
SELECT outbox_id, event_type, payload FROM outbox WHERE status = 'pending' ORDER BY occurred_at ASC LIMIT 25 FOR UPDATE SKIP LOCKED
);
const channel = mq.channel;
for (const row of batch.rows) {
try {
channel.publish('orders', row.event_type, Buffer.from(row.payload), { persistent: true });
await db.query('UPDATE outbox SET status = $1, published_at = NOW() WHERE outbox_id = $2', ['sent', row.outbox_id]);
} catch (e) {
await db.query('UPDATE outbox SET retry_count = retry_count + 1 WHERE outbox_id = $1', [row.outbox_id]);
}
}
}

Why this approach helps practical software engineering

  • It gives you a reliable backbone for real-time data propagation with a clear boundary between write and read models
  • The outbox pattern decouples the database transaction from the messaging system, reducing the risk of message loss due to crash or partial failures
  • Idempotent consumer design reduces the complexity of retry semantics and makes the system robust in real-world distributed environments

Next steps and refinements

  • Introduce a dedicated event store (append-only log) for full auditability
  • Add snapshots for faster rebuilds of read models
  • Consider using a streaming platform (Kafka) for high-throughput scenarios and better exactly-once semantics with idempotent processors
  • Add schema versioning to events to evolve the system without breaking consumers

If you’d like, I can tailor the code to a specific tech stack you’re using (e.g., Python with PostgreSQL and Kafka, or Go with NATS), or provide a complete repository scaffold with migrations, tests, and CI setup. Would you prefer a Node.js/TypeScript implementation with PostgreSQL and RabbitMQ, or another stack?

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)