I noticed a weird pattern in the reminders pipeline: the same work would go quiet for a while, then show up again later—like a sticky note that fell behind the desk and got kicked back out.
Nothing was actually disappearing. The work was getting claimed and then never unclaimed because a worker died at the wrong moment.
This is Part 6 of my series How to Architect an Enterprise AI System (And Why the Engineer Still Matters). Part 5 (Feature Flags as Guardrails) was about preventing unsafe behavior from ever shipping. This part is the opposite kind of guardrail: the one you build when you want concurrency, retries, and recovery to be boringly deterministic.
The key insight (and the bug it prevents)
For a large class of workloads, I do not treat a queue as a separate infrastructure primitive. I treat it as a table with state, and I let PostgreSQL enforce the one thing distributed workers are worst at enforcing themselves:
- Only one worker can own a unit of work at a time
That sounds obvious until you run multiple workers and hit the classic failure mode:
- Worker A fetches next job
- Worker B fetches the same next job
- Both do side effects
- You now have duplicate sends, double writes, or a corrupted lifecycle
The concurrency primitive that eliminates that entire class of bug is:
SELECT ... FOR UPDATE SKIP LOCKED
This is not a vague notion of locking. It is specific behavior with teeth: if another worker already holds the row lock, Postgres skips the row and lets you claim something else.
So instead of building a coordination system out of timeouts and hope, I let the database do what it already does well: transactional exclusion with first-class observability.
The worker state machine I actually run
I model work as a state machine stored in SQL.
When people hear that, they often assume I built a big framework. I didn’t. I built a few columns that make ownership, time, and failure explicit.
At a high level, the transitions are:
-
pending→claimed -
claimed→success -
claimed→pending(unclaim + attempts++) -
claimed→pending(stale claim recovery after a threshold) -
attempts >= max_attempts→inactive
Exactly one diagram, because this is the whole story:
flowchart TD
pending[pending] --> claimed[claimed]
claimed -->|success| success[success]
claimed -->|failure| unclaimRetry[unclaim and increment attempts]
unclaimRetry --> pending
claimed -->|claimed too long| staleRecovery[stale claim recovery]
staleRecovery --> pending
unclaimRetry -->|attempts >= max_attempts| inactive[inactive]```
I like this representation because it is a queue you can query like a ledger. You can answer, with SQL:
- What’s stuck?
- Who claimed it?
- How long has it been claimed?
- How many times did it fail?
- What was the last failure reason?
- When is it eligible to run again?
And you can answer those questions without reconstructing message histories or spelunking logs.
## What I store in the table (the columns that make it survivable)
If you want a table-backed queue to behave like a real queue under failure, you need to write down the invariants.
Here’s the minimal schema shape I’ve found reliable across reminder scheduling, enrichment jobs, and sync workers:
- A claim timestamp (`claimed_at`) and identity (`claimed_by`)
- A **lease token** (`claim_token`) so a stale worker cannot finalize a job it no longer owns
- Attempt tracking (`attempts`, `max_attempts`)
- Eligibility (`run_at` or `remind_at`) so you can schedule into the future
- A place to put the last failure (`last_error`, `last_error_at`)
- A retirement switch (`inactive`) so poison pills stop consuming worker time
A concrete example DDL looks like this:
```sql
CREATE TABLE email_snoozes (
id BIGSERIAL PRIMARY KEY,
user_email TEXT NOT NULL,
message_id TEXT NOT NULL,
subject TEXT,
sender_email TEXT,
remind_at TIMESTAMPTZ NOT NULL,
snooze_duration_hours INT,
claimed_at TIMESTAMPTZ,
claimed_by TEXT,
claim_token UUID,
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 5,
inactive BOOLEAN NOT NULL DEFAULT FALSE,
last_error TEXT,
last_error_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_email_snoozes_due
ON email_snoozes (inactive, claimed_at, remind_at);
CREATE INDEX idx_email_snoozes_claimed
ON email_snoozes (claimed_at)
WHERE claimed_at IS NOT NULL;
Two notes that matter:
- I index for the claim query I actually run: due rows, not claimed, not inactive.
-
claim_tokenis not decorative. It is what makes stale recovery safe.
Without a token, you can do stale recovery and still get the nastiest bug in this whole family: the original worker wakes up late and marks success, overwriting the actual owner’s work. The token turns finalization into a compare-and-swap.
Layer 1: Atomic claim with FOR UPDATE SKIP LOCKED (copy-pastable and correct)
The biggest trap with database queues is teaching people a pattern they accidentally split across transactions.
If you do:
SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1;- then later (outside the same transaction)
UPDATE ... SET claimed_at = NOW() ...;
…you reintroduced a race.
So the claim pattern I ship is a single statement that both selects and marks rows as claimed, and returns the claimed rows to the worker.
This is the same shape I use in the reminder scheduler: a CTE that selects due work with FOR UPDATE SKIP LOCKED, then an UPDATE ... FROM that stamps the claim.
WITH due_reminders AS (
SELECT id
FROM email_snoozes
WHERE inactive = FALSE
AND claimed_at IS NULL
AND remind_at <= CURRENT_TIMESTAMP
AND attempts < max_attempts
ORDER BY remind_at ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
)
UPDATE email_snoozes
SET claimed_at = CURRENT_TIMESTAMP,
claimed_by = $2,
claim_token = gen_random_uuid(),
updated_at = CURRENT_TIMESTAMP
FROM due_reminders
WHERE email_snoozes.id = due_reminders.id
RETURNING
email_snoozes.id,
email_snoozes.user_email,
email_snoozes.message_id,
email_snoozes.subject,
email_snoozes.sender_email,
email_snoozes.remind_at,
email_snoozes.claimed_at,
email_snoozes.claimed_by,
email_snoozes.claim_token,
email_snoozes.attempts,
email_snoozes.max_attempts;
Why this calms the whole system down:
- Multiple workers can run the same claim query concurrently.
- Each worker gets a disjoint set of rows.
- No worker blocks on locks for long; it just skips locked rows.
- The claim boundary is one atomic decision point.
This is the moment where contention stops producing weird behavior and starts producing predictable throughput.
Choosing ordering (don’t pretend FIFO if you don’t mean FIFO)
Notice I order by remind_at, not id.
If you always ORDER BY id, you get fairness by insertion order, but that is not the same as priority. In schedulers, priority is usually time.
I treat ordering as part of the contract:
- If the job is scheduled work, order by the schedule time.
- If the job is backfill, order by the oldest unprocessed range.
- If the job is user-visible, order by the user-impacting field.
Write down what next means, then encode it.
Layer 2: Eligibility is a predicate, not a separate system
A lot of queue systems drift into hidden complexity because they create a separate status enum, then bolt on timestamps, then bolt on retry metadata.
I flip it around. Pending means:
- not inactive
- not claimed
- due
- under retry limit
That is one SQL predicate, and it makes the queue legible.
In practice, this also makes ad-hoc operations safe:
- Want to pause a class of jobs? Set
inactive = TRUEfor a filter. - Want to re-run a subset? Set
attempts = 0, clear error fields. - Want to delay? Push
remind_atforward.
No separate queue service admin UI required.
Layer 3: Unclaim-on-failure with attempts and backoff
Naive retry logic is:
- catch exception
- sleep
- try again
That is how you get infinite loops, noisy dependencies, and no paper trail.
My failure path is explicit:
- record the failure reason
- increment attempts
- unclaim
- optionally push the schedule forward (backoff)
A clean unclaim looks like this:
UPDATE email_snoozes
SET claimed_at = NULL,
claimed_by = NULL,
claim_token = NULL,
attempts = attempts + 1,
last_error = $2,
last_error_at = CURRENT_TIMESTAMP,
remind_at = GREATEST(remind_at, CURRENT_TIMESTAMP + ($3::int || ' seconds')::interval),
updated_at = CURRENT_TIMESTAMP
WHERE id = $1;
A few details are deliberate:
- I clear the token on unclaim so a stale worker can’t finalize.
- I store a human-readable error so the queue becomes the first debugging surface.
- I do backoff by moving
remind_atforward, which preserves the same claim predicate.
Backoff can be linear, exponential, or dependency-specific. The design point is that it is a field in the row, not a hidden sleep inside a worker.
Finalization is token-checked (the stale-worker antidote)
When processing succeeds, I finalize only if I still own the lease:
UPDATE email_snoozes
SET inactive = TRUE,
claimed_at = NULL,
claimed_by = NULL,
claim_token = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE id = $1
AND claim_token = $2;
That AND claim_token = $2 is the difference between:
- correct behavior under stale recovery
- rare, impossible-to-reproduce corruption when a worker resumes after a pause
If the update affects zero rows, I treat it as informational: I no longer own the job.
Layer 4: Stale claim recovery (leases, not wishes)
The moment that made this whole design click was realizing that claiming is not completing.
A worker can die mid-flight. A container can get evicted. A network partition can freeze progress. In all those cases, claimed_at will stay non-null forever unless you explicitly recover it.
The stale recovery job is simple: find claims older than a threshold and reset them.
UPDATE email_snoozes
SET claimed_at = NULL,
claimed_by = NULL,
claim_token = NULL,
last_error = COALESCE(last_error, 'stale claim recovered'),
last_error_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE inactive = FALSE
AND claimed_at IS NOT NULL
AND claimed_at < CURRENT_TIMESTAMP - INTERVAL '5 minutes';
The exact threshold is workload-dependent. The important part is conceptual: claiming is a lease.
Stale recovery is a contract, not a cleanup script
Once you introduce stale recovery, you are making a promise:
- A job should not legitimately be claimed longer than the threshold without reporting progress.
If you violate the promise, you will get duplicate processing.
There are three ways I handle long-running jobs:
- Increase the lease threshold for that queue.
-
Heartbeat: update
claimed_atperiodically while processing. - Split the work: make each row represent a smaller unit.
I prefer (3). Heartbeats are workable, but they are also another failure mode if you do not make the heartbeat itself robust.
Layer 5: Infinite requeue prevention (retire poison pills on purpose)
If you build unclaim-and-retry but never cap it, you built a perpetual motion machine powered by your on-call time.
In my tables I carry a max_attempts column (and I allow it to be set per row). That lets me do two things:
- Cap retries without hardcoding a magic number in worker code
- Treat certain classes of jobs as higher tolerance without splitting tables
Retiring jobs is then a simple predicate:
UPDATE email_snoozes
SET inactive = TRUE,
updated_at = CURRENT_TIMESTAMP
WHERE inactive = FALSE
AND attempts >= max_attempts;
I’m opinionated about what this means operationally: once a job is retired, it is no longer work—it is a case file.
That changes team behavior in a good way. Instead of burning cycles retrying the same failure forever, you look at the error, fix the upstream issue, and then selectively reactivate.
The instrumentation I rely on (real OpenTelemetry code, not vibes)
A queue you cannot measure is a queue you cannot trust.
I instrument at the claim boundary and at the outcome boundary. That gives me a live view of:
- how much work is being claimed
- how fast it is being processed
- how many failures are happening
- how often stale recovery is firing
- how many items are being retired
Below is a runnable, minimal version of the metric and trace wiring I use in the worker loop. It uses a console exporter so you can run it locally, but the API calls are the same ones I use when exporting to my real telemetry backend.
import time
import random
from dataclasses import dataclass
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
@dataclass
class Job:
job_id: int
claim_token: str
def setup_otel(service_name: str = "email-reminder-worker"):
resource = Resource.create({"service.name": service_name})
metric_reader = PeriodicExportingMetricReader(
ConsoleMetricExporter(), export_interval_millis=2000
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)
meter = metrics.get_meter(service_name)
tracer = trace.get_tracer(service_name)
m_claimed = meter.create_counter(
name="queue.claimed",
description="Rows claimed from the queue",
unit="1",
)
m_processed = meter.create_counter(
name="queue.processed",
description="Rows processed successfully",
unit="1",
)
m_failed = meter.create_counter(
name="queue.failed",
description="Rows that failed processing",
unit="1",
)
m_stale_recovered = meter.create_counter(
name="queue.stale_recovered",
description="Rows recovered from stale claims",
unit="1",
)
m_retired = meter.create_counter(
name="queue.retired",
description="Rows retired after exceeding max attempts",
unit="1",
)
h_duration_ms = meter.create_histogram(
name="queue.job_duration_ms",
description="Time spent processing a job",
unit="ms",
)
return tracer, {
"claimed": m_claimed,
"processed": m_processed,
"failed": m_failed,
"stale_recovered": m_stale_recovered,
"retired": m_retired,
"duration_ms": h_duration_ms,
}
def process_job(job: Job) -> None:
# Simulate real work.
time.sleep(random.uniform(0.02, 0.15))
if random.random() < 0.25:
raise RuntimeError("simulated downstream failure")
def main():
tracer, m = setup_otel()
worker_id = "worker-1"
# Simulated claim batch
claimed = [Job(job_id=i, claim_token=f"token-{i}") for i in range(1, 9)]
m["claimed"].add(len(claimed), {"queue": "email_snoozes", "worker": worker_id})
for job in claimed:
start = time.perf_counter()
with tracer.start_as_current_span("queue.process", attributes={
"queue": "email_snoozes",
"worker": worker_id,
"job.id": job.job_id,
"job.claim_token": job.claim_token,
}):
try:
process_job(job)
m["processed"].add(1, {"queue": "email_snoozes"})
except Exception as e:
m["failed"].add(1, {"queue": "email_snoozes", "error": type(e).__name__})
finally:
dur_ms = (time.perf_counter() - start) * 1000.0
m["duration_ms"].record(dur_ms, {"queue": "email_snoozes"})
# Keep the process alive long enough for the periodic metric reader to export.
time.sleep(3)
if __name__ == "__main__":
main()
The main design choice is where the counters sit:
- I count claims when I claim.
- I count outcomes when I finalize.
That makes failure modes visible early. If claims are happening but processed is flat, something is stuck inside the worker. If stale recovery spikes, workers are dying or hanging. If retired climbs, you have a poison-pill class that needs attention.
Why table-backed queues instead of a message bus (most of the time)
I do use a message bus in this system in one place: for enrichment work that is naturally message-shaped and benefits from decoupling.
For example, I have an enrichment worker that reads from a Service Bus queue (named vault-candidate-enrichment) and pulls additional data from an external system of record before upserting into Postgres.
The message format is intentionally small and stable:
{
"crm_id": "6221978000106823002",
"twav": "TWAV118559",
"full_name": "Example Person",
"enqueued_at": "2026-03-16T12:00:00Z"
}
That is a good fit for a message bus because:
- the payload is a pointer, not a giant document
- the consumer can be deployed independently
- the work is naturally buffered and bursty
But for most worker workloads inside the platform, I chose Postgres.
The reason was friction, not ideology. In one place, I needed:
- conflict detection via modified-time comparisons
- stale claim recovery as a first-class mechanism
- attempt tracking with counts
- human-readable failure reasons stored next to the work
- monitoring queries that the team can run directly
Those are all native to SQL.
Message queues are excellent at delivery. They are awkward at introspection. You can bolt on dead-letter queues and dashboards, but you still end up with two sources of truth:
- the database where the entity lives
- the queue where the work is in flight
With table-backed queues, there is one source of truth: the row.
The extra safety layer: a circuit breaker FSM
Retries and stale recovery solve the local failure modes. They do not solve the systemic one: a dependency that is down (or half-down) causing your entire worker fleet to spin.
So I add a circuit breaker above the queue processor.
The rule is simple:
- if failures are consecutive beyond a threshold, open the circuit
- while open, do not claim new work for a cooldown period
- after cooldown, allow a limited number of test executions
This prevents the two worst outcomes:
- retry storms that amplify an outage
- noisy failure that drowns the signal you need to debug
In practice, I store the breaker state in the database as well (so it survives restarts and is visible), but the important part is behavioral: when the breaker is open, workers stop claiming.
Nuances that matter in practice
1) SKIP LOCKED does not fix bad idempotency
SKIP LOCKED ensures one worker owns one row at a time. It does not guarantee your side effects are safe if the row is reprocessed later (because it will be, under stale recovery or retry).
That means side effects must be idempotent.
For reminders, that typically means one of:
- a unique constraint on a send record keyed by
(user, message_id, remind_at) - a status row that moves forward monotonically and refuses to regress
- an outbox table where you write the intent once and dispatch separately
The database queue gives you ownership. You still need idempotency discipline at the edges.
2) Batch claiming changes your failure shape
I often claim in batches (note the $1 limit in the claim query). That reduces transaction overhead and improves throughput.
But it changes the failure profile:
- if the worker dies after claiming 50 rows, you now have 50 stale leases
This is why stale recovery is not optional. It is part of the batch claim contract.
3) Stale recovery without a token causes rare corruption
If you take only one thing from this post beyond SKIP LOCKED, it is this:
- stale recovery requires a lease token
Without a token, a worker can complete after the row was recovered and re-claimed, and you will silently accept the late finalize. You will only notice it when your counts are wrong or a user gets duplicate behavior.
4) Retry caps create a real workflow: triage
The moment you retire jobs, you have created a new category of work: investigation.
That is good. It forces reality into the system.
A retired row carries:
- attempts
- last error
- last error time
- payload pointer
That is everything you need to decide whether to:
- fix data
- fix code
- increase max attempts for a subset
- or mark the job as permanently invalid
What went wrong (and why this design exists)
The failure that forced this architecture was not a theoretical race condition. It was the boring kind: a worker process dying mid-task.
- Without stale claim recovery, the job was gone until someone noticed.
- Without attempt tracking, the failures were invisible.
- Without retry caps, the system would have tried forever.
- Without
FOR UPDATE SKIP LOCKED, scaling workers would have meant gambling with duplicates.
This design looks like extra machinery until you have lived through the alternative.
Closing
I don’t use table-backed queues because they’re trendy. I use them because they let me treat work like a first-class record—with ownership, timestamps, attempts, and a paper trail—so when something breaks at the boundary between AI decisions and enterprise reality, I can debug it with a SQL query instead of guesswork.
Part 7 is where this gets even less forgiving: syncing against an external sales-system API with undocumented behaviors, where the only thing worse than a failed write is a write that half-worked and left the record in a state your models will confidently misinterpret.
🎧 Listen to the Enterprise AI Architecture audiobook
📖 Read the full 13-part series with an AI assistant
Top comments (0)