Building a Transactional Outbox in Python
Building a Transactional Outbox in Python
When your app writes to the database and also needs to publish an event, the transactional outbox pattern keeps those two actions consistent. It solves the classic “saved in one place, failed in the other” problem without requiring a distributed transaction.
Why this pattern matters
A common failure mode is writing an order to your database and then failing before the “order-created” message reaches a queue. If you retry the request naively, you can create duplicate side effects or miss events entirely. The outbox pattern avoids that by storing the event in the same database transaction as the business change, then letting a separate worker publish it later.
This is especially useful for payment systems, notifications, inventory updates, and any workflow where “exactly once” is less realistic than “at least once, but safe to repeat.” Idempotency and retries are the other half of the story, because your worker may publish the same event more than once.
What you will build
You will build a small Python service with these parts:
- A
orderstable for business data. - An
outboxtable for pending events. - A transaction that writes both records together.
- A publisher worker that reads unsent outbox rows and sends them to a message broker.
- An idempotent consumer that ignores duplicates.
The code below uses SQLite for simplicity, but the same design works with PostgreSQL or MySQL. The important part is the transaction boundary, not the specific database.
Database schema
Start with two tables. The orders table holds your business state, and the outbox table stores events that must eventually be published.
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
import json
import uuid
DB_PATH = "app.db"
@contextmanager
def db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
def init_db():
with db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS orders (
id TEXT PRIMARY KEY,
customer_email TEXT NOT NULL,
amount_cents INTEGER NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS outbox (
id TEXT PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
published_at TEXT,
attempts INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_outbox_unpublished
ON outbox(published_at, created_at)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS inbox (
event_id TEXT PRIMARY KEY,
processed_at TEXT NOT NULL
)
""")
Writing business data and event together
The key rule is simple: if the order is saved, the outbox record must also be saved, in the same transaction. That way, either both survive or neither does.
def now():
return datetime.now(timezone.utc).isoformat()
def create_order(customer_email, amount_cents):
order_id = str(uuid.uuid4())
event_id = str(uuid.uuid4())
order_row = {
"id": order_id,
"customer_email": customer_email,
"amount_cents": amount_cents,
"status": "created",
"created_at": now()
}
event_payload = {
"event_id": event_id,
"order_id": order_id,
"customer_email": customer_email,
"amount_cents": amount_cents
}
with db() as conn:
conn.execute(
"""
INSERT INTO orders (id, customer_email, amount_cents, status, created_at)
VALUES (:id, :customer_email, :amount_cents, :status, :created_at)
""",
order_row
)
conn.execute(
"""
INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
event_id,
"order",
order_id,
"OrderCreated",
json.dumps(event_payload),
now()
)
)
return order_id
Notice that the order and the event use the same database transaction. If the insert into outbox fails, the order insert is rolled back too. That prevents the “order exists, but nobody knows about it” problem.
Publishing from the outbox
Now add a worker that scans for unpublished events and sends them to your broker. In a real app, this could be Kafka, RabbitMQ, SQS, or a webhook endpoint. For the tutorial, we will simulate publishing with a function that may fail randomly.
import random
import time
def publish_to_broker(event_type, payload):
if random.random() < 0.2:
raise RuntimeError("Transient broker failure")
print(f"Published: {event_type} -> {payload['event_id']}")
def claim_batch(limit=10):
with db() as conn:
rows = conn.execute(
"""
SELECT *
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT ?
""",
(limit,)
).fetchall()
return rows
def mark_published(event_id):
with db() as conn:
conn.execute(
"UPDATE outbox SET published_at = ? WHERE id = ?",
(now(), event_id)
)
def increment_attempts(event_id):
with db() as conn:
conn.execute(
"UPDATE outbox SET attempts = attempts + 1 WHERE id = ?",
(event_id,)
)
def outbox_worker_once():
rows = claim_batch()
for row in rows:
payload = json.loads(row["payload"])
try:
publish_to_broker(row["event_type"], payload)
mark_published(row["id"])
except Exception:
increment_attempts(row["id"])
This worker is allowed to retry. That is normal, because transient failures are expected in real systems. The pattern assumes at-least-once delivery, so downstream consumers must be safe to process duplicate events.
Making consumers idempotent
If the publisher retries after a network timeout, the same event may arrive more than once. You prevent duplicate side effects by recording processed event IDs in an inbox table before applying the business action.
def handle_order_created(event):
event_id = event["event_id"]
with db() as conn:
existing = conn.execute(
"SELECT 1 FROM inbox WHERE event_id = ?",
(event_id,)
).fetchone()
if existing:
return "duplicate_ignored"
conn.execute(
"INSERT INTO inbox (event_id, processed_at) VALUES (?, ?)",
(event_id, now())
)
# Safe business action goes here.
# Example: send a welcome email, create a shipment, or update analytics.
print(f"Processed order {event['order_id']}")
return "processed"
This inbox table is a simple deduplication gate. If the same event is delivered twice, the second attempt is ignored before side effects happen. That is the practical way to get reliability without pretending the network will never fail.
End-to-end flow
Here is the whole flow in order:
- The API receives a request to create an order.
- It inserts the order row and outbox row in one transaction.
- A worker polls the outbox table.
- The worker publishes the event to the broker.
- A consumer receives the event and checks its inbox table.
- If the event was not seen before, the consumer processes it exactly once from its perspective.
That sequence gives you durability, retries, and duplicate safety without a distributed transaction. The system becomes easier to reason about because every step has one clear responsibility.
Practical safeguards
There are a few details that make this pattern hold up in production. First, index the unpublished outbox rows so workers can fetch them efficiently. Second, keep outbox payloads compact and versioned. Third, track retry attempts so poison messages do not spin forever.
A few improvements you can add next:
- Add a
locked_atcolumn so multiple workers do not publish the same row at once. - Add exponential backoff for failed publishes.
- Move failed rows to a dead-letter table after a retry threshold.
- Include schema versioning in the event payload.
- Emit structured logs for
order_id,event_id, andattempts.
Testing the pattern
You should test three cases. The first is the happy path, where an order is created and one event is published. The second is a crash between the order insert and publish step, which should still leave a row in the outbox. The third is a duplicate event delivery, which should be ignored by the inbox table.
def test_duplicate_event_is_ignored():
event = {
"event_id": "evt-123",
"order_id": "ord-456",
"customer_email": "test@example.com",
"amount_cents": 5000
}
first = handle_order_created(event)
second = handle_order_created(event)
assert first == "processed"
assert second == "duplicate_ignored"
This kind of test is valuable because it checks behavior under failure, not just normal execution. That is where the outbox pattern earns its keep.
When to use it
Use the transactional outbox when a database write must stay in sync with an external message or side effect. It is a strong fit for orders, billing, user notifications, and event-driven integrations.
Do not use it for everything. If you only need a local state change with no downstream event, the pattern adds unnecessary complexity. If your workflow needs strict orchestration across many services, a workflow engine or saga may be a better fit.
Next steps
A good follow-up is to swap the polling worker for a database change stream or CDC-based publisher, which can reduce latency and simplify delivery. Another useful extension is adding metrics for unpublished rows, publish lag, and retry counts so you can see backlog growth early.
The main idea is straightforward: write the business change and the event together, then publish later, and make every downstream step safe to repeat. That one design choice eliminates a large class of consistency bugs.
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)