It is 2am. Anthropic is having an incident. Your agent is failing every call with 503s. You have OpenAI credentials. You could fail over manually but the incident lasts 40 minutes and you are asleep.
Or a subtler problem: you are rate limited on Anthropic. A burst of requests exhausted your RPM quota. OpenAI has spare capacity. You want requests to automatically route to the next available provider.
llm-fallback-chain is an ordered provider failover chain: try provider 1, on failure try provider 2, then provider 3.
The Shape of the Fix
from llm_fallback_chain import FallbackChain, Provider
chain = FallbackChain([
Provider("anthropic", fn=anthropic_client.messages.create, model="claude-sonnet-4-6"),
Provider("openai", fn=openai_client.chat.completions.create, model="gpt-4o"),
Provider("google", fn=google_client.generate_content, model="gemini-2.0-flash"),
])
result, trace = chain.call(
messages=[{"role": "user", "content": "Summarize this document..."}],
max_tokens=1024,
)
print(f"Responded from: {trace.successful_provider}")
for attempt in trace.attempts:
print(f" {attempt.provider}: {attempt.error or 'success'} ({attempt.duration_ms:.0f}ms)")
If Anthropic returns a 503, the chain tries OpenAI. If OpenAI is also down, it tries Google. The trace records every attempt for logging.
What It Does NOT Do
llm-fallback-chain does not normalize request or response formats across providers. Anthropic's messages API, OpenAI's chat.completions API, and Google's generate_content API have different schemas. You are responsible for adapting the request format and response parsing for each provider.
It does not handle partial failures. If Anthropic returns a partial response (streaming cut off mid-way), the chain does not detect this as a failure. You need to detect partial responses in your own code and handle them before falling back.
It does not maintain state across failover. If Anthropic handles the first message in a multi-turn conversation and then fails, the fallback to OpenAI does not have Anthropic's internal state (it does not need it for stateless API calls, but the context is preserved only through the messages you pass).
Inside the Library
The chain iterates providers in order, retrying on specific error types:
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}
class FallbackChain:
def __init__(self, providers: list[Provider], retryable_exceptions: tuple | None = None):
self._providers = providers
self._retryable = retryable_exceptions or (Exception,)
def call(self, **kwargs) -> tuple[Any, CallTrace]:
trace = CallTrace(attempts=[])
last_error = None
for provider in self._providers:
start = time.monotonic()
try:
result = provider.fn(model=provider.model, **kwargs)
duration_ms = (time.monotonic() - start) * 1000
trace.attempts.append(Attempt(
provider=provider.name,
duration_ms=duration_ms,
error=None,
))
trace.successful_provider = provider.name
return result, trace
except Exception as e:
duration_ms = (time.monotonic() - start) * 1000
trace.attempts.append(Attempt(
provider=provider.name,
duration_ms=duration_ms,
error=str(e),
))
last_error = e
# Check if this error is retryable
if not isinstance(e, self._retryable):
raise # Non-retryable: don't try next provider
raise AllProvidersExhausted(
attempts=trace.attempts,
last_error=last_error,
)
async def async_call(self, **kwargs) -> tuple[Any, CallTrace]:
trace = CallTrace(attempts=[])
last_error = None
for provider in self._providers:
start = time.monotonic()
try:
result = await provider.fn(model=provider.model, **kwargs)
trace.attempts.append(Attempt(provider=provider.name, duration_ms=(time.monotonic()-start)*1000, error=None))
trace.successful_provider = provider.name
return result, trace
except Exception as e:
trace.attempts.append(Attempt(provider=provider.name, duration_ms=(time.monotonic()-start)*1000, error=str(e)))
last_error = e
raise AllProvidersExhausted(attempts=trace.attempts, last_error=last_error)
When to Use It
Use it for any production agent where availability matters. Single-provider agents are down when the provider is down. A two-provider fallback chain is up unless both providers are simultaneously down — which is extremely rare.
Use it for rate limit failover. Set the first provider to your primary (Anthropic), the second to your secondary (OpenAI). When Anthropic's rate limit is hit, requests automatically continue on OpenAI.
Use it with the llm-circuit-breaker-py per-provider. When a provider accumulates too many failures, the circuit breaker opens. The chain then skips the open circuit and goes directly to the next provider without waiting for a timeout.
Skip it for cost-sensitive paths where you need predictable billing. Failover means a more expensive provider might handle requests when the primary is rate limited. If you need cost predictability, add explicit cost controls before falling back.
Install
pip install git+https://github.com/MukundaKatta/llm-fallback-chain
# Or from PyPI
pip install llm-fallback-chain
from llm_fallback_chain import FallbackChain, Provider
from llm_circuit_breaker_py import CircuitBreaker
# Per-provider circuit breakers
anthropic_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
openai_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
chain = FallbackChain([
Provider(
"anthropic",
fn=lambda **kw: anthropic_breaker.call(anthropic_client.messages.create, **kw),
model="claude-sonnet-4-6",
),
Provider(
"openai",
fn=lambda **kw: openai_breaker.call(openai_client.chat.completions.create, **kw),
model="gpt-4o",
),
])
async def resilient_call(messages: list[dict]) -> str:
result, trace = await chain.async_call(messages=messages, max_tokens=1024)
if trace.successful_provider != "anthropic":
logger.info("provider_failover", provider=trace.successful_provider)
return extract_text_from_result(result, provider=trace.successful_provider)
Sibling Libraries
| Library | What it solves |
|---|---|
llm-circuit-breaker-py |
Per-provider circuit breaker that opens on repeated failures |
llm-retry |
Exponential backoff retry before falling back |
llm-rate-limit-bucket |
Rate limiting to prevent triggering fallover |
agentsnap |
Track which provider handled each call and at what cost |
llm-pretty-error |
Normalize errors across providers for consistent handling |
The reliability stack: llm-rate-limit-bucket to prevent 429s, llm-retry for transient errors, llm-circuit-breaker-py per provider, llm-fallback-chain for cross-provider failover.
What's Next
Priority-based routing: add weight/priority to each provider. Route 80% to Anthropic, 20% to OpenAI for load distribution without requiring failure. Fall back to 100% if one fails.
Health checking: periodic background pings to each provider (1-token completion check) that update provider health status. Skip unhealthy providers before the first failure, rather than discovering they are down during a real request.
Cost-aware routing: route to the cheapest available provider for simple tasks. If a task is estimated to cost under $0.001, use the cheapest model. Route to the higher-quality model only for complex tasks.
Built as part of the agent-stack family: composable Python primitives for production LLM agents.
Top comments (0)