目录


1. RAG 概述:为什么需要检索增强

1.1 LLM 的两大局限

局限 说明 例子
知识过时 训练数据有截止日期 问 2026 年的新闻,LLM 不知道
缺乏私有知识 无法访问企业内部数据 问"公司退货政策",LLM 不知道

1.2 RAG 的解决思路

RAG

用户提问

检索相关文档

将文档作为上下文

LLM基于上下文回答

准确✅

传统LLM

用户提问

LLM凭记忆回答

可能不准确❌

1.3 RAG 的完整流程

查询阶段 - 在线

索引阶段 - 离线

向量库

1.文档加载

2.文本分块

3.生成嵌入

4.存储到向量库

5.用户提问

6.检索相关文档

7.重排序优化

8.LLM生成回答

1.4 安装依赖

# LangChain 核心组件
pip install langchain langchain-core langchain-openai langchain-community langchain-text-splitters

# 文档解析
pip install pymupdf           # PDF 解析(轻量快速)
pip install python-docx       # Word 文档
pip install beautifulsoup4    # HTML 解析

# 嵌入模型
pip install sentence-transformers     # 通用嵌入模型库

# 向量数据库
pip install chromadb                  # 轻量级向量库

# 搜索增强
pip install rank_bm25 jieba           # BM25 关键词搜索 + 中文分词

# 可选:重排序
pip install FlagEmbedding             # BGE Reranker

2. LangChain 文档加载器

2.1 LangChain Document Loaders 概览

LangChain Document Loaders

PyPDFLoader

Docx2txtLoader

UnstructuredHTMLLoader

TextLoader

DirectoryLoader

PDF文档

Word文档

HTML网页

纯文本/Markdown

批量加载目录

2.2 统一文档加载器实现

"""
rag/document_loader.py - LangChain 文档加载器
"""
from langchain_community.document_loaders import (
    PyPDFLoader,
    Docx2txtLoader,
    UnstructuredHTMLLoader,
    TextLoader,
    DirectoryLoader,
)
from langchain_core.documents import Document
from typing import List, Optional
from pathlib import Path
import hashlib


class UnifiedDocumentLoader:
    """
    统一文档加载器

    基于 LangChain Document Loaders 实现
    支持 PDF、Word、HTML、Markdown、TXT 格式
    """

    def __init__(self, encoding: str = "utf-8"):
        self.encoding = encoding

    def load_file(self, file_path: str) -> List[Document]:
        """
        加载单个文件

        Args:
            file_path: 文件路径

        Returns:
            LangChain Document 列表
        """
        path = Path(file_path)
        if not path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")

        suffix = path.suffix.lower()

        # 根据文件类型选择加载器
        if suffix == ".pdf":
            loader = PyPDFLoader(file_path)
        elif suffix in [".docx", ".doc"]:
            loader = Docx2txtLoader(file_path)
        elif suffix in [".html", ".htm"]:
            loader = UnstructuredHTMLLoader(file_path)
        elif suffix in [".md", ".txt"]:
            loader = TextLoader(file_path, encoding=self.encoding)
        else:
            raise ValueError(f"不支持的文件格式: {suffix}")

        # 加载文档
        documents = loader.load()

        # 添加文件哈希和来源信息
        file_hash = self._compute_hash(file_path)
        for doc in documents:
            doc.metadata["source"] = str(path)
            doc.metadata["file_type"] = suffix
            doc.metadata["file_hash"] = file_hash

        return documents

    def load_directory(
        self,
        directory_path: str,
        glob_pattern: str = "**/*.pdf",
    ) -> List[Document]:
        """
        批量加载目录中的文件

        Args:
            directory_path: 目录路径
            glob_pattern: 文件匹配模式

        Returns:
            Document 列表
        """
        loader = DirectoryLoader(
            directory_path,
            glob=glob_pattern,
            show_progress=True,
        )
        return loader.load()

    def load_multiple(self, file_paths: List[str]) -> List[Document]:
        """
        加载多个文件

        Args:
            file_paths: 文件路径列表

        Returns:
            Document 列表
        """
        all_documents = []
        for path in file_paths:
            try:
                docs = self.load_file(path)
                all_documents.extend(docs)
                print(f"✅ {path}: {len(docs)} 页/段")
            except Exception as e:
                print(f"❌ {path}: {e}")

        return all_documents

    @staticmethod
    def _compute_hash(file_path: str) -> str:
        """计算文件 SHA256 哈希"""
        sha256 = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(8192), b""):
                sha256.update(chunk)
        return sha256.hexdigest()


# === 使用示例 ===
if __name__ == "__main__":
    loader = UnifiedDocumentLoader()

    # 加载单个文件
    # pdf_docs = loader.load_file("example.pdf")
    # print(f"PDF: {len(pdf_docs)} 页")

    # 加载多个文件
    # all_docs = loader.load_multiple(["doc1.pdf", "doc2.docx", "doc3.md"])

    # 批量加载目录
    # dir_docs = loader.load_directory("./knowledge_base", glob_pattern="**/*.pdf")

3. 文本分块策略

3.1 为什么需要分块

解决方案

分块处理
每块500-1000字

精准匹配

✅ 适配LLM窗口

✅ 检索精确

✅ 减少噪音

问题

一篇50页PDF
约5万字

整体作为上下文

❌ 超出LLM窗口

❌ 检索不精确

❌ 包含噪音

3.2 LangChain Text Splitters 对比

策略 LangChain 类 原理 适合场景 推荐度
固定大小 CharacterTextSplitter 按字符数切分 结构均匀的文档 ⭐⭐
递归分块 RecursiveCharacterTextSplitter 按分隔符层次切分 通用场景(生产默认 ⭐⭐⭐⭐⭐
Markdown 分块 MarkdownHeaderTextSplitter 按标题层级切分 Markdown / 技术文档 ⭐⭐⭐⭐
代码分块 LanguageTextSplitter 按语法结构切分 代码文档 ⭐⭐⭐⭐
语义分块 SemanticChunker 按语义相似度切分 主题频繁切换 ⭐⭐⭐

3.3 LangChain 分块器实现

"""
rag/text_splitter.py - LangChain 文本分割器
"""
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    CharacterTextSplitter,
)
from langchain_core.documents import Document
from typing import List, Optional


class DocumentChunker:
    """
    文档分块器

    基于 LangChain Text Splitters 实现
    """

    def __init__(
        self,
        chunk_size: int = 800,
        chunk_overlap: int = 200,
        separators: Optional[List[str]] = None,
    ):
        """
        Args:
            chunk_size: 每个块的最大字符数
            chunk_overlap: 相邻块之间的重叠字符数
            separators: 分隔符列表(优先级从高到低)
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        # 默认分隔符(适合中文)
        self.separators = separators or ["\n\n", "\n", "。", ".", " ", ""]

        # 创建递归分块器
        self.recursive_splitter = RecursiveCharacterTextSplitter(
            separators=self.separators,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            add_start_index=True,
        )

    def split_documents(
        self,
        documents: List[Document],
        chunk_strategy: str = "recursive",
    ) -> List[Document]:
        """
        对文档列表进行分块

        Args:
            documents: LangChain Document 列表
            chunk_strategy: 分块策略
                - "recursive": 递归分块(默认)
                - "markdown": Markdown 分块
                - "fixed": 固定大小分块

        Returns:
            分块后的 Document 列表
        """
        all_chunks = []

        for doc in documents:
            if chunk_strategy == "markdown" and doc.metadata.get("file_type") == ".md":
                chunks = self._split_markdown(doc)
            else:
                chunks = self.recursive_splitter.split_documents([doc])

            # 添加块索引
            for i, chunk in enumerate(chunks):
                chunk.metadata["chunk_index"] = i

            all_chunks.extend(chunks)

        return all_chunks

    def _split_markdown(self, document: Document) -> List[Document]:
        """Markdown 分块(按标题层级)"""
        # 按标题分割
        headers_to_split_on = [
            ("#", "header_1"),
            ("##", "header_2"),
            ("###", "header_3"),
        ]

        md_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on,
            strip_headers=False,
        )

        md_chunks = md_splitter.split_text(document.page_content)

        # 如果块仍然太大,再用递归分块
        final_chunks = []
        for chunk in md_chunks:
            if len(chunk.page_content) > self.chunk_size:
                sub_chunks = self.recursive_splitter.split_documents([chunk])
                final_chunks.extend(sub_chunks)
            else:
                final_chunks.append(chunk)

        return final_chunks

    def split_text(self, text: str) -> List[str]:
        """
        对纯文本进行分块

        Args:
            text: 文本内容

        Returns:
            分块后的文本列表
        """
        return self.recursive_splitter.split_text(text)


# === 使用示例 ===
if __name__ == "__main__":
    from rag.document_loader import UnifiedDocumentLoader

    # 加载文档
    loader = UnifiedDocumentLoader()
    # docs = loader.load_file("example.pdf")

    # 分块
    chunker = DocumentChunker(chunk_size=800, chunk_overlap=200)
    # chunks = chunker.split_documents(docs)

    # print(f"原始文档: {len(docs)} 页/段")
    # print(f"分块结果: {len(chunks)} 块")

    # 测试纯文本分块
    text = """
    人工智能是计算机科学的一个分支,它企图了解智能的实质,
    并生产出一种新的能以人类智能相似的方式做出反应的智能机器。

    机器学习是人工智能的核心,是使计算机具有智能的根本途径。
    机器学习专门研究计算机怎样模拟或实现人类的学习行为,
    以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。
    """

    chunks = chunker.split_text(text)
    print(f"分块数量: {len(chunks)}")
    for i, chunk in enumerate(chunks):
        print(f"\n--- 块 {i+1} ({len(chunk)} 字符) ---")
        print(chunk[:100] + "...")

3.4 分块参数选择指南

场景 chunk_size chunk_overlap 说明
事实性问答 256-512 50-100 精准匹配,减少噪音
通用场景 500-1000 100-200 推荐默认值
分析性问答 1000-2000 200-400 需要更多上下文
代码文档 500-1500 50-100 保持代码完整性
法律/合同 1500-3000 200 保持条款完整

💡 经验法则:overlap 设为 chunk_size 的 10%-20%,确保相邻块之间有足够的上下文衔接。


4. 嵌入模型:让文本变成向量

4.1 中文嵌入模型选择

模型 维度 特点 推荐场景
BAAI/bge-small-zh-v1.5 512 轻量快速 开发测试、资源受限
BAAI/bge-base-zh-v1.5 768 平衡性能 通用推荐
BAAI/bge-large-zh-v1.5 1024 中文效果最佳 追求最高质量
BAAI/bge-m3 1024 多语言+多功能 中英混合场景

4.2 LangChain Embeddings 封装

"""
rag/embeddings.py - LangChain Embedding 模型
"""
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_core.embeddings import Embeddings
from typing import List, Optional
import os
import numpy as np


def get_embeddings(
    provider: str = "bge",
    model_name: Optional[str] = None,
    device: str = "cpu",
) -> Embeddings:
    """
    获取 LangChain Embeddings 实例

    Args:
        provider: 模型提供商
            - "bge": BGE 中文模型(本地运行,推荐)
            - "openai": OpenAI Embeddings
            - "deepseek": DeepSeek Embeddings
        model_name: 模型名称(可选)
        device: 设备,"cpu" 或 "cuda"

    Returns:
        LangChain Embeddings 实例
    """
    if provider == "bge":
        model = model_name or "BAAI/bge-base-zh-v1.5"
        return HuggingFaceEmbeddings(
            model_name=model,
            model_kwargs={"device": device},
            encode_kwargs={"normalize_embeddings": True},
        )

    elif provider == "openai":
        return OpenAIEmbeddings(
            model=model_name or "text-embedding-3-small",
            openai_api_key=os.getenv("OPENAI_API_KEY"),
        )

    elif provider == "deepseek":
        # DeepSeek 兼容 OpenAI 接口
        return OpenAIEmbeddings(
            model=model_name or "text-embedding-3-small",
            openai_api_key=os.getenv("DEEPSEEK_API_KEY"),
            openai_api_base="https://api.deepseek.com/v1",
        )

    else:
        raise ValueError(f"未知的 Embedding 提供商: {provider}")


class EmbeddingUtils:
    """Embedding 工具类"""

    @staticmethod
    def similarity(vec1: List[float], vec2: List[float]) -> float:
        """计算余弦相似度"""
        arr1 = np.array(vec1)
        arr2 = np.array(vec2)
        return float(np.dot(arr1, arr2) / (np.linalg.norm(arr1) * np.linalg.norm(arr2)))

    @staticmethod
    def batch_similarity(
        query_vec: List[float],
        doc_vecs: List[List[float]],
    ) -> List[float]:
        """批量计算相似度"""
        query = np.array(query_vec)
        docs = np.array(doc_vecs)
        # 向量已归一化,直接点积
        scores = np.dot(docs, query)
        return scores.tolist()


# === 使用示例 ===
if __name__ == "__main__":
    # 获取 BGE 嵌入模型
    embeddings = get_embeddings("bge")

    # 嵌入单个查询
    query_vec = embeddings.embed_query("什么是机器学习?")
    print(f"查询向量维度: {len(query_vec)}")

    # 批量嵌入文档
    docs = ["机器学习是AI的分支", "深度学习使用神经网络", "今天天气不错"]
    doc_vecs = embeddings.embed_documents(docs)
    print(f"文档向量数量: {len(doc_vecs)}, 维度: {len(doc_vecs[0])}")

    # 计算相似度
    score = EmbeddingUtils.similarity(
        embeddings.embed_query("机器学习"),
        embeddings.embed_query("深度学习"),
    )
    print(f"相似度: {score:.4f}")

5. 向量检索与混合搜索

5.1 LangChain Retriever 架构

LangChain Retriever

VectorStoreRetriever

MultiQueryRetriever

ContextualCompressionRetriever

EnsembleRetriever

向量相似度检索

多查询扩展检索

上下文压缩检索

多检索器融合

5.2 向量检索器实现

"""
rag/retriever.py - LangChain 检索器
"""
from langchain_community.vectorstores import Chroma
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from typing import List, Dict, Optional
import jieba


class VectorStoreRetrieverWrapper:
    """
    向量存储检索器封装

    基于 LangChain Chroma 实现
    """

    def __init__(
        self,
        embeddings: Embeddings,
        collection_name: str = "rag_docs",
        persist_dir: str = "./chroma_db",
    ):
        self.embeddings = embeddings
        self.collection_name = collection_name
        self.persist_dir = persist_dir
        self.vectorstore: Optional[Chroma] = None

    def add_documents(self, documents: List[Document]):
        """添加文档到向量库"""
        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
            collection_name=self.collection_name,
            persist_directory=self.persist_dir,
        )
        print(f"[VectorStore] 添加了 {len(documents)} 个文档块")

    def as_retriever(self, search_kwargs: Optional[Dict] = None):
        """获取 LangChain Retriever"""
        if self.vectorstore is None:
            raise RuntimeError("请先添加文档")
        return self.vectorstore.as_retriever(
            search_kwargs=search_kwargs or {"k": 5}
        )

    def similarity_search(self, query: str, k: int = 5) -> List[Document]:
        """相似度搜索"""
        if self.vectorstore is None:
            raise RuntimeError("请先添加文档")
        return self.vectorstore.similarity_search(query, k=k)

    def similarity_search_with_score(
        self,
        query: str,
        k: int = 5,
    ) -> List[tuple]:
        """带分数的相似度搜索"""
        if self.vectorstore is None:
            raise RuntimeError("请先添加文档")
        return self.vectorstore.similarity_search_with_score(query, k=k)


class HybridRetrieverWrapper:
    """
    混合检索器封装

    结合向量检索和 BM25 关键词检索
    使用 LangChain EnsembleRetriever 实现
    """

    def __init__(
        self,
        embeddings: Embeddings,
        alpha: float = 0.5,
    ):
        """
        Args:
            embeddings: LangChain Embeddings 实例
            alpha: 向量检索权重(1-alpha 为 BM25 权重)
        """
        self.embeddings = embeddings
        self.alpha = alpha
        self.documents: List[Document] = []
        self.vectorstore: Optional[Chroma] = None
        self.bm25_retriever = None
        self.ensemble_retriever = None

    def add_documents(self, documents: List[Document]):
        """添加文档"""
        self.documents = documents

        # 创建向量存储
        self.vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
        )

        # 创建 BM25 检索器
        self.bm25_retriever = BM25Retriever.from_documents(
            documents,
            # 使用 jieba 分词
            preprocess_func=lambda text: " ".join(jieba.cut(text)),
        )

        # 创建混合检索器
        vector_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
        self.bm25_retriever.k = 10

        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[vector_retriever, self.bm25_retriever],
            weights=[self.alpha, 1 - self.alpha],
        )

        print(f"[HybridRetriever] 添加了 {len(documents)} 个文档块")

    def retrieve(self, query: str, k: int = 5) -> List[Document]:
        """检索"""
        if self.ensemble_retriever is None:
            raise RuntimeError("请先添加文档")
        return self.ensemble_retriever.invoke(query)[:k]

    def retrieve_with_scores(self, query: str, k: int = 5) -> List[Dict]:
        """带分数的检索"""
        # 向量检索
        vector_results = self.vectorstore.similarity_search_with_score(query, k=k * 2)

        # 合并结果
        results = []
        seen = set()
        for doc, score in vector_results:
            doc_id = doc.page_content[:50]
            if doc_id not in seen:
                seen.add(doc_id)
                results.append({
                    "document": doc,
                    "score": float(score),
                })

        return results[:k]


# === 使用示例 ===
if __name__ == "__main__":
    from rag.embeddings import get_embeddings

    # 获取嵌入模型
    embeddings = get_embeddings("bge")

    # 准备测试文档
    docs = [
        Document(page_content="机器学习是人工智能的一个分支,让计算机从数据中学习"),
        Document(page_content="深度学习使用多层神经网络处理数据"),
        Document(page_content="Python是一种流行的编程语言,语法简洁"),
        Document(page_content="自然语言处理让计算机理解人类语言"),
    ]

    # 向量检索
    print("=== 向量检索 ===")
    vector_retriever = VectorStoreRetrieverWrapper(embeddings)
    vector_retriever.add_documents(docs)
    results = vector_retriever.similarity_search("什么是深度学习", k=3)
    for doc in results:
        print(f"  {doc.page_content[:50]}...")

    # 混合检索
    print("\n=== 混合检索 ===")
    hybrid_retriever = HybridRetrieverWrapper(embeddings, alpha=0.7)
    hybrid_retriever.add_documents(docs)
    results = hybrid_retriever.retrieve("神经网络", k=3)
    for doc in results:
        print(f"  {doc.page_content[:50]}...")

6. 重排序(Reranking)

6.1 为什么需要重排序

向量检索
Top 20

可能包含
不相关结果

Reranker
精排

Top 5
高质量结果✅

研究表明,重排序可以:

  • 提升检索准确率 10-30%
  • 减少 35% 的 LLM 幻觉

6.2 LangChain Reranker 集成

"""
rag/reranker.py - LangChain 重排序器
"""
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank
from langchain_core.documents import Document
from typing import List, Dict, Optional


class BGEReranker:
    """
    BGE Reranker 重排序器

    安装: pip install FlagEmbedding
    模型: BAAI/bge-reranker-v2-m3(多语言,支持中英文)
    """

    def __init__(
        self,
        model_name: str = "BAAI/bge-reranker-v2-m3",
        device: str = "cpu",
    ):
        from FlagEmbedding import FlagReranker

        self.reranker = FlagReranker(
            model_name,
            use_fp16=(device == "cuda"),
        )
        print(f"[Reranker] 模型: {model_name}")

    def rerank(
        self,
        query: str,
        documents: List[Document],
        top_k: int = 5,
    ) -> List[Dict]:
        """
        对检索结果进行重排序

        Args:
            query: 查询文本
            documents: 候选文档列表
            top_k: 返回数量

        Returns:
            重排序后的结果列表
        """
        if not documents:
            return []

        # 构建 query-doc 对
        pairs = [(query, doc.page_content) for doc in documents]

        # 批量评分
        scores = self.reranker.compute_score(pairs)

        # 确保是列表
        if not isinstance(scores, list):
            scores = [scores]

        # 排序
        scored = list(zip(documents, scores))
        scored.sort(key=lambda x: x[1], reverse=True)

        return [
            {
                "document": doc,
                "rerank_score": float(score),
            }
            for doc, score in scored[:top_k]
        ]


class FlashrankRerankerWrapper:
    """
    Flashrank Reranker 封装

    轻量级,适合快速部署
    安装: pip install flashrank
    """

    def __init__(self, model_name: str = "ms-marco-MiniLM-L-12-v2"):
        self.compressor = FlashrankRerank(model_name=model_name)
        print(f"[Flashrank] 模型: {model_name}")

    def create_compression_retriever(self, base_retriever):
        """
        创建压缩检索器

        将 Reranker 与检索器结合
        """
        return ContextualCompressionRetriever(
            base_compressor=self.compressor,
            base_retriever=base_retriever,
        )


# === 使用示例 ===
if __name__ == "__main__":
    # 模拟检索结果
    docs = [
        Document(page_content="机器学习是人工智能的核心技术之一"),
        Document(page_content="今天北京天气晴朗"),
        Document(page_content="深度学习是机器学习的子集,使用神经网络"),
        Document(page_content="Python由Guido van Rossum创建"),
        Document(page_content="强化学习通过奖励信号训练智能体"),
    ]

    # BGE Reranker
    reranker = BGEReranker()
    results = reranker.rerank("什么是深度学习?", docs, top_k=3)

    print("=== 重排序结果 ===")
    for r in results:
        print(f"  [{r['rerank_score']:.4f}] {r['document'].page_content[:50]}...")

7. LangChain RAG Chain 完整实现

7.1 使用 LCEL 构建 RAG Chain

"""
rag/rag_chain.py - LangChain RAG Chain
使用 LCEL 构建完整的 RAG 管道
"""
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.documents import Document
from typing import List, Dict, Optional
import os


def create_rag_chain(
    vectorstore: Chroma,
    llm_model: str = "deepseek-chat",
    llm_base_url: str = "https://api.deepseek.com/v1",
    temperature: float = 0.3,
):
    """
    创建 RAG Chain

    使用 LCEL 语法构建

    Args:
        vectorstore: LangChain 向量存储
        llm_model: LLM 模型名称
        llm_base_url: LLM API 地址
        temperature: 温度参数

    Returns:
        RAG Chain
    """
    # 1. 创建 LLM
    llm = ChatOpenAI(
        model=llm_model,
        base_url=llm_base_url,
        api_key=os.getenv("DEEPSEEK_API_KEY"),
        temperature=temperature,
    )

    # 2. 创建检索器
    retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

    # 3. 创建 Prompt
    prompt = ChatPromptTemplate.from_messages([
        ("system", """你是一个智能助手,基于提供的参考信息回答问题。

参考信息:
{context}

请遵循以下原则:
1. 只使用参考信息中的内容回答问题
2. 如果参考信息不足以回答问题,请明确说明
3. 回答要准确、简洁、有条理
4. 标注信息来源"""),
        ("human", "{question}"),
    ])

    # 4. 文档格式化函数
    def format_docs(docs: List[Document]) -> str:
        formatted = []
        for i, doc in enumerate(docs, 1):
            source = doc.metadata.get("source", "未知")
            page = doc.metadata.get("page", "")
            formatted.append(f"[文档{i}] 来源: {source}{page}页\n{doc.page_content}")
        return "\n\n---\n\n".join(formatted)

    # 5. 构建 RAG Chain (LCEL)
    rag_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    return rag_chain, retriever


def create_rag_chain_with_sources(
    vectorstore: Chroma,
    llm_model: str = "deepseek-chat",
    llm_base_url: str = "https://api.deepseek.com/v1",
    temperature: float = 0.3,
):
    """
    创建带来源追踪的 RAG Chain

    返回答案和来源文档
    """
    llm = ChatOpenAI(
        model=llm_model,
        base_url=llm_base_url,
        api_key=os.getenv("DEEPSEEK_API_KEY"),
        temperature=temperature,
    )

    retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

    prompt = ChatPromptTemplate.from_messages([
        ("system", """基于参考信息回答问题。

参考信息:
{context}

回答时标注来源编号。"""),
        ("human", "{question}"),
    ])

    def format_docs(docs: List[Document]) -> str:
        return "\n\n".join(
            f"[{i}] {doc.page_content}"
            for i, doc in enumerate(docs, 1)
        )

    # 使用 RunnableParallel 同时返回答案和来源
    rag_chain_from_docs = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    # 完整 Chain
    rag_chain_with_sources = RunnableParallel(
        answer=rag_chain_from_docs,
        sources=retriever,
    )

    return rag_chain_with_sources


# === 使用示例 ===
if __name__ == "__main__":
    # 创建嵌入模型
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5",
        model_kwargs={"device": "cpu"},
    )

    # 创建测试文档
    docs = [
        Document(
            page_content="公司退货政策:购买后30天内可无理由退货,需保持商品完好。退货运费由买家承担。",
            metadata={"source": "policy.pdf", "page": 1},
        ),
        Document(
            page_content="产品保修政策:所有电子产品享受一年官方保修。保修期内非人为损坏免费维修。",
            metadata={"source": "policy.pdf", "page": 2},
        ),
        Document(
            page_content="配送说明:默认顺丰快递。一线城市次日达,满99元包邮。",
            metadata={"source": "policy.pdf", "page": 3},
        ),
    ]

    # 创建向量存储
    vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=embeddings,
    )

    # 创建 RAG Chain
    rag_chain, retriever = create_rag_chain(vectorstore)

    # 查询
    question = "退货政策是什么?"
    answer = rag_chain.invoke(question)
    print(f"问题: {question}")
    print(f"答案: {answer}")

    # 带来源的查询
    print("\n=== 带来源追踪 ===")
    rag_chain_sources = create_rag_chain_with_sources(vectorstore)
    result = rag_chain_sources.invoke("保修期多久?")
    print(f"答案: {result['answer']}")
    print(f"来源文档数: {len(result['sources'])}")

7.2 完整 RAG 管道类

"""
rag/rag_pipeline.py - 完整 RAG 管道
"""
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from dataclasses import dataclass
from typing import List, Dict, Optional
import os
import time


@dataclass
class RAGConfig:
    """RAG 配置"""
    # 嵌入模型
    embedding_model: str = "BAAI/bge-base-zh-v1.5"

    # 分块
    chunk_size: int = 800
    chunk_overlap: int = 200

    # 检索
    retrieval_k: int = 5

    # 重排序
    enable_reranker: bool = False

    # LLM
    llm_model: str = "deepseek-chat"
    llm_base_url: str = "https://api.deepseek.com/v1"
    llm_temperature: float = 0.3

    # 向量库
    chroma_dir: str = "./rag_chroma_db"
    collection_name: str = "rag_docs"


@dataclass
class RAGResult:
    """RAG 查询结果"""
    answer: str
    sources: List[Dict]
    latency_ms: float = 0


class RAGPipeline:
    """
    完整 RAG 管道

    整合文档加载、分块、嵌入、检索、生成
    """

    def __init__(self, config: Optional[RAGConfig] = None):
        self.config = config or RAGConfig()

        # 延迟初始化
        self._embeddings = None
        self._vectorstore = None
        self._llm = None
        self._reranker = None

    @property
    def embeddings(self):
        if self._embeddings is None:
            self._embeddings = HuggingFaceEmbeddings(
                model_name=self.config.embedding_model,
                model_kwargs={"device": "cpu"},
                encode_kwargs={"normalize_embeddings": True},
            )
        return self._embeddings

    @property
    def llm(self):
        if self._llm is None:
            self._llm = ChatOpenAI(
                model=self.config.llm_model,
                base_url=self.config.llm_base_url,
                api_key=os.getenv("DEEPSEEK_API_KEY"),
                temperature=self.config.llm_temperature,
            )
        return self._llm

    @property
    def reranker(self):
        if self._reranker is None and self.config.enable_reranker:
            from rag.reranker import BGEReranker
            self._reranker = BGEReranker()
        return self._reranker

    def index_documents(self, documents: List[Document]):
        """
        索引文档

        Args:
            documents: LangChain Document 列表
        """
        print(f"[RAG] 索引 {len(documents)} 个文档块...")

        self._vectorstore = Chroma.from_documents(
            documents=documents,
            embedding=self.embeddings,
            collection_name=self.config.collection_name,
            persist_directory=self.config.chroma_dir,
        )

        print(f"[RAG] 索引完成")

    def index_from_files(self, file_paths: List[str]):
        """
        从文件索引

        Args:
            file_paths: 文件路径列表
        """
        from rag.document_loader import UnifiedDocumentLoader
        from rag.text_splitter import DocumentChunker

        # 加载文档
        loader = UnifiedDocumentLoader()
        all_docs = []
        for path in file_paths:
            try:
                docs = loader.load_file(path)
                all_docs.extend(docs)
                print(f"✅ {path}: {len(docs)} 页/段")
            except Exception as e:
                print(f"❌ {path}: {e}")

        # 分块
        chunker = DocumentChunker(
            chunk_size=self.config.chunk_size,
            chunk_overlap=self.config.chunk_overlap,
        )
        chunks = chunker.split_documents(all_docs)

        # 索引
        self.index_documents(chunks)

    def query(self, question: str) -> RAGResult:
        """
        查询

        Args:
            question: 用户问题

        Returns:
            RAGResult
        """
        start_time = time.time()

        if self._vectorstore is None:
            return RAGResult(
                answer="知识库尚未建立,请先索引文档。",
                sources=[],
            )

        # 检索
        retriever = self._vectorstore.as_retriever(
            search_kwargs={"k": self.config.retrieval_k}
        )
        docs = retriever.invoke(question)

        # 重排序
        if self.config.enable_reranker and self.reranker:
            reranked = self.reranker.rerank(question, docs, top_k=3)
            docs = [r["document"] for r in reranked]

        # 构建上下文
        context = "\n\n---\n\n".join(
            f"[来源: {d.metadata.get('source', '未知')}]\n{d.page_content}"
            for d in docs
        )

        # 生成答案
        prompt = ChatPromptTemplate.from_messages([
            ("system", """基于参考信息回答问题。

参考信息:
{context}

要求:
1. 只使用参考信息回答
2. 信息不足时明确说明
3. 标注来源"""),
            ("human", "{question}"),
        ])

        chain = prompt | self.llm | StrOutputParser()
        answer = chain.invoke({"context": context, "question": question})

        # 构建来源
        sources = [
            {
                "content": d.page_content[:200],
                "source": d.metadata.get("source", "未知"),
                "page": d.metadata.get("page", ""),
            }
            for d in docs
        ]

        latency = (time.time() - start_time) * 1000

        return RAGResult(
            answer=answer,
            sources=sources,
            latency_ms=round(latency, 2),
        )


# === 使用示例 ===
if __name__ == "__main__":
    # 配置
    config = RAGConfig(
        embedding_model="BAAI/bge-base-zh-v1.5",
        chunk_size=800,
        chunk_overlap=200,
        enable_reranker=False,
    )

    pipeline = RAGPipeline(config)

    # 创建测试文档
    docs = [
        Document(
            page_content="公司退货政策:购买后30天内可无理由退货,需保持商品完好。",
            metadata={"source": "policy.pdf", "page": 1},
        ),
        Document(
            page_content="产品保修政策:所有电子产品享受一年官方保修。",
            metadata={"source": "policy.pdf", "page": 2},
        ),
    ]

    # 索引
    pipeline.index_documents(docs)

    # 查询
    result = pipeline.query("退货政策是什么?")
    print(f"答案: {result.answer}")
    print(f"来源: {result.sources}")
    print(f"耗时: {result.latency_ms}ms")

7.3 RAG 流程图

LLM VectorStore RAG Pipeline 用户 LLM VectorStore RAG Pipeline 用户 提交问题 检索相关文档 返回 Top-K 文档 格式化上下文 发送问题+上下文 生成答案 返回答案+来源

8. Agentic RAG:让 RAG 更智能

8.1 传统 RAG vs Agentic RAG

AgenticRAG

简单问题

知识库问题

实时信息

相关

不相关

查询

智能路由

直接回答

RAG检索

网络搜索

评估结果

生成答案

改写查询

传统RAG

所有查询

相同管道

固定流程

维度 传统 RAG Agentic RAG
查询处理 所有查询走相同管道 智能路由,按需选择策略
检索失败 返回低质量结果 自我纠错,改写查询重试
知识范围 仅限本地知识库 可扩展到网络搜索
多步推理 不支持 支持多步检索和推理

8.2 使用 LangChain 实现 Agentic RAG

"""
rag/agentic_rag.py - Agentic RAG
使用 LangChain Agent 实现智能检索
"""
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from typing import Optional, List
import os


class AgenticRAG:
    """
    Agentic RAG

    特性:
    - 智能路由:判断查询该走 RAG 还是直接回答
    - 自我纠错:检索结果不相关时自动改写查询重试
    - 工具调用:可扩展到网络搜索等
    """

    def __init__(
        self,
        vectorstore: Chroma,
        llm_model: str = "deepseek-chat",
        llm_base_url: str = "https://api.deepseek.com/v1",
    ):
        self.vectorstore = vectorstore

        # 创建 LLM
        self.llm = ChatOpenAI(
            model=llm_model,
            base_url=llm_base_url,
            api_key=os.getenv("DEEPSEEK_API_KEY"),
            temperature=0,
        )

        # 创建工具
        self.tools = self._create_tools()

        # 创建 Agent
        self.agent_executor = self._create_agent()

    def _create_tools(self):
        """创建工具"""

        @tool
        def search_knowledge_base(query: str) -> str:
            """
            搜索知识库,查找与问题相关的信息。

            当问题涉及公司政策、产品信息、技术文档等内容时使用此工具。

            Args:
                query: 搜索查询

            Returns:
                相关文档内容
            """
            retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3})
            docs = retriever.invoke(query)

            if not docs:
                return "未找到相关信息"

            return "\n\n---\n\n".join(
                f"[来源: {d.metadata.get('source', '未知')}]\n{d.page_content}"
                for d in docs
            )

        @tool
        def direct_answer(question: str) -> str:
            """
            直接回答简单问题。

            当问题是常识性问题、问候、或不需要检索知识库时使用此工具。

            Args:
                question: 用户问题

            Returns:
                直接回答
            """
            # 简单问题直接让 LLM 回答
            return f"这是一个可以直接回答的问题: {question}"

        return [search_knowledge_base, direct_answer]

    def _create_agent(self):
        """创建 Agent"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个智能助手,可以使用工具来回答问题。

工具使用指南:
1. search_knowledge_base: 当问题涉及公司政策、产品信息、技术文档时使用
2. direct_answer: 当问题是常识性问题、问候、或不需要检索时使用

请根据问题类型选择合适的工具。"""),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}"),
        ])

        agent = create_tool_calling_agent(self.llm, self.tools, prompt)

        return AgentExecutor(
            agent=agent,
            tools=self.tools,
            verbose=True,
            handle_parsing_errors=True,
        )

    def query(self, question: str) -> str:
        """查询"""
        result = self.agent_executor.invoke({"input": question})
        return result["output"]


# === 使用示例 ===
if __name__ == "__main__":
    # 创建嵌入模型
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5",
        model_kwargs={"device": "cpu"},
    )

    # 创建测试文档
    from langchain_core.documents import Document

    docs = [
        Document(
            page_content="公司退货政策:购买后30天内可无理由退货。",
            metadata={"source": "policy.pdf"},
        ),
        Document(
            page_content="产品保修政策:所有电子产品享受一年官方保修。",
            metadata={"source": "policy.pdf"},
        ),
    ]

    # 创建向量存储
    vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=embeddings,
    )

    # 创建 Agentic RAG
    rag = AgenticRAG(vectorstore)

    # 测试
    print("=== 测试知识库问题 ===")
    answer = rag.query("退货政策是什么?")
    print(f"答案: {answer}")

    print("\n=== 测试简单问题 ===")
    answer = rag.query("你好,今天天气怎么样?")
    print(f"答案: {answer}")

9. 小结与下一步

9.1 本篇回顾

知识点 掌握程度
RAG 的原理和完整流程
LangChain Document Loaders(PyPDFLoader, Docx2txtLoader)
LangChain Text Splitters(RecursiveCharacterTextSplitter)
LangChain Embeddings(HuggingFaceEmbeddings)
LangChain VectorStore(Chroma)
LangChain Retriever(VectorStoreRetriever, EnsembleRetriever)
使用 LCEL 构建 RAG Chain
Agentic RAG(智能路由 + 工具调用)

9.2 LangChain RAG 组件总览

LangChain RAG 组件

Document Loaders

Text Splitters

Embeddings

VectorStore

Retrievers

RAG Chains

PyPDFLoader

Docx2txtLoader

DirectoryLoader

RecursiveCharacterTextSplitter

MarkdownHeaderTextSplitter

HuggingFaceEmbeddings

OpenAIEmbeddings

Chroma

Milvus

FAISS

VectorStoreRetriever

EnsembleRetriever

MultiQueryRetriever

LCEL RAG Chain

create_retrieval_chain

9.3 生产级 RAG 决策树

PDF/Word

Markdown

混合

中文

英文

多语言

开发/小规模

生产/大规模

开始 RAG 项目

文档类型?

PyPDFLoader/Docx2txtLoader

TextLoader + MarkdownHeaderTextSplitter

DirectoryLoader

RecursiveCharacterTextSplitter
chunk_size=800, overlap=200

嵌入模型选择

BAAI/bge-base-zh-v1.5

text-embedding-3-small

BAAI/bge-m3

向量库选择

Chroma

Milvus

是否需要混合搜索?

EnsembleRetriever
向量 + BM25

VectorStoreRetriever

是否需要重排序?

BGE Reranker

直接使用

LCEL RAG Chain

9.4 下一篇预告

第五篇:综合案例(上) 将实战构建一个完整的智能客服系统:

  • 需求分析与架构设计
  • 使用 LangChain Chain 构建多 Agent 系统
  • 集成 RAG 知识库
  • 对话管理与人机协作

系列导航 | 第一篇:概述与环境搭建 | 第二篇:Prompt 工程与工具调用 | 第三篇:记忆与数据库集成 | 第四篇:RAG 检索增强生成 | 第五篇:综合案例(上) | 第六篇:综合案例(下)

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐