DEV Community

Cover image for Building a Production-Grade RAG System (Not Just a Demo)
Ritwika Kancharla
Ritwika Kancharla

Posted on

Building a Production-Grade RAG System (Not Just a Demo)

It's easy to build a RAG prototype that impresses in a notebook. It's much harder to build one that holds up in production — one that handles 100,000 documents instead of a hundred, recovers gracefully from failures, and gives you actual visibility into what's going wrong when it does.

This is the article for the second kind.


What "Production-Grade" Actually Means

Before we write any code, it's worth being precise about the target. A demo RAG system works on your laptop, handles a small corpus, and "looks right" to whoever's watching. A production RAG system does something fundamentally different: it's measured, monitored, and improvable. It handles load, recovers from failures, and can be understood by a teammate who didn't build it.

The architecture that gets you there has four layers:

┌─────────────────────────────────────────┐
│           DOCUMENT PIPELINE             │
│  Ingest → Chunk → Embed → Index         │
│  (Batch jobs, idempotent, monitored)    │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│           RETRIEVAL LAYER               │
│  Query → Embed → Search → Rerank        │
│  (Cached, filtered, logged)             │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│           GENERATION LAYER              │
│  Prompt → LLM → Post-process → Stream   │
│  (Guardrailed, traced, evaluated)       │
└─────────────────────────────────────────┘
                    ↓
┌─────────────────────────────────────────┐
│           OBSERVABILITY                 │
│  Metrics → Logs → Evals → Alerts        │
│  (You actually know when it breaks)     │
└─────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Let's build each one properly.


Part 1: Document Ingestion Pipeline

Chunking: The Strategy Nobody Thinks About Until It's Too Late

Most people grab a text splitter, pick an arbitrary chunk size, and move on. This works until you're debugging why your system can't answer questions the documents clearly contain.

The right mental model: one chunk = one answerable unit. A chunk should contain enough context to stand alone as the answer to some question. Too small and you lose context; too large and you dilute the signal.

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,        # Characters, not tokens
    chunk_overlap=50,      # Preserves context across boundaries
    separators=["\n\n", "\n", ". ", " ", ""],  # Tries these in order
    length_function=len,
)

chunks = splitter.split_text(long_document)
Enter fullscreen mode Exit fullscreen mode

The RecursiveCharacterTextSplitter is the right default: it respects document structure, splitting on paragraphs before sentences before words. Fixed-size splitters will happily cleave a sentence in half.

Metadata: Store It Now, Thank Yourself Later

Every chunk needs metadata attached at ingestion time. You will want to filter by source, date, and document type in production, and retrofitting that metadata later is painful.

def process_document(doc: dict) -> list:
    chunks = splitter.split_text(doc["content"])

    return [
        {
            "id": f"{doc['source_id']}_{i}",
            "text": chunk,
            "metadata": {
                "source": doc["source"],
                "created_at": doc["timestamp"],
                "chunk_index": i,
                "total_chunks": len(chunks),
                "section": extract_heading(chunk),
                "doc_type": classify_doc_type(chunk),  # FAQ, tutorial, reference, etc.
            }
        }
        for i, chunk in enumerate(chunks)
    ]
Enter fullscreen mode Exit fullscreen mode

Embedding: Batch and Cache

Embedding is where your API costs live. Two habits that pay off immediately: batching and caching.

from openai import OpenAI
import hashlib
import diskcache

client = OpenAI()
cache = diskcache.Cache("./embedding_cache")

def embed_with_cache(texts: list) -> list:
    embeddings = []
    texts_to_embed = []

    for text in texts:
        key = hashlib.md5(text.encode()).hexdigest()

        if key in cache:
            embeddings.append(cache[key])
        else:
            texts_to_embed.append((key, text))

    if texts_to_embed:
        response = client.embeddings.create(
            model="text-embedding-3-large",
            input=[t[1] for t in texts_to_embed]
        )

        for (key, _), embedding in zip(texts_to_embed, response.data):
            cache[key] = embedding.embedding
            embeddings.append(embedding.embedding)

    return embeddings
Enter fullscreen mode Exit fullscreen mode

The sweet spot for batch size is 100–500 texts per API call. Don't embed one text at a time.

Choosing a Vector Store

Store Best For
Chroma Prototyping and smaller corpora (<100K docs)
Pinecone Managed production scale with metadata filtering
Weaviate Complex graph-like queries
pgvector When you already have Postgres and want one database
FAISS Batch/research use cases needing GPU acceleration

For most teams starting out, Chroma gets you running fast. Pinecone is the natural migration target when you need managed scale.

Idempotent Ingestion

Re-running your ingestion pipeline shouldn't create duplicates. This sounds obvious, but it's the kind of thing that bites you the first time you need to re-index after a bug fix.

def ingest_documents(new_docs: list):
    existing = collection.get(ids=[d["id"] for d in new_docs])
    existing_ids = set(existing["ids"])

    to_add = [d for d in new_docs if d["id"] not in existing_ids]
    to_update = [d for d in new_docs if d["id"] in existing_ids]

    if to_add:
        collection.add(...)

    for doc in to_update:
        if content_changed(doc):  # Compare hashes
            collection.delete(ids=[doc["id"]])
            collection.add(...)
Enter fullscreen mode Exit fullscreen mode

Part 2: Retrieval Layer

Over-Fetch, Then Rerank

Vector similarity is good at finding roughly relevant chunks. It's not as good at ranking them. The solution is to over-fetch — grab 2–3x more candidates than you need — and then rerank with a cross-encoder.

class RetrievalEngine:
    def search(self, query: str, filters: dict = None, top_k: int = 10) -> list:
        query_emb = self.embedder.embed(query)

        results = self.collection.query(
            query_embeddings=[query_emb],
            n_results=top_k * 2,  # Over-fetch
            where=filters,
            include=["documents", "metadatas", "distances"]
        )

        reranked = self.rerank(query, results, top_k)
        self.log_query(query, results, reranked)

        return reranked
Enter fullscreen mode Exit fullscreen mode

A cross-encoder scores each query–document pair jointly, which is more accurate than a bi-encoder embedding comparison. The tradeoff is speed, but since you're only reranking a small candidate set, it's fast enough:

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank(query: str, candidates: list, top_k: int) -> list:
    pairs = [(query, doc["text"]) for doc in candidates]
    scores = reranker.predict(pairs)

    for doc, score in zip(candidates, scores):
        doc["rerank_score"] = score

    return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
Enter fullscreen mode Exit fullscreen mode

In most benchmarks, reranking improves precision@5 by 15–25%. It's one of the highest-ROI improvements you can make.

Query Rewriting for Conversational Context

Users in a multi-turn conversation say things like "how do I fix it?" without specifying what "it" is. Retrieval breaks down on pronouns and context-dependent references.

The fix is a short LLM call that rewrites the query to be self-contained before searching:

def rewrite_query(query: str, conversation_history: list) -> str:
    prompt = f"""
    Rewrite this query to be self-contained and specific for search.

    History: {conversation_history[-3:]}
    Current query: {query}

    Rules:
    - Replace "this", "it", "that" with specific nouns from history
    - Add relevant context from the conversation
    - Make it keyword-friendly, not conversational

    Rewritten query:
    """

    return llm.generate(prompt)
Enter fullscreen mode Exit fullscreen mode

So "How do I fix it?" becomes "How to fix Docker build failure: no space left on device" — something the vector store can actually work with.


Part 3: Generation Layer

Prompt Structure Beats Prompt Cleverness

There's a lot of mythology around prompt engineering. In practice, the highest-value thing you can do for RAG prompts is give the model clear, structured instructions with explicit fallback behavior:

RAG_PROMPT = """You are a helpful assistant. Answer based on the provided context.

CONTEXT:
{context}

USER QUESTION:
{question}

INSTRUCTIONS:
1. Answer using ONLY the context provided
2. If the context doesn't contain the answer, say "I don't have that information"
3. Cite your sources with [1], [2], etc.
4. Be concise but complete

ANSWER:
"""

def format_context(docs: list) -> str:
    return "\n\n".join([
        f"[{i+1}] {d['metadata']['source']}: {d['text'][:500]}"
        for i, d in enumerate(docs)
    ])
Enter fullscreen mode Exit fullscreen mode

The explicit "say you don't know" instruction is critical. Without it, models will hallucinate confident answers from thin context.

Guardrails: Catch Bad Outputs Before Users See Them

A guardrail layer runs checks on every response before it goes to the user. Start simple — you can make this as sophisticated as you need over time:

import re

class OutputGuardrail:
    def check(self, response: str, sources: list) -> dict:
        issues = []

        # Hallucinated citations (model invented a source number that doesn't exist)
        citations = re.findall(r'\[(\d+)\]', response)
        for c in citations:
            if int(c) > len(sources):
                issues.append(f"Invalid citation [{c}]")

        # Excessive hedging (often signals the model is guessing)
        weasel_words = ["might", "maybe", "possibly", "could be"]
        if sum(w in response.lower() for w in weasel_words) > 2:
            issues.append("Low confidence language detected")

        # Suspiciously short responses
        if len(response) < 20:
            issues.append("Response too short")

        return {
            "passed": len(issues) == 0,
            "issues": issues,
            "suggested_action": "retry" if issues else "proceed"
        }
Enter fullscreen mode Exit fullscreen mode

Streaming Makes Everything Feel Faster

Users perceive a system that starts showing output immediately as dramatically faster than one that makes them wait for a complete response — even if total latency is similar.

def generate_streaming(prompt: str):
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content
Enter fullscreen mode Exit fullscreen mode

First token in ~200ms instead of a 2-second wait. This is a perception win, not a performance hack.


Part 4: Observability

If you don't measure it, you can't improve it. Here are the metrics that actually matter for RAG:

Category Metric Why It Matters
Retrieval MRR, NDCG@5, Precision@K Is search finding the right chunks?
Generation Faithfulness, citation accuracy Is the LLM staying grounded?
Latency P50, P95, time-to-first-token Is it fast enough for real use?
Business User satisfaction, task completion Is it actually useful?
Cost Tokens per query, embedding costs Can you afford to run it?

Structured Logging You Can Actually Query

Write logs as NDJSON. Every line is a complete, valid JSON object. BigQuery, Elasticsearch, and most log aggregators love this format.

def log_interaction(query: str, retrieved: list, response: str, latency: float):
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "query": query,
        "query_hash": hashlib.md5(query.encode()).hexdigest(),
        "num_retrieved": len(retrieved),
        "retrieved_sources": [r["metadata"]["source"] for r in retrieved],
        "response_length": len(response),
        "latency_ms": latency,
        "guardrail_issues": check_guardrails(response, retrieved),
    }

    with open("rag_logs.ndjson", "a") as f:
        f.write(json.dumps(log_entry) + "\n")
Enter fullscreen mode Exit fullscreen mode

Automated Daily Evaluation

Build a golden dataset of (query, expected relevant document IDs) pairs. Run it daily. Alert on regression.

class RAGEvaluator:
    def evaluate_retrieval(self) -> dict:
        scores = []

        for item in self.golden:
            results = retrieval_engine.search(item["query"])
            retrieved_ids = [r["id"] for r in results]
            scores.append(calculate_mrr(retrieved_ids, item["relevant_ids"]))

        return {
            "mrr_mean": np.mean(scores),
            "mrr_p10": np.percentile(scores, 10),
            "mrr_p90": np.percentile(scores, 90),
        }

    def run_daily_eval(self):
        metrics = self.evaluate_retrieval()

        if metrics["mrr_mean"] < BASELINE_MRR * 0.95:
            send_alert(f"Retrieval MRR dropped to {metrics['mrr_mean']:.3f}")

        log_to_datadog(metrics)
Enter fullscreen mode Exit fullscreen mode

A 5% regression threshold is a reasonable starting point. Tighten it as your system matures and baselines stabilize.


Putting It All Together

Here's the full query path, assembled:

class ProductionRAG:
    def query(self, user_query: str, conversation_history: list = None) -> dict:
        start_time = time.time()

        # Rewrite query if we have conversation context
        search_query = (
            rewrite_query(user_query, conversation_history)
            if conversation_history
            else user_query
        )

        # Retrieve with reranking
        retrieved = self.retrieval.search(search_query, top_k=5)

        # Build and run prompt with streaming
        prompt = RAG_PROMPT.format(
            context=format_context(retrieved),
            question=user_query
        )

        response = "".join(generate_streaming(prompt))

        # Guardrail check
        guardrail_result = self.guardrail.check(response, retrieved)
        if not guardrail_result["passed"]:
            response = "I need to verify some details before I can answer this confidently."

        # Log everything
        latency = (time.time() - start_time) * 1000
        log_interaction(user_query, retrieved, response, latency)

        return {
            "response": response,
            "sources": [r["metadata"]["source"] for r in retrieved],
            "latency_ms": latency,
            "guardrail_passed": guardrail_result["passed"]
        }
Enter fullscreen mode Exit fullscreen mode

Before You Ship: The Checklist

  • [ ] Chunking strategy documented and tested against real queries
  • [ ] Metadata schema versioned and consistent across documents
  • [ ] Idempotent ingestion — re-running never creates duplicates
  • [ ] Embedding cache reducing API costs on repeated content
  • [ ] Reranking improving precision over raw vector similarity
  • [ ] Query rewriting handling ambiguous, conversational queries
  • [ ] Guardrails catching bad outputs before users see them
  • [ ] Streaming enabled for perceived performance
  • [ ] Structured NDJSON logging queryable in your data stack
  • [ ] Daily automated evaluation against a golden dataset
  • [ ] Alerts configured for metric regression
  • [ ] Health checks for load balancer integration
  • [ ] Runbook written for the three most likely failure modes

Where to Go From Here

Once this foundation is solid, the natural next steps are:

Hybrid search — combine vector search with BM25 keyword search. Purely vector-based retrieval underperforms on keyword-heavy queries (product names, error codes, proper nouns).

Multi-tenancy — separate collections per customer, with per-tenant metadata filtering. Don't let one customer's documents bleed into another's retrieval results.

Continuous indexing — webhook-driven updates instead of scheduled batch jobs. New documents show up in retrieval within seconds, not hours.

A/B testing — route 10% of traffic to a new embedding model and measure retrieval metrics before committing. This is the only rigorous way to evaluate embedding model changes.


The difference between a RAG prototype and a RAG system is mostly about the plumbing nobody sees: idempotent pipelines, structured logs, evaluation harnesses, guardrails. It's less glamorous than the retrieval algorithm, but it's what determines whether the thing is still working correctly six months after you shipped it.

Build the plumbing first.

Top comments (0)