DEV Community

lijesom9-create
lijesom9-create

Posted on

RAG Knowledge Base: Building with LangGraph and ChromaDB

RAG知识库实战:LangGraph + ChromaDB从零搭建个人知识助手

RAG(检索增强生成)是让AI拥有"外部记忆"的关键技术。本文基于education-agent项目的完整实现,结合LangChain官方最佳实践,手把手教你搭建一个支持文档上传、智能问答的个人知识库。

前言

你有没有遇到过这个问题:

问ChatGPT:"我们公司的请假流程是什么?"
ChatGPT:"抱歉,我不知道你们公司的具体政策..."

为什么?因为大模型的知识是"冻结"在训练数据里的,它不知道你公司的内部文档。

RAG(Retrieval-Augmented Generation)就是解决方案。

简单说:先从你的文档里找到相关内容,再让AI基于这些内容回答问题。

RAG架构概览

用户问题
    │
    ▼
┌─────────────────────────────────────┐
│           问题理解模块              │
│  • 意图识别                          │
│  • 问题改写                          │
│  • 子问题拆解                        │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│           检索模块                  │
│  • 关键词检索 (50%)                  │
│  • 向量检索 (30%)                    │
│  • BM25检索 (20%)                    │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│           重排序模块                │
│  • CrossEncoder重排序                │
│  • 去重、过滤                        │
└─────────────┬───────────────────────┘
              │
              ▼
┌─────────────────────────────────────┐
│           生成模块                  │
│  • 基于检索结果生成答案              │
│  • 引用溯源                          │
└─────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

文档处理Pipeline

第一步:文档解析

支持多种格式的文档:

# document/parser.py
from pathlib import Path
import PyPDF2
from docx import Document

class DocumentParser:
    """文档解析器"""

    def parse(self, file_path: str) -> str:
        """解析文档,返回纯文本"""
        path = Path(file_path)
        suffix = path.suffix.lower()

        parsers = {
            ".pdf": self._parse_pdf,
            ".docx": self._parse_docx,
            ".txt": self._parse_txt,
            ".md": self._parse_markdown,
        }

        parser = parsers.get(suffix)
        if not parser:
            raise ValueError(f"不支持的文件格式: {suffix}")

        return parser(file_path)

    def _parse_pdf(self, path: str) -> str:
        """解析PDF"""
        with open(path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            text = ""
            for page in reader.pages:
                text += page.extract_text() + "\n"
        return text

    def _parse_docx(self, path: str) -> str:
        """解析Word文档"""
        doc = Document(path)
        return "\n".join([para.text for para in doc.paragraphs])

    def _parse_txt(self, path: str) -> str:
        """解析纯文本"""
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()

    def _parse_markdown(self, path: str) -> str:
        """解析Markdown"""
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()
Enter fullscreen mode Exit fullscreen mode

第二步:智能分块

分块是RAG的关键环节。分块不好,检索效果大打折扣。

# document/chunker.py
class SmartChunker:
    """智能分块器"""

    def __init__(self, chunk_size=500, overlap=50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk(self, text: str, metadata: dict = None) -> list[dict]:
        """智能分块"""
        # 先按段落分割
        paragraphs = text.split("\n\n")

        chunks = []
        current_chunk = ""

        for para in paragraphs:
            para = para.strip()
            if not para:
                continue

            # 如果当前块加上新段落超过限制,保存当前块
            if len(current_chunk) + len(para) > self.chunk_size:
                if current_chunk:
                    chunks.append(self._create_chunk(current_chunk, metadata))

                # 新块包含上一块的末尾(overlap)
                if self.overlap > 0 and current_chunk:
                    overlap_text = current_chunk[-self.overlap:]
                    current_chunk = overlap_text + "\n\n" + para
                else:
                    current_chunk = para
            else:
                current_chunk += "\n\n" + para if current_chunk else para

        # 保存最后一块
        if current_chunk:
            chunks.append(self._create_chunk(current_chunk, metadata))

        return chunks

    def _create_chunk(self, text: str, metadata: dict = None) -> dict:
        """创建块"""
        return {
            "text": text.strip(),
            "metadata": metadata or {},
            "length": len(text)
        }
Enter fullscreen mode Exit fullscreen mode

第三步:向量化

# document/embedder.py
from sentence_transformers import SentenceTransformer

class Embedder:
    """向量化器"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def embed(self, texts: list[str]) -> list[list[float]]:
        """批量向量化"""
        return self.model.encode(texts).tolist()

    def embed_single(self, text: str) -> list[float]:
        """单条向量化"""
        return self.model.encode([text])[0].tolist()
Enter fullscreen mode Exit fullscreen mode

第四步:存储到ChromaDB

# vectorstore/chroma_store.py
import chromadb

class ChromaVectorStore:
    """ChromaDB向量存储"""

    def __init__(self, collection_name: str = "knowledge"):
        self.client = chromadb.PersistentClient(path="./chroma_db")
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )

    def add_documents(self, chunks: list[dict], embeddings: list[list[float]]):
        """添加文档块"""
        ids = [f"doc_{i}" for i in range(len(chunks))]
        documents = [chunk["text"] for chunk in chunks]
        metadatas = [chunk["metadata"] for chunk in chunks]

        self.collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=documents,
            metadatas=metadatas
        )

    def search(self, query_embedding: list[float], top_k: int = 5) -> list[dict]:
        """向量检索"""
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k
        )

        return [
            {
                "text": doc,
                "score": score,
                "metadata": meta
            }
            for doc, score, meta in zip(
                results["documents"][0],
                results["distances"][0],
                results["metadatas"][0]
            )
        ]
Enter fullscreen mode Exit fullscreen mode

混合检索策略

为什么需要混合检索?

单一检索方式有局限:

  • 关键词检索:精确匹配,但不懂语义
  • 向量检索:懂语义,但可能漏掉精确匹配
  • BM25检索:统计方法,平衡但不极致

混合检索取长补短:

# retrieval/hybrid_retriever.py
from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    """混合检索器"""

    def __init__(self, vector_store, keyword_weight=0.5, 
                 vector_weight=0.3, bm25_weight=0.2):
        self.vector_store = vector_store
        self.keyword_weight = keyword_weight
        self.vector_weight = vector_weight
        self.bm25_weight = bm25_weight

        # BM25索引
        self.bm25 = None
        self.corpus = []

    def build_index(self, documents: list[str]):
        """构建BM25索引"""
        self.corpus = documents
        tokenized_corpus = [doc.split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_corpus)

    def search(self, query: str, query_embedding: list[float], 
               top_k: int = 5) -> list[dict]:
        """混合检索"""
        # 1. 关键词检索
        keyword_results = self._keyword_search(query, top_k * 2)

        # 2. 向量检索
        vector_results = self._vector_search(query_embedding, top_k * 2)

        # 3. BM25检索
        bm25_results = self._bm25_search(query, top_k * 2)

        # 4. 融合分数
        merged = self._merge_scores(keyword_results, vector_results, bm25_results)

        # 5. 排序返回
        sorted_results = sorted(merged.items(), key=lambda x: x[1], reverse=True)

        return [
            {"text": doc, "score": score}
            for doc, score in sorted_results[:top_k]
        ]

    def _keyword_search(self, query: str, top_k: int) -> dict:
        """关键词检索"""
        results = {}
        query_terms = query.lower().split()

        for doc in self.corpus:
            score = sum(1 for term in query_terms if term in doc.lower())
            if score > 0:
                results[doc] = score / len(query_terms)

        return dict(sorted(results.items(), key=lambda x: x[1], reverse=True)[:top_k])

    def _vector_search(self, query_embedding: list[float], top_k: int) -> dict:
        """向量检索"""
        results = self.vector_store.search(query_embedding, top_k)
        return {r["text"]: 1 - r["score"] for r in results}  # 转换为相似度

    def _bm25_search(self, query: str, top_k: int) -> dict:
        """BM25检索"""
        if not self.bm25:
            return {}

        scores = self.bm25.get_scores(query.split())
        doc_scores = list(zip(self.corpus, scores))
        doc_scores.sort(key=lambda x: x[1], reverse=True)

        return {doc: score for doc, score in doc_scores[:top_k]}

    def _merge_scores(self, keyword, vector, bm25) -> dict:
        """融合分数"""
        all_docs = set(keyword.keys()) | set(vector.keys()) | set(bm25.keys())

        merged = {}
        for doc in all_docs:
            score = (
                keyword.get(doc, 0) * self.keyword_weight +
                vector.get(doc, 0) * self.vector_weight +
                bm25.get(doc, 0) * self.bm25_weight
            )
            merged[doc] = score

        return merged
Enter fullscreen mode Exit fullscreen mode

重排序

CrossEncoder重排序

# retrieval/reranker.py
from sentence_transformers import CrossEncoder

class CrossEncoderReranker:
    """CrossEncoder重排序器"""

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.model = CrossEncoder(model_name)

    def rerank(self, query: str, documents: list[dict], top_k: int = 3) -> list[dict]:
        """重排序"""
        # 构建查询-文档对
        pairs = [(query, doc["text"]) for doc in documents]

        # 计算相关性分数
        scores = self.model.predict(pairs)

        # 按分数排序
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        return [
            {**doc, "rerank_score": float(score)}
            for doc, score in scored_docs[:top_k]
        ]
Enter fullscreen mode Exit fullscreen mode

问题理解

意图识别

# retrieval/query_understanding.py
class QueryUnderstanding:
    """问题理解模块"""

    def __init__(self, llm):
        self.llm = llm

    async def understand(self, query: str) -> dict:
        """理解问题"""
        prompt = f"""请分析以下问题:

问题:{query}

请返回:
1. 意图类型(factual/how-to/comparison/opinion)
2. 关键实体
3. 改写后的查询(更适合检索的版本)
4. 是否需要拆解为子问题"""

        response = await self.llm.ainvoke(prompt)
        return parse_understanding(response)

    async def rewrite_query(self, query: str) -> str:
        """改写查询,更适合检索"""
        prompt = f"""请将以下问题改写为更适合搜索的版本:

原始问题:{query}

要求:
1. 去除口语化表达
2. 添加关键词
3. 保持原意"""

        response = await self.llm.ainvoke(prompt)
        return response.strip()
Enter fullscreen mode Exit fullscreen mode

完整的RAG Pipeline

# rag/pipeline.py
class RAGPipeline:
    """完整的RAG Pipeline"""

    def __init__(self):
        self.parser = DocumentParser()
        self.chunker = SmartChunker(chunk_size=500, overlap=50)
        self.embedder = Embedder()
        self.vector_store = ChromaVectorStore()
        self.retriever = HybridRetriever(self.vector_store)
        self.reranker = CrossEncoderReranker()
        self.query_understander = QueryUnderstanding(llm)
        self.llm = ChatOpenAI(model="gpt-4")

    async def ingest_document(self, file_path: str):
        """摄入文档"""
        print(f"📄 解析文档: {file_path}")
        text = self.parser.parse(file_path)

        print("✂️ 智能分块...")
        chunks = self.chunker.chunk(text, metadata={"source": file_path})

        print("🔢 向量化...")
        embeddings = self.embedder.embed([c["text"] for c in chunks])

        print("💾 存储到向量数据库...")
        self.vector_store.add_documents(chunks, embeddings)

        # 更新BM25索引
        self.retriever.build_index([c["text"] for c in chunks])

        print(f"✅ 完成!共处理 {len(chunks)} 个文档块")

    async def query(self, question: str) -> str:
        """查询"""
        # 1. 理解问题
        understanding = await self.query_understander.understand(question)
        rewritten_query = await self.query_understander.rewrite_query(question)

        # 2. 向量化查询
        query_embedding = self.embedder.embed_single(rewritten_query)

        # 3. 混合检索
        results = self.retriever.search(rewritten_query, query_embedding, top_k=10)

        # 4. 重排序
        reranked = self.reranker.rerank(question, results, top_k=3)

        # 5. 生成答案
        context = "\n\n".join([r["text"] for r in reranked])

        prompt = f"""基于以下参考资料回答问题。

参考资料:
{context}

问题:{question}

要求:
1. 基于参考资料回答,不要编造
2. 引用来源(标注来自哪个文档)
3. 如果参考资料不足以回答,说明需要更多信息"""

        answer = await self.llm.ainvoke(prompt)

        return {
            "answer": answer,
            "sources": [r["metadata"]["source"] for r in reranked],
            "confidence": sum(r["rerank_score"] for r in reranked) / len(reranked)
        }
Enter fullscreen mode Exit fullscreen mode

实际使用

# 使用示例
rag = RAGPipeline()

# 摄入文档
await rag.ingest_document("公司制度.pdf")
await rag.ingest_document("技术文档.md")
await rag.ingest_document("会议记录.docx")

# 查询
result = await rag.query("公司的请假流程是什么?")

print(result["answer"])
# 根据《公司制度.pdf》第3章规定:
# 1. 员工请假需提前3天申请
# 2. 3天以内由直属主管审批
# 3. 3天以上需部门经理审批
# ...

print(result["sources"])
# ['公司制度.pdf']

print(result["confidence"])
# 0.85
Enter fullscreen mode Exit fullscreen mode

优化技巧

1. 分块大小选择

分块大小 适用场景 优点 缺点
200-300 精确问答 检索精确 可能丢失上下文
500-800 通用问答 平衡 适中
1000+ 长文档理解 上下文完整 检索不够精确

2. 向量模型选择

模型 维度 特点
all-MiniLM-L6-v2 384 速度快,效果好
text-embedding-ada-002 1536 OpenAI,效果最好
bge-large-zh-v1.5 1024 中文优化

3. 检索权重调整

# 根据场景调整权重
# 精确匹配场景(如代码搜索)
retriever = HybridRetriever(
    keyword_weight=0.7,  # 提高关键词权重
    vector_weight=0.2,
    bm25_weight=0.1
)

# 语义理解场景(如开放问答)
retriever = HybridRetriever(
    keyword_weight=0.3,
    vector_weight=0.5,  # 提高向量权重
    bm25_weight=0.2
)
Enter fullscreen mode Exit fullscreen mode

总结

RAG系统的核心组件:

组件 作用 关键技术
文档解析 多格式支持 PyPDF2, python-docx
智能分块 保留语义 按段落分块,overlap
向量化 语义表示 Sentence Transformers
向量存储 高效检索 ChromaDB
混合检索 取长补短 关键词+向量+BM25
重排序 精排相关性 CrossEncoder
问题理解 优化查询 意图识别,查询改写

下一篇预告

《混合检索的威力:关键词+向量+BM25三路融合详解》— 我们会深入每种检索算法的原理,并对比它们的效果。

参考资料


RAG是让AI拥有"外部记忆"的关键技术。掌握了RAG,AI就不再是一个"失忆"的聊天机器人。

tags: rag, langchain, chromadb, vector-search, python
series: rag-knowledge-system

Top comments (0)