DEV Community

matias yoon
matias yoon

Posted on

RAG 시스템 실전 구축 (v49)

RAG 시스템 실전 구축 (v49)

목차

  1. RAG 기초 개념
  2. 청킹 전략
  3. 임베딩 모델 선택
  4. 벡터 DB 비교
  5. 전체 RAG 파이프라인 구현
  6. 고급 기술
  7. 평가 및 개선
  8. 운영 고려사항

1. RAG 기초 개념

RAG(Retrieval-Augmented Generation)은 검색 기반 생성 모델로, LLM이 외부 문서를 검색하고 이를 기반으로 생성하는 방식입니다.

검색-증강-생성 루프

# 간단한 RAG 루프 구현
class SimpleRAG:
    def __init__(self, embedding_model, vector_db, llm):
        self.embedding_model = embedding_model
        self.vector_db = vector_db
        self.llm = llm

    def retrieve(self, query, k=5):
        # 1. 쿼리 임베딩
        query_embedding = self.embedding_model.encode(query)
        # 2. 벡터 DB에서 유사 문서 검색
        results = self.vector_db.search(query_embedding, k)
        return results

    def generate(self, query, context):
        # 3. 증강된 쿼리 생성
        prompt = f"Context: {context}\n\nQuestion: {query}"
        response = self.llm.generate(prompt)
        return response

    def process(self, query):
        # 전체 루프
        context = self.retrieve(query)
        answer = self.generate(query, context)
        return answer
Enter fullscreen mode Exit fullscreen mode

2. 청킹 전략

1. 문맥 기반 청킹 (Semantic Chunking)

import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans

class SemanticChunker:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def chunk_document(self, text, min_chunk_size=100, max_chunk_size=500):
        # 문장 분리
        sentences = self._split_sentences(text)
        embeddings = self.model.encode(sentences)

        # 클러스터링을 통한 청킹
        chunks = self._cluster_chunks(sentences, embeddings, 
                                    min_chunk_size, max_chunk_size)
        return chunks

    def _split_sentences(self, text):
        import re
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if s.strip()]

    def _cluster_chunks(self, sentences, embeddings, min_size, max_size):
        # 클러스터링 기반 청킹
        n_clusters = max(1, len(sentences) // 5)
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings)

        chunks = []
        current_chunk = []
        current_length = 0

        for i, (sentence, label) in enumerate(zip(sentences, labels)):
            if current_length + len(sentence) > max_size:
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]
                current_length = len(sentence)
            else:
                current_chunk.append(sentence)
                current_length += len(sentence)

        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return [chunk for chunk in chunks if len(chunk) >= min_size]
Enter fullscreen mode Exit fullscreen mode

2. 재귀적 청킹 (Recursive Chunking)

class RecursiveChunker:
    def __init__(self, chunk_size=500, overlap=50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk_text(self, text):
        chunks = []
        start = 0

        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            chunk = text[start:end]

            # 오버랩 처리
            if start > 0:
                overlap_start = max(0, start - self.overlap)
                chunk = text[overlap_start:end]

            chunks.append(chunk)
            start = end - self.overlap

        return chunks
Enter fullscreen mode Exit fullscreen mode

3. 에이전트 기반 청킹

class AgentChunker:
    def __init__(self, model):
        self.model = model

    def smart_chunk(self, text, context_length=1000):
        # 텍스트를 여러 단위로 분할하고 의미를 판단
        sentences = self._split_sentences(text)
        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            if current_length + len(sentence) > context_length:
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]
                current_length = len(sentence)
            else:
                current_chunk.append(sentence)
                current_length += len(sentence)

        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return chunks
Enter fullscreen mode Exit fullscreen mode

3. 임베딩 모델 선택 및 비교

모델 비교 클래스

import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import numpy as np

class EmbeddingComparison:
    def __init__(self):
        self.models = {
            'all-MiniLM-L6-v2': SentenceTransformer('all-MiniLM-L6-v2'),
            'all-mpnet-base-v2': SentenceTransformer('all-mpnet-base-v2'),
            'sentence-t5-base': SentenceTransformer('sentence-t5-base'),
            'gte-small': SentenceTransformer('sentence-t5-base')
        }

    def compare_models(self, texts, model_names):
        results = {}
        for name in model_names:
            model = self.models[name]
            embeddings = model.encode(texts)
            results[name] = {
                'mean_similarity': np.mean(
                    [np.dot(embeddings[i], embeddings[i+1]) 
                     for i in range(0, len(embeddings)-1, 2)]
                ),
                'embedding_dim': embeddings.shape[1],
                'latency': self._measure_latency(model, texts)
            }
        return results

    def _measure_latency(self, model, texts):
        import time
        start = time.time()
        model.encode(texts)
        return time.time() - start

# 사용 예시
comparator = EmbeddingComparison()
texts = ["This is a sample text.", "Another example text."]
results = comparator.compare_models(texts, ['all-MiniLM-L6-v2', 'all-mpnet-base-v2'])
print(results)
Enter fullscreen mode Exit fullscreen mode

성능 기준

# 최적의 모델 선택
class OptimalEmbeddingSelector:
    def __init__(self, models_config):
        self.models_config = models_config

    def select_best_model(self, benchmark_data):
        scores = {}
        for model_name, config in self.models_config.items():
            # 점수 계산 (정확도, 속도, 메모리 사용량)
            score = (
                config['accuracy'] * 0.5 +
                (1/config['latency']) * 0.3 +
                (1/config['memory']) * 0.2
            )
            scores[model_name] = score

        return max(scores, key=scores.get)
Enter fullscreen mode Exit fullscreen mode

4. 벡터 DB 비교

Chroma vs Qdrant vs pgvector


python
# Chroma 구현
import chromadb
from chromadb.config import Settings

class ChromaVectorDB:
    def __init__(self, collection_name="rag_collection"):
        self.client = chromadb.Client()
        self.collection = self.client.get_or_create_collection(collection_name)

    def add_documents(self, documents, embeddings, ids):
        self.collection.add(
            documents=documents,
            embeddings=embeddings,
            ids=ids
        )

    def search(self, query_embedding, k=5):
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=k
        )
        return results['documents'][0]

# Qdrant 구현
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Filter, FieldCondition

class QdrantVectorDB:
    def __init__(self, host="localhost", port=6333, collection_name="rag_collection"):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = collection_name

        # 컬렉션 생성
        if not self.client.collection_exists(collection_name):
            self.client.recreate_collection(
                collection_name=collection_name,
                vectors_config=VectorParams(size=384, distance="Cosine")
            )

    def add_documents(self, documents, embeddings, ids):
        points = [
            {
                "id": id,
                "vector": embedding.tolist(),
                "payload": {"text": doc}
            }
            for id, doc, embedding in zip(ids, documents, embeddings)
        ]
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )

    def search(self, query_embedding, k=5):
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding.tolist(),
            limit=k
        )
        return [hit

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)