We had a feature in production where a single user request could run for five-plus minutes — fetch documents, chunk them, hit an LLM per chunk, synthesize a final answer. We did the obvious thing first: a FastAPI handler that ran the pipeline and streamed progress back to the browser over Server-Sent Events.
It looked like this:
# Naive in-handler version — what we wrote first, before learning the
# work inside an SSE generator dies with the connection. The helpers
# (fetch_text, chunk_text, summarize_chunk) are real — they live in
# app/pipeline.py and survived the refactor. What got replaced is the
# in-handler shape of the orchestrator below.
@app.get("/jobs/stream")
async def stream(url: str):
async def gen():
yield {"event": "started", "data": url}
text = await fetch_text(url)
yield {"event": "fetched", "data": f"{len(text)} chars"}
chunks = chunk_text(text, size=1500, max_chunks=3)
summaries: list[str] = []
for i, chunk in enumerate(chunks):
summary = await summarize_chunk(chunk, index=i) # ← ~30s each
summaries.append(summary)
yield {"event": "chunk", "data": f"{i + 1}/{len(chunks)}"}
# Naive synthesis: one more pass through the same helper.
final = await summarize_chunk("\n".join(summaries), index=-1)
yield {"event": "done", "data": final}
return EventSourceResponse(gen())
It worked beautifully — until users started doing user things. Closing the tab. Refreshing the page. Switching networks on the train. Every disconnect cost us:
- The whole 5-minute pipeline got cancelled — work the LLM had already done was thrown away.
- Any progress events not yet flushed to the browser vanished.
- When the user came back two minutes later, there was nothing to come back to. They just saw "Loading…" forever.
This post is about the architecture we ended up with to fix that, and a couple of subtle bugs we hit along the way. The whole thing is stack-agnostic — I'll show one Python implementation, but the same shape works anywhere you have background workers and concurrent event producers (Go, Node, Rust, JVM, .NET — same pattern). Working repo: github.com/AkshatSoni26/longshot.
What — what we wanted
A pipeline that satisfies four properties:
- The work outlives the connection. Closing the tab must not cancel a 5-minute LLM job.
- Progress survives disconnects. A client that comes back ten seconds — or ten minutes — later sees everything that happened while they were away.
- Duplicate-delivery suppression. If the same task is delivered twice (it will be — more on that later), the second delivery becomes a silent no-op. True exactly-once over partial failures needs checkpointing, which we'll be honest about at the end.
-
Live updates while the client is connected. Polling
/statusevery two seconds is a non-answer.
The trick is the separation: the work flows through the queue and the worker, the client connects to the stream of events the worker produces. Client and worker never share a request lifecycle.
Why — why naive SSE can't fix this
Before we dig into the architecture, it's worth being precise about why the naive handler above doesn't survive disconnects.
EventSourceResponse (and the equivalent in any other framework) is an async generator running inside the request's task. When the underlying TCP connection closes — tab close, refresh, network blip — the framework stops driving the generator. The next await inside it raises (the exact exception is CancelledError, anyio.ClosedResourceError, or ClientDisconnect depending on framework version) and the generator unwinds.
If your business logic lives inside that generator (like the snippet above), it dies with it. The LLM call mid-flight is cancelled. The state in the function's locals is gone. The next reconnect creates a fresh request. (The HTML5 EventSource spec lets the browser send a Last-Event-ID header on auto-reconnect — but it's a hint the server has to implement against, not built-in state continuity. We're going to build that server-side mechanism.) Each request is otherwise independent.
This is correct HTTP behavior. You can't change it, and you shouldn't want to. The fix is to make sure the work isn't inside the request task in the first place. That's the wedge the rest of the architecture drives in.
A naive variant gets you closer but still loses progress: move the work to a background task (Celery, BullMQ, TaskIQ, Sidekiq, whatever), publish progress to a Redis Pub/Sub channel, have the SSE endpoint subscribe to that channel. Now the worker survives the disconnect. But Pub/Sub is at-most-once and has no replay. If a client is offline when an event fires, that event is gone forever. The user comes back, the SSE endpoint subscribes, and the next thing they see is whatever the worker emits next — with no idea what they missed.
That trichotomy of needs — durable history, latest-state snapshot, live tail — is what drives the design.
How — three storage shapes for one event
For every progress event the worker emits, we write it to three Redis structures in one pipelined round-trip:
| Shape | Key | Why |
|---|---|---|
| Replay list | RPUSH events:{sid} |
A reconnecting client reads the whole list to catch up. (How "durable" this is depends on your Redis persistence config — AOF/RDB. The events list is a streaming buffer, not a journal of record; for that, write to your application database.) |
| Snapshot |
SET snapshot:{sid} EX 3600
|
A late joiner can fetch current state in one call without scanning the list |
| Live channel | PUBLISH channel:{sid} |
Already-connected clients get the new event instantly |
Plus an INCR seq:{sid} for a monotonic per-session sequence number — the client uses this to detect gaps and dedupe between the replay and the live tail.
A note on the snapshot key. The demo writes the snapshot for completeness — it's the third leg of the trichotomy and a pattern worth showing. But the SSE endpoint we'll build below replays the list (the full timeline), not the snapshot. The snapshot is more useful when you have a cheap status endpoint that returns "what's happening right now" without streaming, e.g. a polling-only dashboard or a job-overview screen. Drop it if your client-side never calls anything but the streaming endpoint.
None of this is Redis-specific. The same trichotomy is "Kafka log + materialized view + websocket fanout", "Postgres
LISTEN/NOTIFY+ a state row + a streaming view", or "NATS JetStream + KV bucket + subject". I'll use Redis throughout for concreteness; substitute as needed.
The producer side is conceptually:
async def emit(redis, session_id, event_type, data):
seq = await redis.incr(f"seq:{session_id}")
payload = json.dumps({"seq": seq, "type": event_type, **data})
await redis.rpush(f"events:{session_id}", payload)
await redis.set(f"snapshot:{session_id}", payload, ex=3600)
await redis.publish(f"channel:{session_id}", payload)
(There's a subtle bug in there. We'll come back to it.)
How — the replay-then-tail SSE endpoint
This is the half that solves the disconnect problem. The SSE handler does two phases, but the order matters in a way that's easy to get wrong:
# from app/api.py — verbatim. keys.X helpers live in app/keys.py;
# settings.X comes from a Pydantic BaseSettings in app/settings.py.
@app.get("/jobs/{session_id}/stream")
async def stream_job(session_id: str, request: Request):
redis = await get_redis()
settings = get_settings()
async def event_source():
# Order matters: SUBSCRIBE first, LRANGE second.
#
# If we LRANGE first then SUBSCRIBE, events published in the gap
# between the two operations are *lost* — Redis Pub/Sub has no replay,
# and the events are gone from the channel before we ever attached.
# By subscribing first, redis-py starts buffering messages on the
# connection. We then read the durable history with LRANGE; any event
# that appears in BOTH the list and the buffered channel feed is
# caught by the dedupe set below. Result: nothing is lost, nothing is
# duplicated, regardless of how the producer's RPUSH/PUBLISH calls
# interleave with our subscribe and read.
pubsub = redis.pubsub()
await pubsub.subscribe(keys.channel(session_id))
loop = asyncio.get_event_loop()
heartbeat_at = loop.time() + settings.sse_heartbeat_seconds
try:
# Phase 1: REPLAY everything that's already happened. The list is
# capped by TTL but otherwise complete — clients reconnecting
# mid-task see the full history before the live tail starts.
replayed_seqs: set[int] = set()
backlog_raw: list[bytes | str] = await redis.lrange(
keys.events_list(session_id), 0, -1
)
for raw in backlog_raw:
event = EVENT_ADAPTER.validate_json(raw)
replayed_seqs.add(event.seq)
yield {"event": event.type.value, "data": event.model_dump_json()}
if event.type in TERMINAL_TYPES:
return
# Phase 2: TAIL — drain the buffered messages + live ones. Dedupe
# by seq for events that landed in both the list (from LRANGE)
# and the channel buffer (between SUBSCRIBE and LRANGE).
#
# Design choice: we deliberately do NOT cancel the worker when the
# SSE client disconnects. The whole point of replay-then-tail is
# that the work outlives the connection — closing a tab during a
# 5-minute job must not kill the job. The user can reconnect to
# the same session_id and pick up exactly where they left off.
# Explicit cancellation is via DELETE /jobs/{session_id} only.
while True:
if await request.is_disconnected():
# Just exit the SSE generator. Do NOT set the cancel flag —
# the worker keeps running, and the events list keeps
# accumulating for the next reconnect.
return
timeout = max(0.1, heartbeat_at - loop.time())
msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=timeout)
if msg is None:
yield {"event": "keepalive", "data": "{}"}
heartbeat_at = loop.time() + settings.sse_heartbeat_seconds
continue
event = EVENT_ADAPTER.validate_json(msg["data"])
if event.seq in replayed_seqs:
continue
yield {"event": event.type.value, "data": event.model_dump_json()}
if event.type in TERMINAL_TYPES:
return
finally:
await pubsub.unsubscribe(keys.channel(session_id))
await pubsub.aclose()
return EventSourceResponse(event_source())
Three production details that earn their lines:
-
Keepalives. SSE over a load balancer or reverse proxy will get killed silently by an idle-connection timeout if nothing flows for 30–60 seconds. Emitting a
keepaliveevent when the channel is quiet (themsg is Nonebranch) keeps the socket warm. -
Disconnect detection without cancel. When the client goes away we exit the SSE generator immediately, but we do not set a cancel flag for the worker. The worker keeps running; the events list keeps accumulating; the user comes back, hits the same
/jobs/{id}/stream, and gets the full replay. Tab-close isn't a kill signal — that's the whole thesis of the post. -
finallycleanup. The pubsub object holds a Redis connection; without explicit unsubscribe + close we'd leak it on every disconnect.
Why subscribe-first matters. If you LRANGE first then SUBSCRIBE — which is how almost everyone writes this on the first try, including me — there's a race window between the two operations. Any event published in that window is gone forever: Pub/Sub has no replay, and the message is dropped because nobody was listening yet. Subscribing first means the connection holds incoming messages until we read them, so the LRANGE-then-drain order catches everything. The dedupe set then handles events that landed in both the list (via LRANGE) and the connection's queue (via the live subscription) — which is the only case where the same event shows up twice. If you skip the subscribe-first step, the dedupe never actually fires; it's protecting against the wrong race.
This is the code that fixes the disconnect bug we started with. Walk through what happens to a user whose tab closed at minute 3:
- Worker is on chunk 5 of 8. The SSE generator in their first request died at minute 3, but the worker is still running because it's a separate process.
- User opens the tab again. Browser hits
/jobs/{sid}/stream— a brand new request, brand new generator. - The handler subscribes to the channel first (start buffering anything new).
- Phase 1 reads the events list from start. The user instantly sees:
started,fetch_started,fetch_done,chunk_summarized(×4) — everything that happened while they were gone. - Phase 2 drains the channel buffer and tails live. They start receiving
chunk_summarizedfor chunk 5, then 6, 7, 8, thenfinal_started,final_done,doneas they happen live. - The dedupe set catches anything that was in both the LRANGE result and the channel buffer — for example, an event that fired exactly between the SUBSCRIBE and the LRANGE call.
They missed nothing. They didn't even notice the disconnect happened.
How — at-least-once means duplicates will arrive on a Tuesday
There's a piece I haven't shown yet that's invisible but load-bearing. Putting the work behind a durable queue is great, but it introduces a new problem: at-least-once delivery. Redis Streams doesn't auto-redeliver on its own — but with a consumer group, a message that's been read but not ACKed enters a pending state, claimable by another consumer via XAUTOCLAIM (or XPENDING + XCLAIM) after idle_timeout. The broker's reclaim loop is what actually moves the message back into circulation. Same pattern in Kafka (offset commits), RabbitMQ (manual ack), BullMQ (stalled-job recovery), Sidekiq, most decent queues. Most retry middlewares (TaskIQ's SmartRetryMiddleware, Celery's autoretry_for, etc.) add another layer of redelivery on top of that for transient errors.
You will get the same task fired twice. Not theoretically. In production. On a Tuesday.
The fix is one line at the top of every task:
# from app/tasks.py — verbatim.
@broker.task(task_name="run_job")
async def run_job(
session_id: str,
input: str, # noqa: A002 the payload field name in the API is also `input`
mode: JobMode,
chunk_size_chars: int,
max_chunks: int,
redis: Redis = TaskiqDepends(redis_dependency), # noqa: B008 TaskIQ DI mirrors FastAPI Depends
) -> TaskResult:
"""Dispatch on ``mode``. Both branches share the lock + cancel + timeout
skeleton — only the inner pipeline differs."""
settings = get_settings()
publisher = get_publisher(redis)
# 1. Idempotency lock. If we lose the race, this is a redelivery — exit clean.
got_lock = await redis.set(
keys.idempotency_lock(session_id),
"1",
nx=True,
ex=settings.idempotency_lock_ttl_seconds,
)
if not got_lock:
return TaskResult(
status=TaskStatus.DUPLICATE_DELIVERY, session_id=session_id, mode=mode
)
# 2. ... pipeline work ...
SET ... NX EX is atomic on the Redis side — only the first delivery wins the lock. While the lock is alive, every subsequent delivery observes it and exits silently. This is duplicate-delivery suppression, not true exactly-once. It guarantees the task body doesn't re-enter while a previous delivery is still working, which is what protects you from the common case (a worker briefly stalls, the broker redelivers, both versions are now alive). It does not protect against:
- A worker crashing after partially calling an LLM (the charge is already incurred); a future redelivery wouldn't repeat the charge while the lock holds, but the partial work is lost.
- The lock TTL expiring while the original task is still running (rare in practice if you tune the TTL > worst-case task duration, but possible).
- Side effects already committed before a crash being "rolled back" — they aren't.
For the demo's failure modes, duplicate-delivery suppression is enough. True exactly-once over partial failures needs a checkpointed state machine, which is in what this isn't below.
How — and now, the subtle bug from earlier
Remember this snippet from a few sections back?
async def emit(redis, session_id, event_type, data):
seq = await redis.incr(f"seq:{session_id}")
payload = json.dumps({"seq": seq, "type": event_type, **data})
await redis.rpush(f"events:{session_id}", payload)
await redis.set(f"snapshot:{session_id}", payload, ex=3600)
await redis.publish(f"channel:{session_id}", payload)
Once we deployed the architecture above and started actually using it, we noticed something weird in the event log:
seq=1 started
seq=2 fetch_started
seq=4 chunk_summarized index=0
seq=3 fetch_done
seq=5 chunk_summarized index=1
seq=4 arrived before seq=3.
It's a quiet bug. The browser still rendered something. The summary still ended up correct. The dedupe in the SSE handler still worked. But the contract our API advertised — strictly monotonic seqs so the client can detect gaps — was silently broken.
The cause is concurrency between emit calls. The mechanism differs by runtime — Python's asyncio and Node's event loop interleave at every await on a single thread; Go's scheduler runs goroutines on multiple OS threads; Tokio's default runtime is multi-thread too — but the bug shape is the same in all of them. If the only thing your worker does is emit events sequentially, you're fine. The instant anything else creates concurrency — a heartbeat coroutine, parallel chunk processing, an LLM streaming response triggering token events alongside another emit source — two emit calls can be in flight at the same time:
coroutine A: INCR -> 3
coroutine B: INCR -> 4
coroutine B: RPUSH (seq=4 lands in the list first)
coroutine A: RPUSH (seq=3 lands second)
Same bug shape in Go with goroutines, Node with Promise.all, Rust with tokio::spawn. It's a property of any pattern where (a) multiple producers can emit, (b) the seq is generated before the writes, and (c) the writes aren't instantaneous.
How — three fixes that look right but aren't
When developers first hit this, they reach for one of three tools.
Wrap the writes in MULTI/EXEC. Doesn't help — the seq is generated outside the transaction. Two emits are still independent transactions that interleave at the boundary.
Move it to a Lua script. Works, but it's the wrong tool. Each session's events are produced by exactly one worker (the one that won the message from the consumer group). There's no distributed problem; reaching for a server-side primitive to fix an in-process race is operationally heavy and obscures the design issue.
Distributed lock around each emit. Same critique, more so. Network round-trip + lock-TTL failure mode (what if the worker crashes holding it?) for a thing your runtime already gives you tools for.
How — a single-consumer drainer
The race only happens because multiple coroutines call into the publish path concurrently. Restrict that to one, and the race vanishes.
The principle works in any language:
| Runtime | Queue | Single-consumer task |
|---|---|---|
| Python | asyncio.Queue |
asyncio.create_task(drain(...)) |
| Go | unbuffered or buffered chan
|
go drain(ch) |
| Node |
p-queue with concurrency: 1, or a hand-rolled async iterator |
a single consumer awaiting for await (const ev of queue)
|
| Rust | tokio::sync::mpsc |
tokio::spawn(drain(rx)) |
Producers enqueue events; a single drainer dequeues them in FIFO order. The seq is generated inside the drainer, after the dequeue, before the writes. There's exactly one place in the program that touches the seq counter, so no race is possible.
The Python implementation (real code from app/publish.py):
# from app/publish.py — verbatim.
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from redis.asyncio import Redis
from app import keys
from app.events import TERMINAL_TYPES, BaseEvent
from app.settings import get_settings
@dataclass(frozen=True, slots=True)
class _DrainerStop:
"""Typed sentinel: tells a session's drainer to flush and exit.
Single instance below — comparing identity (``is _DRAINER_STOP``) costs
nothing and lets the queue stay strictly typed as
``BaseEvent | _DrainerStop`` (no ``object``, no ``Any``).
"""
_DRAINER_STOP: _DrainerStop = _DrainerStop()
_QueueItem = BaseEvent | _DrainerStop
@dataclass
class _SessionPublisher:
queue: asyncio.Queue[_QueueItem]
drainer: asyncio.Task[None]
class ProgressPublisher:
"""One instance per worker process. Holds a drainer per active session."""
def __init__(self, redis: Redis) -> None:
self._redis = redis
self._sessions: dict[str, _SessionPublisher] = {}
self._lock = asyncio.Lock()
self._settings = get_settings()
async def emit(self, event: BaseEvent) -> None:
"""Enqueue a typed event. ``event.session_id`` routes to the right drainer.
``event.seq`` is rewritten by the drainer just before the write — it's
fine (and expected) to leave it at the ``0`` default when constructing.
"""
publisher = await self._get_or_create(event.session_id)
await publisher.queue.put(event)
async def close(self, session_id: str) -> None:
"""Drain remaining items, stop the drainer. Call after a terminal event."""
publisher = self._sessions.pop(session_id, None)
if publisher is None:
return
await publisher.queue.put(_DRAINER_STOP)
try:
await asyncio.wait_for(publisher.drainer, timeout=5)
except TimeoutError:
publisher.drainer.cancel()
async def _get_or_create(self, session_id: str) -> _SessionPublisher:
if session_id in self._sessions:
return self._sessions[session_id]
async with self._lock:
if session_id in self._sessions:
return self._sessions[session_id]
queue: asyncio.Queue[_QueueItem] = asyncio.Queue()
drainer = asyncio.create_task(
self._drain(session_id, queue), name=f"drain:{session_id}"
)
self._sessions[session_id] = _SessionPublisher(queue=queue, drainer=drainer)
return self._sessions[session_id]
Producers don't touch Redis. They put events on a queue. The drainer is the only consumer:
async def _drain(self, session_id: str, queue: asyncio.Queue[_QueueItem]) -> None:
"""The single writer for this session. FIFO guarantees ordered seqs."""
list_key = keys.events_list(session_id)
snap_key = keys.progress_snapshot(session_id)
chan_key = keys.channel(session_id)
seq_key = keys.sequence(session_id)
ttl = self._settings.progress_snapshot_ttl_seconds
while True:
item = await queue.get()
if isinstance(item, _DrainerStop):
return
seq = int(await self._redis.incr(seq_key))
event = item.model_copy(update={"seq": seq})
payload = event.model_dump_json()
# One pipelined round-trip for the durable writes + channel publish.
pipe = self._redis.pipeline(transaction=False)
pipe.rpush(list_key, payload)
pipe.expire(list_key, ttl)
pipe.set(snap_key, payload, ex=ttl)
pipe.publish(chan_key, payload)
await pipe.execute()
if event.type in TERMINAL_TYPES:
# Best-effort: keep terminal events around longer than mid-stream
# ones so a late SSE client still sees the outcome.
await self._redis.expire(list_key, ttl)
Read the loop carefully. Inside one iteration:
- Pop one event from the queue.
-
INCRthe sequence counter. -
model_copythe event with the fresh seq (events are constructed by producers withseq=0placeholder). - Pipeline four writes in one round-trip.
The next event from the queue cannot start step 2 until step 4 finishes. There is no other consumer of this queue. asyncio.Queue is FIFO. Sequence numbers come out monotonic by construction.
What's not there:
- No
transaction=Trueon the pipeline. The pipeline is purely a round-trip optimization — it batches the four commands into one network call. We don't needMULTI/EXECfor the ordering guarantee, because the drainer is the only actor writing this session's event list, snapshot, channel publish, and seq counter; ordering is enforced by the single-consumer property of the loop, not by Redis. (A non-transactional pipeline is not atomic — if your product needs cross-key consistency, e.g. snapshot always agreeing with the latest list entry from any reader's perspective, you'd need a transaction or Lua script. We don't, so we don't.) - No Redis-side script, no
WATCH/MULTI/EXEC. - No distributed lock.
A small implementation detail worth flagging: each event resets the list's TTL with pipe.expire(list_key, 3600). That means the events list lives for 1 hour after the last event, not 1 hour from creation — a rolling window. Active sessions stay around as long as they're producing; abandoned ones expire on schedule.
Why a typed sentinel and not
Noneorobject()? Because the queue type isasyncio.Queue[BaseEvent | _DrainerStop], fully type-checked. A bareobject()would type the queue asQueue[object]and gut every guarantee on the producer side. Small thing; saves you a debugging session three months from now.
How — typed events at the boundary
One last piece. Every byte coming back through the SSE boundary is validated against a Pydantic discriminated union:
class StartedEvent(BaseEvent):
type: Literal[EventType.STARTED] = EventType.STARTED
input: str
class TokenEvent(BaseEvent):
type: Literal[EventType.TOKEN] = EventType.TOKEN
delta: str
# ... one concrete subclass per event type ...
AnyEvent = Annotated[
StartedEvent | TokenEvent | ChunkSummarizedEvent | ...,
Field(discriminator="type"),
]
EVENT_ADAPTER = TypeAdapter(AnyEvent)
Equivalents in other languages: Zod discriminated unions in TypeScript, serde with #[serde(tag = "type")] in Rust, sealed classes in Kotlin/Java. The principle: misspelled keys fail at the boundary, not three layers down.
The failure matrix
The architecture above closes the loop on a lot of failure modes. Here's the full set, with where each one is caught and what the client actually sees:
| Failure mode | Where it's caught | What the client sees |
|---|---|---|
| Client closes tab mid-task | Work is in worker, not request — keeps running | Reconnect = full replay |
| Network drops for 30s | Same as above | Reconnect = full replay |
| Worker process crashes mid-task | Stream pending-message claim (XAUTOCLAIM after idle timeout) + idempotency lock |
No double work — but session doesn't auto-resume; client must resubmit (true resumption is in What this isn't) |
| Same task delivered twice |
SET NX EX lock at task entry |
Second delivery is a silent no-op |
Known pipeline error (FetchError, SummarizeError — e.g. LLM unreachable, HTTP fetch timeout) |
Inner pipeline emits ErrorEvent; run_job catches and returns TaskResult(status=ERROR) — no broker retry |
SSE error event, then close |
| Unknown exception | Emits ErrorEvent, then re-raises to the broker (retry middleware may redeliver; idempotency lock catches the dupe) |
SSE error event, then close |
| Task hangs forever |
asyncio.wait_for hard timeout |
error event with reason=timeout
|
| User clicks Cancel |
DELETE /jobs/{id} sets cancel flag |
Worker exits at next checkpoint, emits cancelled
|
| Concurrent emits (the seq bug) | Single-consumer drainer | Strictly monotonic seqs |
The interesting column is the third. Most worker-side failures become events the client can observe — error, cancelled, timeout. The two that don't are by design: a client disconnect is just a generator exit (the worker keeps going, reconnect replays everything), and a duplicate delivery is a silent no-op (that's the whole point of the lock). No HTTP error mid-stream (SSE clients don't see those). No silent retries that rewrite history. The client either sees the truth or gets a clean termination — never a confused middle state.
What this isn't
A few things this design explicitly does not solve, and shouldn't:
- Resumption from a checkpoint. "Worker crash → no double work" is not the same as "task picks up where it left off." Real resumption means the task itself is structured around a state machine you can re-enter — a much larger design change. The idempotency lock just guarantees one task body executes per session, not that it executes correctly after partial progress.
- Multi-worker fan-out for a single session. The drainer assumes one worker owns one session at a time. If multiple workers need to publish into the same session, you're back to needing a server-side primitive.
- Persistence beyond an hour. The events list and snapshot are TTL'd. They're a streaming buffer, not your source of truth — keep your application's record in your database.
The stack at a glance
The demo repo uses these tools, but none of them are essential to the pattern:
| Concern | What I used | Common alternatives |
|---|---|---|
| Web framework | FastAPI | Express, Axum, Gin, Spring |
| Background queue | TaskIQ + Redis Streams | Celery, BullMQ, Sidekiq, RabbitMQ Streams, Kafka |
| Live channel | Redis Pub/Sub | NATS, Kafka, Postgres LISTEN/NOTIFY
|
| Client transport | Server-Sent Events | WebSockets, gRPC server streaming, long polling |
| Schema | Pydantic discriminated unions | Zod, serde, sealed classes |
| Local LLM (optional) | LM Studio | Ollama, vLLM, llama.cpp |
The architecture — separate the work from the connection, three storage shapes per event, replay-then-tail, idempotency at the task entry, single-consumer drainer for ordering — is the core idea. Stack swaps are mechanical.
Try it
Full implementation on GitHub:
- 🔗 Repo: github.com/AkshatSoni26/longshot
- 🔗 The drainer:
app/publish.py - 🔗 The replay-then-tail SSE endpoint:
app/api.py - 🔗 Idempotency lock + cancel + timeout:
app/tasks.py
git clone https://github.com/AkshatSoni26/longshot
cd longshot
make up # docker compose: redis + api + worker
bash demo/happy_path.sh # POST a job, watch SSE events stream
The repo includes three modes for the LLM call: a deterministic mock (no network), Anthropic, and LM Studio (free, local, OpenAI-compatible). LM Studio mode handles thinking models like Qwen3-Thinking with <think> blocks stripped during the streaming pass.
What I'd do differently next time
Honest list:
-
Real integration tests against a Redis container. The smoke tests verify wiring. They don't verify the drainer's invariants under contention, and they don't verify the SSE replay/dedupe path. Both are testable with
pytest-asyncio+ thetestcontainersRedis module. - A property-based test on the drainer. Spin up N producers emitting random events, assert the consumer sees strictly increasing seqs. The drainer makes this true by construction; a Hypothesis test would prove it under contention.
- Resumable tasks. Real resumption is a state-machine rewrite — task body broken into idempotent steps, each step writing a checkpoint key, the lock check skipping completed steps on retry. Big design change, worth its own post.
The pattern in this post comes from production code I built for a long-running AI-orchestration system, after we got tired of users losing five minutes of progress every time their VPN reconnected. The repo is a sanitized, dependency-light extraction — a reference implementation, not a copy of the real thing — designed to be small enough to read end-to-end. If you build something on top of it, open an issue — I'd love to see what you do with it.
🤝 Sharing thoughts, open to corrections
I'm a few years into my career — not the staff engineer who's seen every production failure mode. I'm sharing this because writing it down forced me to understand the design more carefully than building it did, and I'd rather have it reviewed in public than convince myself the demo is bulletproof.
If something here is off, overclaimed, or naive in a way I'll cringe at in five years, please tell me. Open an issue on the repo or drop a comment below — corrections from people who've shipped this kind of system at scale are exactly what I want. The whole point of writing publicly is getting better, not being right. 🙌


Top comments (0)