RAG 시스템 실전 구축 (v35)
개요
RAG(Retrieval-Augmented Generation) 시스템은 대규모 언어 모델(LLM)의 정보 제한을 극복하기 위한 핵심 기술입니다. 이 가이드에서는 실제 개발 환경에서 사용할 수 있는 RAG 시스템을 구축하는 방법을 실전 중심으로 설명합니다.
1. RAG 기본 개념
RAG는 세 가지 핵심 단계로 구성됩니다:
- 검색(Retrieval): 사용자 쿼리와 유사한 문서를 검색
- 보강(Augmentation): 검색된 문서를 프롬프트에 포함
- 생성(Generation): LLM이 검색된 정보를 기반으로 답변 생성
# 기본 RAG 구조
class BasicRAG:
def __init__(self, embedding_model, vector_db, llm):
self.embedding_model = embedding_model
self.vector_db = vector_db
self.llm = llm
def query(self, user_query):
# 1. 임베딩 생성
query_embedding = self.embedding_model.encode(user_query)
# 2. 검색
retrieved_docs = self.vector_db.search(query_embedding, k=5)
# 3. 프롬프트 구성
context = "\n".join([doc.content for doc in retrieved_docs])
prompt = f"질문: {user_query}\n문맥: {context}"
# 4. 생성
response = self.llm.generate(prompt)
return response
2. Chunking 전략
문서를 적절한 단위로 분할하는 것이 중요합니다:
2.1 의미 기반 Chunking
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticChunker:
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def chunk_by_semantic(self, text, max_tokens=512):
sentences = text.split('. ')
embeddings = self.model.encode(sentences)
chunks = []
current_chunk = []
current_length = 0
for sentence, embedding in zip(sentences, embeddings):
if current_length + len(sentence.split()) > max_tokens:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_length = len(sentence.split())
else:
current_chunk.append(sentence)
current_length += len(sentence.split())
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
# 사용 예시
chunker = SemanticChunker()
chunks = chunker.chunk_by_semantic("대규모 언어 모델은... 매우 강력한 기술입니다.")
2.2 Recursive Chunking
class RecursiveChunker:
def __init__(self, chunk_size=1024, chunk_overlap=128):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk_recursive(self, text, separators=["\n\n", "\n", " ", ""]):
chunks = []
current_chunk = ""
for separator in separators:
if separator in text:
parts = text.split(separator)
for i, part in enumerate(parts):
if len(part.strip()) == 0:
continue
if len(current_chunk) + len(part) <= self.chunk_size:
current_chunk += part + separator
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = part + separator
break
else:
# 마지막 단계: 기본 분할
for i in range(0, len(text), self.chunk_size):
chunk = text[i:i + self.chunk_size]
if len(chunk) >= self.chunk_size * 0.5:
chunks.append(chunk)
if current_chunk and len(current_chunk.strip()) > 0:
chunks.append(current_chunk.strip())
return chunks
# 사용 예시
recursive_chunker = RecursiveChunker()
chunks = recursive_chunker.chunk_recursive("내용을 분할하는 데 사용됩니다.")
3. Embedding 모델 선택과 비교
from sentence_transformers import SentenceTransformer
import torch
import time
class EmbeddingBenchmark:
def __init__(self):
self.models = {
"all-MiniLM-L6-v2": SentenceTransformer("all-MiniLM-L6-v2"),
"all-mpnet-base-v2": SentenceTransformer("all-mpnet-base-v2"),
"sentence-t5-xxl": SentenceTransformer("sentence-t5-xxl"),
"bge-large-en": SentenceTransformer("BAAI/bge-large-en")
}
def benchmark_embeddings(self, texts):
results = {}
for name, model in self.models.items():
start_time = time.time()
embeddings = model.encode(texts)
end_time = time.time()
results[name] = {
"latency": end_time - start_time,
"size": len(embeddings),
"dimension": len(embeddings[0]) if len(embeddings) > 0 else 0
}
return results
# 사용 예시
benchmark = EmbeddingBenchmark()
texts = ["예시 문장 1", "예시 문장 2", "예시 문장 3"]
results = benchmark.benchmark_embeddings(texts)
for model, metrics in results.items():
print(f"{model}: {metrics['latency']:.2f}s, {metrics['dimension']}차원")
4. Vector Database 비교
4.1 Chroma
import chromadb
from chromadb.config import Settings
class ChromaVectorDB:
def __init__(self, collection_name="rag_collection"):
self.client = chromadb.Client(Settings(chroma_db_impl="duckdb"))
self.collection = self.client.get_or_create_collection(collection_name)
def add_documents(self, documents, embeddings, ids):
self.collection.add(
documents=documents,
embeddings=embeddings,
ids=ids
)
def search(self, query_embedding, k=5):
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results["documents"][0], results["ids"][0]
4.2 Qdrant
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
import numpy as np
class QdrantVectorDB:
def __init__(self, host="localhost", port=6333, collection_name="rag_collection"):
self.client = QdrantClient(host=host, port=port)
self.collection_name = collection_name
# 컬렉션 생성
self.client.recreate_collection(
collection_name=collection_name,
vectors_config={"size": 384, "distance": "Cosine"}
)
def add_documents(self, documents, embeddings, ids):
points = [
{
"id": id,
"vector": embedding,
"payload": {"content": doc}
}
for id, doc, embedding in zip(ids, documents, embeddings)
]
self.client.upsert(
collection_name=self.collection_name,
points=points
)
def search(self, query_embedding, k=5):
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=k
)
return [point.payload["content"] for point in results], [point.id for point in results]
4.3 pgvector
import psycopg2
from psycopg2.extras import Json
import numpy as np
class PGVectorDB:
def __init__(self, connection_string):
self.conn = psycopg2.connect(connection_string)
self.create_table()
def create_table(self):
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS rag_documents (
id UUID PRIMARY KEY,
content TEXT,
embedding VECTOR(384),
metadata JSONB
)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_embedding ON rag_documents USING ivfflat (embedding vector_cosine_ops)
""")
self.conn.commit()
def add_documents(self, documents, embeddings, ids):
with self.conn.cursor() as cur:
for id, content, embedding in zip(ids, documents, embeddings):
cur.execute(
"INSERT INTO rag_documents (id, content, embedding) VALUES (%s, %s, %s)",
(id, content, embedding.tolist())
)
self.conn.commit()
def search(self, query_embedding, k=5):
with self.conn.cursor() as cur:
cur.execute("""
SELECT content FROM rag_documents
ORDER BY embedding <-> %s
LIMIT %s
""", (query_embedding.tolist(), k))
results = cur.fetchall()
return [result[0] for result in results], None # ID는 반환하지 않음
5. 완전한 RAG 파이프라인 구현
python
import os
import uuid
from typing import List, Tuple
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
from langchain_open
---
📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Top comments (0)