DEV Community

Cover image for How I Built a Production RAG Pipeline with FastAPI, pgvector and Cross-Encoder Reranking
Martin Palopoli
Martin Palopoli

Posted on

How I Built a Production RAG Pipeline with FastAPI, pgvector and Cross-Encoder Reranking

I built a production RAG engine that combines hybrid search (pgvector + BM25), cross-encoder reranking, MMR diversity, semantic caching and automatic language detection — all on top of async FastAPI and PostgreSQL. This article covers the real architecture, technical decisions, and key code.


Why Another RAG Article

Most RAG tutorials stop at "embed → cosine search → prompt". That works for a demo, but in production you'll run into:

  • Queries in Spanish that match English chunks (or vice versa)
  • The top 10 most similar chunks come from the same document
  • Semantic search fails with proper nouns or exact codes
  • Repeated answers burn tokens unnecessarily
  • No way to know if the answer is actually reliable

This article shows how I solved each of these problems in a real multi-tenant system.


The Stack

Component Technology
Backend Python 3.12 + FastAPI (100% async)
Database PostgreSQL 16 + pgvector + tsvector
Embeddings paraphrase-multilingual-MiniLM-L12-v2 (384d)
Reranker cross-encoder/ms-marco-MiniLM-L-6-v2
LLM Groq (llama-3.3-70b-versatile)
Cache Redis + PostgreSQL (semantic cache)
Streaming Server-Sent Events (SSE)

Pipeline Architecture

The pipeline has 3 fixed stages with pre and post filters:

User query
     │
     ▼
┌─────────────┐
│  FAQ Match   │──→ If score ≥ 0.75: instant answer (LLM cost = 0)
└─────┬───────┘
      │ No match
      ▼
┌─────────────┐
│ Cache Check  │──→ If similarity ≥ 0.95: cached response
└─────┬───────┘
      │ Cache miss
      ▼
┌─────────────────────────────────────┐
│  STAGE 1: Hybrid Search            │
│  pgvector (semantic) + BM25 (lexical)│
│  Fusion with Reciprocal Rank Fusion │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│  STAGE 2: Cross-Encoder Reranking  │
│  Re-evaluates actual relevance     │
│  Computes confidence score         │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│  STAGE 3: MMR (Diversity)          │
│  Balances relevance vs variety     │
│  Max 3 chunks per document         │
└─────────────┬───────────────────────┘
              │
              ▼
         LLM + SSE streaming
Enter fullscreen mode Exit fullscreen mode

Let's look at each part in detail.


Stage 1: Hybrid Search (Vector + BM25)

The Problem with Vector-Only Search

Embedding-based search is excellent at capturing semantic meaning, but fails with:

  • Proper nouns: "What did John Smith say?"
  • Codes or IDs: "HTTP 403 error"
  • Exact technical terms the embedding model doesn't know well

The Solution: Hybrid Search

I run two searches in parallel and merge the results:

async def search_chunks(self, query: str, kb_ids: list[int],
                        embedding: list[float], config: RetrievalConfig):
    # Vector search (pgvector HNSW, cosine distance)
    vector_results = await self._vector_search(
        embedding, kb_ids, config.candidate_k
    )

    # Lexical search (PostgreSQL tsvector + ts_rank_cd)
    bm25_results = await self._bm25_search(
        query, kb_ids, config.candidate_k
    )

    # Fusion with Reciprocal Rank Fusion
    merged = self._rrf_merge(vector_results, bm25_results, config.bm25_weight)

    return merged
Enter fullscreen mode Exit fullscreen mode

Reciprocal Rank Fusion (RRF)

RRF is elegant in its simplicity. Instead of comparing scores from different systems (which have different scales), it uses ranking positions:

def _rrf_merge(self, vector_results, bm25_results, bm25_weight=0.3):
    k = 60  # Smoothing constant
    vector_weight = 1.0 - bm25_weight
    scores = {}

    for rank, chunk in enumerate(vector_results):
        scores[chunk.id] = vector_weight / (k + rank + 1)

    for rank, chunk in enumerate(bm25_results):
        scores[chunk.id] = scores.get(chunk.id, 0) + bm25_weight / (k + rank + 1)

    return sorted(scores.items(), key=lambda x: x[1], reverse=True)
Enter fullscreen mode Exit fullscreen mode

Why k=60? It prevents the #1 chunk from having a disproportionately high score relative to #2. With k=60, the difference between position 1 and 2 is minimal, making the fusion more stable.

Why 70/30? In my tests, semantic search is more robust for most queries. The 30% BM25 rescues the cases where you need exact lexical matching. This is configurable per Knowledge Base.

BM25 with Fallback

An important detail: BM25 search first tries an AND query, and if it finds no results, falls back to OR:

# First: AND (all terms must be present)
ts_query = func.plainto_tsquery('spanish', query)

# If no results: OR (any term)
words = [w for w in query.split() if w.lower() not in STOP_WORDS]
or_query = " | ".join(words)
ts_query = func.to_tsquery('spanish', or_query)
Enter fullscreen mode Exit fullscreen mode

Stage 2: Cross-Encoder Reranking

Why Embeddings Aren't Enough

Bi-encoder embeddings (like MiniLM) encode query and document separately. They're fast but miss subtle interactions between query and chunk words.

A cross-encoder processes query + chunk together as a single input, capturing cross-attention relationships. It's slower but significantly more accurate.

Implementation

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank(self, query: str, candidates: list[dict]) -> list[dict]:
    pairs = [(query, c["content"]) for c in candidates]
    scores = self.reranker.predict(pairs)

    for candidate, score in zip(candidates, scores):
        candidate["rerank_score"] = float(score)

    # Sort by rerank_score descending
    return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
Enter fullscreen mode Exit fullscreen mode

We reduce from ~60 candidates (RRF output) to the top 15 after reranking.

Confidence Score: How Reliable Is the Answer?

Using cross-encoder scores, I compute a confidence score that indicates how confident we are that the retrieved chunks are relevant:

from math import exp

def compute_confidence(sources: list[dict]) -> float:
    scores = sorted([s["rerank_score"] for s in sources], reverse=True)

    max_score = scores[0]
    top3_avg = sum(scores[:3]) / min(len(scores), 3)
    median = scores[len(scores) // 2]

    # Blend: 40% best score, 30% top-3 average, 30% median
    blended = 0.4 * max_score + 0.3 * top3_avg + 0.3 * median

    # Sigmoid → [0, 1]
    return 1.0 / (1.0 + exp(-blended))
Enter fullscreen mode Exit fullscreen mode

Why a blend and not just the max? If the best chunk scores 0.9 but the rest are at -0.5, the answer is probably weak — a single good chunk doesn't compensate for bad context. The blend captures the overall quality of the retrieved context.

The confidence score is sent to the frontend, which uses it to:

  • Display visual confidence indicators
  • Decide whether to suggest alternative FAQs
  • Trigger automatic escalation when below threshold

Stage 3: MMR (Maximum Marginal Relevance)

The Redundancy Problem

Without MMR, the top 10 most relevant chunks could come from the same document or be consecutive paragraphs saying essentially the same thing. That wastes LLM context.

The Formula

MMR(chunk) = λ × relevance(chunk) - (1-λ) × max_similarity(chunk, already_selected) - document_penalty
Enter fullscreen mode Exit fullscreen mode
def _mmr_select(self, candidates, query_embedding, config):
    selected = []
    remaining = list(candidates)
    lambda_param = config.lambda_param  # Default: 0.6
    doc_counts = {}

    for _ in range(config.top_k):  # Default: 10
        best_score = -float('inf')
        best_idx = 0

        for i, candidate in enumerate(remaining):
            # Relevance to query
            relevance = cosine_similarity(query_embedding, candidate["embedding"])

            # Max similarity with already selected chunks
            max_sim = max(
                (cosine_similarity(candidate["embedding"], s["embedding"])
                 for s in selected),
                default=0
            )

            # Penalty for repeated document
            doc_name = candidate["document_name"]
            penalty = doc_counts.get(doc_name, 0) * 0.1

            # Hard cap: max 3 chunks per document
            if doc_counts.get(doc_name, 0) >= config.max_per_doc:
                continue

            score = lambda_param * relevance - (1 - lambda_param) * max_sim - penalty

            if score > best_score:
                best_score = score
                best_idx = i

        chosen = remaining.pop(best_idx)
        selected.append(chosen)
        doc_counts[chosen["document_name"]] = doc_counts.get(chosen["document_name"], 0) + 1

    return selected
Enter fullscreen mode Exit fullscreen mode

λ = 0.6 means 60% relevance, 40% diversity. It's the sweet spot I found: enough diversity to avoid repetition, but without sacrificing truly relevant chunks.


Semantic Cache: Don't Repeat Work

The Concept

If someone asks "How do I reset my password?" and 2 hours ago another user asked "How can I change my password?", the answer is the same. Why run the entire pipeline again?

Implementation

async def lookup_cache(self, query_embedding, tenant_id, kb_ids, config_hash):
    result = await db.execute(
        select(ResponseCache)
        .where(
            ResponseCache.tenant_id == tenant_id,
            ResponseCache.rag_config_hash == config_hash,
            ResponseCache.expires_at > func.now(),
        )
        .order_by(
            ResponseCache.query_embedding.cosine_distance(query_embedding)
        )
        .limit(1)
    )
    cache_entry = result.scalar_one_or_none()

    if cache_entry:
        similarity = 1 - cosine_distance(query_embedding, cache_entry.query_embedding)
        if similarity >= 0.95:
            return cache_entry  # Cache hit

    return None  # Cache miss → run full pipeline
Enter fullscreen mode Exit fullscreen mode

Key Decisions

  • Threshold 0.95: Very high on purpose. I'd rather have a cache miss than serve an incorrect answer.
  • Config hash: Cache is invalidated if RAG parameters change. Changing top_k from 10 to 5 produces different answers.
  • Fire-and-forget store: After generating the LLM response, I save to cache without blocking the stream.
  • TTL 7 days: Configurable per KB. Documents that change often can use a shorter TTL.
  • Proactive invalidation: When a document is uploaded or reprocessed, the cache for that KB is invalidated.
# Store (doesn't block the response)
async def store_cache(self, query_embedding, response, sources, confidence, ...):
    if confidence < config.min_confidence:
        return  # Don't cache low-confidence answers

    cache_entry = ResponseCache(
        query_embedding=query_embedding,
        response_text=response,
        sources=sources,
        confidence=confidence,
        rag_config_hash=config_hash,
        expires_at=datetime.utcnow() + timedelta(hours=config.cache_ttl_hours),
    )
    db.add(cache_entry)
    await db.commit()
Enter fullscreen mode Exit fullscreen mode

FAQ Match: The Most Valuable Shortcut

Before the entire pipeline, I check if there's a FAQ whose embedding is similar enough to the query:

async def match_faq(self, query_embedding, kb_ids, threshold=0.75):
    result = await db.execute(
        select(FAQ.id, FAQ.question, FAQ.answer,
               (1 - FAQ.embedding.cosine_distance(query_embedding)).label("score"))
        .where(FAQ.knowledge_base_id.in_(kb_ids), FAQ.is_active == True)
        .order_by(FAQ.embedding.cosine_distance(query_embedding))
        .limit(1)
    )
    faq = result.first()

    if faq and faq.score >= threshold:
        # Increment hit_count atomically
        await db.execute(
            update(FAQ).where(FAQ.id == faq.id)
            .values(hit_count=FAQ.hit_count + 1)
        )
        return faq

    return None
Enter fullscreen mode Exit fullscreen mode

Why is this so valuable?

  • Zero cost: No LLM call, no token consumption
  • Minimal latency: A single SQL query with HNSW index
  • Curated answers: FAQs are written by humans, always correct
  • Always available: Even when the user has exhausted their monthly LLM query quota, FAQs keep working

Language Detection Without External APIs

The system supports Spanish, English, and Portuguese. Instead of calling a detection API (extra latency + cost), I use heuristic detection:

EN_WORDS = {"how", "what", "the", "is", "can", "do", "where", "when", "why", ...}  # 60+
PT_WORDS = {"como", "onde", "você", "qual", "para", "não", "uma", ...}  # 45+
ES_WORDS = {"qué", "cómo", "dónde", "ayuda", "puedo", "quiero", ...}  # 40+

def detect_language(query: str) -> str:
    words = set(query.lower().split())
    scores = {
        "en": len(words & EN_WORDS),
        "pt": len(words & PT_WORDS),
        "es": len(words & ES_WORDS),
    }
    best = max(scores, key=scores.get)
    return best if scores[best] >= 2 else "es"  # Default: Spanish
Enter fullscreen mode Exit fullscreen mode

Is it perfect? No. But it has zero latency, no external dependencies, and for short queries of 5-15 words it works surprisingly well. The detected language is injected into the LLM's system prompt so it responds in the same language.


Smart Routing: Automatically Choosing the Right KB

When a tenant has multiple Knowledge Bases (e.g., "Sales", "Technical Support", "HR"), the user shouldn't have to manually choose where to search.

KB Centroids

For each KB, I compute the centroid (average of all chunk embeddings):

SELECT AVG(embedding)::text AS centroid FROM chunks WHERE knowledge_base_id = :kb_id
Enter fullscreen mode Exit fullscreen mode

Similarity-Based Routing

async def route_query(self, query_embedding, available_kbs):
    scores = []
    for kb in available_kbs:
        centroid = await self._get_centroid(kb.id)  # Redis cache 24h
        similarity = cosine_similarity(query_embedding, centroid)
        if similarity >= 0.15:  # Minimum threshold
            scores.append((kb, similarity))

    # Top 3 most relevant KBs
    scores.sort(key=lambda x: x[1], reverse=True)
    return [kb for kb, _ in scores[:3]]
Enter fullscreen mode Exit fullscreen mode

If no KB exceeds the threshold, search across all of them (safe fallback).


Numbers That Matter

Metric Value
Initial candidates per branch 30 (vector) + 30 (BM25)
After RRF merge ~40-60 unique
After reranking Top 15
After MMR Top 10 (max 3 per doc)
Embedding dimensions 384
FAQ threshold 0.75
Cache threshold 0.95
Cache TTL 7 days
FAQ match latency ~15ms
Full pipeline latency ~2-4s
Savings per cache hit ~90% latency, 100% LLM tokens

Lessons Learned

1. The Cross-Encoder Is Worth It

The quality difference between using embeddings alone vs. embeddings + cross-encoder reranking is enormous. Bi-encoder embeddings are good for recall (finding candidates), but the cross-encoder is much better for precision (ranking by actual relevance).

2. BM25 Isn't Dead

For queries with proper nouns, error codes, or specific technical terms, BM25 regularly outperforms vector search. The hybrid combination is strictly better than either one alone.

3. MMR Is More Important Than It Seems

Without MMR diversity, the LLM receives redundant context and produces answers that orbit around a single point. With MMR, answers are more complete and cover different angles of the topic.

4. Semantic Cache Requires a High Threshold

At 0.90 I had problems with incorrect answers being served from cache. 0.95 is the sweet spot: enough to cache obvious reformulations, but not so low as to match queries that actually need different answers.

5. FAQs Are Your Best Investment

Every FAQ you add is a perfect answer served in 15ms with zero LLM cost. I implemented auto-generation of suggested FAQs from frequent unanswered queries, which the admin can approve with a single click.


What's Next

  • Evaluation framework: Automatic quality metrics (faithfulness, relevance) with evaluation datasets per KB
  • Chunk contextualization: Using the LLM to add document context to chunks before indexing
  • Late interaction models (ColBERT): Better than cross-encoder for high volumes

Conclusion

A production RAG pipeline is not "embed + search + prompt". It's a multi-stage system where each component solves a specific weakness of the previous one. Hybrid search covers semantic and lexical gaps, the cross-encoder ranks by actual relevance, MMR ensures diversity, cache eliminates redundant work, and FAQs shortcut when the answer already exists.

Most importantly: everything is configurable per Knowledge Base. There's no universal set of parameters — each knowledge domain has its own characteristics and needs different tuning.


If you're working on production RAG, drop a comment with your biggest challenge. And if this article was useful, a like helps it reach more people.

Top comments (0)