DEV Community

Cover image for Building Production RAG Systems with PostgreSQL: Complete Implementation Guide
Pablo Ifrán
Pablo Ifrán

Posted on

Building Production RAG Systems with PostgreSQL: Complete Implementation Guide

Building Production RAG Systems with PostgreSQL: Complete Implementation Guide

Most RAG (Retrieval-Augmented Generation) systems fail in production for predictable technical reasons: poor retrieval quality, lack of source attribution, and inability to handle real-world query variations. This guide shows you how to build a production-ready RAG system that actually works.

Why Most RAG Systems Fail

Production RAG failures stem from three core technical problems:

1. Pure Vector Search Limitations
Vector similarity doesn't always match human relevance. A query for "API rate limits" might retrieve "request throttling guidelines" when users want exact information about "1000 requests/hour."

2. No Source Attribution
Users don't trust answers they can't verify. Without clear source citations, even correct answers feel unreliable.

3. Single Search Strategy
Relying only on vector search or only on keyword search misses important results. Real questions require both semantic understanding and exact term matching.

Architecture Overview: PostgreSQL + pgvector

Architecture Overview

Our architecture uses PostgreSQL with the pgvector extension to handle both vector and traditional search in a single, reliable database:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Document      │───▶│   PostgreSQL     │───▶│   Answer        │
│   Processing    │    │   + pgvector     │    │   Generation    │
└─────────────────┘    └──────────────────┘    └─────────────────┘
Enter fullscreen mode Exit fullscreen mode

Step 1: Database Schema

-- Full documents with complete context
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    url TEXT UNIQUE NOT NULL,
    category TEXT NOT NULL,
    last_updated TIMESTAMP DEFAULT NOW(),
    content_hash TEXT NOT NULL,
    embedding vector(1536),
    metadata JSONB DEFAULT '{}'
);

-- Document chunks for precise retrieval
CREATE TABLE document_chunks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    content TEXT NOT NULL,
    embedding vector(1536),
    token_count INTEGER NOT NULL,
    section_title TEXT,
    start_position INTEGER,
    end_position INTEGER,
    metadata JSONB DEFAULT '{}',
    UNIQUE(document_id, chunk_index)
);
Enter fullscreen mode Exit fullscreen mode

Create essential indexes:

-- Full-text search for keyword matching
CREATE INDEX idx_chunks_fts 
ON document_chunks USING gin (to_tsvector('english', content));

-- Vector search for semantic matching
CREATE INDEX idx_chunks_embedding 
ON document_chunks USING hnsw (embedding vector_l2_ops) 
WITH (m = 16, ef_construction = 64);
Enter fullscreen mode Exit fullscreen mode

Step 2: Hybrid Search Implementation

The core of our system a function that combines semantic and keyword search:

CREATE OR REPLACE FUNCTION hybrid_search(
    query_text TEXT,
    query_embedding vector(1536),
    limit_count INTEGER DEFAULT 10,
    semantic_weight FLOAT DEFAULT 0.6,
    keyword_weight FLOAT DEFAULT 0.4
) 
RETURNS TABLE (
    chunk_id UUID,
    content TEXT,
    semantic_score DOUBLE PRECISION,
    keyword_score DOUBLE PRECISION,
    combined_score DOUBLE PRECISION,
    document_title TEXT,
    document_url TEXT
) AS $$
BEGIN
    RETURN QUERY
    WITH semantic_results AS (
        SELECT 
            c.id as chunk_id,
            c.content,
            (1 - (c.embedding <=> query_embedding))::DOUBLE PRECISION as semantic_score,
            d.title as document_title,
            d.url as document_url
        FROM document_chunks c
        JOIN documents d ON c.document_id = d.id
        ORDER BY c.embedding <=> query_embedding
        LIMIT limit_count * 3
    ),
    keyword_results AS (
        SELECT 
            c.id as chunk_id,
            ts_rank_cd(to_tsvector('english', c.content), 
                       plainto_tsquery('english', query_text))::DOUBLE PRECISION as keyword_score
        FROM document_chunks c
        WHERE to_tsvector('english', c.content) @@ plainto_tsquery('english', query_text)
    )
    SELECT 
        s.chunk_id,
        s.content,
        s.semantic_score,
        COALESCE(k.keyword_score, 0.0) as keyword_score,
        (semantic_weight * s.semantic_score + 
         keyword_weight * COALESCE(k.keyword_score, 0.0))::DOUBLE PRECISION as combined_score,
        s.document_title,
        s.document_url
    FROM semantic_results s
    LEFT JOIN keyword_results k ON s.chunk_id = k.chunk_id
    WHERE (semantic_weight * s.semantic_score + 
           keyword_weight * COALESCE(k.keyword_score, 0.0)) > 0.3
    ORDER BY combined_score DESC
    LIMIT limit_count;
END;
$$ LANGUAGE plpgsql;
Enter fullscreen mode Exit fullscreen mode

Step 3: Document Processing Pipeline

Smart chunking that preserves context:

import asyncpg
import openai
from typing import List, Dict
import re
from dataclasses import dataclass

@dataclass
class DocumentChunk:
    content: str
    section_title: str
    chunk_index: int
    token_count: int

class DocumentProcessor:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.client = openai.AsyncOpenAI(api_key=openai_api_key)

    def _create_smart_chunks(self, content: str, max_tokens: int = 512) -> List[DocumentChunk]:
        """Create chunks that respect document structure."""
        chunks = []

        # Split by major sections (headers)
        sections = re.split(r'\n\s*#{1,3}\s+', content)
        current_section = "Introduction"

        for section in sections:
            if not section.strip():
                continue

            # Extract section title
            lines = section.split('\n', 1)
            if len(lines) > 1 and len(lines[0]) < 100:
                current_section = lines[0].strip()
                section_content = lines[1] if len(lines) > 1 else ""
            else:
                section_content = section

            # Split section into chunks by paragraphs
            paragraphs = [p.strip() for p in section_content.split('\n\n') if p.strip()]

            current_chunk = ""
            current_tokens = 0

            for paragraph in paragraphs:
                paragraph_tokens = len(paragraph) // 4  # Rough token estimation

                if current_tokens + paragraph_tokens > max_tokens and current_chunk:
                    chunks.append(DocumentChunk(
                        content=current_chunk.strip(),
                        section_title=current_section,
                        chunk_index=len(chunks),
                        token_count=current_tokens
                    ))
                    current_chunk = paragraph
                    current_tokens = paragraph_tokens
                else:
                    if current_chunk:
                        current_chunk += "\n\n" + paragraph
                    else:
                        current_chunk = paragraph
                    current_tokens += paragraph_tokens

            # Add final chunk
            if current_chunk.strip():
                chunks.append(DocumentChunk(
                    content=current_chunk.strip(),
                    section_title=current_section,
                    chunk_index=len(chunks),
                    token_count=current_tokens
                ))

        return chunks

    async def process_document(self, title: str, content: str, url: str, category: str) -> bool:
        """Process a complete document into the RAG system."""
        try:
            # Generate document embedding
            doc_embedding = await self._generate_embedding(f"{title}\n\n{content}")

            async with asyncpg.connect(self.db_url) as conn:
                # Insert document
                doc_id = await conn.fetchval("""
                    INSERT INTO documents (title, content, url, category, content_hash, embedding)
                    VALUES ($1, $2, $3, $4, $5, $6)
                    RETURNING id
                """, title, content, url, category, 
                hashlib.sha256(content.encode()).hexdigest(), doc_embedding)

                # Create and process chunks
                chunks = self._create_smart_chunks(content)

                for chunk in chunks:
                    chunk_embedding = await self._generate_embedding(chunk.content)

                    await conn.execute("""
                        INSERT INTO document_chunks 
                        (document_id, chunk_index, content, embedding, token_count, section_title)
                        VALUES ($1, $2, $3, $4, $5, $6)
                    """, doc_id, chunk.chunk_index, chunk.content, 
                    chunk_embedding, chunk.token_count, chunk.section_title)

                return True

        except Exception as e:
            print(f"Error processing document: {e}")
            return False

    async def _generate_embedding(self, text: str) -> List[float]:
        response = await self.client.embeddings.create(
            model="text-embedding-3-large",
            input=text
        )
        return response.data[0].embedding
Enter fullscreen mode Exit fullscreen mode

Step 4: Query Pipeline with Source Attribution

import json
from typing import List, Dict

class ProductionRAGSystem:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.client = openai.AsyncOpenAI(api_key=openai_api_key)

    async def answer_question(self, question: str) -> Dict:
        """Main entry point for answering user questions."""

        # Generate query embedding
        embedding_response = await self.client.embeddings.create(
            model="text-embedding-3-large",
            input=question
        )
        query_embedding = embedding_response.data[0].embedding

        # Retrieve relevant chunks
        chunks = await self._retrieve_chunks(question, query_embedding)

        # Quality gate - ensure we have good sources
        high_quality_chunks = [c for c in chunks if c['combined_score'] > 0.5]

        if len(high_quality_chunks) < 2:
            return {
                "answer": "I couldn't find enough relevant information to answer your question confidently.",
                "confidence": 0.0,
                "sources": [],
                "suggestion": "Try rephrasing your question with more specific terms."
            }

        # Generate answer with attribution
        return await self._generate_attributed_answer(question, high_quality_chunks[:5])

    async def _retrieve_chunks(self, query: str, embedding: List[float]) -> List[Dict]:
        async with asyncpg.connect(self.db_url) as conn:
            rows = await conn.fetch("""
                SELECT * FROM hybrid_search($1, $2, 15, 0.6, 0.4)
            """, query, embedding)

            return [dict(row) for row in rows]

    async def _generate_attributed_answer(self, question: str, chunks: List[Dict]) -> Dict:
        """Generate answer with full source attribution."""

        # Build context with source numbering
        context_parts = []
        sources_info = {}

        for i, chunk in enumerate(chunks, 1):
            context_parts.append(
                f"[Source {i}: {chunk['document_title']}]\n"
                f"URL: {chunk['document_url']}\n"
                f"Content: {chunk['content']}\n"
            )

            sources_info[i] = {
                "title": chunk['document_title'],
                "url": chunk['document_url'],
                "relevance_score": chunk['combined_score'],
                "preview": chunk['content'][:200] + "..."
            }

        context = "\n---\n".join(context_parts)

        # System prompt for accurate, attributed answers
        system_prompt = """You are a documentation assistant that provides accurate answers based solely on provided sources.

REQUIREMENTS:
1. Answer ONLY using information from the provided sources
2. Always cite sources using [Source X] format
3. If sources don't fully answer the question, clearly state what's missing
4. Provide a confidence score (0.0-1.0) based on source coverage

Response format (JSON):
{
    "answer": "Complete answer with [Source X] citations",
    "confidence": 0.85,
    "reasoning": "Brief explanation of confidence score"
}"""

        response = await self.client.chat.completions.create(
            model="gpt-4-turbo-preview",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"Question: {question}\n\nSources:\n{context}"}
            ],
            response_format={"type": "json_object"},
            temperature=0.1
        )

        result = json.loads(response.choices[0].message.content)

        return {
            "answer": result.get("answer", ""),
            "confidence": float(result.get("confidence", 0.0)),
            "reasoning": result.get("reasoning", ""),
            "sources": sources_info,
            "query_processed": question
        }

# Usage example
async def main():
    rag_system = ProductionRAGSystem(
        db_url="postgresql://user:pass@localhost/ragdb",
        openai_api_key="your-openai-key"
    )

    result = await rag_system.answer_question(
        "How do I update my API rate limits?"
    )

    print(f"Answer: {result['answer']}")
    print(f"Confidence: {result['confidence']}")
    print(f"Sources: {len(result['sources'])}")
Enter fullscreen mode Exit fullscreen mode

Step 5: Production Optimization

PostgreSQL Configuration:

-- Memory settings for vector operations
ALTER SYSTEM SET work_mem = '1GB';
ALTER SYSTEM SET maintenance_work_mem = '8GB';
ALTER SYSTEM SET shared_buffers = '4GB';

-- SSD optimization
ALTER SYSTEM SET random_page_cost = 1.1;

-- Enable monitoring
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
Enter fullscreen mode Exit fullscreen mode

Application-Level Caching:

from functools import lru_cache
import hashlib

class OptimizedRAGSystem(ProductionRAGSystem):
    @lru_cache(maxsize=1000)
    def _get_query_cache_key(self, query: str) -> str:
        normalized = query.lower().strip()
        return hashlib.sha256(normalized.encode()).hexdigest()[:16]

    async def answer_question_cached(self, question: str, cache_ttl: int = 3600) -> Dict:
        # Implementation with Redis caching
        # Check cache first, fallback to generation
        pass
Enter fullscreen mode Exit fullscreen mode

Step 6: Testing & Validation

Create comprehensive test suites to ensure accuracy:

@dataclass
class TestCase:
    question: str
    expected_topics: List[str]
    expected_sources: List[str]
    min_confidence: float

async def run_evaluation():
    test_cases = [
        TestCase(
            question="How do I update my billing address?",
            expected_topics=["billing", "address", "update"],
            expected_sources=["billing-management"],
            min_confidence=0.8
        )
    ]

    for test_case in test_cases:
        result = await rag_system.answer_question(test_case.question)
        # Evaluate accuracy, source quality, confidence
Enter fullscreen mode Exit fullscreen mode

Key Implementation Takeaways

Building production RAG systems requires:

  1. Hybrid Search: Combine semantic and keyword search for better relevance
  2. Smart Chunking: Preserve document structure and context
  3. Source Attribution: Always provide verifiable citations
  4. Quality Gates: Set confidence thresholds to prevent bad answers
  5. Performance Optimization: Cache, connection pooling, and database tuning
  6. Systematic Testing: Comprehensive evaluation with real metrics

This implementation provides a solid foundation for RAG systems that actually work in production, handling real user queries with high accuracy and transparency.

The complete code is available as working examples that you can deploy and customize for your specific use case. Focus on getting the fundamentals right—retrieval quality, source attribution, and user trust—before optimizing for advanced features.

Top comments (0)