DEV Community

matias yoon
matias yoon

Posted on

RAG 시스템 실전 구축 (v40)

RAG 시스템 실전 구축 (v40)

개요

RAG(Retrieval-Augmented Generation) 시스템은 대규모 언어 모델(LLM)의 지식 범위를 확장하고, 정확한 정보를 기반으로 출력을 생성하기 위한 핵심 아키텍처입니다. 이 가이드는 실전에서 구현 가능한 RAG 시스템을 구축하고 최적화하는 방법을 다룹니다.

1. RAG 기본 구조

RAG 시스템은 다음과 같은 루프로 작동합니다:

  1. 검색(Retrieval): 사용자 질문과 유사한 문서 검색
  2. 보완(Augmentation): 검색된 문서와 질문을 결합하여 프롬프트 생성
  3. 생성(Generation): LLM이 결합된 프롬프트를 기반으로 답변 생성
# RAG 루프 기본 구조
class BasicRAG:
    def __init__(self, embedding_model, vector_db, llm):
        self.embedding_model = embedding_model
        self.vector_db = vector_db
        self.llm = llm

    def process_query(self, query):
        # 1. 질문 임베딩
        query_embedding = self.embedding_model.encode(query)

        # 2. 문서 검색
        relevant_docs = self.vector_db.search(query_embedding, k=5)

        # 3. 프롬프트 구성
        prompt = self._build_prompt(query, relevant_docs)

        # 4. 생성
        answer = self.llm.generate(prompt)
        return answer

    def _build_prompt(self, query, docs):
        context = "\n\n".join([doc.content for doc in docs])
        return f"질문: {query}\n\n문맥: {context}"
Enter fullscreen mode Exit fullscreen mode

2. 청킹 전략

청킹은 문서를 모델이 처리할 수 있는 크기로 나누는 과정입니다.

2.1 의미적 청킹

from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticChunker:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50,
            separators=["\n\n", "\n", " ", ""]
        )

    def chunk_documents(self, documents):
        chunks = []
        for doc in documents:
            # 문서를 작은 청크로 분할
            doc_chunks = self.text_splitter.split_text(doc.content)

            # 각 청크의 임베딩을 계산
            chunk_embeddings = self.model.encode(doc_chunks)

            # 의미적 유사도 기반으로 청크를 재조합
            chunks.extend(self._semantic_merge(doc_chunks, chunk_embeddings))
        return chunks

    def _semantic_merge(self, chunks, embeddings):
        # 유사도가 높은 청크를 병합하는 로직
        # 간단한 예: 0.8 이상 유사도 청크는 병합
        merged_chunks = []
        return chunks  # 실제 구현은 복잡한 로직 필요
Enter fullscreen mode Exit fullscreen mode

2.2 재귀적 청킹

class RecursiveChunker:
    def __init__(self, chunk_size=1000, chunk_overlap=100):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def chunk_recursive(self, text):
        # 재귀적으로 청킹 수행
        chunks = []
        start = 0

        while start < len(text):
            end = min(start + self.chunk_size, len(text))
            chunk = text[start:end]
            chunks.append(chunk)

            # 오버랩 부분 추가
            start = max(0, end - self.chunk_overlap)

        return chunks
Enter fullscreen mode Exit fullscreen mode

2.3 에이전트 기반 청킹

class AgentBasedChunker:
    def __init__(self, llm):
        self.llm = llm

    def chunk_with_agent(self, text):
        # LLM이 청킹 전략을 결정
        prompt = f"""
        문서를 의미 있는 단위로 나누어주세요:
        {text}

        각 청크는 500자 이내로 구성하고, 의미 단위를 유지하세요.
        """
        response = self.llm.generate(prompt)
        return response.split("\n\n")  # 예시: 줄바꿈 기준으로 분할
Enter fullscreen mode Exit fullscreen mode

3. 임베딩 모델 선택과 비교

import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import numpy as np

class EmbeddingBenchmark:
    def __init__(self):
        self.models = {
            "all-MiniLM-L6-v2": SentenceTransformer("all-MiniLM-L6-v2"),
            "all-mpnet-base-v2": SentenceTransformer("all-mpnet-base-v2"),
            "sentence-t5-base": SentenceTransformer("sentence-t5-base")
        }

    def benchmark_models(self, texts):
        results = {}
        for name, model in self.models.items():
            # 임베딩 생성
            embeddings = model.encode(texts)
            results[name] = {
                "embedding_shape": embeddings.shape,
                "memory_usage": embeddings.nbytes,
                "speed": self._measure_inference_time(model, texts)
            }
        return results

    def _measure_inference_time(self, model, texts):
        import time
        start = time.time()
        model.encode(texts)
        end = time.time()
        return end - start

# 사용 예시
benchmark = EmbeddingBenchmark()
texts = ["문서 내용 1", "문서 내용 2", "문서 내용 3"]
results = benchmark.benchmark_models(texts)
Enter fullscreen mode Exit fullscreen mode

4. 벡터 데이터베이스 비교

4.1 Chroma

import chromadb
from chromadb import Client

class ChromaVectorDB:
    def __init__(self, collection_name="rag_collection"):
        self.client = Client()
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )

    def add_documents(self, documents, embeddings):
        ids = [str(i) for i in range(len(documents))]
        self.collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=[doc.content for doc in documents]
        )

    def search(self, query_embedding, k=5):
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=k
        )
        return results["documents"][0]
Enter fullscreen mode Exit fullscreen mode

4.2 Qdrant

from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue

class QdrantVectorDB:
    def __init__(self, host="localhost", port=6333):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = "rag_collection"

    def create_collection(self):
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config={"size": 384, "distance": "Cosine"}
        )

    def add_documents(self, documents, embeddings):
        points = [
            {
                "id": i,
                "vector": embedding.tolist(),
                "payload": {"content": doc.content}
            }
            for i, (doc, embedding) in enumerate(zip(documents, embeddings))
        ]
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )

    def search(self, query_embedding, k=5):
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_embedding.tolist(),
            limit=k
        )
        return [point.payload["content"] for point in results]
Enter fullscreen mode Exit fullscreen mode

4.3 pgvector


python
import psycopg2
from psycopg2.extras import Json
import numpy as np

class PGVectorDB:
    def __init__(self, connection_string):
        self.conn = psycopg2.connect(connection_string)

    def create_table(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS rag_documents (
                    id SERIAL PRIMARY KEY,
                    content TEXT,
                    embedding VECTOR(384),
                    metadata JSONB
                )
            """)
            cur.execute("""
                CREATE INDEX IF NOT EXISTS idx_embedding 
                ON rag_documents USING ivfflat (embedding vector_cosine_ops)
            """)
        self.conn.commit()

    def add_document(self, content, embedding, metadata=None):
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO rag_documents (content, embedding, metadata)
                VALUES (%s, %s, %s)
            """, (content, embedding.tolist(), Json(metadata)))
        self.conn.commit()

    def search(self, query_embedding, k=5):
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT content FROM rag_documents
                ORDER BY embedding <-> %s
                LIMIT %s
            """, (query_embedding.tolist(), k))


---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)