DEV Community

matias yoon
matias yoon

Posted on

RAG 시스템 실전 구축 (v45)

RAG 시스템 실전 구축 (v45)

Practical Guide for ML Engineers and Backend Developers

1. RAG Fundamentals: The Retrieval-Augmentation-Generation Loop

Retrieval-Augmentation-Generation (RAG) is a powerful architecture that combines the strengths of retrieval systems and language models. The core loop operates as follows:

  1. Retrieval: Query vector is compared against document vectors to find relevant chunks
  2. Augmentation: Retrieved documents are concatenated with the original query
  3. Generation: LLM processes the augmented prompt to generate a response
# Basic RAG workflow
def rag_pipeline(query, vector_db, llm):
    # Step 1: Retrieve relevant documents
    query_vector = embed_query(query)
    relevant_docs = vector_db.search(query_vector, k=5)

    # Step 2: Augment prompt
    context = "\n".join([doc.content for doc in relevant_docs])
    augmented_prompt = f"Context: {context}\nQuestion: {query}"

    # Step 3: Generate response
    response = llm.generate(augmented_prompt)
    return response
Enter fullscreen mode Exit fullscreen mode

2. Chunking Strategies for Optimal Retrieval

Effective chunking is crucial for RAG performance. Here are three primary strategies:

Semantic Chunking

Breaks text into semantically coherent pieces using clustering algorithms:

from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.cluster import KMeans

class SemanticChunker:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def chunk(self, text, max_tokens=512):
        # Split text into sentences
        sentences = text.split('. ')
        embeddings = self.model.encode(sentences)

        # Cluster sentences based on similarity
        kmeans = KMeans(n_clusters=max(1, len(sentences)//3))
        kmeans.fit(embeddings)

        # Group sentences by cluster
        clusters = {}
        for i, label in enumerate(kmeans.labels_):
            if label not in clusters:
                clusters[label] = []
            clusters[label].append(i)

        # Create chunks
        chunks = []
        for cluster_indices in clusters.values():
            chunk_text = ". ".join([sentences[i] for i in cluster_indices])
            if len(chunk_text.split()) <= max_tokens:
                chunks.append(chunk_text)

        return chunks
Enter fullscreen mode Exit fullscreen mode

Recursive Chunking

Splits text recursively using multiple delimiters:

def recursive_chunking(text, chunk_size=512, overlap=50):
    """Recursive chunking with overlapping windows"""
    chunks = []

    # Define delimiters in order of preference
    delimiters = ['\n\n', '\n', '. ', '! ', '? ', ' ']

    for delimiter in delimiters:
        if delimiter in text:
            parts = text.split(delimiter)
            current_chunk = ""

            for part in parts:
                if len(current_chunk.split()) + len(part.split()) <= chunk_size:
                    current_chunk += part + delimiter
                else:
                    if current_chunk.strip():
                        chunks.append(current_chunk.strip())
                    current_chunk = part + delimiter

            if current_chunk.strip():
                chunks.append(current_chunk.strip())
            break

    # If no delimiter worked, use character-based splitting
    if not chunks:
        for i in range(0, len(text), chunk_size - overlap):
            chunks.append(text[i:i+chunk_size])

    return chunks
Enter fullscreen mode Exit fullscreen mode

Agentic Chunking

Uses LLMs to determine optimal chunk boundaries:

class AgenticChunker:
    def __init__(self, llm):
        self.llm = llm

    def chunk(self, text):
        # Prompt LLM to identify logical boundaries
        prompt = f"""
        Please identify logical boundaries in the following text for optimal chunking:
        Text: {text}

        Return chunk boundaries as a JSON array of indices where splits should occur.
        """
        boundaries = self.llm.generate(prompt)
        # Implementation details for boundary-based chunking
        return self._create_chunks_from_boundaries(text, boundaries)
Enter fullscreen mode Exit fullscreen mode

3. Embedding Model Selection and Comparison

Choosing the right embedding model affects both performance and cost:

from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import torch

class EmbeddingModelSelector:
    def __init__(self):
        self.models = {
            'all-MiniLM-L6-v2': {
                'size': 'small',
                'quality': 'high',
                'speed': 'fast',
                'cost': 'low'
            },
            'all-mpnet-base-v2': {
                'size': 'medium',
                'quality': 'very-high',
                'speed': 'medium',
                'cost': 'medium'
            },
            'bge-small-en': {
                'size': 'small',
                'quality': 'high',
                'speed': 'fast',
                'cost': 'low'
            }
        }

    def evaluate_models(self, queries, documents):
        """Benchmark different embedding models"""
        results = {}

        for model_name, config in self.models.items():
            model = SentenceTransformer(model_name)

            # Encode queries
            query_vectors = model.encode(queries)
            doc_vectors = model.encode(documents)

            # Calculate similarity scores
            similarities = np.dot(query_vectors, doc_vectors.T)
            avg_similarity = np.mean(similarities)

            results[model_name] = {
                'avg_similarity': avg_similarity,
                'model_config': config,
                'latency': self._measure_latency(model, queries[:5])
            }

        return results

    def _measure_latency(self, model, test_queries):
        import time
        start = time.time()
        for _ in range(10):
            model.encode(test_queries)
        return (time.time() - start) / 10
Enter fullscreen mode Exit fullscreen mode

4. Vector Database Comparison

Different vector databases offer varying performance characteristics:

# Chroma implementation
import chromadb
from chromadb.config import Settings

class ChromaVectorDB:
    def __init__(self, path="./chroma_db"):
        self.client = chromadb.Client(Settings(persist_directory=path))
        self.collection = self.client.get_or_create_collection("rag_collection")

    def add_documents(self, documents, embeddings):
        self.collection.add(
            embeddings=embeddings,
            documents=documents,
            ids=[str(i) for i in range(len(documents))]
        )

    def search(self, query_embedding, k=5):
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=k
        )
        return results['documents'][0]

# Qdrant implementation
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue

class QdrantVectorDB:
    def __init__(self, host="localhost", port=6333):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = "rag_collection"

    def add_documents(self, documents, embeddings):
        self.client.upsert(
            collection_name=self.collection_name,
            points=[
                {
                    "id": i,
                    "vector": embedding,
                    "payload": {"content": doc}
                }
                for i, (doc, embedding) in enumerate(zip(documents, embeddings))
            ]
        )

    def search(self, query_embedding, k=5):
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding,
            limit=k
        )
        return [hit.payload['content'] for hit in results]

# pgvector implementation
import psycopg2
from psycopg2.extras import Json

class PGVectorDB:
    def __init__(self, connection_string):
        self.conn = psycopg2.connect(connection_string)
        self._setup_table()

    def _setup_table(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS rag_documents (
                    id SERIAL PRIMARY KEY,
                    content TEXT,
                    embedding VECTOR(384)
                )
            """)
            cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON rag_documents USING ivfflat (embedding vector_l2_ops)")
        self.conn.commit()

    def add_documents(self, documents, embeddings):
        with self.conn.cursor() as cur:
            for doc, embedding in zip(documents, embeddings):
                cur.execute(
                    "INSERT INTO rag_documents (content, embedding) VALUES (%s, %s)",
                    (doc, embedding)
                )
        self.conn.commit()

    def search(self, query_embedding, k=5):
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT content FROM rag_documents 
                ORDER BY embedding <-> %s 
                LIMIT %s
            """, (query_embedding, k))
            return [row[0] for row in cur.fetchall()]
Enter fullscreen mode Exit fullscreen mode

Performance Benchmarks:

  • Chroma: Fast for small datasets (< 1M vectors)
  • Qdrant: Excellent for complex queries and filtering
  • PGVector: Best for integration with existing SQL workflows
  • Milvus: Highest performance for large-scale deployments

5. Full RAG Pipeline Code from Scratch


python
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from typing import List, Tuple

class RAGPipeline:
    def __init__(self, embedding_model="all-MiniLM-L6-v2"):
        # Initialize components
        self.embedder

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)