DEV Community

Yasser B.
Yasser B.

Posted on • Originally published at rivestack.io

Building a RAG Application with pgvector and the OpenAI API

Retrieval-Augmented Generation is now the default architecture for adding knowledge to LLM-powered applications. When a user asks a question, you retrieve relevant context from your own data, pass that context to the model alongside the question, and the model answers based on what you gave it.

Most RAG tutorials reach for LangChain immediately. This guide skips the framework and builds the pipeline from scratch: pgvector for vector storage, the OpenAI Python SDK for embeddings and generation, and psycopg for the database connection.

Originally published at rivestack.io

Database Setup

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE documents (
  id BIGSERIAL PRIMARY KEY,
  title TEXT NOT NULL,
  source TEXT
);

CREATE TABLE chunks (
  id BIGSERIAL PRIMARY KEY,
  document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
  content TEXT NOT NULL,
  token_count INTEGER,
  embedding VECTOR(1536)
);

CREATE INDEX ON chunks USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);
Enter fullscreen mode Exit fullscreen mode

Chunking and Embedding

from openai import OpenAI
import psycopg, os

client = OpenAI()
EMBEDDING_MODEL = "text-embedding-3-small"

def chunk_text(text, chunk_size=400, overlap=50):
    words = text.split()
    start = 0
    while start < len(words):
        end = min(start + chunk_size, len(words))
        yield " ".join(words[start:end])
        if end == len(words):
            break
        start += chunk_size - overlap

def embed_texts(texts):
    return [i.embedding for i in client.embeddings.create(
        input=texts, model=EMBEDDING_MODEL).data]

def ingest_document(title, source, text):
    chunks = list(chunk_text(text))
    embeddings = []
    for i in range(0, len(chunks), 256):
        embeddings.extend(embed_texts(chunks[i:i+256]))
    with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
        with conn.cursor() as cur:
            cur.execute("INSERT INTO documents (title, source) VALUES (%s, %s) RETURNING id", (title, source))
            doc_id = cur.fetchone()[0]
            for chunk, emb in zip(chunks, embeddings):
                cur.execute("INSERT INTO chunks (document_id, content, token_count, embedding) VALUES (%s, %s, %s, %s)",
                            (doc_id, chunk, len(chunk.split()), emb))
        conn.commit()
Enter fullscreen mode Exit fullscreen mode

Retrieval and Generation

def retrieve_chunks(question, limit=5):
    q_emb = embed_texts([question])[0]
    with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT c.content, d.title,
                  1 - (c.embedding <=> %s::vector) AS similarity
                FROM chunks c
                JOIN documents d ON d.id = c.document_id
                ORDER BY c.embedding <=> %s::vector LIMIT %s
            """, (q_emb, q_emb, limit))
            return [{"content": r[0], "title": r[1], "sim": r[2]} for r in cur.fetchall()]

def answer_question(question):
    chunks = retrieve_chunks(question)
    context = "\n\n---\n\n".join(f"Source: {c['title']}\n{c['content']}" for c in chunks)
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"Answer using only this context:\n\n{context}"},
            {"role": "user", "content": question},
        ],
        temperature=0.2,
    )
    return resp.choices[0].message.content
Enter fullscreen mode Exit fullscreen mode

Why pgvector Over a Dedicated Vector Store

Since embeddings live in PostgreSQL, metadata filtering is just a WHERE clause:

WHERE d.source = 'user-123-docs'
Enter fullscreen mode Exit fullscreen mode

No separate metadata index. No extra service to keep in sync. Transactions, JOINs, and your existing backup setup all work automatically.

Production Notes

Batch embedding calls. The API accepts up to 2048 inputs per call. Batching 256 at a time keeps latency low without hitting rate limits.

Track embedding model per row. Add an embedding_model column so you can identify and re-embed stale rows when switching models.

Limit by tokens, not chunk count. Research shows 3 to 8 highly relevant chunks outperform 20 loosely related ones. Cap context by token count, not by number of chunks.

For the full guide including HNSW index tuning, async ingestion patterns, and similarity thresholds, see the complete post at rivestack.io.

Top comments (0)