DEV Community

Cover image for Rebuild a CQRS Read Model With Zero Downtime
Gabriel Anhaia
Gabriel Anhaia

Posted on

Rebuild a CQRS Read Model With Zero Downtime


You discover the bug at 2:14 on a Tuesday. The orders_read projection has been flattening line items into a JSON blob for eighteen months, and the new "items shipped in last 30 days" dashboard needs to filter on SKU. The schema is wrong. There are 84 million rows. Rebuilding the projection from the event log takes about three hours on a warm replica. Your SLO budget for the quarter is 22 minutes and you've already burned 11.

You can't take a four-hour outage to rebuild the read side. You also can't ship the new query against the wrong shape. This is the boring problem CQRS was supposed to make easy, and it is, once you stop thinking about the rebuild as an outage and start thinking about it as a parallel deployment. Two read models, one event log, a flag, and a parity check. That's the whole pattern.

The pieces below assume Postgres for the read side and Kafka as the durable event log. The shape transfers to any pair where the log keeps full history and the read store can be re-populated from it. Marten's Critter Stack shipped this in March 2025 as a first-class blue-green; the same pattern works by hand if you don't have framework support.

The shape of the rebuild

Five steps, in this order. Skip none.

  1. Stand up orders_read_v2 next to orders_read_v1. New schema, new table, same database is fine.
  2. Dual-write the projection: every event the consumer handles updates both v1 and v2.
  3. Replay the historical events from the log into v2 only. v1 is already correct; don't touch it.
  4. Shadow-read: every query hits v1 to serve the response and v2 to compare. Log mismatches.
  5. When the mismatch rate is zero for a sustained window, flip the read flag to v2. Wait. Drop v1.

The replay and the live tail meet somewhere in the middle. The dual-write is what makes that meeting safe. Without it, events that arrive during the replay race with the historical events and you get gaps. With it, every event lands in v2 twice (once from replay, once from the live consumer) and your projection has to be idempotent. That's the price of admission and it's cheaper than an outage.

Here are the two tables side by side. v1 is what you already have; v2 is the target.

-- v1: the existing, wrong shape.
CREATE TABLE orders_read_v1 (
  order_id      uuid PRIMARY KEY,
  customer_id   uuid NOT NULL,
  status        text NOT NULL,
  items_json    jsonb NOT NULL,
  total_cents   bigint NOT NULL,
  updated_at    timestamptz NOT NULL,
  last_event_id bigint NOT NULL
);
Enter fullscreen mode Exit fullscreen mode
-- v2: the new, correct shape. Items get their own table so
-- you can filter on sku without jsonb gymnastics.
CREATE TABLE orders_read_v2 (
  order_id      uuid PRIMARY KEY,
  customer_id   uuid NOT NULL,
  status        text NOT NULL,
  total_cents   bigint NOT NULL,
  shipped_at    timestamptz,
  updated_at    timestamptz NOT NULL,
  last_event_id bigint NOT NULL
);

CREATE TABLE order_items_read_v2 (
  order_id   uuid NOT NULL
             REFERENCES orders_read_v2(order_id) ON DELETE CASCADE,
  line_no    int  NOT NULL,
  sku        text NOT NULL,
  qty        int  NOT NULL,
  PRIMARY KEY (order_id, line_no)
);

CREATE INDEX ON order_items_read_v2 (sku);
Enter fullscreen mode Exit fullscreen mode

The last_event_id column on each row is the load-bearing detail. It's the offset of the last event applied to that aggregate. The projection writes it on every update, and the replay refuses to apply an event whose offset is less than or equal to the current last_event_id. That's how you make the projection idempotent without bolting on a separate dedup table.

Dual-write the projection

Your consumer was already writing to v1. Add v2 in the same transaction. Both writes succeed or neither does. That's non-negotiable; otherwise the parity check below is meaningless.

# consumer.py: runs against the live tail of the Kafka topic.
import psycopg
from confluent_kafka import Consumer

def apply(conn, event):
    eid = event["offset"]
    with conn.transaction():
        apply_v1(conn, event, eid)
        apply_v2(conn, event, eid)
Enter fullscreen mode Exit fullscreen mode
def apply_v2(conn, event, eid):
    if event["type"] == "OrderPlaced":
        conn.execute("""
          INSERT INTO orders_read_v2
            (order_id, customer_id, status,
             total_cents, updated_at, last_event_id)
          VALUES (%s, %s, 'placed', %s, now(), %s)
          ON CONFLICT (order_id) DO UPDATE
          SET last_event_id = EXCLUDED.last_event_id
          WHERE orders_read_v2.last_event_id < EXCLUDED.last_event_id
        """, (event["order_id"], event["customer_id"],
              event["total_cents"], eid))
        for i, item in enumerate(event["items"]):
            conn.execute("""
              INSERT INTO order_items_read_v2
                (order_id, line_no, sku, qty)
              VALUES (%s, %s, %s, %s)
              ON CONFLICT (order_id, line_no) DO NOTHING
            """, (event["order_id"], i, item["sku"], item["qty"]))
Enter fullscreen mode Exit fullscreen mode

The WHERE orders_read_v2.last_event_id < EXCLUDED.last_event_id clause is the idempotency guard. An event that's already been applied (because the replay raced ahead of the live tail, or because Kafka redelivered) becomes a no-op. Run the same event ten times; the row settles at the same state.

The replay

Replay reads from the beginning of the topic and writes only to v2. It uses a separate consumer group so it doesn't poison the live consumer's offsets. Run it on a worker pool. The throughput ceiling is Postgres write bandwidth, not Kafka.

# replay.py: one-shot job, separate consumer group.
from confluent_kafka import Consumer, TopicPartition

cfg = {
    "bootstrap.servers": "kafka:9092",
    "group.id": "orders-read-v2-replay",
    "enable.auto.commit": False,
    "auto.offset.reset": "earliest",
}
c = Consumer(cfg)
c.assign([TopicPartition("orders", p, 0) for p in range(12)])

with psycopg.connect(DSN) as conn:
    while True:
        msg = c.poll(1.0)
        if msg is None:
            if all_caught_up(c):
                break
            continue
        event = decode(msg)
        with conn.transaction():
            apply_v2(conn, event, msg.offset())
Enter fullscreen mode Exit fullscreen mode

Two details that bite teams running this for the first time. First, batch your transactions. Committing every event drops throughput by 10x or more. Group 500 events per transaction and the replay finishes in a fraction of the time. Second, monitor the high-water mark of v2's last_event_id against the topic head. When the gap stops shrinking, you're caught up. That's the signal to start shadow-reads, not a fixed timer.

Shadow-read for parity

The query layer reads from v1 to serve the response. In the same request, asynchronously, it reads from v2 and compares. Mismatches go to a log with the event offsets that got us here.

# query_layer.py: flag-gated, instrumented.
def get_order(order_id):
    v1 = fetch_v1(order_id)
    if FLAG.get("orders_read_v2_shadow"):
        try:
            v2 = fetch_v2(order_id)
            if not equivalent(v1, v2):
                log.warning("parity_mismatch",
                            order_id=order_id,
                            v1=v1, v2=v2)
        except Exception as e:
            log.error("v2_shadow_failed", err=str(e))
    return v1
Enter fullscreen mode Exit fullscreen mode

The equivalent function is where the work hides. v1's items_json is unordered; v2's order_items_read_v2 is ordered by line_no. Normalize both sides before comparing. Ignore updated_at skew within a few hundred milliseconds. Don't compare last_event_id; it will legitimately differ between the two if v2 caught a later event the live consumer hasn't written to v1 yet.

The parity-check script

Shadow-read covers what users query. It does not cover the long tail of orders nobody opened in the last week. Run a sweep.

# parity_check.py: paginates through orders, flags drift.
import psycopg, json

BATCH = 5000

def normalize(items):
    return sorted(
        [(i["sku"], i["qty"]) for i in items],
        key=lambda x: x[0],
    )

def check_batch(conn, after):
    rows = conn.execute("""
      SELECT v1.order_id, v1.status, v1.total_cents,
             v1.items_json, v2.status, v2.total_cents,
             COALESCE(json_agg(json_build_object(
               'sku', i.sku, 'qty', i.qty)), '[]'::json)
      FROM orders_read_v1 v1
      JOIN orders_read_v2 v2 USING (order_id)
      LEFT JOIN order_items_read_v2 i USING (order_id)
      WHERE v1.order_id > %s
      GROUP BY v1.order_id, v2.order_id
      ORDER BY v1.order_id
      LIMIT %s
    """, (after, BATCH)).fetchall()
    return rows
Enter fullscreen mode Exit fullscreen mode
def diff(rows):
    drift = []
    for r in rows:
        oid, s1, t1, items1, s2, t2, items2 = r
        if s1 != s2 or t1 != t2:
            drift.append((oid, "header", s1, s2, t1, t2))
            continue
        if normalize(items1) != normalize(items2):
            drift.append((oid, "items"))
    return drift
Enter fullscreen mode Exit fullscreen mode

Run the sweep nightly during the rebuild. The drift count should fall to zero and stay there. If it doesn't, you have a bug in apply_v2 and the cutover waits. The most common cause is an event type the v2 projection forgot to handle: OrderRefunded, OrderItemReplaced, the one that gets emitted twice a month. Your test suite missed it because your test corpus is two weeks of fixtures.

The cutover

A flag flip, a hold, a drop. In that order.

# At T=0: flip read traffic to v2.
FLAG.set("orders_read_source", "v2")

# At T+24h: stop dual-writing if parity has held.
FLAG.set("orders_read_dual_write", False)

# At T+7d: drop v1.
# DROP TABLE orders_read_v1;
Enter fullscreen mode Exit fullscreen mode

The 24-hour hold is for rollback. If a v2 query returns garbage for a customer that nobody saw during shadow-read (a regional edge case, a legacy aggregate from 2019), you flip the flag back to v1 and v1 still has the latest state because dual-write was on. The seven-day hold before the drop is for the auditor who shows up on day five asking why the report shape changed. Keep v1 readable until that conversation is done.

The pattern compresses well in your head. Two tables, one transaction, one replay, one parity sweep, one flag with three positions. The work that takes a week is rarely the code; it's the schema migration, the equivalence function, and the long tail of event types the v1 projection silently absorbed and the v2 projection has to reproduce. Plan the calendar around that, not the replay job.


If this was useful

Chapter 6 of Event-Driven Architecture Pocket Guide walks the CQRS read-side patterns end to end: projection idempotency, replay tooling, the parity-check checklist this post compresses, and the failure modes that show up around the dual-write window. If you're staring at a read model you can't afford to rebuild in place, it's the chapter to read first.

Event-Driven Architecture Pocket Guide

Top comments (0)