RAG 시스템 실전 구축 (v45)
Practical Guide for ML Engineers and Backend Developers
1. RAG Fundamentals: The Retrieval-Augmentation-Generation Loop
Retrieval-Augmentation-Generation (RAG) is a powerful architecture that combines the strengths of retrieval systems and language models. The core loop operates as follows:
- Retrieval: Query vector is compared against document vectors to find relevant chunks
- Augmentation: Retrieved documents are concatenated with the original query
- Generation: LLM processes the augmented prompt to generate a response
# Basic RAG workflow
def rag_pipeline(query, vector_db, llm):
# Step 1: Retrieve relevant documents
query_vector = embed_query(query)
relevant_docs = vector_db.search(query_vector, k=5)
# Step 2: Augment prompt
context = "\n".join([doc.content for doc in relevant_docs])
augmented_prompt = f"Context: {context}\nQuestion: {query}"
# Step 3: Generate response
response = llm.generate(augmented_prompt)
return response
2. Chunking Strategies for Optimal Retrieval
Effective chunking is crucial for RAG performance. Here are three primary strategies:
Semantic Chunking
Breaks text into semantically coherent pieces using clustering algorithms:
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.cluster import KMeans
class SemanticChunker:
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def chunk(self, text, max_tokens=512):
# Split text into sentences
sentences = text.split('. ')
embeddings = self.model.encode(sentences)
# Cluster sentences based on similarity
kmeans = KMeans(n_clusters=max(1, len(sentences)//3))
kmeans.fit(embeddings)
# Group sentences by cluster
clusters = {}
for i, label in enumerate(kmeans.labels_):
if label not in clusters:
clusters[label] = []
clusters[label].append(i)
# Create chunks
chunks = []
for cluster_indices in clusters.values():
chunk_text = ". ".join([sentences[i] for i in cluster_indices])
if len(chunk_text.split()) <= max_tokens:
chunks.append(chunk_text)
return chunks
Recursive Chunking
Splits text recursively using multiple delimiters:
def recursive_chunking(text, chunk_size=512, overlap=50):
"""Recursive chunking with overlapping windows"""
chunks = []
# Define delimiters in order of preference
delimiters = ['\n\n', '\n', '. ', '! ', '? ', ' ']
for delimiter in delimiters:
if delimiter in text:
parts = text.split(delimiter)
current_chunk = ""
for part in parts:
if len(current_chunk.split()) + len(part.split()) <= chunk_size:
current_chunk += part + delimiter
else:
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = part + delimiter
if current_chunk.strip():
chunks.append(current_chunk.strip())
break
# If no delimiter worked, use character-based splitting
if not chunks:
for i in range(0, len(text), chunk_size - overlap):
chunks.append(text[i:i+chunk_size])
return chunks
Agentic Chunking
Uses LLMs to determine optimal chunk boundaries:
class AgenticChunker:
def __init__(self, llm):
self.llm = llm
def chunk(self, text):
# Prompt LLM to identify logical boundaries
prompt = f"""
Please identify logical boundaries in the following text for optimal chunking:
Text: {text}
Return chunk boundaries as a JSON array of indices where splits should occur.
"""
boundaries = self.llm.generate(prompt)
# Implementation details for boundary-based chunking
return self._create_chunks_from_boundaries(text, boundaries)
3. Embedding Model Selection and Comparison
Choosing the right embedding model affects both performance and cost:
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import torch
class EmbeddingModelSelector:
def __init__(self):
self.models = {
'all-MiniLM-L6-v2': {
'size': 'small',
'quality': 'high',
'speed': 'fast',
'cost': 'low'
},
'all-mpnet-base-v2': {
'size': 'medium',
'quality': 'very-high',
'speed': 'medium',
'cost': 'medium'
},
'bge-small-en': {
'size': 'small',
'quality': 'high',
'speed': 'fast',
'cost': 'low'
}
}
def evaluate_models(self, queries, documents):
"""Benchmark different embedding models"""
results = {}
for model_name, config in self.models.items():
model = SentenceTransformer(model_name)
# Encode queries
query_vectors = model.encode(queries)
doc_vectors = model.encode(documents)
# Calculate similarity scores
similarities = np.dot(query_vectors, doc_vectors.T)
avg_similarity = np.mean(similarities)
results[model_name] = {
'avg_similarity': avg_similarity,
'model_config': config,
'latency': self._measure_latency(model, queries[:5])
}
return results
def _measure_latency(self, model, test_queries):
import time
start = time.time()
for _ in range(10):
model.encode(test_queries)
return (time.time() - start) / 10
4. Vector Database Comparison
Different vector databases offer varying performance characteristics:
# Chroma implementation
import chromadb
from chromadb.config import Settings
class ChromaVectorDB:
def __init__(self, path="./chroma_db"):
self.client = chromadb.Client(Settings(persist_directory=path))
self.collection = self.client.get_or_create_collection("rag_collection")
def add_documents(self, documents, embeddings):
self.collection.add(
embeddings=embeddings,
documents=documents,
ids=[str(i) for i in range(len(documents))]
)
def search(self, query_embedding, k=5):
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results['documents'][0]
# Qdrant implementation
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
class QdrantVectorDB:
def __init__(self, host="localhost", port=6333):
self.client = QdrantClient(host=host, port=port)
self.collection_name = "rag_collection"
def add_documents(self, documents, embeddings):
self.client.upsert(
collection_name=self.collection_name,
points=[
{
"id": i,
"vector": embedding,
"payload": {"content": doc}
}
for i, (doc, embedding) in enumerate(zip(documents, embeddings))
]
)
def search(self, query_embedding, k=5):
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=k
)
return [hit.payload['content'] for hit in results]
# pgvector implementation
import psycopg2
from psycopg2.extras import Json
class PGVectorDB:
def __init__(self, connection_string):
self.conn = psycopg2.connect(connection_string)
self._setup_table()
def _setup_table(self):
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS rag_documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding VECTOR(384)
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_embedding ON rag_documents USING ivfflat (embedding vector_l2_ops)")
self.conn.commit()
def add_documents(self, documents, embeddings):
with self.conn.cursor() as cur:
for doc, embedding in zip(documents, embeddings):
cur.execute(
"INSERT INTO rag_documents (content, embedding) VALUES (%s, %s)",
(doc, embedding)
)
self.conn.commit()
def search(self, query_embedding, k=5):
with self.conn.cursor() as cur:
cur.execute("""
SELECT content FROM rag_documents
ORDER BY embedding <-> %s
LIMIT %s
""", (query_embedding, k))
return [row[0] for row in cur.fetchall()]
Performance Benchmarks:
- Chroma: Fast for small datasets (< 1M vectors)
- Qdrant: Excellent for complex queries and filtering
- PGVector: Best for integration with existing SQL workflows
- Milvus: Highest performance for large-scale deployments
5. Full RAG Pipeline Code from Scratch
python
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from typing import List, Tuple
class RAGPipeline:
def __init__(self, embedding_model="all-MiniLM-L6-v2"):
# Initialize components
self.embedder
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Top comments (0)