DEV Community

Cover image for Cómo construí un pipeline RAG de producción con FastAPI, pgvector y cross-encoder reranking
Martin Palopoli
Martin Palopoli

Posted on

Cómo construí un pipeline RAG de producción con FastAPI, pgvector y cross-encoder reranking

TL;DR

Construí un motor RAG de producción que combina búsqueda híbrida (pgvector + BM25), reranking con cross-encoder, diversidad MMR, cache semántico y detección automática de idioma — todo sobre FastAPI async y PostgreSQL. En este artículo comparto la arquitectura real, las decisiones técnicas, y el código clave.


Por qué otro artículo sobre RAG

La mayoría de los tutoriales RAG se quedan en "embed → cosine search → prompt". Eso funciona para una demo, pero en producción te vas a encontrar con:

  • Queries en español que matchean chunks en inglés (o viceversa)
  • Los 10 chunks más similares vienen del mismo documento
  • La búsqueda semántica falla con nombres propios o códigos exactos
  • Respuestas repetidas consumen tokens innecesariamente
  • No tenés forma de saber si la respuesta es confiable o no

Este artículo muestra cómo resolví cada uno de esos problemas en un sistema real multi-tenant.


El stack

Componente Tecnología
Backend Python 3.12 + FastAPI (100% async)
Base de datos PostgreSQL 16 + pgvector + tsvector
Embeddings paraphrase-multilingual-MiniLM-L12-v2 (384d)
Reranker cross-encoder/ms-marco-MiniLM-L-6-v2
LLM Groq (llama-3.3-70b-versatile)
Cache Redis + PostgreSQL (cache semántico)
Streaming Server-Sent Events (SSE)

Arquitectura del pipeline

El pipeline tiene 3 etapas fijas con filtros pre y post:

Query del usuario
     │
     ▼
┌─────────────┐
│  FAQ Match   │──→ Si score ≥ 0.75: respuesta instantánea (costo LLM = 0)
└─────┬───────┘
      │ No match
      ▼
┌─────────────┐
│ Cache Check  │──→ Si similaridad ≥ 0.95: respuesta cacheada
└─────┬───────┘
      │ Cache miss
      ▼
┌─────────────────────────────────────┐
│  ETAPA 1: Búsqueda Híbrida         │
│  pgvector (semántica) + BM25 (léxica)│
│  Fusión con Reciprocal Rank Fusion  │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│  ETAPA 2: Cross-Encoder Reranking   │
│  Re-evalúa relevancia real          │
│  Calcula confidence score           │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│  ETAPA 3: MMR (Diversidad)          │
│  Balancea relevancia vs variedad    │
│  Máx 3 chunks por documento         │
└─────────────┬───────────────────────┘
              │
              ▼
         LLM + SSE streaming
Enter fullscreen mode Exit fullscreen mode

Veamos cada parte en detalle.


Etapa 1: Búsqueda Híbrida (Vector + BM25)

El problema con solo búsqueda vectorial

La búsqueda por embeddings es excelente para capturar significado semántico, pero falla con:

  • Nombres propios: "¿Qué dijo Juan Pérez?"
  • Códigos o IDs: "Error HTTP 403"
  • Términos técnicos exactos que el modelo de embeddings no conoce bien

La solución: búsqueda híbrida

Ejecuto dos búsquedas en paralelo y fusiono los resultados:

async def search_chunks(self, query: str, kb_ids: list[int],
                        embedding: list[float], config: RetrievalConfig):
    # Búsqueda vectorial (pgvector HNSW, distancia coseno)
    vector_results = await self._vector_search(
        embedding, kb_ids, config.candidate_k
    )

    # Búsqueda léxica (PostgreSQL tsvector + ts_rank_cd)
    bm25_results = await self._bm25_search(
        query, kb_ids, config.candidate_k
    )

    # Fusión con Reciprocal Rank Fusion
    merged = self._rrf_merge(vector_results, bm25_results, config.bm25_weight)

    return merged
Enter fullscreen mode Exit fullscreen mode

Reciprocal Rank Fusion (RRF)

RRF es elegante en su simplicidad. En vez de comparar scores de sistemas diferentes (que tienen escalas distintas), usa posiciones en el ranking:

def _rrf_merge(self, vector_results, bm25_results, bm25_weight=0.3):
    k = 60  # Constante de suavizado
    vector_weight = 1.0 - bm25_weight
    scores = {}

    for rank, chunk in enumerate(vector_results):
        scores[chunk.id] = vector_weight / (k + rank + 1)

    for rank, chunk in enumerate(bm25_results):
        scores[chunk.id] = scores.get(chunk.id, 0) + bm25_weight / (k + rank + 1)

    return sorted(scores.items(), key=lambda x: x[1], reverse=True)
Enter fullscreen mode Exit fullscreen mode

¿Por qué k=60? Evita que el chunk #1 tenga un score desproporcionadamente alto respecto al #2. Con k=60, la diferencia entre posición 1 y 2 es mínima, lo que hace la fusión más estable.

¿Por qué 70/30? En mis pruebas, la búsqueda semántica es más robusta para la mayoría de queries. El 30% de BM25 rescata los casos donde necesitás match léxico exacto. Esto es configurable por Knowledge Base.

BM25 con fallback

Un detalle importante: la búsqueda BM25 primero intenta un AND query, y si no encuentra resultados, hace fallback a OR:

# Primero: AND (todos los términos deben estar presentes)
ts_query = func.plainto_tsquery('spanish', query)

# Si no hay resultados: OR (cualquier término)
words = [w for w in query.split() if w.lower() not in STOP_WORDS]
or_query = " | ".join(words)
ts_query = func.to_tsquery('spanish', or_query)
Enter fullscreen mode Exit fullscreen mode

Etapa 2: Cross-Encoder Reranking

Por qué no alcanza con embeddings

Los embeddings bi-encoder (como MiniLM) codifican query y documento por separado. Son rápidos pero pierden interacciones sutiles entre las palabras del query y del chunk.

Un cross-encoder procesa query + chunk juntos como un solo input, capturando relaciones cruzadas. Es más lento pero significativamente más preciso.

Implementación

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank(self, query: str, candidates: list[dict]) -> list[dict]:
    pairs = [(query, c["content"]) for c in candidates]
    scores = self.reranker.predict(pairs)

    for candidate, score in zip(candidates, scores):
        candidate["rerank_score"] = float(score)

    # Ordenar por rerank_score descendente
    return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
Enter fullscreen mode Exit fullscreen mode

Reducimos de ~60 candidatos (resultado de RRF) a los top 15 después del reranking.

Confidence Score: ¿qué tan confiable es la respuesta?

Con los scores del cross-encoder, calculo un confidence score que indica qué tan seguro estoy de que los chunks recuperados son relevantes:

from math import exp

def compute_confidence(sources: list[dict]) -> float:
    scores = sorted([s["rerank_score"] for s in sources], reverse=True)

    max_score = scores[0]
    top3_avg = sum(scores[:3]) / min(len(scores), 3)
    median = scores[len(scores) // 2]

    # Blend: 40% mejor score, 30% promedio top-3, 30% mediana
    blended = 0.4 * max_score + 0.3 * top3_avg + 0.3 * median

    # Sigmoid → [0, 1]
    return 1.0 / (1.0 + exp(-blended))
Enter fullscreen mode Exit fullscreen mode

¿Por qué un blend y no solo el max? Si el mejor chunk tiene score 0.9 pero el resto está en -0.5, la respuesta probablemente es débil — un solo chunk bueno no compensa contexto malo. El blend captura la calidad general del contexto recuperado.

El confidence score se envía al frontend, que lo usa para:

  • Mostrar indicadores visuales de confianza
  • Decidir si sugerir FAQs alternativas
  • Trigger de escalación automática cuando está por debajo del umbral

Etapa 3: MMR (Maximum Marginal Relevance)

El problema de la redundancia

Sin MMR, los 10 chunks más relevantes podrían venir del mismo documento o ser párrafos consecutivos que dicen prácticamente lo mismo. Eso desperdicia contexto del LLM.

La fórmula

MMR(chunk) = λ × relevancia(chunk) - (1-λ) × max_similaridad(chunk, ya_seleccionados) - penalidad_documento
Enter fullscreen mode Exit fullscreen mode
def _mmr_select(self, candidates, query_embedding, config):
    selected = []
    remaining = list(candidates)
    lambda_param = config.lambda_param  # Default: 0.6
    doc_counts = {}

    for _ in range(config.top_k):  # Default: 10
        best_score = -float('inf')
        best_idx = 0

        for i, candidate in enumerate(remaining):
            # Relevancia vs query
            relevance = cosine_similarity(query_embedding, candidate["embedding"])

            # Máxima similaridad con chunks ya seleccionados
            max_sim = max(
                (cosine_similarity(candidate["embedding"], s["embedding"])
                 for s in selected),
                default=0
            )

            # Penalidad por documento repetido
            doc_name = candidate["document_name"]
            penalty = doc_counts.get(doc_name, 0) * 0.1

            # Hard cap: máximo 3 chunks por documento
            if doc_counts.get(doc_name, 0) >= config.max_per_doc:
                continue

            score = lambda_param * relevance - (1 - lambda_param) * max_sim - penalty

            if score > best_score:
                best_score = score
                best_idx = i

        chosen = remaining.pop(best_idx)
        selected.append(chosen)
        doc_counts[chosen["document_name"]] = doc_counts.get(chosen["document_name"], 0) + 1

    return selected
Enter fullscreen mode Exit fullscreen mode

λ = 0.6 significa 60% relevancia, 40% diversidad. Es el sweet spot que encontré: suficiente diversidad para no repetir, pero sin sacrificar chunks realmente relevantes.


Cache Semántico: no repitas trabajo

El concepto

Si alguien pregunta "¿Cómo reseteo mi contraseña?" y hace 2 horas otro usuario preguntó "¿Cómo puedo cambiar mi password?", la respuesta es la misma. ¿Para qué ejecutar todo el pipeline de nuevo?

Implementación

async def lookup_cache(self, query_embedding, tenant_id, kb_ids, config_hash):
    result = await db.execute(
        select(ResponseCache)
        .where(
            ResponseCache.tenant_id == tenant_id,
            ResponseCache.rag_config_hash == config_hash,
            ResponseCache.expires_at > func.now(),
        )
        .order_by(
            ResponseCache.query_embedding.cosine_distance(query_embedding)
        )
        .limit(1)
    )
    cache_entry = result.scalar_one_or_none()

    if cache_entry:
        similarity = 1 - cosine_distance(query_embedding, cache_entry.query_embedding)
        if similarity >= 0.95:
            return cache_entry  # Cache hit

    return None  # Cache miss → ejecutar pipeline completo
Enter fullscreen mode Exit fullscreen mode

Decisiones clave

  • Threshold 0.95: Muy alto a propósito. Prefiero un cache miss a servir una respuesta incorrecta.
  • Config hash: El cache se invalida si cambian los parámetros de RAG. Cambiar top_k de 10 a 5 produce respuestas diferentes.
  • Fire-and-forget store: Después de generar la respuesta del LLM, guardo en cache sin bloquear el streaming.
  • TTL 7 días: Configurable por KB. Documentos que cambian seguido pueden usar TTL más corto.
  • Invalidación proactiva: Cuando se sube o reprocesa un documento, se invalida el cache de esa KB.
# Almacenar (no bloquea el response)
async def store_cache(self, query_embedding, response, sources, confidence, ...):
    if confidence < config.min_confidence:
        return  # No cachear respuestas de baja confianza

    cache_entry = ResponseCache(
        query_embedding=query_embedding,
        response_text=response,
        sources=sources,
        confidence=confidence,
        rag_config_hash=config_hash,
        expires_at=datetime.utcnow() + timedelta(hours=config.cache_ttl_hours),
    )
    db.add(cache_entry)
    await db.commit()
Enter fullscreen mode Exit fullscreen mode

FAQ Match: la shortcut más valiosa

Antes de todo el pipeline, verifico si existe una FAQ cuyo embedding sea suficientemente similar al query:

async def match_faq(self, query_embedding, kb_ids, threshold=0.75):
    result = await db.execute(
        select(FAQ.id, FAQ.question, FAQ.answer,
               (1 - FAQ.embedding.cosine_distance(query_embedding)).label("score"))
        .where(FAQ.knowledge_base_id.in_(kb_ids), FAQ.is_active == True)
        .order_by(FAQ.embedding.cosine_distance(query_embedding))
        .limit(1)
    )
    faq = result.first()

    if faq and faq.score >= threshold:
        # Incrementar hit_count atómicamente
        await db.execute(
            update(FAQ).where(FAQ.id == faq.id)
            .values(hit_count=FAQ.hit_count + 1)
        )
        return faq

    return None
Enter fullscreen mode Exit fullscreen mode

¿Por qué es tan valioso?

  • Costo cero: No llama al LLM, no consume tokens
  • Latencia mínima: Una sola query SQL con índice HNSW
  • Respuestas curadas: Las FAQs son redactadas por humanos, siempre correctas
  • Siempre disponible: Incluso cuando el usuario agotó su cuota mensual de queries LLM, las FAQs siguen funcionando

Detección de idioma sin APIs externas

Mi sistema soporta español, inglés y portugués. En vez de llamar a una API de detección (latencia extra + costo), uso detección heurística:

EN_WORDS = {"how", "what", "the", "is", "can", "do", "where", "when", "why", ...}  # 60+
PT_WORDS = {"como", "onde", "você", "qual", "para", "não", "uma", ...}  # 45+
ES_WORDS = {"qué", "cómo", "dónde", "ayuda", "puedo", "quiero", ...}  # 40+

def detect_language(query: str) -> str:
    words = set(query.lower().split())
    scores = {
        "en": len(words & EN_WORDS),
        "pt": len(words & PT_WORDS),
        "es": len(words & ES_WORDS),
    }
    best = max(scores, key=scores.get)
    return best if scores[best] >= 2 else "es"  # Default: español
Enter fullscreen mode Exit fullscreen mode

¿Es perfecto? No. Pero tiene latencia cero, no depende de servicios externos, y para queries cortas de 5-15 palabras funciona sorprendentemente bien. El idioma detectado se inyecta en el system prompt del LLM para que responda en el mismo idioma.


Smart Routing: elegir la KB correcta automáticamente

Cuando un tenant tiene múltiples Knowledge Bases (ej: "Ventas", "Soporte Técnico", "RRHH"), el usuario no debería tener que elegir manualmente dónde buscar.

Centroides de KB

Para cada KB, calculo el centroide (promedio de todos los embeddings de sus chunks):

SELECT AVG(embedding)::text AS centroid FROM chunks WHERE knowledge_base_id = :kb_id
Enter fullscreen mode Exit fullscreen mode

Routing por similaridad

async def route_query(self, query_embedding, available_kbs):
    scores = []
    for kb in available_kbs:
        centroid = await self._get_centroid(kb.id)  # Redis cache 24h
        similarity = cosine_similarity(query_embedding, centroid)
        if similarity >= 0.15:  # Umbral mínimo
            scores.append((kb, similarity))

    # Top 3 KBs más relevantes
    scores.sort(key=lambda x: x[1], reverse=True)
    return [kb for kb, _ in scores[:3]]
Enter fullscreen mode Exit fullscreen mode

Si ninguna KB supera el umbral, se busca en todas (fallback seguro).


Números que importan

Métrica Valor
Candidatos iniciales por rama 30 (vector) + 30 (BM25)
Después de RRF merge ~40-60 únicos
Después de reranking Top 15
Después de MMR Top 10 (máx 3 por doc)
Dimensiones de embeddings 384
Threshold FAQ 0.75
Threshold cache 0.95
TTL cache 7 días
Latencia FAQ match ~15ms
Latencia pipeline completo ~2-4s
Ahorro por cache hit ~90% latencia, 100% tokens LLM

Lecciones aprendidas

1. El cross-encoder vale la pena

La diferencia de calidad entre usar solo embeddings vs. embeddings + cross-encoder reranking es enorme. Los embeddings bi-encoder son buenos para recall (encontrar candidatos), pero el cross-encoder es mucho mejor para precision (ordenar por relevancia real).

2. BM25 no murió

Para queries con nombres propios, códigos de error, o términos técnicos específicos, BM25 regularmente supera a la búsqueda vectorial. La combinación híbrida es estrictamente mejor que cualquiera por separado.

3. MMR es más importante de lo que parece

Sin diversidad MMR, el LLM recibe contexto redundante y produce respuestas que orbitan alrededor de un solo punto. Con MMR, las respuestas son más completas y cubren diferentes ángulos del tema.

4. Cache semántico requiere threshold alto

Con 0.90 tuve problemas de respuestas incorrectas servidas del cache. 0.95 es el sweet spot: suficiente para cachear reformulaciones obvias, pero no tan bajo como para matchear queries que realmente necesitan respuestas diferentes.

5. Las FAQs son tu mejor inversión

Cada FAQ que agregás es una respuesta perfecta que se sirve en 15ms sin costo de LLM. Implementé auto-generación de FAQs sugeridas a partir de queries frecuentes sin respuesta, que el admin puede aprobar con un click.


Lo que sigue

  • Evaluation framework: métricas automáticas de calidad (faithfulness, relevance) con datasets de evaluación por KB
  • Chunk contextualization: usar el LLM para agregar contexto del documento al chunk antes de indexar
  • Late interaction models (ColBERT): mejor que cross-encoder para volúmenes altos

Conclusión

Un pipeline RAG de producción no es "embed + search + prompt". Es un sistema de múltiples etapas donde cada componente resuelve una debilidad específica del anterior. La búsqueda híbrida cubre gaps semánticos y léxicos, el cross-encoder ordena por relevancia real, MMR garantiza diversidad, el cache elimina trabajo redundante, y las FAQs cortan camino cuando la respuesta ya existe.

Lo más importante: todo es configurable por Knowledge Base. No existe un set de parámetros universal — cada dominio de conocimiento tiene sus propias características y necesita tuning diferente.


Si te interesa el tema de RAG en producción, dejá un comentario con tu mayor desafío. Y si este artículo te fue útil, un like ayuda a que llegue a más personas.

Top comments (0)