DEV Community

Moon Robert
Moon Robert

Posted on

Construir Aplicaciones RAG Listas para Producción con Bases de Datos Vectoriales

Construir Aplicaciones RAG Listas para Producción con Bases de Datos Vectoriales

La generación aumentada por recuperación (RAG) ha pasado de ser un concepto académico a convertirse en el patrón arquitectónico más adoptado para construir aplicaciones de IA confiables. Si ya tienes un prototipo funcionando y quieres llevarlo a producción sin que se caiga a los tres días, este tutorial RAG es exactamente lo que necesitas.

En este artículo vas a aprender a diseñar, implementar y escalar un sistema RAG completo usando bases de datos vectoriales como Pinecone y Weaviate, con decisiones de arquitectura justificadas para entornos reales.


¿Qué es la Generación Aumentada por Recuperación y por qué importa?

La generación aumentada por recuperación es una técnica que combina dos capacidades: la búsqueda semántica sobre una base de conocimiento propia y la capacidad generativa de un LLM. En lugar de depender únicamente del conocimiento que el modelo adquirió durante el entrenamiento, el sistema recupera fragmentos de texto relevantes en tiempo real y los incluye en el contexto de la consulta.

El resultado práctico es que puedes construir un chatbot que responde con información actualizada de tu documentación interna, un asistente legal que cita contratos específicos o un sistema de soporte que sabe exactamente qué versión del producto tiene el usuario.

El problema con los tutoriales convencionales de RAG es que se detienen en el prototipo. Un pipeline RAG de producción necesita manejar:

  • Ingestión incremental de documentos sin tiempos de caída
  • Búsqueda con baja latencia bajo carga concurrente
  • Filtrado por metadatos para multitenancy
  • Monitorización de calidad de respuestas
  • Estrategias de chunking que no destruyan el contexto

Vamos a ver todo esto paso a paso.


La Arquitectura de un Sistema RAG Listo para Producción

Antes de escribir una sola línea de código, necesitas entender los componentes y sus responsabilidades.

[Documentos] → [Preprocesamiento] → [Chunking] → [Embedding] → [Base de datos vectorial]
                                                                         ↓
[Usuario] → [Query] → [Embedding de query] → [Búsqueda vectorial] → [Contexto]
                                                                         ↓
                                                               [LLM + Prompt] → [Respuesta]
Enter fullscreen mode Exit fullscreen mode

Cada flecha en ese diagrama es un punto de fallo potencial. Una arquitectura robusta debe gestionar errores, reintentos y degradación controlada en cada uno de ellos.

Componentes críticos

Modelo de embedding: Transforma texto en vectores densos. La elección aquí afecta directamente la calidad de recuperación. text-embedding-3-small de OpenAI ofrece un buen equilibrio entre coste y rendimiento. Para casos donde la privacidad es crítica, nomic-embed-text corriendo localmente es una alternativa sólida.

Base de datos vectorial: Almacena embeddings y permite búsqueda por similitud a escala. Pinecone y Weaviate son las opciones más maduras para producción, aunque con filosofías distintas.

LLM: Genera la respuesta final. GPT-4o, Claude 3.5 Sonnet o Llama 3.1 70B dependiendo de tus restricciones de coste y latencia.


Pinecone vs. Weaviate: Eligiendo tu Base de Datos Vectorial

Esta decisión tiene consecuencias a largo plazo. Aquí tienes las diferencias que realmente importan en producción.

Pinecone

Pinecone es un servicio gestionado que abstrae completamente la infraestructura. No gestionas índices, no configuras shards, no te preocupas por la replicación.

Cuándo usar Pinecone:

  • Tu equipo no tiene experiencia en bases de datos distribuidas
  • Necesitas escalar rápido sin overhead operacional
  • El coste predecible es más importante que el coste absoluto
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key="tu_api_key")

# Crear índice serverless
pc.create_index(
    name="documentos-produccion",
    dimension=1536,  # dimensión de text-embedding-3-small
    metric="cosine",
    spec=ServerlessSpec(cloud="aws", region="us-east-1")
)

index = pc.Index("documentos-produccion")

# Upsert con metadatos para filtrado
vectors = [
    {
        "id": "doc_001_chunk_0",
        "values": embedding_vector,
        "metadata": {
            "tenant_id": "empresa_abc",
            "documento": "politica_privacidad.pdf",
            "pagina": 1,
            "fecha_ingestion": "2025-01-15"
        }
    }
]

index.upsert(vectors=vectors, namespace="tenant_empresa_abc")
Enter fullscreen mode Exit fullscreen mode

Weaviate

Weaviate es un sistema open-source con una arquitectura más rica. Soporta búsqueda híbrida (vectorial + BM25 keyword) de forma nativa y tiene un esquema flexible orientado a objetos.

Cuándo usar Weaviate:

  • Necesitas búsqueda híbrida sin construirla tú mismo
  • Quieres autohosting para cumplimiento normativo
  • Tus documentos tienen estructura compleja con relaciones entre entidades
import weaviate
from weaviate.classes.config import Configure, Property, DataType

client = weaviate.connect_to_weaviate_cloud(
    cluster_url="tu-cluster.weaviate.network",
    auth_credentials=weaviate.auth.AuthApiKey("tu_api_key")
)

# Definir colección con vectorizador
client.collections.create(
    name="Documentos",
    vectorizer_config=Configure.Vectorizer.text2vec_openai(
        model="text-embedding-3-small"
    ),
    properties=[
        Property(name="contenido", data_type=DataType.TEXT),
        Property(name="tenant_id", data_type=DataType.TEXT),
        Property(name="fuente", data_type=DataType.TEXT),
        Property(name="fecha", data_type=DataType.DATE),
    ]
)
Enter fullscreen mode Exit fullscreen mode

Implementando el Pipeline de Ingestión

El pipeline de ingestión es donde más proyectos de IA en producción cometen errores. La clave está en el chunking y en el diseño para actualizaciones incrementales.

Estrategia de Chunking

El chunking ingenuo divide el texto cada N caracteres. Esto rompe frases, separa preguntas de sus respuestas y destruye el contexto que el LLM necesita para responder bien.

Una estrategia mejor usa chunking semántico con superposición controlada:

from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict
import hashlib

def chunk_documento(texto: str, metadatos: Dict) -> List[Dict]:
    """
    Divide un documento en chunks con superposición semántica.
    Retorna lista de chunks con metadatos enriquecidos.
    """
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=512,
        chunk_overlap=64,
        separators=["\n\n", "\n", ". ", "! ", "? ", " "],
        length_function=len,
    )

    chunks = splitter.split_text(texto)
    resultado = []

    for i, chunk in enumerate(chunks):
        # ID determinístico: si el contenido no cambia, el ID tampoco
        chunk_id = hashlib.md5(
            f"{metadatos['doc_id']}_{i}_{chunk[:50]}".encode()
        ).hexdigest()

        resultado.append({
            "id": chunk_id,
            "contenido": chunk,
            "chunk_index": i,
            "total_chunks": len(chunks),
            **metadatos
        })

    return resultado
Enter fullscreen mode Exit fullscreen mode

Los IDs determinísticos son fundamentales para la ingestión incremental. Si el documento cambia, solo reingestas los chunks modificados.

Pipeline de Ingestión Asíncrono

En producción, la ingestión no puede bloquear el servidor principal. Usa una cola de tareas:

import asyncio
import openai
from typing import List
import backoff

openai_client = openai.AsyncOpenAI()

@backoff.on_exception(
    backoff.expo,
    openai.RateLimitError,
    max_tries=5
)
async def generar_embedding(texto: str) -> List[float]:
    """Genera embedding con reintentos automáticos ante rate limits."""
    response = await openai_client.embeddings.create(
        input=texto,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

async def ingestar_chunks_batch(chunks: List[Dict], index) -> None:
    """Procesa chunks en batches para respetar límites de API."""
    BATCH_SIZE = 100

    for i in range(0, len(chunks), BATCH_SIZE):
        batch = chunks[i:i + BATCH_SIZE]

        # Generar embeddings en paralelo dentro del batch
        embeddings = await asyncio.gather(*[
            generar_embedding(chunk["contenido"])
            for chunk in batch
        ])

        vectors = [
            {
                "id": chunk["id"],
                "values": embedding,
                "metadata": {k: v for k, v in chunk.items()
                             if k not in ("id", "contenido")}
            }
            for chunk, embedding in zip(batch, embeddings)
        ]

        index.upsert(vectors=vectors)
        print(f"Batch {i // BATCH_SIZE + 1} completado: {len(vectors)} chunks")
Enter fullscreen mode Exit fullscreen mode

El Pipeline de Recuperación y Generación

La recuperación es donde la mayoría de los sistemas RAG dejan puntos de mejora sobre la mesa. Una búsqueda vectorial básica devuelve los K vecinos más cercanos, pero en producción necesitas más control.

Búsqueda con Filtrado por Metadatos

El filtrado por metadatos es esencial para multitenancy. Sin él, un usuario podría recuperar documentos de otro cliente.

async def recuperar_contexto(
    query: str,
    tenant_id: str,
    index,
    top_k: int = 5,
    score_threshold: float = 0.7
) -> List[Dict]:
    """
    Recupera fragmentos relevantes con filtrado por tenant y umbral de score.
    """
    query_embedding = await generar_embedding(query)

    resultados = index.query(
        vector=query_embedding,
        top_k=top_k,
        namespace=f"tenant_{tenant_id}",
        filter={"tenant_id": {"$eq": tenant_id}},
        include_metadata=True
    )

    # Filtrar por score mínimo para evitar contexto irrelevante
    fragmentos_relevantes = [
        match for match in resultados.matches
        if match.score >= score_threshold
    ]

    if not fragmentos_relevantes:
        return []

    return [
        {
            "contenido": match.metadata.get("contenido", ""),
            "fuente": match.metadata.get("fuente", ""),
            "score": match.score
        }
        for match in fragmentos_relevantes
    ]
Enter fullscreen mode Exit fullscreen mode

Construcción del Prompt y Generación

El prompt es la interfaz entre la recuperación y el LLM. Un prompt mal diseñado anula todo el trabajo anterior.

from anthropic import AsyncAnthropic

anthropic_client = AsyncAnthropic()

SYSTEM_PROMPT = """Eres un asistente especializado que responde preguntas basándose \
exclusivamente en la información proporcionada en el contexto.

Reglas:
1. Usa SOLO la información del contexto para responder.
2. Si el contexto no contiene información suficiente, indícalo explícitamente.
3. Cita la fuente cuando sea relevante.
4. Sé conciso y directo.
5. No inventes información que no esté en el contexto."""

async def generar_respuesta_rag(
    query: str,
    fragmentos: List[Dict]
) -> str:
    """Genera respuesta usando el contexto recuperado."""

    if not fragmentos:
        return "No encontré información relevante en la base de conocimiento para responder tu pregunta."

    # Construir contexto formateado
    contexto = "\n\n---\n\n".join([
        f"[Fuente: {f['fuente']} | Relevancia: {f['score']:.2f}]\n{f['contenido']}"
        for f in fragmentos
    ])

    mensaje_usuario = f"""Contexto disponible:
{contexto}

Pregunta: {query}"""

    response = await anthropic_client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        system=SYSTEM_PROMPT,
        messages=[
            {"role": "user", "content": mensaje_usuario}
        ]
    )

    return response.content[0].text
Enter fullscreen mode Exit fullscreen mode

Orquestando el Pipeline Completo

async def rag_query(
    query: str,
    tenant_id: str,
    index
) -> Dict:
    """
    Pipeline RAG completo: recuperación + generación + metadatos de debug.
    """
    import time
    inicio = time.time()

    # Recuperación
    fragmentos = await recuperar_contexto(query, tenant_id, index)
    tiempo_recuperacion = time.time() - inicio

    # Generación
    respuesta = await generar_respuesta_rag(query, fragmentos)
    tiempo_total = time.time() - inicio

    return {
        "respuesta": respuesta,
        "fuentes": [f["fuente"] for f in fragmentos],
        "fragmentos_recuperados": len(fragmentos),
        "latencia_recuperacion_ms": round(tiempo_recuperacion * 1000),
        "latencia_total_ms": round(tiempo_total * 1000)
    }
Enter fullscreen mode Exit fullscreen mode

Evaluación y Monitorización: Lo que Separa el Prototipo de la Producción

Puedes tener el mejor pipeline RAG del mundo, pero si no mides la calidad de las respuestas, no sabrás cuándo falla.

Métricas Clave

Faithfulness (fidelidad): ¿La respuesta generada está respaldada por el contexto recuperado? Una respuesta fiel no añade información que no esté en los fragmentos.

Answer Relevance: ¿La respuesta aborda la pregunta del usuario? Puedes medirla generando preguntas a partir de la respuesta y verificando que coincidan con la consulta original.

Context Recall: ¿El contexto recuperado contiene la información necesaria para responder?

Implementación con RAGAS

RAGAS es la librería de referencia para evaluar sistemas RAG de forma automática:

from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset

# Preparar dataset de evaluación
datos_evaluacion = {
    "question": ["¿Cuál es la política de devoluciones?"],
    "answer": [respuesta_generada],
    "contexts": [[f["contenido"] for f in fragmentos]],
    "ground_truth": ["La política de devoluciones permite..."]  # respuesta ideal
}

dataset = Dataset.from_dict(datos_evaluacion)

resultados = evaluate(
    dataset=dataset,
    metrics=[faithfulness, answer_relevancy, context_recall]
)

print(resultados)
# Output: {'faithfulness': 0.92, 'answer_relevancy': 0.88, 'context_recall': 0.85}
Enter fullscreen mode Exit fullscreen mode

Logging Estructurado para Debugging

En producción, cada query debe generar un log estructurado que permita diagnosticar fallos:

import structlog
import uuid

logger = structlog.get_logger()

async def rag_query_con_logging(query: str, tenant_id: str, index) -> Dict:
    request_id = str(uuid.uuid4())

    log = logger.bind(
        request_id=request_id,
        tenant_id=tenant_id,
        query_length=len(query)
    )

    log.info("rag_query_iniciada")

    try:
        resultado = await rag_query(query, tenant_id, index)

        log.info(
            "rag_query_completada",
            fragmentos_recuperados=resultado["fragmentos_recuperados"],
            latencia_total_ms=resultado["latencia_total_ms"]
        )

        return {**resultado, "request_id": request_id}

    except Exception as e:
        log.error("rag_query_fallida", error=str(e), error_type=type(e).__name__)
        raise
Enter fullscreen mode Exit fullscreen mode

Optimizaciones para Escala

Una vez que el sistema funciona correctamente en local, estos son los cuellos de botella que vas a encontrar primero.

Caché de Embeddings

Generar embeddings tiene coste y latencia. Si los mismos documentos se consultan repetidamente, cachea los embeddings:

import redis
import json
import hashlib

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

async def generar_embedding_con_cache(texto: str) -> List[float]:
    cache_key = f"emb:{hashlib.md5(texto.encode()).hexdigest()}"

    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    embedding = await generar_embedding(texto)

    # TTL de 24 horas para embeddings de queries frecuentes
    redis_client.setex(cache_key, 86400, json.dumps(embedding))
    return embedding
Enter fullscreen mode Exit fullscreen mode

Búsqueda Híbrida en Weaviate

Para documentos con terminología técnica específica, la búsqueda puramente vectorial puede fallar con nombres propios o códigos de producto. La búsqueda híbrida combina vectorial con BM25:

def busqueda_hibrida_weaviate(client, query: str, tenant_id: str, limit: int = 5):
    coleccion = client.collections.get("Documentos")

    resultados = coleccion.query.hybrid(
        query=query,
        alpha=0.75,  # 0 = solo BM25, 1 = solo vectorial
        limit=limit,
        filters=weaviate.classes.query.Filter.by_property("tenant_id").equal(tenant_id),
        return_metadata=weaviate.classes.query.MetadataQuery(score=True)
    )

    return [
        {
            "contenido": obj.properties["contenido"],
            "fuente": obj.properties["fuente"],
            "score": obj.metadata.score
        }
        for obj in resultados.objects
    ]
Enter fullscreen mode Exit fullscreen mode

Conclusiones y Próximos Pasos

Construir un sistema RAG para producción no es difícil, pero requiere pensar en cada componente más allá del caso feliz. El chunking determina qué tan bien se recupera la información. Los IDs determinísticos permiten actualizaciones incrementales sin reingestar todo. Los filtros por metadatos son imprescindibles para multitenancy. Y la evaluación continua con RAGAS te permite detectar degradaciones antes de que los usuarios las reporten.

El siguiente paso recomendado para llevar tu sistema al siguiente nivel es implementar reranking: después de recuperar los top-K fragmentos, pasa los resultados por un modelo de reranking (como Cohere Rerank o un cross-encoder local) antes de enviarlo al LLM. Esto mejora significativamente la precision de los fragmentos seleccionados.

La IA en producción no es un estado que alcanzas, es una práctica continua de medición, iteración y mejora. Con la arquitectura correcta desde el principio, cada iteración te cuesta tiempo de ingeniería, no tiempo de caída.


¿Tienes preguntas sobre cómo adaptar este tutorial RAG a tu caso de uso específico? Los comentarios están abiertos.

Top comments (0)