In v3 I added a cross-encoder reranker. This time the feature was simpler but touched every layer: streaming responses via Server-Sent Events (SSE).
The goal: instead of waiting 3-5 seconds for the full answer, start showing tokens the moment the LLM generates them. The sources still arrive at the end.
Why streaming matters for RAG
Without streaming, the user experience is: click → wait → wall of text. With streaming, the first token arrives in ~200ms. The user starts reading while the model is still generating. It's the same answer, but it feels instant.
For a RAG pipeline specifically, there's a design question: when do you send the sources? You can't stream them inline — the LLM doesn't produce structured source metadata as it generates. So the pattern becomes:
SSE event 1: {"token": "The"}
SSE event 2: {"token": " list"}
SSE event 3: {"token": " price"}
...
SSE final: {"sources": [{"chunk_id": 4, "text": "...", "page": 2, "score": 0.75}]}
Tokens stream in real-time. Sources are sent as the final event once the LLM is done. The client knows the stream is complete when it receives the sources event.
The abstraction
In v3, LLMClient had one method:
class LLMClient(ABC):
@abstractmethod
def generate(self, system: str, user: str) -> str: ...
Now it has two:
class LLMClient(ABC):
@abstractmethod
def generate(self, system: str, user: str) -> str: ...
@abstractmethod
def stream(self, system: str, user: str) -> Iterator[str]: ...
Same inputs, different output shape. generate returns a string. stream yields string chunks. The endpoint decides which to call — /query calls generate, /query/stream calls stream.
This is where it got interesting: each SDK streams differently.
Three SDKs, three streaming APIs
Groq and OpenAI (similar)
Both use the OpenAI-compatible stream=True parameter:
def stream(self, system: str, user: str) -> Iterator[str]:
resp = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=0.2,
max_tokens=800,
stream=True,
)
for chunk in resp:
delta = chunk.choices[0].delta.content
if delta:
yield delta
The only difference from generate is stream=True and iterating over chunks instead of reading .choices[0].message.content. Groq uses the same API shape since it's OpenAI-compatible.
Anthropic (different)
Anthropic's SDK has a dedicated streaming context manager:
def stream(self, system: str, user: str) -> Iterator[str]:
with self.client.messages.stream(
model=self.model,
system=system,
messages=[{"role": "user", "content": user}],
temperature=0.2,
max_tokens=800,
) as resp:
for text in resp.text_stream:
yield text
Instead of client.messages.create(..., stream=True), it's client.messages.stream(...) — a different method entirely. And instead of parsing chunk.choices[0].delta.content, you iterate resp.text_stream which yields clean text directly. The with block handles connection cleanup.
It's a cleaner API honestly — no null-checking on deltas, no digging into nested objects. But it means you can't write one streaming implementation and share it across providers.
The endpoint
FastAPI's StreamingResponse handles the SSE transport:
@app.post("/query/stream")
def query_stream(req: QueryRequest) -> StreamingResponse:
# ... same retrieval + reranking as /query ...
llm = get_llm_client()
user_prompt = build_user_prompt(req.question, retrieved)
def event_stream() -> Iterator[str]:
for token in llm.stream(system=SYSTEM_PROMPT, user=user_prompt):
yield f"data: {json.dumps({'token': token})}\n\n"
yield f"data: {json.dumps({'sources': sources})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
The retrieval pipeline (embed → hybrid search → rerank) runs before streaming starts — that's all synchronous work. Only the LLM generation streams. This means the client sees a brief pause (retrieval + reranking), then tokens start flowing.
The sources list is built from the retrieved chunks before the stream starts, so it's ready to send as the final event without any extra processing.
Testing it
curl -N -X POST http://localhost:8000/query/stream \
-H "Content-Type: application/json" \
-d '{"question": "What is the list price of the Magpie-7?", "top_k": 3}'
Output:
data: {"token": "The"}
data: {"token": " list"}
data: {"token": " price"}
data: {"token": " of"}
data: {"token": " the"}
data: {"token": " Magpie"}
data: {"token": "-7"}
data: {"token": " is"}
data: {"token": " €"}
data: {"token": "68"}
data: {"token": ",400"}
data: {"token": " per"}
data: {"token": " unit."}
data: {"sources": [{"chunk_id": 4, "text": "...", "page": 2, "score": 0.7542}]}
The -N flag disables curl's output buffering so you see tokens as they arrive.
The pipeline now
PDF ─► extract text ─► chunk ─► embed (MiniLM-L6-v2)
│
▼
question ─► FAISS + BM25 (RRF) ─► cross-encoder rerank
─► LLM generate (blocking) → /query → {answer, sources}
─► LLM stream (SSE) → /query/stream → token events + sources
Same retrieval pipeline, two output modes. The client picks which endpoint to call.
What I learned
Streaming is a UX feature, not an accuracy feature. The answer is identical — streaming just changes when the user sees it. But the perceived latency difference is dramatic.
SDK divergence is real. Groq and OpenAI share the same streaming interface (OpenAI-compatible). Anthropic uses a fundamentally different pattern. If you're building a multi-provider abstraction, streaming is where it gets messy. The
LLMClientabstract class earns its keep here.Sources and tokens are separate concerns. In a RAG pipeline, you know the sources before the LLM starts generating. Streaming them as the final SSE event is a clean separation — the client can render tokens immediately and append source citations when the stream ends.
FastAPI makes SSE trivial.
StreamingResponsewith a generator function andtext/event-streammedia type — that's it. No WebSocket setup, no special middleware.
What's next
- Conversation memory (multi-turn follow-ups)
- Possibly a Streamlit UI
Try it yourself
- v4 (streaming): github.com/santanu2908/chat-with-pdf-rag
- v3 (reranker): github.com/santanu2908/chat-with-pdf-rag/tree/v3
- v2 (hybrid retrieval): github.com/santanu2908/chat-with-pdf-rag/tree/v2
- v1 (pure FAISS): github.com/santanu2908/chat-with-pdf-rag/tree/v1
uv sync
cp .env.example .env # set your API key
uv run uvicorn app.main:app --reload
Open http://localhost:8000/docs, upload the sample PDF, and try /query/stream — watch the tokens arrive one by one.
If you're building multi-provider streaming, I'd love to hear how you handled the SDK differences.
I'm Santanu Mohanta — connect with me on LinkedIn or check out my projects on GitHub.
Top comments (0)