DEV Community

zhongqiyue
zhongqiyue

Posted on

How I Fixed My AI Chatbot's Timeout Nightmare

I spent three weeks debugging an AI chatbot that kept timing out. It wasn't the API itself—it was how I was calling it. Here's what I learned.

The Problem

Last quarter, I was building a customer support chatbot for a SaaS product. The idea was simple: users ask questions, an AI model returns natural language answers. We picked an AI API that seemed solid—decent latency, good accuracy. But in production, everything fell apart.

Users would type a question, wait... and wait... then get a 504 Gateway Timeout. Our logs showed that about 15% of requests were failing because the API response took longer than our 30-second timeout. Even when it worked, the answer arrived in one big chunk after 10-20 seconds. Users started leaving the chat mid-response.

This wasn't a theoretical problem. It was happening to real people, and my boss was not happy.

What I Tried That Didn't Work

My first instinct was to crank up the timeout. I set it to 60 seconds. That just meant failures took longer. Users hated it more.

Next, I tried synchronous retries with exponential backoff. That made things worse: if the first attempt timed out, the retry also often timed out, and the whole request could take minutes. Plus, our server couldn't handle the backlog of pending requests—it started queueing, and memory usage spiked.

I considered switching to a different model, but our product was already tied to this API's unique fine-tuning. We were stuck.

I even tried polling: send the request, get a task ID, poll every second for the result. But the API didn't support async tasks—it required a single open connection.

At this point, I was ready to roll back to a simple FAQ lookup. Then I remembered a colleague mentioning "streaming" at a meetup. I hadn't paid attention, but now it sounded like a lifeline.

What Eventually Worked: Streaming + Smart Retry

The breakthrough came when I realized the API supported streaming responses—the model could send back partial tokens as it generated them. Instead of waiting for the full answer, I could start displaying text to the user immediately. This solved two problems:

  • Perceived latency dropped to near zero. The first token arrived within 200ms, even if the full response took 15 seconds.
  • Timeouts became manageable. If the stream stopped unexpectedly, I could reconnect without losing the partial response.

But streaming alone wasn't enough. The connection would sometimes drop mid-stream. I needed a robust retry mechanism that could resume from the last received token.

Here's the approach I settled on:

  1. Open a streaming connection using aiohttp in Python.
  2. Read chunks as they arrive and feed them to the user in real time.
  3. Track the last token index (or character count).
  4. If the connection drops, set a retry delay (100ms initially, doubling up to 5s).
  5. Reconnect and send a 'resume' parameter (if the API supports it) to continue from where we left off.
  6. Cap total attempts at 3.

Not all APIs support resumption, but many do. If not, you can just restart the request—the user already saw some text, so the experience is still better than a timeout.

Code Walkthrough

Here's a simplified version of what I wrote. It's async Python using aiohttp and asyncio.

import asyncio
import aiohttp
from typing import AsyncIterator

class AIStreamClient:
    def __init__(self, api_url: str, api_key: str):
        self.api_url = api_url
        self.api_key = api_key
        self.session = aiohttp.ClientSession()

    async def stream_completion(self, prompt: str) -> AsyncIterator[str]:
        """Stream tokens from the AI API with retry logic."""
        max_retries = 3
        base_delay = 0.1  # 100ms
        last_position = 0

        for attempt in range(max_retries):
            try:
                headers = {
                    "Authorization": f"Bearer {self.api_key}",
                    "Accept": "text/event-stream",
                }
                payload = {
                    "prompt": prompt,
                    "stream": True,
                    "resume_from": last_position  # if supported
                }

                async with self.session.post(
                    self.api_url,
                    json=payload,
                    headers=headers,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    async for chunk in response.content:
                        if chunk:
                            text = chunk.decode("utf-8")
                            # Assume each chunk is a JSON with "token" and "position"
                            # In reality, you'd parse SSE format
                            data = json.loads(text)
                            token = data.get("token", "")
                            position = data.get("position", last_position)
                            if position > last_position:
                                yield token
                                last_position = position

            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                print(f"Stream error on attempt {attempt+1}: {e}")
                if attempt == max_retries - 1:
                    raise
                delay = base_delay * (2 ** attempt)
                await asyncio.sleep(min(delay, 5))

    async def close(self):
        await self.session.close()
Enter fullscreen mode Exit fullscreen mode

How to use it:

async def main():
    client = AIStreamClient(
        api_url="https://ai.interwestinfo.com/v1/completions",  # example API
        api_key="sk-..."
    )
    async for token in client.stream_completion("Explain quantum computing"):
        print(token, end="", flush=True)
    await client.close()

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This is a proof-of-concept. In production, you'd handle partial tokens more carefully, parse SSE properly, and add backpressure if the user is typing new input while streaming.

Trade-offs & Limitations

Streaming isn't a silver bullet. Here's what I discovered:

  • API dependency: Not every AI API supports streaming or resumption. Check the docs first. If they don't, you're stuck with polling or falling back to non-streaming.
  • Complexity: The retry logic, state tracking, and chunk parsing add significant code. For simple use cases, a straightforward timeout bump might be enough.
  • Resource usage: Keeping a persistent session and managing concurrent streams can hit memory limits. Use connection pooling wisely.
  • User experience: If you display tokens as they arrive, you need to handle backspace, edits, or interruptions gracefully. Also, streaming can make it obvious the model is "thinking"—some users find that cool, others annoying.

When NOT to use streaming:

  • If your API returns complete responses in under 2 seconds consistently.
  • If you're building offline batch processing.
  • If the AI model is on a local GPU with low latency.

What I'd Do Differently

Hindsight is 20/20. If I could start over:

  1. Read the API docs thoroughly before coding. I skimmed the streaming section and assumed it wasn't important.
  2. Add observability early. I had no metrics on response latency or failure rates until users complained. Now I log every chunk arrival time.
  3. Design for failure from day one. Every network call should handle partial failures, not just timeouts.
  4. Use a library instead of reinventing. There are great Python packages like httpx with built-in streaming and retries. I should have started there.

The Result

After deploying the streaming version, timeout errors dropped from 15% to less than 0.5%. User satisfaction scores went up, and I stopped getting paged at 2 AM. The code is now used across three microservices.

But I'm still paranoid. Every AI API is different, and production has a way of surprising you.

What's your setup look like? How do you handle unreliable AI responses? I'd love to hear what's worked (or failed) for you.

Top comments (0)