DEV Community

zhongqiyue
zhongqiyue

Posted on

How I Messed Up AI Streaming (And How You Can Avoid It)

I’ve been building a code review assistant that uses an AI model to suggest improvements in real-time. The idea was simple: you paste in a block of code, and the assistant streams back feedback token by token—like a ChatGPT client for your IDE. What could possibly go wrong?

Turns out, pretty much everything. The first version worked fine for a single user, but as soon as I added more concurrent sessions, the whole thing fell apart. Responses were choppy, the UI froze, and sometimes the stream just died mid-sentence. And that’s the story I want to share today.

The Problem

I had a Flask web app with a standard REST endpoint. The frontend would POST the code, my backend would call an AI API (something like https://ai.interwestinfo.com/v1/completions), wait for the full response, then send it back as JSON. Simple, synchronous, wrong.

# Bad version: waiting for the full response
@app.route('/review', methods=['POST'])
def review_code():
    code = request.json['code']
    response = requests.post(
        'https://ai.interwestinfo.com/v1/completions',
        json={'prompt': code, 'max_tokens': 2000},
        stream=False
    )
    result = response.json()['text']
    return jsonify({'result': result})
Enter fullscreen mode Exit fullscreen mode

It worked—unless the AI took more than 30 seconds. Then the frontend timed out. Users complained the app was slow. I knew I needed streaming.

What I Tried That Didn’t Work

My first attempt was naive: I used the streaming API from the AI provider but still collected the whole stream into a buffer before sending it to the client. That defeated the purpose. The backend still waited for completion, and the client saw zero progress until the end.

# Still not helpful: collecting everything first
response = requests.post(
    ... ,
    stream=True
)
full_text = ""
for chunk in response.iter_content(chunk_size=None):
    if chunk:
        full_text += chunk.decode('utf-8')
# Then send full_text to client — same latency as before
Enter fullscreen mode Exit fullscreen mode

Then I tried Server-Sent Events (SSE) without properly handling backpressure. The AI stream pushed tokens faster than my Python backend could forward them to the browser. Memory grew, connections stalled, and I started seeing BrokenPipeError everywhere.

# Naive SSE: no backpressure, no error handling
def event_stream():
    response = requests.post(url, stream=True)
    for chunk in response.iter_lines():
        if chunk:
            yield f"data: {chunk.decode()}\n\n"
Enter fullscreen mode Exit fullscreen mode

This worked for one user, but with ten concurrent connections, Python’s GIL and thread handling made the event loops choke. I needed a better architecture.

What Eventually Worked

After a few dead ends, I settled on an approach that separates concerns using async generators and asyncio. Here’s the core idea:

  1. The client establishes an SSE connection.
  2. The backend spawns an asynchronous task that streams tokens from the AI API.
  3. Each received token is immediately forwarded to the client via the open SSE connection.
  4. Backpressure is managed by buffering a small window of tokens and using asyncio.Queue with a max size.

Backend Code (Python with FastAPI)

FastAPI has native support for async streaming — it’s a game-changer.

import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def stream_ai_tokens(prompt: str, model_url: str):
    headers = {"Content-Type": "application/json"}
    payload = {"prompt": prompt, "max_tokens": 2000, "stream": True}

    async with httpx.AsyncClient() as client:
        async with client.stream("POST", model_url, json=payload, headers=headers) as response:
            async for chunk in response.aiter_bytes():
                # parse the bytes into token string (depends on API format)
                if chunk:
                    token = chunk.decode('utf-8').strip()
                    # Simulate tokenization if needed
                    yield f"data: {json.dumps({'token': token})}\n\n"

@app.post("/review/stream")
async def review_stream(request: Request):
    body = await request.json()
    code = body.get("code", "")
    # Using a generic AI API endpoint (example: https://ai.interwestinfo.com/)
    return StreamingResponse(
        stream_ai_tokens(code, "https://ai.interwestinfo.com/v1/completions"),
        media_type="text/event-stream"
    )
Enter fullscreen mode Exit fullscreen mode

Frontend (JavaScript)

const eventSource = new EventSource('/review/stream', { method: 'POST', body: JSON.stringify({ code }) });
eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.token) {
        // Append to output div
        document.getElementById('output').innerHTML += data.token;
    }
};
eventSource.onerror = (err) => {
    console.error('Stream error', err);
    // Implement reconnection logic or fallback
};
Enter fullscreen mode Exit fullscreen mode

This works. The tokens appear in the UI as they are generated. No buffering delays, no memory blowups. But it’s not perfect.

Lessons Learned (The Hard Way)

  • Streaming is not a silver bullet. If your client needs to do heavy processing on each token (e.g., syntax highlighting or security scanning), you’ll introduce latency. Consider batching tokens in the client and processing them in batches.
  • Connection reliability is a nightmare. SSE connections drop, especially on flaky networks. You need retry logic on the client and a graceful way to resume (e.g., sending a request with a last_token ID).
  • Concurrent stream management. With many streams, your async server must handle many open connections. FastAPI + uvicorn with a proper worker model (e.g., uvloop) is essential. I also added a semaphore to limit concurrent AI API calls to avoid rate limiting.
  • Error propagation is tricky. If the AI API returns an error mid-stream, you have to decide whether to ignore it, retry, or abort the whole stream. I chose to send a special error token and let the client decide.

When NOT to Use This Approach

  • If you don’t need real-time feedback, just use a standard request-response. Simpler, fewer moving parts.
  • If your AI model is very fast (under 2 seconds), the overhead of streaming (SSE headers, parsing) may not be worth it.
  • If you must support browsers that don’t handle SSE well (looking at you, old IE), fall back to polling or WebSockets.

What I’d Do Differently Next Time

  1. Start with a non‑streaming prototype. Get the logic right, then optimize. I over‑engineered from day one.
  2. Use an event‑driven architecture. Instead of SSE directly from my app server, I’d publish tokens to a Redis pub/sub and have a separate worker write to the SSE connection. That decouples scaling.
  3. Test with real network conditions. I only tested locally on localhost, which masked latency and drop issues. Use tools like tc to simulate packet loss.

Streaming AI responses is satisfying when it works, but it’s a rabbit hole of concurrency and reliability. I’m still tweaking my implementation. And you? What’s your approach to handling partial AI responses in production?

Top comments (0)