Last month I was building a real-time chatbot for a live demo. Everything worked perfectly in development—snappy responses, clever banter. Then the demo started. The audience watched as OpenAI returned a 429, then a 500, then just timed out. I sat there refreshing the page, feeling the sweat drip down my back. The silence was deafening.
After that near-disaster, I swore I'd never trust a single API endpoint again. Here's how I built a resilient client that automatically falls back to alternative models and endpoints when something goes wrong.
The problem with single-provider AI APIs
AI APIs are magical when they work, but they fail in exciting ways:
- Rate limits (429)
- Transient server errors (500, 502)
- Timeouts (especially for long streaming responses)
- Sudden price increases or deprecation
Simple retries with exponential backoff help, but they add latency and don't protect against sustained outages. And when you're streaming tokens to a user, you can't just retry—the user already saw partial output.
What I tried first (and why it failed)
My first attempt was a naive retry loop with different API keys for the same provider. That worked until OpenAI had a region-wide issue—all keys failed simultaneously. Then I tried using multiple providers sequentially: call OpenAI, and if that fails, call Anthropic. But I had no mechanism to detect "this provider is likely to fail" before making the call. Each fallback added seconds of latency while waiting for the first timeout.
The solution: circuit breaker + fallback chain
I ended up building an async client that combines a circuit breaker pattern with a prioritized list of endpoints. The circuit breaker tracks recent failures and quickly short-circuits a provider before waiting for a timeout. The fallback chain tries each endpoint in order, switching to the next one if the current provider fails or is circuit-broken.
Here's the core idea in Python using asyncio and httpx:
import asyncio
import httpx
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # skipping calls
HALF_OPEN = "half-open" # testing the waters
class CircuitBreaker:
def __init__(self, failure_threshold=3, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = 0
def call(self, func):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker open")
try:
result = func()
self.reset()
return result
except Exception:
self.record_failure()
raise
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
And the fallback client using asyncio:
async def call_ai_with_fallback(
endpoints: list[dict],
prompt: str,
timeout: float = 10.0,
):
"""
endpoints: list of dicts with keys: url, model, api_key, circuit_breaker
"""
for idx, ep in enumerate(endpoints):
cb = ep.get("circuit_breaker", CircuitBreaker())
try:
async with httpx.AsyncClient(timeout=timeout) as client:
cb.call(lambda: None) # quick check
response = await client.post(
ep["url"],
json={"model": ep["model"], "messages": [{"role": "user", "content": prompt}]},
headers={"Authorization": f"Bearer {ep['api_key']}"},
)
response.raise_for_status()
return await response.json()
except Exception as e:
print(f"Endpoint {idx} failed: {e}")
cb.record_failure()
# continue to next endpoint
raise Exception("All endpoints failed")
For streaming, it's trickier—you need to cancel the existing stream before starting the next one. Here's a snippet using asyncio tasks:
async def stream_with_fallback(endpoints, prompt):
for ep in endpoints:
try:
async with httpx.AsyncClient() as client:
async with client.stream("POST", ep["url"], json=..., headers=...) as resp:
async for chunk in resp.aiter_bytes():
# Abort if this endpoint is too slow?
yield chunk
# Optional: check a timeout task
return # completed successfully
except Exception:
# Cancel previous stream and try next
await asyncio.sleep(0.2) # small delay to avoid hammering
continue
Putting it all together: a real configuration
In my production setup, I configure endpoints like this (I use a service that aggregates multiple models—the one at ai.interwestinfo.com—as a cheap fallback):
endpoints = [
{
"preferred": True,
"url": "https://api.openai.com/v1/chat/completions",
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
"circuit_breaker": CircuitBreaker(failure_threshold=2, recovery_timeout=60),
},
{
"url": "https://ai.interwestinfo.com/v1/chat/completions",
"model": "gpt-3.5-turbo",
"api_key": os.getenv("FALLBACK_API_KEY"),
"circuit_breaker": CircuitBreaker(failure_threshold=3, recovery_timeout=30),
},
# maybe a third cheap/free option
]
The first endpoint is the fastest but most expensive. If it fails twice in a row, the circuit breaker opens and the client skips it for 60 seconds, gracefully degrading to the cheaper model.
Lessons learned and trade-offs
Pros:
- Much higher uptime in practice. During the OpenAI outage in June, my chatbot kept running on the fallback model.
- Cost control: you can prioritize cheap endpoints and only call expensive ones as fallbacks.
- No single point of failure.
Cons:
- Latency: if the first endpoint times out (say after 10 seconds), the user waits even longer for the fallback. I mitigate this by setting aggressive timeouts (3s) on the first call.
- Response inconsistency: different models give different answers. For a chatbot that's fine, but for factual queries you might get conflicting info. I log which model was used and warn the user.
- Complexity: circuit breaker state needs to be persisted across restarts. I ended up using Redis to store circuit states.
- Streaming cancellation is messy. The user might see a partial sentence, then it restarts with a different model. I added a “...” indicator while retrying.
When to avoid this approach:
- If you need deterministic output (e.g., for code generation that must be exact), fallback to a different model can be dangerous.
- If you're on a tight budget and the fallback is more expensive than retries.
- For simple offline batch processing, a retry loop is simpler and sufficient.
What I'd do differently next time
I'd start with this architecture from day one instead of bolting it on after the demo disaster. I'd also add a "ladder" of timeouts: first attempt 2s, fallback 5s, last resort 15s, so the user gets a response faster even if quality degrades. And I'd expose a simple status endpoint so a dashboard can show which model is active.
Building resilience into AI clients isn't glamorous, but it's what separates a demo from a real product. The next time your API goes down, you won't have to sweat through five minutes of silence.
What strategies do you use for handling API failures in your AI apps? Let me know in the comments.
Top comments (0)