- Book: Event-Driven Architecture Pocket Guide: Saga, CQRS, Outbox, and the Traps Nobody Warns You About
- My project: Hermes IDE | GitHub — an IDE for developers who ship with Claude Code and other AI coding tools
- Me: xgabriel.com | GitHub
You discover the bug at 2:14 on a Tuesday. The orders_read projection has been flattening line items into a JSON blob for eighteen months, and the new "items shipped in last 30 days" dashboard needs to filter on SKU. The schema is wrong. There are 84 million rows. Rebuilding the projection from the event log takes about three hours on a warm replica. Your SLO budget for the quarter is 22 minutes and you've already burned 11.
You can't take a four-hour outage to rebuild the read side. You also can't ship the new query against the wrong shape. This is the boring problem CQRS was supposed to make easy, and it is, once you stop thinking about the rebuild as an outage and start thinking about it as a parallel deployment. Two read models, one event log, a flag, and a parity check. That's the whole pattern.
The pieces below assume Postgres for the read side and Kafka as the durable event log. The shape transfers to any pair where the log keeps full history and the read store can be re-populated from it. Marten's Critter Stack shipped this in March 2025 as a first-class blue-green; the same pattern works by hand if you don't have framework support.
The shape of the rebuild
Five steps, in this order. Skip none.
- Stand up
orders_read_v2next toorders_read_v1. New schema, new table, same database is fine. - Dual-write the projection: every event the consumer handles updates both v1 and v2.
- Replay the historical events from the log into v2 only. v1 is already correct; don't touch it.
- Shadow-read: every query hits v1 to serve the response and v2 to compare. Log mismatches.
- When the mismatch rate is zero for a sustained window, flip the read flag to v2. Wait. Drop v1.
The replay and the live tail meet somewhere in the middle. The dual-write is what makes that meeting safe. Without it, events that arrive during the replay race with the historical events and you get gaps. With it, every event lands in v2 twice (once from replay, once from the live consumer) and your projection has to be idempotent. That's the price of admission and it's cheaper than an outage.
Here are the two tables side by side. v1 is what you already have; v2 is the target.
-- v1: the existing, wrong shape.
CREATE TABLE orders_read_v1 (
order_id uuid PRIMARY KEY,
customer_id uuid NOT NULL,
status text NOT NULL,
items_json jsonb NOT NULL,
total_cents bigint NOT NULL,
updated_at timestamptz NOT NULL,
last_event_id bigint NOT NULL
);
-- v2: the new, correct shape. Items get their own table so
-- you can filter on sku without jsonb gymnastics.
CREATE TABLE orders_read_v2 (
order_id uuid PRIMARY KEY,
customer_id uuid NOT NULL,
status text NOT NULL,
total_cents bigint NOT NULL,
shipped_at timestamptz,
updated_at timestamptz NOT NULL,
last_event_id bigint NOT NULL
);
CREATE TABLE order_items_read_v2 (
order_id uuid NOT NULL
REFERENCES orders_read_v2(order_id) ON DELETE CASCADE,
line_no int NOT NULL,
sku text NOT NULL,
qty int NOT NULL,
PRIMARY KEY (order_id, line_no)
);
CREATE INDEX ON order_items_read_v2 (sku);
The last_event_id column on each row is the load-bearing detail. It's the offset of the last event applied to that aggregate. The projection writes it on every update, and the replay refuses to apply an event whose offset is less than or equal to the current last_event_id. That's how you make the projection idempotent without bolting on a separate dedup table.
Dual-write the projection
Your consumer was already writing to v1. Add v2 in the same transaction. Both writes succeed or neither does. That's non-negotiable; otherwise the parity check below is meaningless.
# consumer.py: runs against the live tail of the Kafka topic.
import psycopg
from confluent_kafka import Consumer
def apply(conn, event):
eid = event["offset"]
with conn.transaction():
apply_v1(conn, event, eid)
apply_v2(conn, event, eid)
def apply_v2(conn, event, eid):
if event["type"] == "OrderPlaced":
conn.execute("""
INSERT INTO orders_read_v2
(order_id, customer_id, status,
total_cents, updated_at, last_event_id)
VALUES (%s, %s, 'placed', %s, now(), %s)
ON CONFLICT (order_id) DO UPDATE
SET last_event_id = EXCLUDED.last_event_id
WHERE orders_read_v2.last_event_id < EXCLUDED.last_event_id
""", (event["order_id"], event["customer_id"],
event["total_cents"], eid))
for i, item in enumerate(event["items"]):
conn.execute("""
INSERT INTO order_items_read_v2
(order_id, line_no, sku, qty)
VALUES (%s, %s, %s, %s)
ON CONFLICT (order_id, line_no) DO NOTHING
""", (event["order_id"], i, item["sku"], item["qty"]))
The WHERE orders_read_v2.last_event_id < EXCLUDED.last_event_id clause is the idempotency guard. An event that's already been applied (because the replay raced ahead of the live tail, or because Kafka redelivered) becomes a no-op. Run the same event ten times; the row settles at the same state.
The replay
Replay reads from the beginning of the topic and writes only to v2. It uses a separate consumer group so it doesn't poison the live consumer's offsets. Run it on a worker pool. The throughput ceiling is Postgres write bandwidth, not Kafka.
# replay.py: one-shot job, separate consumer group.
from confluent_kafka import Consumer, TopicPartition
cfg = {
"bootstrap.servers": "kafka:9092",
"group.id": "orders-read-v2-replay",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
}
c = Consumer(cfg)
c.assign([TopicPartition("orders", p, 0) for p in range(12)])
with psycopg.connect(DSN) as conn:
while True:
msg = c.poll(1.0)
if msg is None:
if all_caught_up(c):
break
continue
event = decode(msg)
with conn.transaction():
apply_v2(conn, event, msg.offset())
Two details that bite teams running this for the first time. First, batch your transactions. Committing every event drops throughput by 10x or more. Group 500 events per transaction and the replay finishes in a fraction of the time. Second, monitor the high-water mark of v2's last_event_id against the topic head. When the gap stops shrinking, you're caught up. That's the signal to start shadow-reads, not a fixed timer.
Shadow-read for parity
The query layer reads from v1 to serve the response. In the same request, asynchronously, it reads from v2 and compares. Mismatches go to a log with the event offsets that got us here.
# query_layer.py: flag-gated, instrumented.
def get_order(order_id):
v1 = fetch_v1(order_id)
if FLAG.get("orders_read_v2_shadow"):
try:
v2 = fetch_v2(order_id)
if not equivalent(v1, v2):
log.warning("parity_mismatch",
order_id=order_id,
v1=v1, v2=v2)
except Exception as e:
log.error("v2_shadow_failed", err=str(e))
return v1
The equivalent function is where the work hides. v1's items_json is unordered; v2's order_items_read_v2 is ordered by line_no. Normalize both sides before comparing. Ignore updated_at skew within a few hundred milliseconds. Don't compare last_event_id; it will legitimately differ between the two if v2 caught a later event the live consumer hasn't written to v1 yet.
The parity-check script
Shadow-read covers what users query. It does not cover the long tail of orders nobody opened in the last week. Run a sweep.
# parity_check.py: paginates through orders, flags drift.
import psycopg, json
BATCH = 5000
def normalize(items):
return sorted(
[(i["sku"], i["qty"]) for i in items],
key=lambda x: x[0],
)
def check_batch(conn, after):
rows = conn.execute("""
SELECT v1.order_id, v1.status, v1.total_cents,
v1.items_json, v2.status, v2.total_cents,
COALESCE(json_agg(json_build_object(
'sku', i.sku, 'qty', i.qty)), '[]'::json)
FROM orders_read_v1 v1
JOIN orders_read_v2 v2 USING (order_id)
LEFT JOIN order_items_read_v2 i USING (order_id)
WHERE v1.order_id > %s
GROUP BY v1.order_id, v2.order_id
ORDER BY v1.order_id
LIMIT %s
""", (after, BATCH)).fetchall()
return rows
def diff(rows):
drift = []
for r in rows:
oid, s1, t1, items1, s2, t2, items2 = r
if s1 != s2 or t1 != t2:
drift.append((oid, "header", s1, s2, t1, t2))
continue
if normalize(items1) != normalize(items2):
drift.append((oid, "items"))
return drift
Run the sweep nightly during the rebuild. The drift count should fall to zero and stay there. If it doesn't, you have a bug in apply_v2 and the cutover waits. The most common cause is an event type the v2 projection forgot to handle: OrderRefunded, OrderItemReplaced, the one that gets emitted twice a month. Your test suite missed it because your test corpus is two weeks of fixtures.
The cutover
A flag flip, a hold, a drop. In that order.
# At T=0: flip read traffic to v2.
FLAG.set("orders_read_source", "v2")
# At T+24h: stop dual-writing if parity has held.
FLAG.set("orders_read_dual_write", False)
# At T+7d: drop v1.
# DROP TABLE orders_read_v1;
The 24-hour hold is for rollback. If a v2 query returns garbage for a customer that nobody saw during shadow-read (a regional edge case, a legacy aggregate from 2019), you flip the flag back to v1 and v1 still has the latest state because dual-write was on. The seven-day hold before the drop is for the auditor who shows up on day five asking why the report shape changed. Keep v1 readable until that conversation is done.
The pattern compresses well in your head. Two tables, one transaction, one replay, one parity sweep, one flag with three positions. The work that takes a week is rarely the code; it's the schema migration, the equivalence function, and the long tail of event types the v1 projection silently absorbed and the v2 projection has to reproduce. Plan the calendar around that, not the replay job.
If this was useful
Chapter 6 of Event-Driven Architecture Pocket Guide walks the CQRS read-side patterns end to end: projection idempotency, replay tooling, the parity-check checklist this post compresses, and the failure modes that show up around the dual-write window. If you're staring at a read model you can't afford to rebuild in place, it's the chapter to read first.

Top comments (0)