DEV Community

Moon Robert
Moon Robert

Posted on • Originally published at blog.rebalai.com

Construyendo Pipelines de IA en Producción: Lecciones de 10K+ Generaciones

El viernes a las 3pm del segundo sprint que tuvimos con IA corriendo en producción, empujé un "cambio menor" al pipeline de generación de contenido. Sin suficientes tests de carga. Sin entender bien qué pasaba cuando el proveedor respondía con un 529.

A las 11pm teníamos 400 jobs atascados en cola, el equipo de soporte recibiendo tickets, y yo con tres terminales abiertas tratando de entender por qué nuestro sistema de reintentos estaba atacando la API como si quisiera tumbársela.

Eso fue hace ocho meses. Desde entonces hemos procesado más de 10,000 generaciones en producción para un producto B2B que analiza documentos legales. Somos cuatro backend developers. No tenemos un equipo de ML. Lo que aprendimos en ese proceso es lo que quiero compartir aquí.

El modelo no era el cuello de botella (yo creí que sí)

Cuando empezamos, dedicamos demasiado tiempo eligiendo modelos, afinando temperatura, discutiendo prompts. Esas cosas importan, pero el cuello de botella real en producción era la infraestructura alrededor del modelo, no el modelo en sí.

El primer problema serio fue la cola de jobs. Usamos Celery con Redis. El worker no era el culpable — el problema era que cuando un job tardaba 90 segundos (timeout del proveedor), el worker quedaba bloqueado y no procesaba nada más. Pasamos a workers con asyncio y el throughput casi se triplicó. Un cambio de arquitectura de dos días que debimos hacer desde el principio.

Después vino la falta de idempotencia. Un job podía ejecutarse dos veces si el ACK llegaba tarde al broker. Resultado: doble costo, datos duplicados en base de datos, lógica de deduplicación que tuvimos que agregar a posteriori. Solucionable, pero tardamos más de lo que debería haber tomado reconocer el problema.

Y los logs. Durante el primer mes teníamos print("Error") en varios lugares del código. Muy profesional. Cuando algo fallaba a las 2am era imposible reconstruir qué había pasado.

La conclusión que tardamos demasiado en sacar: antes de optimizar el prompt, verifica que tu infraestructura pueda manejar fallos del proveedor sin hacer colapsar todo lo demás. La arquitectura que terminamos usando — FastAPI como gateway, Celery async, PostgreSQL para estado de jobs, Redis para cola y rate limiting — no tiene nada de especial. Pero funciona, y sobrevive a errores transitorios sin drama.

Gestión de costos: el susto del primer billing statement

El primer mes de producción real llegó la factura: $340 USD. Para un proyecto interno en fase de validación, eso era mucho. Y lo peor era que no teníamos idea de dónde venía exactamente ese gasto.

Después de dos días revisando logs (que ya habíamos mejorado un poco para entonces), descubrimos que el 60% del costo venía de reintentos fallidos que mandaban el prompt completo cada vez, incluyendo contexto que no necesitaba replicarse. Otro 15% venía de jobs de prueba que yo mismo había corrido durante el desarrollo y que olvidé desactivar en staging.

Me dio vergüenza. Pero bueno, así se aprende.

Lo que implementamos para tener visibilidad real desde esa semana:

from dataclasses import dataclass, field
from typing import Dict

# Precios en USD por millón de tokens — revisa la página de pricing periódicamente
COSTO_POR_MTOKEN: Dict[str, Dict[str, float]] = {
    "claude-sonnet-4-6":        {"input": 3.00, "output": 15.00},
    "claude-haiku-4-5-20251001": {"input": 0.80, "output":  4.00},
    "gpt-4o":                    {"input": 2.50, "output": 10.00},
}

@dataclass
class TrackerCosto:
    modelo: str
    tokens_entrada: int = 0
    tokens_salida: int = 0
    costo_usd: float = 0.0

    def registrar(self, usage) -> float:
        """
        Recibe el objeto 'usage' de la respuesta de la API.
        Retorna el costo incremental de esta sola llamada.
        """
        precios = COSTO_POR_MTOKEN.get(self.modelo, {"input": 0, "output": 0})
        costo = (
            usage.input_tokens  * precios["input"]  / 1_000_000 +
            usage.output_tokens * precios["output"] / 1_000_000
        )
        self.tokens_entrada += usage.input_tokens
        self.tokens_salida  += usage.output_tokens
        self.costo_usd      += costo
        return costo

    def resumen(self) -> dict:
        return {
            "modelo":        self.modelo,
            "tokens_entrada": self.tokens_entrada,
            "tokens_salida":  self.tokens_salida,
            "costo_usd":     round(self.costo_usd, 6),
        }
Enter fullscreen mode Exit fullscreen mode

Cada worker instancia un tracker, registra cada llamada, y al terminar el job persiste el resumen en PostgreSQL junto al resultado. Ahora tenemos dashboards por tipo de documento, por usuario, por semana. El costo promedio por generación con Sonnet nos quedó en $0.0031 USD. Con Haiku bajamos a $0.0008 para los pasos donde la calidad no necesita ser máxima — clasificación del tipo de documento, por ejemplo.

Un gotcha que no anticipé: los tokens de entrada crecen rápido si tienes un system prompt largo. Teníamos uno de 800 palabras que se mandaba en cada llamada. Lo comprimimos a 340 palabras sin perder calidad apreciable, y eso redujo el costo de entrada un 12%. No enorme, pero tampoco despreciable a escala.

Reintentos y timeouts: por qué el viernes fue un desastre

El problema con reintentos ingenuos — y este fue exactamente mi error de aquel viernes — es que cuando el proveedor está bajo carga, un sistema que reintenta agresivamente puede empeorar la situación para todos. Literalmente contribuyes al problema que intentas resolver.

Hay dos categorías de error que se deben tratar distinto: los transitorios (timeouts, 429, 529, 503) y los definitivos (400, 401, 403). Para los primeros, backoff exponencial con jitter. Para los segundos, ni lo intentes.

import asyncio
import time
import logging
from anthropic import Anthropic, APIStatusError, APITimeoutError, APIConnectionError

logger = logging.getLogger(__name__)
cliente = Anthropic()

async def generar_con_reintento(
    prompt: str,
    modelo: str = "claude-sonnet-4-6",
    max_intentos: int = 3,
    timeout_seg: float = 30.0,
) -> str:
    """
    Wrapper de producción. Notas importantes:
    - timeout_seg=30 viene de medir el p95 real durante 2 semanas (era ~18s).
      Empezamos con 60s y los workers lentos bloqueaban todo el pipeline.
    - El jitter en backoff es no-negociable: sin él, múltiples workers
      reintentan exactamente al mismo tiempo y la cosa empeora.
    - Solo reintentamos 5xx y errores de red. Los 4xx (salvo 429) son bugs.
    """
    for intento in range(max_intentos):
        try:
            respuesta = await asyncio.wait_for(
                asyncio.to_thread(
                    cliente.messages.create,
                    model=modelo,
                    max_tokens=1024,
                    messages=[{"role": "user", "content": prompt}],
                ),
                timeout=timeout_seg,
            )
            return respuesta.content[0].text

        except asyncio.TimeoutError:
            # asyncio.wait_for lanza esto — diferente a APITimeoutError
            logger.warning("Timeout local en intento %d/%d", intento + 1, max_intentos)
            if intento == max_intentos - 1:
                raise

        except APITimeoutError:
            # La librería lo lanza por sus propios mecanismos internos
            logger.warning("APITimeout en intento %d/%d", intento + 1, max_intentos)
            if intento == max_intentos - 1:
                raise

        except APIStatusError as e:
            if e.status_code in (429, 529):
                espera = min(60, 10 * (2 ** intento))
                logger.info("Rate limit %d, esperando %.1fs", e.status_code, espera)
                await asyncio.sleep(espera)
                continue
            elif e.status_code >= 500:
                pass  # Backoff normal abajo
            else:
                raise  # 400, 401, 403: no reintentar, es un bug nuestro

        except APIConnectionError:
            logger.warning("Error de conexión en intento %d/%d", intento + 1, max_intentos)

        if intento < max_intentos - 1:
            # Jitter: fracción de segundo aleatoria basada en monotonic clock
            espera = (2 ** intento) + (time.monotonic() % 1.0)
            await asyncio.sleep(espera)

    raise RuntimeError(f"Fallaron todos los {max_intentos} intentos para modelo={modelo}")
Enter fullscreen mode Exit fullscreen mode

Lo que no esperaba — y esto me costó una hora de debugging ese viernes — es que asyncio.TimeoutError y APITimeoutError son dos excepciones distintas que hay que capturar por separado. El primero lo lanza asyncio.wait_for cuando se agota el timeout local. El segundo lo lanza la librería de Anthropic si usa sus propios mecanismos internos de timeout. Si solo capturas uno, el otro se te escapa silenciosamente hacia arriba en el stack y termina matando el worker.

El timeout de 30 segundos tampoco es arbitrario. Medimos el p95 de latencia durante dos semanas con histogramas de Prometheus, y estaba en 18 segundos. El p99, que es donde la cosa se pone interesante, llegaba a 45 segundos en momentos de alta carga del proveedor. Para análisis de documentos legales — no tiempo real — el p99 alto es aceptable. Para un chatbot interactivo, definitivamente no.

Observabilidad: ver para creer

Durante los primeros dos meses teníamos observabilidad terrible. Sabíamos si un job terminaba o fallaba, pero no por qué fallaba, cuánto tardaba en cada paso, ni cuál era la distribución real de latencias.

El cambio más importante fue el structured logging. Cada evento del pipeline emite un JSON con campos fijos: job_id, modelo, paso, duracion_ms, tokens_entrada, tokens_salida, costo_usd, y error si aplica. Eso nos permite hacer queries en CloudWatch Logs Insights sin necesitar un sistema de observabilidad más caro. Cada job tiene un ID único que viaja por todo el pipeline — cuando algo falla, busco ese ID y veo exactamente qué pasó en orden.

Agregamos también una alerta que se dispara cuando el costo acumulado del día supera el 120% del promedio de los últimos siete días. Parece simple. Nos salvó dos veces de bugs que mandaban jobs duplicados sin que nadie se diera cuenta hasta el día siguiente.

Algo que me arrepiento de no haber hecho desde el inicio: integrar OpenTelemetry para trazas distribuidas. Ahora sería trabajo retrofitear. Si ya tienes Jaeger o Honeycomb en tu stack, hazlo desde el día uno. No es mucho trabajo en ese momento, y es muchísimo trabajo después.

Sin rodeos: lo que haría si empezara mañana

Tracking de costos desde el día uno. No el día que llegue la primera factura sorpresa — desde el principio. Es una hora de trabajo que se paga sola en la primera semana. Ir a ciegas en lo que gastas lleva directo a una conversación incómoda con quien paga las facturas — y, por experiencia, esa conversación es mucho menos divertida que un incidente de producción a las 11pm.

Después, modelos más baratos donde la calidad no sea crítica. El 40% de nuestro pipeline corre en Haiku sin pérdida apreciable en el resultado final, lo que reduce el costo total un 30%. Vale la pena medir cuáles pasos realmente necesitan el modelo caro.

Para el manejo de errores: trata los fallos del proveedor como parte del diseño normal, no como casos extremos. Backoff con jitter, distinción clara entre errores transitorios y definitivos, timeouts basados en métricas reales — no en corazonadas. Ese viernes me enseñó que un sistema que reintenta sin considerar el impacto colectivo es peor que uno que simplemente falla limpio.

Y lo que más contradice el instinto inicial: no toques los prompts hasta tener métricas. Pasamos semanas ajustándolos a mano antes de tener evaluación automatizada. No sabíamos si los cambios ayudaban. Mide primero, optimiza después.

No tengo certeza de que todo esto escale sin ajustes más allá de unas decenas de miles de generaciones por día — no hemos llegado a ese volumen todavía. Pero los principios se mantienen: observabilidad desde el inicio, manejo explícito de errores, control de costos por llamada. Lo demás es afinar.

Top comments (0)