DEV Community

matias yoon
matias yoon

Posted on

RAG 시스템 실전 구축 (v46)

RAG 시스템 실전 구축 (v46)

1. RAG 기본 원리 (Retrieval → Augmentation → Generation)

RAG (Retrieval-Augmented Generation)는 검색 기반 추론 시스템으로, LLM이 외부 데이터를 검색하고 이를 기반으로 답변을 생성하는 아키텍처입니다. 핵심 루프는 다음과 같습니다:

# RAG 루프 구현
class RAGPipeline:
    def __init__(self, embedder, retriever, generator):
        self.embedder = embedder
        self.retriever = retriever
        self.generator = generator

    def process(self, query):
        # 1. 임베딩 생성
        query_embedding = self.embedder.embed(query)

        # 2. 검색
        retrieved_docs = self.retriever.search(query_embedding)

        # 3. 증강
        augmented_context = self._augment_context(query, retrieved_docs)

        # 4. 생성
        response = self.generator.generate(augmented_context)
        return response

    def _augment_context(self, query, docs):
        return f"Query: {query}\n\nRelevant Docs:\n" + "\n\n".join(docs)
Enter fullscreen mode Exit fullscreen mode

2. 청킹 전략

2.1 의미 기반 청킹 (Semantic Chunking)

from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.cluster import KMeans

class SemanticChunker:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def chunk_by_semantic(self, text, threshold=0.7):
        # 문장 단위로 분할
        sentences = text.split('. ')
        embeddings = self.model.encode(sentences)

        # 클러스터링으로 의미적 단위 생성
        kmeans = KMeans(n_clusters=max(1, len(sentences)//3))
        kmeans.fit(embeddings)

        chunks = []
        for i in range(len(sentences)):
            chunks.append((sentences[i], kmeans.labels_[i]))

        # 동일 클러스터 문장 결합
        grouped_chunks = {}
        for sentence, cluster in chunks:
            if cluster not in grouped_chunks:
                grouped_chunks[cluster] = []
            grouped_chunks[cluster].append(sentence)

        return [' '.join(chunk_list) for chunk_list in grouped_chunks.values()]
Enter fullscreen mode Exit fullscreen mode

2.2 재귀적 청킹 (Recursive Chunking)

class RecursiveChunker:
    def __init__(self, chunk_size=512, overlap=50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk_recursive(self, text):
        chunks = []
        start = 0

        while start < len(text):
            end = min(start + self.chunk_size, len(text))

            # 문장 경계에서 잘라내기
            if end < len(text):
                # 문장 끝 찾기
                while end > start and text[end] not in '.!?':
                    end -= 1
                if end <= start:
                    end = start + self.chunk_size

            chunks.append(text[start:end])
            start = max(0, end - self.overlap)

        return chunks
Enter fullscreen mode Exit fullscreen mode

3. 임베딩 모델 선택 및 비교

import torch
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import numpy as np

class EmbeddingBenchmark:
    def __init__(self):
        self.models = {
            'all-MiniLM-L6-v2': SentenceTransformer('all-MiniLM-L6-v2'),
            'all-mpnet-base-v2': SentenceTransformer('all-mpnet-base-v2'),
            'text-embedding-3-small': 'openai',  # 필요 시 API 호출
        }

    def benchmark_models(self, texts):
        results = {}
        for name, model in self.models.items():
            if isinstance(model, SentenceTransformer):
                embeddings = model.encode(texts)
                results[name] = {
                    'avg_time': self._measure_time(model, texts),
                    'dimensionality': embeddings.shape[1],
                    'memory_usage': embeddings.nbytes
                }
            else:
                # OpenAI API 호출 시
                pass
        return results

    def _measure_time(self, model, texts):
        import time
        start = time.time()
        model.encode(texts)
        return time.time() - start

# 사용 예시
benchmark = EmbeddingBenchmark()
texts = ["Python은 프로그래밍 언어입니다.", "AI는 인공지능을 의미합니다."]
results = benchmark.benchmark_models(texts)
Enter fullscreen mode Exit fullscreen mode

4. 벡터 데이터베이스 비교

4.1 Chroma (로컬)

import chromadb
from chromadb.config import Settings

class ChromaVectorDB:
    def __init__(self, path="./chroma_db"):
        self.client = chromadb.Client(Settings(chroma_db_impl="duckdb", 
                                              chroma_location=path))
        self.collection = self.client.get_or_create_collection("rag_collection")

    def add_documents(self, documents, embeddings, ids):
        self.collection.add(
            documents=documents,
            embeddings=embeddings,
            ids=ids
        )

    def search(self, query_embedding, limit=5):
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=limit
        )
        return results['documents'][0]
Enter fullscreen mode Exit fullscreen mode

4.2 Qdrant (HTTP)

from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue

class QdrantVectorDB:
    def __init__(self, host="localhost", port=6333):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = "rag_collection"

    def create_collection(self, vector_size=384):
        self.client.recreate_collection(
            collection_name=self.collection_name,
            vectors_config={"size": vector_size, "distance": "Cosine"}
        )

    def add_documents(self, documents, embeddings, ids):
        points = [
            {
                "id": id_,
                "vector": emb,
                "payload": {"text": doc}
            }
            for id_, doc, emb in zip(ids, documents, embeddings)
        ]
        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )

    def search(self, query_vector, limit=5):
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            limit=limit
        )
        return [point.payload["text"] for point in results]
Enter fullscreen mode Exit fullscreen mode

4.3 pgvector (PostgreSQL)

import psycopg2
from psycopg2.extras import Json

class PGVectorDB:
    def __init__(self, connection_string):
        self.conn = psycopg2.connect(connection_string)
        self._create_table()

    def _create_table(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS rag_documents (
                    id UUID PRIMARY KEY,
                    content TEXT,
                    embedding VECTOR(384)
                )
            """)
            cur.execute("""
                CREATE INDEX IF NOT EXISTS idx_embedding 
                ON rag_documents USING ivfflat (embedding vector_cosine_ops)
            """)
        self.conn.commit()

    def add_documents(self, documents, embeddings, ids):
        with self.conn.cursor() as cur:
            for id_, doc, emb in zip(ids, documents, embeddings):
                cur.execute(
                    "INSERT INTO rag_documents (id, content, embedding) VALUES (%s, %s, %s)",
                    (id_, doc, emb)
                )
        self.conn.commit()

    def search(self, query_vector, limit=5):
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT content FROM rag_documents 
                ORDER BY embedding <-> %s 
                LIMIT %s
            """, (query_vector, limit))
            return [row[0] for row in cur.fetchall()]
Enter fullscreen mode Exit fullscreen mode

5. 완전한 RAG 파이프라인 코드


python
import uuid
import numpy as np
from sentence_transformers import SentenceTransformer
from chromadb import Client
from chromadb.config import Settings

class CompleteRAGPipeline:
    def __init__(self, embedding_model="all-MiniLM-L6-v2"):
        # 초기화
        self.embedder = SentenceTransformer(embedding_model)
        self.client = Client(Settings(chroma_db_impl="duckdb", chroma_location="./chroma_db"))
        self.collection = self.client.get_or_create_collection("rag_docs")
        self.chunker = RecursiveChunker(chunk_size=512, overlap=50)

    def add_documents(self, documents):
        """문서 추가 및 인덱싱"""
        ids = []
        chunks = []
        chunk_embeddings = []

        # 문서 청킹
        for doc in documents:
            doc_id = str(uuid.uuid4())
            doc_chunks = self.chunker.chunk_recursive(doc)

            # 각 청크 임베딩 생성
            chunk_embeddings.extend(self.embedder.encode(doc_chunks))
            chunks.extend(doc_chunks)
            ids.extend([f"{doc_id}_{i}" for i in range(len(doc_chunks))])

        # 벡터 저장

---

📥 **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($7)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)