DEV Community

Santanu Mohanta
Santanu Mohanta

Posted on

Adding streaming to my RAG pipeline — three SDKs, three different APIs

In v3 I added a cross-encoder reranker. This time the feature was simpler but touched every layer: streaming responses via Server-Sent Events (SSE).

The goal: instead of waiting 3-5 seconds for the full answer, start showing tokens the moment the LLM generates them. The sources still arrive at the end.

Why streaming matters for RAG

Without streaming, the user experience is: click → wait → wall of text. With streaming, the first token arrives in ~200ms. The user starts reading while the model is still generating. It's the same answer, but it feels instant.

For a RAG pipeline specifically, there's a design question: when do you send the sources? You can't stream them inline — the LLM doesn't produce structured source metadata as it generates. So the pattern becomes:

SSE event 1:  {"token": "The"}
SSE event 2:  {"token": " list"}
SSE event 3:  {"token": " price"}
...
SSE final:    {"sources": [{"chunk_id": 4, "text": "...", "page": 2, "score": 0.75}]}
Enter fullscreen mode Exit fullscreen mode

Tokens stream in real-time. Sources are sent as the final event once the LLM is done. The client knows the stream is complete when it receives the sources event.

The abstraction

In v3, LLMClient had one method:

class LLMClient(ABC):
    @abstractmethod
    def generate(self, system: str, user: str) -> str: ...
Enter fullscreen mode Exit fullscreen mode

Now it has two:

class LLMClient(ABC):
    @abstractmethod
    def generate(self, system: str, user: str) -> str: ...

    @abstractmethod
    def stream(self, system: str, user: str) -> Iterator[str]: ...
Enter fullscreen mode Exit fullscreen mode

Same inputs, different output shape. generate returns a string. stream yields string chunks. The endpoint decides which to call — /query calls generate, /query/stream calls stream.

This is where it got interesting: each SDK streams differently.

Three SDKs, three streaming APIs

Groq and OpenAI (similar)

Both use the OpenAI-compatible stream=True parameter:

def stream(self, system: str, user: str) -> Iterator[str]:
    resp = self.client.chat.completions.create(
        model=self.model,
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        temperature=0.2,
        max_tokens=800,
        stream=True,
    )
    for chunk in resp:
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta
Enter fullscreen mode Exit fullscreen mode

The only difference from generate is stream=True and iterating over chunks instead of reading .choices[0].message.content. Groq uses the same API shape since it's OpenAI-compatible.

Anthropic (different)

Anthropic's SDK has a dedicated streaming context manager:

def stream(self, system: str, user: str) -> Iterator[str]:
    with self.client.messages.stream(
        model=self.model,
        system=system,
        messages=[{"role": "user", "content": user}],
        temperature=0.2,
        max_tokens=800,
    ) as resp:
        for text in resp.text_stream:
            yield text
Enter fullscreen mode Exit fullscreen mode

Instead of client.messages.create(..., stream=True), it's client.messages.stream(...) — a different method entirely. And instead of parsing chunk.choices[0].delta.content, you iterate resp.text_stream which yields clean text directly. The with block handles connection cleanup.

It's a cleaner API honestly — no null-checking on deltas, no digging into nested objects. But it means you can't write one streaming implementation and share it across providers.

The endpoint

FastAPI's StreamingResponse handles the SSE transport:

@app.post("/query/stream")
def query_stream(req: QueryRequest) -> StreamingResponse:
    # ... same retrieval + reranking as /query ...

    llm = get_llm_client()
    user_prompt = build_user_prompt(req.question, retrieved)

    def event_stream() -> Iterator[str]:
        for token in llm.stream(system=SYSTEM_PROMPT, user=user_prompt):
            yield f"data: {json.dumps({'token': token})}\n\n"
        yield f"data: {json.dumps({'sources': sources})}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

The retrieval pipeline (embed → hybrid search → rerank) runs before streaming starts — that's all synchronous work. Only the LLM generation streams. This means the client sees a brief pause (retrieval + reranking), then tokens start flowing.

The sources list is built from the retrieved chunks before the stream starts, so it's ready to send as the final event without any extra processing.

Testing it

curl -N -X POST http://localhost:8000/query/stream \
  -H "Content-Type: application/json" \
  -d '{"question": "What is the list price of the Magpie-7?", "top_k": 3}'
Enter fullscreen mode Exit fullscreen mode

Output:

data: {"token": "The"}

data: {"token": " list"}

data: {"token": " price"}

data: {"token": " of"}

data: {"token": " the"}

data: {"token": " Magpie"}

data: {"token": "-7"}

data: {"token": " is"}

data: {"token": " €"}

data: {"token": "68"}

data: {"token": ",400"}

data: {"token": " per"}

data: {"token": " unit."}

data: {"sources": [{"chunk_id": 4, "text": "...", "page": 2, "score": 0.7542}]}
Enter fullscreen mode Exit fullscreen mode

The -N flag disables curl's output buffering so you see tokens as they arrive.

The pipeline now

PDF ─► extract text ─► chunk ─► embed (MiniLM-L6-v2)
                                        │
                                        ▼
question ─► FAISS + BM25 (RRF) ─► cross-encoder rerank
         ─► LLM generate (blocking)  → /query   → {answer, sources}
         ─► LLM stream   (SSE)       → /query/stream → token events + sources
Enter fullscreen mode Exit fullscreen mode

Same retrieval pipeline, two output modes. The client picks which endpoint to call.

What I learned

  1. Streaming is a UX feature, not an accuracy feature. The answer is identical — streaming just changes when the user sees it. But the perceived latency difference is dramatic.

  2. SDK divergence is real. Groq and OpenAI share the same streaming interface (OpenAI-compatible). Anthropic uses a fundamentally different pattern. If you're building a multi-provider abstraction, streaming is where it gets messy. The LLMClient abstract class earns its keep here.

  3. Sources and tokens are separate concerns. In a RAG pipeline, you know the sources before the LLM starts generating. Streaming them as the final SSE event is a clean separation — the client can render tokens immediately and append source citations when the stream ends.

  4. FastAPI makes SSE trivial. StreamingResponse with a generator function and text/event-stream media type — that's it. No WebSocket setup, no special middleware.

What's next

  • Conversation memory (multi-turn follow-ups)
  • Possibly a Streamlit UI

Try it yourself

uv sync
cp .env.example .env   # set your API key
uv run uvicorn app.main:app --reload
Enter fullscreen mode Exit fullscreen mode

Open http://localhost:8000/docs, upload the sample PDF, and try /query/stream — watch the tokens arrive one by one.


If you're building multi-provider streaming, I'd love to hear how you handled the SDK differences.

I'm Santanu Mohanta — connect with me on LinkedIn or check out my projects on GitHub.

Top comments (0)