DEV Community

Ayi NEDJIMI
Ayi NEDJIMI

Posted on

Building a Multi-Step AI Pipeline with Automatic Retry Logic

When you chain language model calls into a pipeline, things break in unexpected ways. A step times out, the API returns a 429, or the model outputs malformed JSON that crashes your parser two steps later. Without deliberate retry logic, one bad response kills the whole run — and re-running from scratch wastes time and money. This article walks through building a multi-step LLM pipeline in Python with per-step retry, exponential backoff, and clean error propagation.

What a Multi-Step AI Pipeline Looks Like

A pipeline here means: take raw input → process through N language model calls, each depending on the previous → produce a final structured result. A realistic example:

  1. Extract entities from a user document
  2. Classify each entity by type and risk level
  3. Summarize the findings into a final report

Each step has its own prompt, its own expected output format, and its own failure modes. Step 2 failing should not force you to re-run step 1. You need per-step isolation, not a single try/except wrapped around the whole chain.

The Naive Approach and Why It Fails

Most tutorials wrap every call in a try/except and call it done:

def call_llm(prompt: str) -> str:
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        raise RuntimeError(f"LLM call failed: {e}")
Enter fullscreen mode Exit fullscreen mode

This silently swallows all failures in a single bucket. A transient 503 and a hard authentication error look identical. You cannot retry one without retrying the other, and you cannot tell which step caused the failure downstream.

The real failure mode: a 429 at step 3 after 45 seconds of processing means you either crash the whole run or retry from step 1. Neither is acceptable in production.

Per-Step Retry with Exponential Backoff

The right pattern gives each step its own retry budget. Here is a decorator that does it:

import time
import random
from functools import wraps
from typing import Callable, TypeVar

T = TypeVar("T")

def with_retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
    retryable_exceptions: tuple = (TimeoutError, ConnectionError),
):
    def decorator(fn: Callable[..., T]) -> Callable[..., T]:
        @wraps(fn)
        def wrapper(*args, **kwargs) -> T:
            for attempt in range(max_attempts):
                try:
                    return fn(*args, **kwargs)
                except retryable_exceptions as e:
                    if attempt + 1 >= max_attempts:
                        raise
                    delay = min(
                        base_delay * (2 ** (attempt + 1)) + random.uniform(0, 1),
                        max_delay,
                    )
                    print(f"[retry] {fn.__name__} attempt {attempt+1}/{max_attempts} in {delay:.1f}s: {e}")
                    time.sleep(delay)
        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

The jitter (random.uniform(0, 1)) prevents thundering herd when multiple pipeline instances retry simultaneously after an outage. Without it, all instances wake up at the same moment and hit the API together.

Usage:

@with_retry(max_attempts=3, retryable_exceptions=(TimeoutError, ConnectionError))
def extract_entities(text: str) -> list[str]:
    # your LLM call here, returns a parsed list
    ...
Enter fullscreen mode Exit fullscreen mode

Tracking Pipeline State

Beyond per-step retry, you need to know where a pipeline failed and what each step produced. A simple state object makes this explicit:

from dataclasses import dataclass, field
from typing import Optional, Any

@dataclass
class StepResult:
    name: str
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    attempts: int = 0

@dataclass
class PipelineState:
    input: str
    steps: list[StepResult] = field(default_factory=list)

    def last_output(self) -> Any:
        for step in reversed(self.steps):
            if step.success:
                return step.output
        return self.input

    def first_failure(self) -> Optional[StepResult]:
        return next((s for s in self.steps if not s.success), None)
Enter fullscreen mode Exit fullscreen mode

Then a run_step helper that wraps retry logic and records the outcome:

import httpx

RETRYABLE = (httpx.TimeoutException, httpx.ConnectError, httpx.RemoteProtocolError)

def run_step(state: PipelineState, name: str, fn, *args) -> PipelineState:
    for attempt in range(3):
        try:
            result = fn(*args)
            state.steps.append(StepResult(name, success=True, output=result, attempts=attempt + 1))
            return state
        except RETRYABLE as e:
            if attempt == 2:
                state.steps.append(StepResult(name, success=False, error=str(e), attempts=3))
                return state
            time.sleep(min(1.0 * (2 ** (attempt + 1)), 30))
        except Exception as e:
            # Non-retryable: parse failures, auth errors, validation issues
            state.steps.append(StepResult(name, success=False, error=str(e), attempts=attempt + 1))
            return state
    return state
Enter fullscreen mode Exit fullscreen mode

Validation errors like json.JSONDecodeError fall through to the non-retryable branch. Retrying a step that produced malformed output with the same prompt rarely fixes anything — you need a different prompt or a fallback parser.

Wiring the Full Pipeline

def run_pipeline(raw_input: str) -> str:
    state = PipelineState(input=raw_input)

    state = run_step(state, "extract", extract_entities, raw_input)
    if state.first_failure():
        raise RuntimeError(f"Pipeline aborted at 'extract': {state.first_failure().error}")

    state = run_step(state, "classify", classify_entities, state.last_output())
    if state.first_failure():
        raise RuntimeError(f"Pipeline aborted at 'classify': {state.first_failure().error}")

    state = run_step(state, "summarize", summarize, state.last_output())
    if state.first_failure():
        raise RuntimeError(f"Pipeline aborted at 'summarize': {state.first_failure().error}")

    return state.last_output()
Enter fullscreen mode Exit fullscreen mode

Each step failure short-circuits with a precise message: which step failed, what error was raised, and how many attempts were made before giving up.

Handling Rate Limits Correctly

LLM APIs return HTTP 429 with a Retry-After header when you hit quota. Ignoring this header and using fixed backoff can get your account throttled further. Read it:

import httpx

def call_with_rate_limit_awareness(prompt: str, client) -> str:
    for attempt in range(4):
        try:
            return client.complete(prompt)
        except httpx.HTTPStatusError as e:
            status = e.response.status_code
            if status == 429:
                wait = int(e.response.headers.get("retry-after", 5))
                print(f"Rate limited. Respecting retry-after: {wait}s")
                time.sleep(wait)
            elif status >= 500:
                time.sleep(2 ** attempt)
            else:
                raise  # 4xx that is not 429: request is broken, do not retry
    raise RuntimeError("Exceeded retry budget")
Enter fullscreen mode Exit fullscreen mode

The key distinction: 4xx errors (except 429) mean your request is malformed or unauthorized. Retrying will not fix them. 5xx and 429 are transient — retry with respect for server-side signals.

The Takeaway

Multi-step LLM pipelines fail in predictable, manageable ways. The pattern here — per-step retry with jittered backoff, explicit state tracking, and differentiated error handling — covers the common failure modes without overengineering.

What matters most:

  • Retry transient errors only — do not retry parse failures or auth errors
  • Track state per step — know exactly where and why a pipeline failed
  • Honor Retry-After headers — ignoring them makes throttling worse

In production, add structured logging per step and consider persisting PipelineState to a database for runs longer than a few seconds. This lets you resume mid-run after a crash instead of starting over from step 1.

If you are hardening the security layer around your LLM stack, our free security hardening checklists cover API key rotation, rate limit handling, and output validation patterns for production deployments.


I run AYI NEDJIMI Consultants, a cybersecurity consulting firm. We publish free security hardening checklists — PDF and Excel.

Top comments (0)