TL;DR
Construí un motor RAG de producción que combina búsqueda híbrida (pgvector + BM25), reranking con cross-encoder, diversidad MMR, cache semántico y detección automática de idioma — todo sobre FastAPI async y PostgreSQL. En este artículo comparto la arquitectura real, las decisiones técnicas, y el código clave.
Por qué otro artículo sobre RAG
La mayoría de los tutoriales RAG se quedan en "embed → cosine search → prompt". Eso funciona para una demo, pero en producción te vas a encontrar con:
- Queries en español que matchean chunks en inglés (o viceversa)
- Los 10 chunks más similares vienen del mismo documento
- La búsqueda semántica falla con nombres propios o códigos exactos
- Respuestas repetidas consumen tokens innecesariamente
- No tenés forma de saber si la respuesta es confiable o no
Este artículo muestra cómo resolví cada uno de esos problemas en un sistema real multi-tenant.
El stack
| Componente | Tecnología |
|---|---|
| Backend | Python 3.12 + FastAPI (100% async) |
| Base de datos | PostgreSQL 16 + pgvector + tsvector |
| Embeddings |
paraphrase-multilingual-MiniLM-L12-v2 (384d) |
| Reranker | cross-encoder/ms-marco-MiniLM-L-6-v2 |
| LLM | Groq (llama-3.3-70b-versatile) |
| Cache | Redis + PostgreSQL (cache semántico) |
| Streaming | Server-Sent Events (SSE) |
Arquitectura del pipeline
El pipeline tiene 3 etapas fijas con filtros pre y post:
Query del usuario
│
▼
┌─────────────┐
│ FAQ Match │──→ Si score ≥ 0.75: respuesta instantánea (costo LLM = 0)
└─────┬───────┘
│ No match
▼
┌─────────────┐
│ Cache Check │──→ Si similaridad ≥ 0.95: respuesta cacheada
└─────┬───────┘
│ Cache miss
▼
┌─────────────────────────────────────┐
│ ETAPA 1: Búsqueda Híbrida │
│ pgvector (semántica) + BM25 (léxica)│
│ Fusión con Reciprocal Rank Fusion │
└─────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ ETAPA 2: Cross-Encoder Reranking │
│ Re-evalúa relevancia real │
│ Calcula confidence score │
└─────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ ETAPA 3: MMR (Diversidad) │
│ Balancea relevancia vs variedad │
│ Máx 3 chunks por documento │
└─────────────┬───────────────────────┘
│
▼
LLM + SSE streaming
Veamos cada parte en detalle.
Etapa 1: Búsqueda Híbrida (Vector + BM25)
El problema con solo búsqueda vectorial
La búsqueda por embeddings es excelente para capturar significado semántico, pero falla con:
- Nombres propios: "¿Qué dijo Juan Pérez?"
- Códigos o IDs: "Error HTTP 403"
- Términos técnicos exactos que el modelo de embeddings no conoce bien
La solución: búsqueda híbrida
Ejecuto dos búsquedas en paralelo y fusiono los resultados:
async def search_chunks(self, query: str, kb_ids: list[int],
embedding: list[float], config: RetrievalConfig):
# Búsqueda vectorial (pgvector HNSW, distancia coseno)
vector_results = await self._vector_search(
embedding, kb_ids, config.candidate_k
)
# Búsqueda léxica (PostgreSQL tsvector + ts_rank_cd)
bm25_results = await self._bm25_search(
query, kb_ids, config.candidate_k
)
# Fusión con Reciprocal Rank Fusion
merged = self._rrf_merge(vector_results, bm25_results, config.bm25_weight)
return merged
Reciprocal Rank Fusion (RRF)
RRF es elegante en su simplicidad. En vez de comparar scores de sistemas diferentes (que tienen escalas distintas), usa posiciones en el ranking:
def _rrf_merge(self, vector_results, bm25_results, bm25_weight=0.3):
k = 60 # Constante de suavizado
vector_weight = 1.0 - bm25_weight
scores = {}
for rank, chunk in enumerate(vector_results):
scores[chunk.id] = vector_weight / (k + rank + 1)
for rank, chunk in enumerate(bm25_results):
scores[chunk.id] = scores.get(chunk.id, 0) + bm25_weight / (k + rank + 1)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
¿Por qué k=60? Evita que el chunk #1 tenga un score desproporcionadamente alto respecto al #2. Con k=60, la diferencia entre posición 1 y 2 es mínima, lo que hace la fusión más estable.
¿Por qué 70/30? En mis pruebas, la búsqueda semántica es más robusta para la mayoría de queries. El 30% de BM25 rescata los casos donde necesitás match léxico exacto. Esto es configurable por Knowledge Base.
BM25 con fallback
Un detalle importante: la búsqueda BM25 primero intenta un AND query, y si no encuentra resultados, hace fallback a OR:
# Primero: AND (todos los términos deben estar presentes)
ts_query = func.plainto_tsquery('spanish', query)
# Si no hay resultados: OR (cualquier término)
words = [w for w in query.split() if w.lower() not in STOP_WORDS]
or_query = " | ".join(words)
ts_query = func.to_tsquery('spanish', or_query)
Etapa 2: Cross-Encoder Reranking
Por qué no alcanza con embeddings
Los embeddings bi-encoder (como MiniLM) codifican query y documento por separado. Son rápidos pero pierden interacciones sutiles entre las palabras del query y del chunk.
Un cross-encoder procesa query + chunk juntos como un solo input, capturando relaciones cruzadas. Es más lento pero significativamente más preciso.
Implementación
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(self, query: str, candidates: list[dict]) -> list[dict]:
pairs = [(query, c["content"]) for c in candidates]
scores = self.reranker.predict(pairs)
for candidate, score in zip(candidates, scores):
candidate["rerank_score"] = float(score)
# Ordenar por rerank_score descendente
return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
Reducimos de ~60 candidatos (resultado de RRF) a los top 15 después del reranking.
Confidence Score: ¿qué tan confiable es la respuesta?
Con los scores del cross-encoder, calculo un confidence score que indica qué tan seguro estoy de que los chunks recuperados son relevantes:
from math import exp
def compute_confidence(sources: list[dict]) -> float:
scores = sorted([s["rerank_score"] for s in sources], reverse=True)
max_score = scores[0]
top3_avg = sum(scores[:3]) / min(len(scores), 3)
median = scores[len(scores) // 2]
# Blend: 40% mejor score, 30% promedio top-3, 30% mediana
blended = 0.4 * max_score + 0.3 * top3_avg + 0.3 * median
# Sigmoid → [0, 1]
return 1.0 / (1.0 + exp(-blended))
¿Por qué un blend y no solo el max? Si el mejor chunk tiene score 0.9 pero el resto está en -0.5, la respuesta probablemente es débil — un solo chunk bueno no compensa contexto malo. El blend captura la calidad general del contexto recuperado.
El confidence score se envía al frontend, que lo usa para:
- Mostrar indicadores visuales de confianza
- Decidir si sugerir FAQs alternativas
- Trigger de escalación automática cuando está por debajo del umbral
Etapa 3: MMR (Maximum Marginal Relevance)
El problema de la redundancia
Sin MMR, los 10 chunks más relevantes podrían venir del mismo documento o ser párrafos consecutivos que dicen prácticamente lo mismo. Eso desperdicia contexto del LLM.
La fórmula
MMR(chunk) = λ × relevancia(chunk) - (1-λ) × max_similaridad(chunk, ya_seleccionados) - penalidad_documento
def _mmr_select(self, candidates, query_embedding, config):
selected = []
remaining = list(candidates)
lambda_param = config.lambda_param # Default: 0.6
doc_counts = {}
for _ in range(config.top_k): # Default: 10
best_score = -float('inf')
best_idx = 0
for i, candidate in enumerate(remaining):
# Relevancia vs query
relevance = cosine_similarity(query_embedding, candidate["embedding"])
# Máxima similaridad con chunks ya seleccionados
max_sim = max(
(cosine_similarity(candidate["embedding"], s["embedding"])
for s in selected),
default=0
)
# Penalidad por documento repetido
doc_name = candidate["document_name"]
penalty = doc_counts.get(doc_name, 0) * 0.1
# Hard cap: máximo 3 chunks por documento
if doc_counts.get(doc_name, 0) >= config.max_per_doc:
continue
score = lambda_param * relevance - (1 - lambda_param) * max_sim - penalty
if score > best_score:
best_score = score
best_idx = i
chosen = remaining.pop(best_idx)
selected.append(chosen)
doc_counts[chosen["document_name"]] = doc_counts.get(chosen["document_name"], 0) + 1
return selected
λ = 0.6 significa 60% relevancia, 40% diversidad. Es el sweet spot que encontré: suficiente diversidad para no repetir, pero sin sacrificar chunks realmente relevantes.
Cache Semántico: no repitas trabajo
El concepto
Si alguien pregunta "¿Cómo reseteo mi contraseña?" y hace 2 horas otro usuario preguntó "¿Cómo puedo cambiar mi password?", la respuesta es la misma. ¿Para qué ejecutar todo el pipeline de nuevo?
Implementación
async def lookup_cache(self, query_embedding, tenant_id, kb_ids, config_hash):
result = await db.execute(
select(ResponseCache)
.where(
ResponseCache.tenant_id == tenant_id,
ResponseCache.rag_config_hash == config_hash,
ResponseCache.expires_at > func.now(),
)
.order_by(
ResponseCache.query_embedding.cosine_distance(query_embedding)
)
.limit(1)
)
cache_entry = result.scalar_one_or_none()
if cache_entry:
similarity = 1 - cosine_distance(query_embedding, cache_entry.query_embedding)
if similarity >= 0.95:
return cache_entry # Cache hit
return None # Cache miss → ejecutar pipeline completo
Decisiones clave
- Threshold 0.95: Muy alto a propósito. Prefiero un cache miss a servir una respuesta incorrecta.
-
Config hash: El cache se invalida si cambian los parámetros de RAG. Cambiar
top_kde 10 a 5 produce respuestas diferentes. - Fire-and-forget store: Después de generar la respuesta del LLM, guardo en cache sin bloquear el streaming.
- TTL 7 días: Configurable por KB. Documentos que cambian seguido pueden usar TTL más corto.
- Invalidación proactiva: Cuando se sube o reprocesa un documento, se invalida el cache de esa KB.
# Almacenar (no bloquea el response)
async def store_cache(self, query_embedding, response, sources, confidence, ...):
if confidence < config.min_confidence:
return # No cachear respuestas de baja confianza
cache_entry = ResponseCache(
query_embedding=query_embedding,
response_text=response,
sources=sources,
confidence=confidence,
rag_config_hash=config_hash,
expires_at=datetime.utcnow() + timedelta(hours=config.cache_ttl_hours),
)
db.add(cache_entry)
await db.commit()
FAQ Match: la shortcut más valiosa
Antes de todo el pipeline, verifico si existe una FAQ cuyo embedding sea suficientemente similar al query:
async def match_faq(self, query_embedding, kb_ids, threshold=0.75):
result = await db.execute(
select(FAQ.id, FAQ.question, FAQ.answer,
(1 - FAQ.embedding.cosine_distance(query_embedding)).label("score"))
.where(FAQ.knowledge_base_id.in_(kb_ids), FAQ.is_active == True)
.order_by(FAQ.embedding.cosine_distance(query_embedding))
.limit(1)
)
faq = result.first()
if faq and faq.score >= threshold:
# Incrementar hit_count atómicamente
await db.execute(
update(FAQ).where(FAQ.id == faq.id)
.values(hit_count=FAQ.hit_count + 1)
)
return faq
return None
¿Por qué es tan valioso?
- Costo cero: No llama al LLM, no consume tokens
- Latencia mínima: Una sola query SQL con índice HNSW
- Respuestas curadas: Las FAQs son redactadas por humanos, siempre correctas
- Siempre disponible: Incluso cuando el usuario agotó su cuota mensual de queries LLM, las FAQs siguen funcionando
Detección de idioma sin APIs externas
Mi sistema soporta español, inglés y portugués. En vez de llamar a una API de detección (latencia extra + costo), uso detección heurística:
EN_WORDS = {"how", "what", "the", "is", "can", "do", "where", "when", "why", ...} # 60+
PT_WORDS = {"como", "onde", "você", "qual", "para", "não", "uma", ...} # 45+
ES_WORDS = {"qué", "cómo", "dónde", "ayuda", "puedo", "quiero", ...} # 40+
def detect_language(query: str) -> str:
words = set(query.lower().split())
scores = {
"en": len(words & EN_WORDS),
"pt": len(words & PT_WORDS),
"es": len(words & ES_WORDS),
}
best = max(scores, key=scores.get)
return best if scores[best] >= 2 else "es" # Default: español
¿Es perfecto? No. Pero tiene latencia cero, no depende de servicios externos, y para queries cortas de 5-15 palabras funciona sorprendentemente bien. El idioma detectado se inyecta en el system prompt del LLM para que responda en el mismo idioma.
Smart Routing: elegir la KB correcta automáticamente
Cuando un tenant tiene múltiples Knowledge Bases (ej: "Ventas", "Soporte Técnico", "RRHH"), el usuario no debería tener que elegir manualmente dónde buscar.
Centroides de KB
Para cada KB, calculo el centroide (promedio de todos los embeddings de sus chunks):
SELECT AVG(embedding)::text AS centroid FROM chunks WHERE knowledge_base_id = :kb_id
Routing por similaridad
async def route_query(self, query_embedding, available_kbs):
scores = []
for kb in available_kbs:
centroid = await self._get_centroid(kb.id) # Redis cache 24h
similarity = cosine_similarity(query_embedding, centroid)
if similarity >= 0.15: # Umbral mínimo
scores.append((kb, similarity))
# Top 3 KBs más relevantes
scores.sort(key=lambda x: x[1], reverse=True)
return [kb for kb, _ in scores[:3]]
Si ninguna KB supera el umbral, se busca en todas (fallback seguro).
Números que importan
| Métrica | Valor |
|---|---|
| Candidatos iniciales por rama | 30 (vector) + 30 (BM25) |
| Después de RRF merge | ~40-60 únicos |
| Después de reranking | Top 15 |
| Después de MMR | Top 10 (máx 3 por doc) |
| Dimensiones de embeddings | 384 |
| Threshold FAQ | 0.75 |
| Threshold cache | 0.95 |
| TTL cache | 7 días |
| Latencia FAQ match | ~15ms |
| Latencia pipeline completo | ~2-4s |
| Ahorro por cache hit | ~90% latencia, 100% tokens LLM |
Lecciones aprendidas
1. El cross-encoder vale la pena
La diferencia de calidad entre usar solo embeddings vs. embeddings + cross-encoder reranking es enorme. Los embeddings bi-encoder son buenos para recall (encontrar candidatos), pero el cross-encoder es mucho mejor para precision (ordenar por relevancia real).
2. BM25 no murió
Para queries con nombres propios, códigos de error, o términos técnicos específicos, BM25 regularmente supera a la búsqueda vectorial. La combinación híbrida es estrictamente mejor que cualquiera por separado.
3. MMR es más importante de lo que parece
Sin diversidad MMR, el LLM recibe contexto redundante y produce respuestas que orbitan alrededor de un solo punto. Con MMR, las respuestas son más completas y cubren diferentes ángulos del tema.
4. Cache semántico requiere threshold alto
Con 0.90 tuve problemas de respuestas incorrectas servidas del cache. 0.95 es el sweet spot: suficiente para cachear reformulaciones obvias, pero no tan bajo como para matchear queries que realmente necesitan respuestas diferentes.
5. Las FAQs son tu mejor inversión
Cada FAQ que agregás es una respuesta perfecta que se sirve en 15ms sin costo de LLM. Implementé auto-generación de FAQs sugeridas a partir de queries frecuentes sin respuesta, que el admin puede aprobar con un click.
Lo que sigue
- Evaluation framework: métricas automáticas de calidad (faithfulness, relevance) con datasets de evaluación por KB
- Chunk contextualization: usar el LLM para agregar contexto del documento al chunk antes de indexar
- Late interaction models (ColBERT): mejor que cross-encoder para volúmenes altos
Conclusión
Un pipeline RAG de producción no es "embed + search + prompt". Es un sistema de múltiples etapas donde cada componente resuelve una debilidad específica del anterior. La búsqueda híbrida cubre gaps semánticos y léxicos, el cross-encoder ordena por relevancia real, MMR garantiza diversidad, el cache elimina trabajo redundante, y las FAQs cortan camino cuando la respuesta ya existe.
Lo más importante: todo es configurable por Knowledge Base. No existe un set de parámetros universal — cada dominio de conocimiento tiene sus propias características y necesita tuning diferente.
Si te interesa el tema de RAG en producción, dejá un comentario con tu mayor desafío. Y si este artículo te fue útil, un like ayuda a que llegue a más personas.
Top comments (0)