Retrieval-Augmented Generation is now the default architecture for adding knowledge to LLM-powered applications. When a user asks a question, you retrieve relevant context from your own data, pass that context to the model alongside the question, and the model answers based on what you gave it.
Most RAG tutorials reach for LangChain immediately. This guide skips the framework and builds the pipeline from scratch: pgvector for vector storage, the OpenAI Python SDK for embeddings and generation, and psycopg for the database connection.
Originally published at rivestack.io
Database Setup
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
title TEXT NOT NULL,
source TEXT
);
CREATE TABLE chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
content TEXT NOT NULL,
token_count INTEGER,
embedding VECTOR(1536)
);
CREATE INDEX ON chunks USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
Chunking and Embedding
from openai import OpenAI
import psycopg, os
client = OpenAI()
EMBEDDING_MODEL = "text-embedding-3-small"
def chunk_text(text, chunk_size=400, overlap=50):
words = text.split()
start = 0
while start < len(words):
end = min(start + chunk_size, len(words))
yield " ".join(words[start:end])
if end == len(words):
break
start += chunk_size - overlap
def embed_texts(texts):
return [i.embedding for i in client.embeddings.create(
input=texts, model=EMBEDDING_MODEL).data]
def ingest_document(title, source, text):
chunks = list(chunk_text(text))
embeddings = []
for i in range(0, len(chunks), 256):
embeddings.extend(embed_texts(chunks[i:i+256]))
with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO documents (title, source) VALUES (%s, %s) RETURNING id", (title, source))
doc_id = cur.fetchone()[0]
for chunk, emb in zip(chunks, embeddings):
cur.execute("INSERT INTO chunks (document_id, content, token_count, embedding) VALUES (%s, %s, %s, %s)",
(doc_id, chunk, len(chunk.split()), emb))
conn.commit()
Retrieval and Generation
def retrieve_chunks(question, limit=5):
q_emb = embed_texts([question])[0]
with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT c.content, d.title,
1 - (c.embedding <=> %s::vector) AS similarity
FROM chunks c
JOIN documents d ON d.id = c.document_id
ORDER BY c.embedding <=> %s::vector LIMIT %s
""", (q_emb, q_emb, limit))
return [{"content": r[0], "title": r[1], "sim": r[2]} for r in cur.fetchall()]
def answer_question(question):
chunks = retrieve_chunks(question)
context = "\n\n---\n\n".join(f"Source: {c['title']}\n{c['content']}" for c in chunks)
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"Answer using only this context:\n\n{context}"},
{"role": "user", "content": question},
],
temperature=0.2,
)
return resp.choices[0].message.content
Why pgvector Over a Dedicated Vector Store
Since embeddings live in PostgreSQL, metadata filtering is just a WHERE clause:
WHERE d.source = 'user-123-docs'
No separate metadata index. No extra service to keep in sync. Transactions, JOINs, and your existing backup setup all work automatically.
Production Notes
Batch embedding calls. The API accepts up to 2048 inputs per call. Batching 256 at a time keeps latency low without hitting rate limits.
Track embedding model per row. Add an embedding_model column so you can identify and re-embed stale rows when switching models.
Limit by tokens, not chunk count. Research shows 3 to 8 highly relevant chunks outperform 20 loosely related ones. Cap context by token count, not by number of chunks.
For the full guide including HNSW index tuning, async ingestion patterns, and similarity thresholds, see the complete post at rivestack.io.
Top comments (0)