It's easy to build a RAG prototype that impresses in a notebook. It's much harder to build one that holds up in production — one that handles 100,000 documents instead of a hundred, recovers gracefully from failures, and gives you actual visibility into what's going wrong when it does.
This is the article for the second kind.
What "Production-Grade" Actually Means
Before we write any code, it's worth being precise about the target. A demo RAG system works on your laptop, handles a small corpus, and "looks right" to whoever's watching. A production RAG system does something fundamentally different: it's measured, monitored, and improvable. It handles load, recovers from failures, and can be understood by a teammate who didn't build it.
The architecture that gets you there has four layers:
┌─────────────────────────────────────────┐
│ DOCUMENT PIPELINE │
│ Ingest → Chunk → Embed → Index │
│ (Batch jobs, idempotent, monitored) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ RETRIEVAL LAYER │
│ Query → Embed → Search → Rerank │
│ (Cached, filtered, logged) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ GENERATION LAYER │
│ Prompt → LLM → Post-process → Stream │
│ (Guardrailed, traced, evaluated) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ OBSERVABILITY │
│ Metrics → Logs → Evals → Alerts │
│ (You actually know when it breaks) │
└─────────────────────────────────────────┘
Let's build each one properly.
Part 1: Document Ingestion Pipeline
Chunking: The Strategy Nobody Thinks About Until It's Too Late
Most people grab a text splitter, pick an arbitrary chunk size, and move on. This works until you're debugging why your system can't answer questions the documents clearly contain.
The right mental model: one chunk = one answerable unit. A chunk should contain enough context to stand alone as the answer to some question. Too small and you lose context; too large and you dilute the signal.
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # Characters, not tokens
chunk_overlap=50, # Preserves context across boundaries
separators=["\n\n", "\n", ". ", " ", ""], # Tries these in order
length_function=len,
)
chunks = splitter.split_text(long_document)
The RecursiveCharacterTextSplitter is the right default: it respects document structure, splitting on paragraphs before sentences before words. Fixed-size splitters will happily cleave a sentence in half.
Metadata: Store It Now, Thank Yourself Later
Every chunk needs metadata attached at ingestion time. You will want to filter by source, date, and document type in production, and retrofitting that metadata later is painful.
def process_document(doc: dict) -> list:
chunks = splitter.split_text(doc["content"])
return [
{
"id": f"{doc['source_id']}_{i}",
"text": chunk,
"metadata": {
"source": doc["source"],
"created_at": doc["timestamp"],
"chunk_index": i,
"total_chunks": len(chunks),
"section": extract_heading(chunk),
"doc_type": classify_doc_type(chunk), # FAQ, tutorial, reference, etc.
}
}
for i, chunk in enumerate(chunks)
]
Embedding: Batch and Cache
Embedding is where your API costs live. Two habits that pay off immediately: batching and caching.
from openai import OpenAI
import hashlib
import diskcache
client = OpenAI()
cache = diskcache.Cache("./embedding_cache")
def embed_with_cache(texts: list) -> list:
embeddings = []
texts_to_embed = []
for text in texts:
key = hashlib.md5(text.encode()).hexdigest()
if key in cache:
embeddings.append(cache[key])
else:
texts_to_embed.append((key, text))
if texts_to_embed:
response = client.embeddings.create(
model="text-embedding-3-large",
input=[t[1] for t in texts_to_embed]
)
for (key, _), embedding in zip(texts_to_embed, response.data):
cache[key] = embedding.embedding
embeddings.append(embedding.embedding)
return embeddings
The sweet spot for batch size is 100–500 texts per API call. Don't embed one text at a time.
Choosing a Vector Store
| Store | Best For |
|---|---|
| Chroma | Prototyping and smaller corpora (<100K docs) |
| Pinecone | Managed production scale with metadata filtering |
| Weaviate | Complex graph-like queries |
| pgvector | When you already have Postgres and want one database |
| FAISS | Batch/research use cases needing GPU acceleration |
For most teams starting out, Chroma gets you running fast. Pinecone is the natural migration target when you need managed scale.
Idempotent Ingestion
Re-running your ingestion pipeline shouldn't create duplicates. This sounds obvious, but it's the kind of thing that bites you the first time you need to re-index after a bug fix.
def ingest_documents(new_docs: list):
existing = collection.get(ids=[d["id"] for d in new_docs])
existing_ids = set(existing["ids"])
to_add = [d for d in new_docs if d["id"] not in existing_ids]
to_update = [d for d in new_docs if d["id"] in existing_ids]
if to_add:
collection.add(...)
for doc in to_update:
if content_changed(doc): # Compare hashes
collection.delete(ids=[doc["id"]])
collection.add(...)
Part 2: Retrieval Layer
Over-Fetch, Then Rerank
Vector similarity is good at finding roughly relevant chunks. It's not as good at ranking them. The solution is to over-fetch — grab 2–3x more candidates than you need — and then rerank with a cross-encoder.
class RetrievalEngine:
def search(self, query: str, filters: dict = None, top_k: int = 10) -> list:
query_emb = self.embedder.embed(query)
results = self.collection.query(
query_embeddings=[query_emb],
n_results=top_k * 2, # Over-fetch
where=filters,
include=["documents", "metadatas", "distances"]
)
reranked = self.rerank(query, results, top_k)
self.log_query(query, results, reranked)
return reranked
A cross-encoder scores each query–document pair jointly, which is more accurate than a bi-encoder embedding comparison. The tradeoff is speed, but since you're only reranking a small candidate set, it's fast enough:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(query: str, candidates: list, top_k: int) -> list:
pairs = [(query, doc["text"]) for doc in candidates]
scores = reranker.predict(pairs)
for doc, score in zip(candidates, scores):
doc["rerank_score"] = score
return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
In most benchmarks, reranking improves precision@5 by 15–25%. It's one of the highest-ROI improvements you can make.
Query Rewriting for Conversational Context
Users in a multi-turn conversation say things like "how do I fix it?" without specifying what "it" is. Retrieval breaks down on pronouns and context-dependent references.
The fix is a short LLM call that rewrites the query to be self-contained before searching:
def rewrite_query(query: str, conversation_history: list) -> str:
prompt = f"""
Rewrite this query to be self-contained and specific for search.
History: {conversation_history[-3:]}
Current query: {query}
Rules:
- Replace "this", "it", "that" with specific nouns from history
- Add relevant context from the conversation
- Make it keyword-friendly, not conversational
Rewritten query:
"""
return llm.generate(prompt)
So "How do I fix it?" becomes "How to fix Docker build failure: no space left on device" — something the vector store can actually work with.
Part 3: Generation Layer
Prompt Structure Beats Prompt Cleverness
There's a lot of mythology around prompt engineering. In practice, the highest-value thing you can do for RAG prompts is give the model clear, structured instructions with explicit fallback behavior:
RAG_PROMPT = """You are a helpful assistant. Answer based on the provided context.
CONTEXT:
{context}
USER QUESTION:
{question}
INSTRUCTIONS:
1. Answer using ONLY the context provided
2. If the context doesn't contain the answer, say "I don't have that information"
3. Cite your sources with [1], [2], etc.
4. Be concise but complete
ANSWER:
"""
def format_context(docs: list) -> str:
return "\n\n".join([
f"[{i+1}] {d['metadata']['source']}: {d['text'][:500]}"
for i, d in enumerate(docs)
])
The explicit "say you don't know" instruction is critical. Without it, models will hallucinate confident answers from thin context.
Guardrails: Catch Bad Outputs Before Users See Them
A guardrail layer runs checks on every response before it goes to the user. Start simple — you can make this as sophisticated as you need over time:
import re
class OutputGuardrail:
def check(self, response: str, sources: list) -> dict:
issues = []
# Hallucinated citations (model invented a source number that doesn't exist)
citations = re.findall(r'\[(\d+)\]', response)
for c in citations:
if int(c) > len(sources):
issues.append(f"Invalid citation [{c}]")
# Excessive hedging (often signals the model is guessing)
weasel_words = ["might", "maybe", "possibly", "could be"]
if sum(w in response.lower() for w in weasel_words) > 2:
issues.append("Low confidence language detected")
# Suspiciously short responses
if len(response) < 20:
issues.append("Response too short")
return {
"passed": len(issues) == 0,
"issues": issues,
"suggested_action": "retry" if issues else "proceed"
}
Streaming Makes Everything Feel Faster
Users perceive a system that starts showing output immediately as dramatically faster than one that makes them wait for a complete response — even if total latency is similar.
def generate_streaming(prompt: str):
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
First token in ~200ms instead of a 2-second wait. This is a perception win, not a performance hack.
Part 4: Observability
If you don't measure it, you can't improve it. Here are the metrics that actually matter for RAG:
| Category | Metric | Why It Matters |
|---|---|---|
| Retrieval | MRR, NDCG@5, Precision@K | Is search finding the right chunks? |
| Generation | Faithfulness, citation accuracy | Is the LLM staying grounded? |
| Latency | P50, P95, time-to-first-token | Is it fast enough for real use? |
| Business | User satisfaction, task completion | Is it actually useful? |
| Cost | Tokens per query, embedding costs | Can you afford to run it? |
Structured Logging You Can Actually Query
Write logs as NDJSON. Every line is a complete, valid JSON object. BigQuery, Elasticsearch, and most log aggregators love this format.
def log_interaction(query: str, retrieved: list, response: str, latency: float):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"query": query,
"query_hash": hashlib.md5(query.encode()).hexdigest(),
"num_retrieved": len(retrieved),
"retrieved_sources": [r["metadata"]["source"] for r in retrieved],
"response_length": len(response),
"latency_ms": latency,
"guardrail_issues": check_guardrails(response, retrieved),
}
with open("rag_logs.ndjson", "a") as f:
f.write(json.dumps(log_entry) + "\n")
Automated Daily Evaluation
Build a golden dataset of (query, expected relevant document IDs) pairs. Run it daily. Alert on regression.
class RAGEvaluator:
def evaluate_retrieval(self) -> dict:
scores = []
for item in self.golden:
results = retrieval_engine.search(item["query"])
retrieved_ids = [r["id"] for r in results]
scores.append(calculate_mrr(retrieved_ids, item["relevant_ids"]))
return {
"mrr_mean": np.mean(scores),
"mrr_p10": np.percentile(scores, 10),
"mrr_p90": np.percentile(scores, 90),
}
def run_daily_eval(self):
metrics = self.evaluate_retrieval()
if metrics["mrr_mean"] < BASELINE_MRR * 0.95:
send_alert(f"Retrieval MRR dropped to {metrics['mrr_mean']:.3f}")
log_to_datadog(metrics)
A 5% regression threshold is a reasonable starting point. Tighten it as your system matures and baselines stabilize.
Putting It All Together
Here's the full query path, assembled:
class ProductionRAG:
def query(self, user_query: str, conversation_history: list = None) -> dict:
start_time = time.time()
# Rewrite query if we have conversation context
search_query = (
rewrite_query(user_query, conversation_history)
if conversation_history
else user_query
)
# Retrieve with reranking
retrieved = self.retrieval.search(search_query, top_k=5)
# Build and run prompt with streaming
prompt = RAG_PROMPT.format(
context=format_context(retrieved),
question=user_query
)
response = "".join(generate_streaming(prompt))
# Guardrail check
guardrail_result = self.guardrail.check(response, retrieved)
if not guardrail_result["passed"]:
response = "I need to verify some details before I can answer this confidently."
# Log everything
latency = (time.time() - start_time) * 1000
log_interaction(user_query, retrieved, response, latency)
return {
"response": response,
"sources": [r["metadata"]["source"] for r in retrieved],
"latency_ms": latency,
"guardrail_passed": guardrail_result["passed"]
}
Before You Ship: The Checklist
- [ ] Chunking strategy documented and tested against real queries
- [ ] Metadata schema versioned and consistent across documents
- [ ] Idempotent ingestion — re-running never creates duplicates
- [ ] Embedding cache reducing API costs on repeated content
- [ ] Reranking improving precision over raw vector similarity
- [ ] Query rewriting handling ambiguous, conversational queries
- [ ] Guardrails catching bad outputs before users see them
- [ ] Streaming enabled for perceived performance
- [ ] Structured NDJSON logging queryable in your data stack
- [ ] Daily automated evaluation against a golden dataset
- [ ] Alerts configured for metric regression
- [ ] Health checks for load balancer integration
- [ ] Runbook written for the three most likely failure modes
Where to Go From Here
Once this foundation is solid, the natural next steps are:
Hybrid search — combine vector search with BM25 keyword search. Purely vector-based retrieval underperforms on keyword-heavy queries (product names, error codes, proper nouns).
Multi-tenancy — separate collections per customer, with per-tenant metadata filtering. Don't let one customer's documents bleed into another's retrieval results.
Continuous indexing — webhook-driven updates instead of scheduled batch jobs. New documents show up in retrieval within seconds, not hours.
A/B testing — route 10% of traffic to a new embedding model and measure retrieval metrics before committing. This is the only rigorous way to evaluate embedding model changes.
The difference between a RAG prototype and a RAG system is mostly about the plumbing nobody sees: idempotent pipelines, structured logs, evaluation harnesses, guardrails. It's less glamorous than the retrieval algorithm, but it's what determines whether the thing is still working correctly six months after you shipped it.
Build the plumbing first.
Top comments (0)