When you chain language model calls into a pipeline, things break in unexpected ways. A step times out, the API returns a 429, or the model outputs malformed JSON that crashes your parser two steps later. Without deliberate retry logic, one bad response kills the whole run — and re-running from scratch wastes time and money. This article walks through building a multi-step LLM pipeline in Python with per-step retry, exponential backoff, and clean error propagation.
What a Multi-Step AI Pipeline Looks Like
A pipeline here means: take raw input → process through N language model calls, each depending on the previous → produce a final structured result. A realistic example:
- Extract entities from a user document
- Classify each entity by type and risk level
- Summarize the findings into a final report
Each step has its own prompt, its own expected output format, and its own failure modes. Step 2 failing should not force you to re-run step 1. You need per-step isolation, not a single try/except wrapped around the whole chain.
The Naive Approach and Why It Fails
Most tutorials wrap every call in a try/except and call it done:
def call_llm(prompt: str) -> str:
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"LLM call failed: {e}")
This silently swallows all failures in a single bucket. A transient 503 and a hard authentication error look identical. You cannot retry one without retrying the other, and you cannot tell which step caused the failure downstream.
The real failure mode: a 429 at step 3 after 45 seconds of processing means you either crash the whole run or retry from step 1. Neither is acceptable in production.
Per-Step Retry with Exponential Backoff
The right pattern gives each step its own retry budget. Here is a decorator that does it:
import time
import random
from functools import wraps
from typing import Callable, TypeVar
T = TypeVar("T")
def with_retry(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_exceptions: tuple = (TimeoutError, ConnectionError),
):
def decorator(fn: Callable[..., T]) -> Callable[..., T]:
@wraps(fn)
def wrapper(*args, **kwargs) -> T:
for attempt in range(max_attempts):
try:
return fn(*args, **kwargs)
except retryable_exceptions as e:
if attempt + 1 >= max_attempts:
raise
delay = min(
base_delay * (2 ** (attempt + 1)) + random.uniform(0, 1),
max_delay,
)
print(f"[retry] {fn.__name__} attempt {attempt+1}/{max_attempts} in {delay:.1f}s: {e}")
time.sleep(delay)
return wrapper
return decorator
The jitter (random.uniform(0, 1)) prevents thundering herd when multiple pipeline instances retry simultaneously after an outage. Without it, all instances wake up at the same moment and hit the API together.
Usage:
@with_retry(max_attempts=3, retryable_exceptions=(TimeoutError, ConnectionError))
def extract_entities(text: str) -> list[str]:
# your LLM call here, returns a parsed list
...
Tracking Pipeline State
Beyond per-step retry, you need to know where a pipeline failed and what each step produced. A simple state object makes this explicit:
from dataclasses import dataclass, field
from typing import Optional, Any
@dataclass
class StepResult:
name: str
success: bool
output: Optional[Any] = None
error: Optional[str] = None
attempts: int = 0
@dataclass
class PipelineState:
input: str
steps: list[StepResult] = field(default_factory=list)
def last_output(self) -> Any:
for step in reversed(self.steps):
if step.success:
return step.output
return self.input
def first_failure(self) -> Optional[StepResult]:
return next((s for s in self.steps if not s.success), None)
Then a run_step helper that wraps retry logic and records the outcome:
import httpx
RETRYABLE = (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError)
def run_step(state: PipelineState, name: str, fn, *args) -> PipelineState:
for attempt in range(3):
try:
result = fn(*args)
state.steps.append(StepResult(name, success=True, output=result, attempts=attempt + 1))
return state
except RETRYABLE as e:
if attempt == 2:
state.steps.append(StepResult(name, success=False, error=str(e), attempts=3))
return state
time.sleep(min(1.0 * (2 ** (attempt + 1)), 30))
except Exception as e:
# Non-retryable: parse failures, auth errors, validation issues
state.steps.append(StepResult(name, success=False, error=str(e), attempts=attempt + 1))
return state
return state
Validation errors like json.JSONDecodeError fall through to the non-retryable branch. Retrying a step that produced malformed output with the same prompt rarely fixes anything — you need a different prompt or a fallback parser.
Wiring the Full Pipeline
def run_pipeline(raw_input: str) -> str:
state = PipelineState(input=raw_input)
state = run_step(state, "extract", extract_entities, raw_input)
if state.first_failure():
raise RuntimeError(f"Pipeline aborted at 'extract': {state.first_failure().error}")
state = run_step(state, "classify", classify_entities, state.last_output())
if state.first_failure():
raise RuntimeError(f"Pipeline aborted at 'classify': {state.first_failure().error}")
state = run_step(state, "summarize", summarize, state.last_output())
if state.first_failure():
raise RuntimeError(f"Pipeline aborted at 'summarize': {state.first_failure().error}")
return state.last_output()
Each step failure short-circuits with a precise message: which step failed, what error was raised, and how many attempts were made before giving up.
Handling Rate Limits Correctly
LLM APIs return HTTP 429 with a Retry-After header when you hit quota. Ignoring this header and using fixed backoff can get your account throttled further. Read it:
import httpx
def call_with_rate_limit_awareness(prompt: str, client) -> str:
for attempt in range(4):
try:
return client.complete(prompt)
except httpx.HTTPStatusError as e:
status = e.response.status_code
if status == 429:
wait = int(e.response.headers.get("retry-after", 5))
print(f"Rate limited. Respecting retry-after: {wait}s")
time.sleep(wait)
elif status >= 500:
time.sleep(2 ** attempt)
else:
raise # 4xx that is not 429: request is broken, do not retry
raise RuntimeError("Exceeded retry budget")
The key distinction: 4xx errors (except 429) mean your request is malformed or unauthorized. Retrying will not fix them. 5xx and 429 are transient — retry with respect for server-side signals.
The Takeaway
Multi-step LLM pipelines fail in predictable, manageable ways. The pattern here — per-step retry with jittered backoff, explicit state tracking, and differentiated error handling — covers the common failure modes without overengineering.
What matters most:
- Retry transient errors only — do not retry parse failures or auth errors
- Track state per step — know exactly where and why a pipeline failed
-
Honor
Retry-Afterheaders — ignoring them makes throttling worse
In production, add structured logging per step and consider persisting PipelineState to a database for runs longer than a few seconds. This lets you resume mid-run after a crash instead of starting over from step 1.
If you are hardening the security layer around your LLM stack, our free security hardening checklists cover API key rotation, rate limit handling, and output validation patterns for production deployments.
I run AYI NEDJIMI Consultants, a cybersecurity consulting firm. We publish free security hardening checklists — PDF and Excel.
Top comments (0)