DEV Community

German Yamil
German Yamil

Posted on

Building a Retry System with Exponential Backoff for LLM API Calls in Python

LLM APIs fail. Rate limits hit at inconvenient times, 503s appear during high-traffic periods, and timeouts spike when the model is under load. If your pipeline doesn't have a retry strategy, a transient failure kills an hours-long job. This article builds a complete retry system: a generic backoff decorator, specific handling for 429 rate-limit responses, and a circuit breaker to stop retrying when a service is genuinely down.

Why LLM APIs Fail

Understanding the failure mode shapes your retry strategy:

  • 429 Too Many Requests: you've exceeded your rate limit. The Retry-After header tells you exactly how long to wait. Exponential backoff without reading this header wastes time or retries too early.
  • 503 Service Unavailable: the provider is overloaded. Retryable — back off and try again.
  • 500 Internal Server Error: server-side bug. Sometimes retryable (transient), sometimes not. Retry with a cap.
  • 408 Request Timeout: the request took too long. Retryable.
  • 400 Bad Request: your payload is malformed. Not retryable — retrying won't fix a broken prompt.
  • 401 Unauthorized / 403 Forbidden: auth issue. Not retryable — fix your API key first.
  • 422 Unprocessable Entity: your request is semantically invalid. Not retryable.

The rule: 4xx errors are generally your fault (except 429 and 408), 5xx errors are the server's fault.

Generic Retry Decorator with Exponential Backoff + Jitter

import time
import random
import logging
import functools
from typing import Callable, TypeVar, Any

log = logging.getLogger("pipeline.retry")

RETRYABLE_STATUS_CODES = {408, 429, 500, 502, 503, 504}

F = TypeVar("F", bound=Callable[..., Any])


def with_retry(
    max_attempts: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_factor: float = 2.0,
    jitter: bool = True,
) -> Callable[[F], F]:
    """
    Decorator that retries a function on retryable HTTP errors.
    Uses exponential backoff with optional full jitter.
    """
    def decorator(func: F) -> F:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 0
            while attempt < max_attempts:
                try:
                    return func(*args, **kwargs)

                except RetryableError as e:
                    attempt += 1
                    if attempt >= max_attempts:
                        log.error(f"Giving up after {max_attempts} attempts: {e}")
                        raise

                    delay = min(base_delay * (backoff_factor ** (attempt - 1)), max_delay)
                    if jitter:
                        # Full jitter: uniform random between 0 and delay
                        # Prevents thundering herd when many workers retry simultaneously
                        delay = random.uniform(0, delay)

                    if e.retry_after is not None:
                        delay = max(delay, e.retry_after)

                    log.warning(
                        f"Attempt {attempt}/{max_attempts} failed ({e}). "
                        f"Retrying in {delay:.1f}s"
                    )
                    time.sleep(delay)

                except NonRetryableError:
                    raise  # no point retrying

        return wrapper  # type: ignore
    return decorator


class RetryableError(Exception):
    def __init__(self, message: str, status_code: int, retry_after: float | None = None):
        super().__init__(message)
        self.status_code = status_code
        self.retry_after = retry_after


class NonRetryableError(Exception):
    def __init__(self, message: str, status_code: int):
        super().__init__(message)
        self.status_code = status_code
Enter fullscreen mode Exit fullscreen mode

Specific Handling for 429: Read the Retry-After Header

The Retry-After header is either a number of seconds or an HTTP date. Parse both:

from email.utils import parsedate_to_datetime
from datetime import datetime, timezone


def parse_retry_after(header_value: str | None) -> float | None:
    """
    Parse the Retry-After header.
    Returns seconds to wait, or None if the header is absent/unparseable.
    """
    if not header_value:
        return None

    # Try it as a plain number of seconds first
    try:
        return float(header_value)
    except ValueError:
        pass

    # Try it as an HTTP date
    try:
        retry_at = parsedate_to_datetime(header_value)
        now = datetime.now(tz=timezone.utc)
        seconds = (retry_at - now).total_seconds()
        return max(0.0, seconds)
    except Exception:
        return None


def classify_http_error(status_code: int, headers: dict, body: str) -> Exception:
    """Convert an HTTP error response into the appropriate exception type."""
    if status_code in RETRYABLE_STATUS_CODES:
        retry_after = None
        if status_code == 429:
            retry_after = parse_retry_after(headers.get("Retry-After"))
        return RetryableError(
            f"HTTP {status_code}: {body[:200]}",
            status_code=status_code,
            retry_after=retry_after,
        )
    else:
        return NonRetryableError(
            f"HTTP {status_code}: {body[:200]}",
            status_code=status_code,
        )
Enter fullscreen mode Exit fullscreen mode

Circuit Breaker: Stop Retrying After N Consecutive Failures

A circuit breaker prevents your pipeline from hammering an API that's genuinely down. After N consecutive failures it "opens" and fast-fails all calls. After a cooldown period it enters "half-open" state and allows one test call through.

import threading
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"        # normal operation
    OPEN = "open"            # failing fast
    HALF_OPEN = "half_open"  # testing recovery


class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time: float | None = None
        self._lock = threading.Lock()

    @property
    def state(self) -> CircuitState:
        with self._lock:
            if self._state == CircuitState.OPEN:
                if (
                    self._last_failure_time is not None
                    and time.monotonic() - self._last_failure_time >= self.recovery_timeout
                ):
                    self._state = CircuitState.HALF_OPEN
                    log.info("Circuit breaker entering HALF_OPEN — testing recovery")
            return self._state

    def record_success(self):
        with self._lock:
            self._failure_count = 0
            self._state = CircuitState.CLOSED

    def record_failure(self):
        with self._lock:
            self._failure_count += 1
            self._last_failure_time = time.monotonic()
            if self._failure_count >= self.failure_threshold:
                if self._state != CircuitState.OPEN:
                    log.error(
                        f"Circuit breaker OPEN after {self._failure_count} consecutive failures"
                    )
                self._state = CircuitState.OPEN

    def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            raise RuntimeError("Circuit breaker is OPEN — not attempting call")
        try:
            result = func(*args, **kwargs)
            self.record_success()
            return result
        except Exception:
            self.record_failure()
            raise
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

import requests

circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60.0)


@with_retry(max_attempts=5, base_delay=1.0, max_delay=60.0)
def call_llm_api(prompt: str, model: str, api_key: str) -> str:
    """Make a retryable LLM API call wrapped in a circuit breaker."""
    def _call():
        response = requests.post(
            "https://api.anthropic.com/v1/messages",
            headers={
                "x-api-key": api_key,
                "anthropic-version": "2023-06-01",
                "content-type": "application/json",
            },
            json={
                "model": model,
                "max_tokens": 4096,
                "messages": [{"role": "user", "content": prompt}],
            },
            timeout=120,
        )

        if not response.ok:
            raise classify_http_error(
                response.status_code,
                dict(response.headers),
                response.text,
            )

        return response.json()["content"][0]["text"]

    return circuit_breaker.call(_call)
Enter fullscreen mode Exit fullscreen mode

With this setup: transient 503s retry automatically with backoff, 429s wait the exact number of seconds the server requests, 400s fail immediately without wasting retries, and five consecutive failures open the circuit and surface a clear error to your pipeline rather than thrashing for minutes.

Full pipeline + source code: germy5.gumroad.com/l/xhxkzz — $19.99, 30-day refund.

Top comments (0)