RAG 시스템 실전 구축 (v49)
목차
- RAG 기초 개념
- 청킹 전략
- 임베딩 모델 선택
- 벡터 DB 비교
- 전체 RAG 파이프라인 구현
- 고급 기술
- 평가 및 개선
- 운영 고려사항
1. RAG 기초 개념
RAG(Retrieval-Augmented Generation)은 검색 기반 생성 모델로, LLM이 외부 문서를 검색하고 이를 기반으로 생성하는 방식입니다.
검색-증강-생성 루프
# 간단한 RAG 루프 구현
class SimpleRAG:
def __init__(self, embedding_model, vector_db, llm):
self.embedding_model = embedding_model
self.vector_db = vector_db
self.llm = llm
def retrieve(self, query, k=5):
# 1. 쿼리 임베딩
query_embedding = self.embedding_model.encode(query)
# 2. 벡터 DB에서 유사 문서 검색
results = self.vector_db.search(query_embedding, k)
return results
def generate(self, query, context):
# 3. 증강된 쿼리 생성
prompt = f"Context: {context}\n\nQuestion: {query}"
response = self.llm.generate(prompt)
return response
def process(self, query):
# 전체 루프
context = self.retrieve(query)
answer = self.generate(query, context)
return answer
2. 청킹 전략
1. 문맥 기반 청킹 (Semantic Chunking)
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
class SemanticChunker:
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def chunk_document(self, text, min_chunk_size=100, max_chunk_size=500):
# 문장 분리
sentences = self._split_sentences(text)
embeddings = self.model.encode(sentences)
# 클러스터링을 통한 청킹
chunks = self._cluster_chunks(sentences, embeddings,
min_chunk_size, max_chunk_size)
return chunks
def _split_sentences(self, text):
import re
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def _cluster_chunks(self, sentences, embeddings, min_size, max_size):
# 클러스터링 기반 청킹
n_clusters = max(1, len(sentences) // 5)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings)
chunks = []
current_chunk = []
current_length = 0
for i, (sentence, label) in enumerate(zip(sentences, labels)):
if current_length + len(sentence) > max_size:
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_length = len(sentence)
else:
current_chunk.append(sentence)
current_length += len(sentence)
if current_chunk:
chunks.append(' '.join(current_chunk))
return [chunk for chunk in chunks if len(chunk) >= min_size]
2. 재귀적 청킹 (Recursive Chunking)
class RecursiveChunker:
def __init__(self, chunk_size=500, overlap=50):
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_text(self, text):
chunks = []
start = 0
while start < len(text):
end = min(start + self.chunk_size, len(text))
chunk = text[start:end]
# 오버랩 처리
if start > 0:
overlap_start = max(0, start - self.overlap)
chunk = text[overlap_start:end]
chunks.append(chunk)
start = end - self.overlap
return chunks
3. 에이전트 기반 청킹
class AgentChunker:
def __init__(self, model):
self.model = model
def smart_chunk(self, text, context_length=1000):
# 텍스트를 여러 단위로 분할하고 의미를 판단
sentences = self._split_sentences(text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
if current_length + len(sentence) > context_length:
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_length = len(sentence)
else:
current_chunk.append(sentence)
current_length += len(sentence)
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
3. 임베딩 모델 선택 및 비교
모델 비교 클래스
import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import numpy as np
class EmbeddingComparison:
def __init__(self):
self.models = {
'all-MiniLM-L6-v2': SentenceTransformer('all-MiniLM-L6-v2'),
'all-mpnet-base-v2': SentenceTransformer('all-mpnet-base-v2'),
'sentence-t5-base': SentenceTransformer('sentence-t5-base'),
'gte-small': SentenceTransformer('sentence-t5-base')
}
def compare_models(self, texts, model_names):
results = {}
for name in model_names:
model = self.models[name]
embeddings = model.encode(texts)
results[name] = {
'mean_similarity': np.mean(
[np.dot(embeddings[i], embeddings[i+1])
for i in range(0, len(embeddings)-1, 2)]
),
'embedding_dim': embeddings.shape[1],
'latency': self._measure_latency(model, texts)
}
return results
def _measure_latency(self, model, texts):
import time
start = time.time()
model.encode(texts)
return time.time() - start
# 사용 예시
comparator = EmbeddingComparison()
texts = ["This is a sample text.", "Another example text."]
results = comparator.compare_models(texts, ['all-MiniLM-L6-v2', 'all-mpnet-base-v2'])
print(results)
성능 기준
# 최적의 모델 선택
class OptimalEmbeddingSelector:
def __init__(self, models_config):
self.models_config = models_config
def select_best_model(self, benchmark_data):
scores = {}
for model_name, config in self.models_config.items():
# 점수 계산 (정확도, 속도, 메모리 사용량)
score = (
config['accuracy'] * 0.5 +
(1/config['latency']) * 0.3 +
(1/config['memory']) * 0.2
)
scores[model_name] = score
return max(scores, key=scores.get)
4. 벡터 DB 비교
Chroma vs Qdrant vs pgvector
python
# Chroma 구현
import chromadb
from chromadb.config import Settings
class ChromaVectorDB:
def __init__(self, collection_name="rag_collection"):
self.client = chromadb.Client()
self.collection = self.client.get_or_create_collection(collection_name)
def add_documents(self, documents, embeddings, ids):
self.collection.add(
documents=documents,
embeddings=embeddings,
ids=ids
)
def search(self, query_embedding, k=5):
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results['documents'][0]
# Qdrant 구현
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Filter, FieldCondition
class QdrantVectorDB:
def __init__(self, host="localhost", port=6333, collection_name="rag_collection"):
self.client = QdrantClient(host=host, port=port)
self.collection_name = collection_name
# 컬렉션 생성
if not self.client.collection_exists(collection_name):
self.client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=384, distance="Cosine")
)
def add_documents(self, documents, embeddings, ids):
points = [
{
"id": id,
"vector": embedding.tolist(),
"payload": {"text": doc}
}
for id, doc, embedding in zip(ids, documents, embeddings)
]
self.client.upsert(
collection_name=self.collection_name,
points=points
)
def search(self, query_embedding, k=5):
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding.tolist(),
limit=k
)
return [hit
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Top comments (0)