Cada query RAG cuesta dinero: embedding + tokens de LLM. Implementé tres capas de optimización que reducen el costo real un 30-45%: FAQ matching (respuestas curadas a costo cero), cache semántico (pgvector con similaridad >= 0.95), y auto-generación de FAQs desde queries frecuentes sin respuesta. Todo con código de producción y un fallback que mantiene el servicio activo cuando el presupuesto de LLM se agota.
El problema: cada query cuesta dinero real
En los artículos anteriores construí un pipeline RAG con búsqueda híbrida, reranking y streaming. Funciona bien. Pero en producción:
Query típica: embedding + búsqueda + LLM (~800 tokens) ≈ $0.0003
1,000 queries/día × 30 días = 30,000 queries/mes
30,000 × $0.0003 = ~$9/mes por tenant
50 tenants = ~$450/mes solo en LLM
El insight clave: el 35-40% de las queries son repetidas o muy similares. Cada una es dinero quemado.
Arquitectura de las 3 capas de ahorro
Query del usuario
│
▼
┌─────────────┐
│ FAQ Match │──→ Si score ≥ 0.85: respuesta curada (costo LLM = $0)
└─────┬───────┘
│ No match
▼
┌─────────────────┐
│ Semantic Cache │──→ Si similaridad ≥ 0.95: respuesta cacheada ($0)
└─────┬───────────┘
│ Cache miss
▼
┌─────────────────┐
│ Budget Check │──→ Si presupuesto agotado: fallback FAQ-only
└─────┬───────────┘
│ Budget OK
▼
Pipeline RAG completo → fire-and-forget: guardar en cache
Capa 1: FAQ Matching — la inversión más rentable
async def match_faq(
db: AsyncSession, query: str, kb_ids: list[UUID], threshold: float = 0.85,
) -> dict | None:
query_embedding = embedding_service.embed_query(query)
kb_ids_str = ",".join(f"'{str(kb_id)}'" for kb_id in kb_ids)
stmt = text(f"""
SELECT id, question, answer, attachments,
1 - (embedding <=> CAST(:embedding AS vector)) as score
FROM faqs
WHERE knowledge_base_id IN ({kb_ids_str})
AND is_active = true AND embedding IS NOT NULL
ORDER BY embedding <=> CAST(:embedding AS vector)
LIMIT 1
""")
result = await db.execute(stmt, {"embedding": str(query_embedding)})
row = result.mappings().first()
if not row or float(row["score"]) < threshold:
return None
# Increment hit_count atomically
await db.execute(
update(FAQ).where(FAQ.id == row["id"]).values(hit_count=FAQ.hit_count + 1)
)
await db.commit()
return {"faq_id": str(row["id"]), "question": row["question"],
"answer": row["answer"], "score": float(row["score"])}
El modelo usa un índice HNSW sobre el embedding para que la búsqueda sea ~15ms:
class FAQ(Base):
__tablename__ = "faqs"
embedding = mapped_column(Vector(384), nullable=True)
hit_count = mapped_column(Integer, default=0, server_default="0")
__table_args__ = (
Index("ix_faqs_embedding", "embedding",
postgresql_using="hnsw",
postgresql_ops={"embedding": "vector_cosine_ops"},
postgresql_with={"m": 16, "ef_construction": 64}),
)
Threshold tuning: de 0.75 a 0.85
Empecé con 0.75. Demasiado bajo — obtuve false positives donde "¿Cómo configuro el widget?" matcheaba con "¿Qué es un widget?". Subí a 0.85 y los falsos positivos desaparecieron:
| Query | FAQ almacenada | Score | Match? |
|---|---|---|---|
| "como reseteo mi clave" | "¿Cómo cambio mi contraseña?" | 0.91 | Si |
| "aceptan visa?" | "¿Qué medios de pago aceptan?" | 0.87 | Si |
| "como configuro el widget" | "¿Qué es un widget?" | 0.72 | No |
Cuando hay match, el pipeline completo no se ejecuta:
if faq_match:
yield {"event": "faq_match", "data": json.dumps(faq_match)}
# FAQ matches are free — don't increment billing counters
fire_and_forget_log_usage(
query_type="faq", response_tokens=0, provider=None, ...
)
response_tokens=0 y query_type="faq" — fundamental para las métricas de ahorro.
Capa 2: Cache Semántico — pgvector como cache inteligente
El concepto
Si alguien pregunta "¿Cómo reseteo mi contraseña?" y hace 2 horas otro preguntó "¿Cómo puedo cambiar mi password?", la respuesta es la misma. El cache semántico detecta esa equivalencia comparando embeddings, no strings exactos.
El modelo almacena el embedding del query, la respuesta, las fuentes, y un hash de la configuración RAG:
class ResponseCache(Base):
__tablename__ = "response_cache"
query_embedding = mapped_column(Vector(384), nullable=False)
query_text = mapped_column(String(500), nullable=False)
response_text = mapped_column(Text, nullable=False)
sources = mapped_column(JSONB, nullable=False, default=list)
confidence = mapped_column(Float, nullable=False)
knowledge_base_ids = mapped_column(ARRAY(UUID(as_uuid=True)), nullable=False)
rag_config_hash = mapped_column(String(64), nullable=False)
hit_count = mapped_column(Integer, default=0, server_default="0")
expires_at = mapped_column(DateTime(timezone=True), nullable=False)
Config hash: scoping por configuración
Si el admin cambia parámetros de RAG, las respuestas cacheadas ya no son válidas:
def compute_config_hash(retrieval_config, language: str | None = None) -> str:
key_fields = {
"candidate_k": retrieval_config.candidate_k,
"rerank_top_n": retrieval_config.rerank_top_n,
"top_k": retrieval_config.top_k,
"lambda_param": retrieval_config.lambda_param,
"max_per_doc": retrieval_config.max_per_doc,
"bm25_weight": retrieval_config.bm25_weight,
"language": language or "es",
}
return hashlib.sha256(json.dumps(key_fields, sort_keys=True).encode()).hexdigest()
Lookup: similaridad >= 0.95
CACHE_SIMILARITY_THRESHOLD = 0.95
async def lookup_cache(db, query_embedding, kb_ids, rag_config_hash, tenant_id):
stmt = text(f"""
SELECT id, response_text, sources, confidence,
1 - (query_embedding <=> CAST(:embedding AS vector)) AS similarity
FROM response_cache
WHERE tenant_id = :tid AND expires_at > now()
AND rag_config_hash = :config_hash
AND knowledge_base_ids @> {kb_array}
ORDER BY query_embedding <=> CAST(:embedding AS vector)
LIMIT 1
""")
row = result.mappings().first()
if not row or float(row["similarity"]) < CACHE_SIMILARITY_THRESHOLD:
return None
await db.execute(
text("UPDATE response_cache SET hit_count = hit_count + 1 WHERE id = :cid"),
{"cid": str(row["id"])},
)
return {"response_text": row["response_text"], "sources": row["sources"],
"confidence": float(row["confidence"])}
¿Por qué 0.95? Con 0.90 tuve cache hits incorrectos:
| Query A | Query B | Similaridad | ¿Misma respuesta? |
|---|---|---|---|
| "Cómo exporto a CSV" | "Cómo descargo datos en CSV" | 0.97 | Si |
| "Cómo configuro el webhook" | "Cómo pruebo el webhook" | 0.92 | No |
| "Cómo agrego usuarios" | "Cómo elimino usuarios" | 0.91 | No |
Fire-and-forget storage
No podés bloquear el streaming SSE para guardar en cache:
def fire_and_forget_store_cache(tenant_id, query_embedding, query_text,
kb_ids, rag_config_hash, response_text,
sources, confidence, ttl_hours=168):
async def _store():
try:
async with async_session() as db:
await store_cache(db, tenant_id, query_embedding, ...)
except Exception as e:
logger.error("Failed to store cache: %s", e)
try:
loop = asyncio.get_running_loop()
loop.create_task(_store())
except RuntimeError:
pass # No event loop (tests) — skip
Y la condición para cachear:
if rag_config.retrieval.cache_enabled and kb_ids and not low_confidence:
fire_and_forget_store_cache(...)
not low_confidence: no cacheamos respuestas malas.
Invalidación proactiva: el cache que no miente
async def invalidate_kb_cache(db: AsyncSession, kb_id: UUID) -> int:
result = await db.execute(
text("DELETE FROM response_cache WHERE CAST(:kb_id AS UUID) = ANY(knowledge_base_ids)"),
{"kb_id": str(kb_id)},
)
await db.commit()
return result.rowcount
Se llama automáticamente en cada operación que cambia contenido de una KB:
# En faq_service.py — al crear, actualizar o importar FAQs
async def create_faq(db, kb_id, data):
faq = FAQ(knowledge_base_id=kb_id, ...)
db.add(faq)
await db.commit()
await invalidate_kb_cache(db, kb_id) # Cache muere
return faq
Lo mismo ocurre al subir o reprocesar documentos. Regla simple: si el contenido de una KB cambió, el cache de esa KB muere.
Auto-FAQ: convertir preguntas frecuentes en FAQs reales
El sistema registra queries con baja confianza. Cuando una aparece múltiples veces, el admin puede auto-generar una FAQ:
async def generate_faq_suggestion(db, unanswered_query_id, kb_id, tenant_id):
uq = await db.get(UnansweredQuery, unanswered_query_id)
# Dedup: skip si ya hay sugerencia pendiente o FAQ similar (>= 0.85)
similar_faq = await db.execute(text("""
SELECT 1 - (embedding <=> CAST(:embedding AS vector)) AS score
FROM faqs WHERE knowledge_base_id = :kb_id AND is_active = true
ORDER BY embedding <=> CAST(:embedding AS vector) LIMIT 1
"""), ...)
if faq_row and float(faq_row["score"]) >= 0.85:
return None # Similar FAQ already exists
# Search KB for context, then call LLM
sources = await search_chunks(db, uq.query_text, [kb_id], ...)
result = await generate_playground_response(
query=uq.query_text, sources=sources[:5],
temperature=0.2, max_tokens=512,
system_prompt="Responde SOLO con información del contexto. "
"2-4 oraciones. Si no hay info suficiente: NO_ANSWER"
)
if not result.get("response") or result["response"].strip() == "NO_ANSWER":
return None
return SuggestedFAQ(
tenant_id=tenant_id, knowledge_base_id=kb_id,
question=uq.query_text, generated_answer=result["response"].strip(),
embedding=query_embedding, status="pending",
)
Batch generation prioriza por frecuencia:
async def batch_generate_suggestions(db, tenant_id, kb_id, limit=5):
stmt = text("""
SELECT uq.id FROM unanswered_queries uq
WHERE uq.tenant_id = :tid AND uq.resolved = false
AND NOT EXISTS (
SELECT 1 FROM suggested_faqs sf
WHERE sf.source_query_id = uq.id AND sf.status = 'pending'
)
ORDER BY uq.occurrence_count DESC LIMIT :lim
""")
# Generate suggestions for each...
Al aprobar, se crea la FAQ real, se invalida el cache, y la query se marca como resuelta. Ciclo cerrado.
Fallback FAQ-only: servicio degradado sin corte
Plan Free: 50 queries de IA/mes. Cuando se agotan, las FAQs siguen activas:
async def is_llm_budget_exhausted(db, tenant_id) -> bool:
"""Soft check — enables FAQ-only fallback, does NOT raise."""
plan, tenant = await _get_plan_and_tenant(db, tenant_id)
if not plan:
return False
usage = await get_monthly_usage(db, tenant_id)
if plan.max_messages_month != -1 and usage["messages_month"] >= plan.max_messages_month:
return True
return False
En el flujo principal:
faq_only_mode = await is_llm_budget_exhausted(db, tenant_id)
# FAQ matching funciona siempre
if faq_match:
return faq_response(faq_match) # Gratis
# Sin match + sin presupuesto → upgrade card
if faq_only_mode:
yield {"event": "upgrade_required", "data": json.dumps({
"message": "Has alcanzado el límite de consultas de IA. "
"Las FAQs siguen disponibles sin costo."
})}
Métricas de costo
async def _get_cost_metrics(db, tenant_id, cutoff):
# Tokens by provider
# ...configurable prices from settings...
groq_price = await settings_service.get(db, "cost.groq.price_per_1k_tokens", 0.00027)
# FAQ savings estimation
avg_llm_tokens = total_tokens / max(llm_query_count, 1)
estimated_faq_tokens_saved = int(faq_count * avg_llm_tokens)
estimated_faq_cost_saved = estimated_faq_tokens_saved / 1000 * groq_price
return {
"by_provider": by_provider,
"total_estimated_cost": round(total_cost, 4),
"faq_savings": {
"queries_without_llm": faq_count,
"estimated_tokens_saved": estimated_faq_tokens_saved,
"estimated_cost_saved": round(estimated_faq_cost_saved, 4),
},
"avg_cost_per_conversation": round(total_cost / conv_count, 6),
}
Cache hit rate como 7ma stat card en el dashboard:
async def get_cache_stats(db, tenant_id):
rate_result = await db.execute(text("""
SELECT
COUNT(*) FILTER (WHERE query_type = 'cached') AS cached_queries,
COUNT(*) FILTER (WHERE query_type IN ('chat','widget','cached')) AS total_queries
FROM usage_logs WHERE tenant_id = :tid
"""))
return {"hit_rate": round(cached / total, 3), ...}
El flujo completo en el endpoint de chat
Para ver cómo encajan las 3 capas, este es el orden real en chat.py:
# 1. Budget check (soft — no lanza excepción)
faq_only_mode = await is_llm_budget_exhausted(db, tenant_id)
# 2. FAQ match (funciona siempre, incluso en faq_only_mode)
faq_match = await match_faq(db, data.content, kb_ids, threshold=0.85)
if faq_match:
return EventSourceResponse(faq_event_generator()) # $0
# 3. Budget exhausted + no FAQ match → upgrade card
if faq_only_mode:
yield {"event": "upgrade_required", ...}
return
# 4. Semantic cache lookup
query_embedding = embedding_service.embed_query(effective_query)
config_hash = compute_config_hash(rag_config.retrieval, detected_lang)
cache_hit = await lookup_cache(db, query_embedding, kb_ids, config_hash, tenant_id)
if cache_hit:
return EventSourceResponse(cache_event_generator()) # $0
# 5. Full RAG pipeline (vector + BM25 + rerank + LLM)
# ... streaming response ...
# 6. Fire-and-forget: store in cache for next time
if not low_confidence:
fire_and_forget_store_cache(...)
Números reales
| Métrica | Valor |
|---|---|
| Threshold FAQ | 0.85 |
| Threshold cache | 0.95 |
| TTL cache default | 7 días |
| Latencia FAQ match | ~15ms |
| Latencia cache lookup | ~20ms |
| Latencia pipeline completo | ~2-4s |
| % queries resueltas por FAQ | 15-25% |
| % queries resueltas por cache | 10-20% |
| % queries que llegan al LLM | 55-75% |
| Ahorro estimado total | 30-45% |
Lecciones aprendidas
1. Las FAQs son la mejor inversión
No es sexy. Es una tabla con preguntas y respuestas. Pero cada FAQ es una respuesta perfecta servida en 15ms a costo cero, para siempre.
2. El threshold del cache debe ser muy alto
Con 0.90 tuve falsos positivos que sirvieron respuestas incorrectas. 0.95 es el mínimo seguro.
3. Fire-and-forget es obligatorio para cache storage
El primer intento fue síncrono. Si el INSERT tarda, el último token del streaming se demora. Fire-and-forget elimina esa latencia.
4. La invalidación proactiva vale la pena
Es tentador dejar que el cache expire solo (TTL). Pero el admin que sube un documento espera respuestas actualizadas inmediatamente.
5. Auto-FAQ cierra el ciclo
Sin auto-FAQ, las knowledge gaps se acumulan. Con auto-FAQ, en 2 clicks la pregunta frecuente pasa de gap a FAQ activa.
6. Fallback FAQ-only > cortar el servicio
Los usuarios free siguen haciendo preguntas que matchean FAQs. Reduce churn y da incentivo real para upgradear.
7. Los precios de LLM deben ser configurables
Hardcodear $0.00027/1K tokens es una trampa. Los precios cambian. Guardarlos en una tabla de settings permite ajustarlos sin deploy.
Conclusión
Optimizar costos en RAG es un problema de evitar trabajo innecesario. Las tres capas (FAQ, cache, auto-FAQ) atacan el mismo principio: si la respuesta ya existe, no pagues por generarla de nuevo.
Lo interesante: también mejoran la experiencia. FAQ match en 15ms vs 2-4s del pipeline. Cache hit en 20ms con la misma calidad. Y pgvector ya estaba en el stack — no necesitás Redis ni Elasticsearch.
Este es el quinto artículo de la serie. Si te sirvió, un like ayuda a que llegue a más personas. ¿Tenés preguntas sobre cache semántico o FAQ matching? Dejá un comentario.
Top comments (0)