AI Agent 开发实战教程(四):RAG 检索增强生成
·
目录
- 1. RAG 概述:为什么需要检索增强
- 2. LangChain 文档加载器
- 3. 文本分块策略
- 4. 嵌入模型:让文本变成向量
- 5. 向量检索与混合搜索
- 6. 重排序(Reranking)
- 7. LangChain RAG Chain 完整实现
- 8. Agentic RAG:让 RAG 更智能
- 9. 小结与下一步
1. RAG 概述:为什么需要检索增强
1.1 LLM 的两大局限
| 局限 | 说明 | 例子 |
|---|---|---|
| 知识过时 | 训练数据有截止日期 | 问 2026 年的新闻,LLM 不知道 |
| 缺乏私有知识 | 无法访问企业内部数据 | 问"公司退货政策",LLM 不知道 |
1.2 RAG 的解决思路
1.3 RAG 的完整流程
1.4 安装依赖
# LangChain 核心组件
pip install langchain langchain-core langchain-openai langchain-community langchain-text-splitters
# 文档解析
pip install pymupdf # PDF 解析(轻量快速)
pip install python-docx # Word 文档
pip install beautifulsoup4 # HTML 解析
# 嵌入模型
pip install sentence-transformers # 通用嵌入模型库
# 向量数据库
pip install chromadb # 轻量级向量库
# 搜索增强
pip install rank_bm25 jieba # BM25 关键词搜索 + 中文分词
# 可选:重排序
pip install FlagEmbedding # BGE Reranker
2. LangChain 文档加载器
2.1 LangChain Document Loaders 概览
2.2 统一文档加载器实现
"""
rag/document_loader.py - LangChain 文档加载器
"""
from langchain_community.document_loaders import (
PyPDFLoader,
Docx2txtLoader,
UnstructuredHTMLLoader,
TextLoader,
DirectoryLoader,
)
from langchain_core.documents import Document
from typing import List, Optional
from pathlib import Path
import hashlib
class UnifiedDocumentLoader:
"""
统一文档加载器
基于 LangChain Document Loaders 实现
支持 PDF、Word、HTML、Markdown、TXT 格式
"""
def __init__(self, encoding: str = "utf-8"):
self.encoding = encoding
def load_file(self, file_path: str) -> List[Document]:
"""
加载单个文件
Args:
file_path: 文件路径
Returns:
LangChain Document 列表
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
suffix = path.suffix.lower()
# 根据文件类型选择加载器
if suffix == ".pdf":
loader = PyPDFLoader(file_path)
elif suffix in [".docx", ".doc"]:
loader = Docx2txtLoader(file_path)
elif suffix in [".html", ".htm"]:
loader = UnstructuredHTMLLoader(file_path)
elif suffix in [".md", ".txt"]:
loader = TextLoader(file_path, encoding=self.encoding)
else:
raise ValueError(f"不支持的文件格式: {suffix}")
# 加载文档
documents = loader.load()
# 添加文件哈希和来源信息
file_hash = self._compute_hash(file_path)
for doc in documents:
doc.metadata["source"] = str(path)
doc.metadata["file_type"] = suffix
doc.metadata["file_hash"] = file_hash
return documents
def load_directory(
self,
directory_path: str,
glob_pattern: str = "**/*.pdf",
) -> List[Document]:
"""
批量加载目录中的文件
Args:
directory_path: 目录路径
glob_pattern: 文件匹配模式
Returns:
Document 列表
"""
loader = DirectoryLoader(
directory_path,
glob=glob_pattern,
show_progress=True,
)
return loader.load()
def load_multiple(self, file_paths: List[str]) -> List[Document]:
"""
加载多个文件
Args:
file_paths: 文件路径列表
Returns:
Document 列表
"""
all_documents = []
for path in file_paths:
try:
docs = self.load_file(path)
all_documents.extend(docs)
print(f"✅ {path}: {len(docs)} 页/段")
except Exception as e:
print(f"❌ {path}: {e}")
return all_documents
@staticmethod
def _compute_hash(file_path: str) -> str:
"""计算文件 SHA256 哈希"""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
# === 使用示例 ===
if __name__ == "__main__":
loader = UnifiedDocumentLoader()
# 加载单个文件
# pdf_docs = loader.load_file("example.pdf")
# print(f"PDF: {len(pdf_docs)} 页")
# 加载多个文件
# all_docs = loader.load_multiple(["doc1.pdf", "doc2.docx", "doc3.md"])
# 批量加载目录
# dir_docs = loader.load_directory("./knowledge_base", glob_pattern="**/*.pdf")
3. 文本分块策略
3.1 为什么需要分块
3.2 LangChain Text Splitters 对比
| 策略 | LangChain 类 | 原理 | 适合场景 | 推荐度 |
|---|---|---|---|---|
| 固定大小 | CharacterTextSplitter |
按字符数切分 | 结构均匀的文档 | ⭐⭐ |
| 递归分块 | RecursiveCharacterTextSplitter |
按分隔符层次切分 | 通用场景(生产默认) | ⭐⭐⭐⭐⭐ |
| Markdown 分块 | MarkdownHeaderTextSplitter |
按标题层级切分 | Markdown / 技术文档 | ⭐⭐⭐⭐ |
| 代码分块 | LanguageTextSplitter |
按语法结构切分 | 代码文档 | ⭐⭐⭐⭐ |
| 语义分块 | SemanticChunker |
按语义相似度切分 | 主题频繁切换 | ⭐⭐⭐ |
3.3 LangChain 分块器实现
"""
rag/text_splitter.py - LangChain 文本分割器
"""
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter,
CharacterTextSplitter,
)
from langchain_core.documents import Document
from typing import List, Optional
class DocumentChunker:
"""
文档分块器
基于 LangChain Text Splitters 实现
"""
def __init__(
self,
chunk_size: int = 800,
chunk_overlap: int = 200,
separators: Optional[List[str]] = None,
):
"""
Args:
chunk_size: 每个块的最大字符数
chunk_overlap: 相邻块之间的重叠字符数
separators: 分隔符列表(优先级从高到低)
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 默认分隔符(适合中文)
self.separators = separators or ["\n\n", "\n", "。", ".", " ", ""]
# 创建递归分块器
self.recursive_splitter = RecursiveCharacterTextSplitter(
separators=self.separators,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
add_start_index=True,
)
def split_documents(
self,
documents: List[Document],
chunk_strategy: str = "recursive",
) -> List[Document]:
"""
对文档列表进行分块
Args:
documents: LangChain Document 列表
chunk_strategy: 分块策略
- "recursive": 递归分块(默认)
- "markdown": Markdown 分块
- "fixed": 固定大小分块
Returns:
分块后的 Document 列表
"""
all_chunks = []
for doc in documents:
if chunk_strategy == "markdown" and doc.metadata.get("file_type") == ".md":
chunks = self._split_markdown(doc)
else:
chunks = self.recursive_splitter.split_documents([doc])
# 添加块索引
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_index"] = i
all_chunks.extend(chunks)
return all_chunks
def _split_markdown(self, document: Document) -> List[Document]:
"""Markdown 分块(按标题层级)"""
# 按标题分割
headers_to_split_on = [
("#", "header_1"),
("##", "header_2"),
("###", "header_3"),
]
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on,
strip_headers=False,
)
md_chunks = md_splitter.split_text(document.page_content)
# 如果块仍然太大,再用递归分块
final_chunks = []
for chunk in md_chunks:
if len(chunk.page_content) > self.chunk_size:
sub_chunks = self.recursive_splitter.split_documents([chunk])
final_chunks.extend(sub_chunks)
else:
final_chunks.append(chunk)
return final_chunks
def split_text(self, text: str) -> List[str]:
"""
对纯文本进行分块
Args:
text: 文本内容
Returns:
分块后的文本列表
"""
return self.recursive_splitter.split_text(text)
# === 使用示例 ===
if __name__ == "__main__":
from rag.document_loader import UnifiedDocumentLoader
# 加载文档
loader = UnifiedDocumentLoader()
# docs = loader.load_file("example.pdf")
# 分块
chunker = DocumentChunker(chunk_size=800, chunk_overlap=200)
# chunks = chunker.split_documents(docs)
# print(f"原始文档: {len(docs)} 页/段")
# print(f"分块结果: {len(chunks)} 块")
# 测试纯文本分块
text = """
人工智能是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。
机器学习是人工智能的核心,是使计算机具有智能的根本途径。
机器学习专门研究计算机怎样模拟或实现人类的学习行为,
以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。
"""
chunks = chunker.split_text(text)
print(f"分块数量: {len(chunks)}")
for i, chunk in enumerate(chunks):
print(f"\n--- 块 {i+1} ({len(chunk)} 字符) ---")
print(chunk[:100] + "...")
3.4 分块参数选择指南
| 场景 | chunk_size | chunk_overlap | 说明 |
|---|---|---|---|
| 事实性问答 | 256-512 | 50-100 | 精准匹配,减少噪音 |
| 通用场景 | 500-1000 | 100-200 | 推荐默认值 |
| 分析性问答 | 1000-2000 | 200-400 | 需要更多上下文 |
| 代码文档 | 500-1500 | 50-100 | 保持代码完整性 |
| 法律/合同 | 1500-3000 | 200 | 保持条款完整 |
💡 经验法则:overlap 设为 chunk_size 的 10%-20%,确保相邻块之间有足够的上下文衔接。
4. 嵌入模型:让文本变成向量
4.1 中文嵌入模型选择
| 模型 | 维度 | 特点 | 推荐场景 |
|---|---|---|---|
BAAI/bge-small-zh-v1.5 |
512 | 轻量快速 | 开发测试、资源受限 |
BAAI/bge-base-zh-v1.5 |
768 | 平衡性能 | 通用推荐 |
BAAI/bge-large-zh-v1.5 |
1024 | 中文效果最佳 | 追求最高质量 |
BAAI/bge-m3 |
1024 | 多语言+多功能 | 中英混合场景 |
4.2 LangChain Embeddings 封装
"""
rag/embeddings.py - LangChain Embedding 模型
"""
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_core.embeddings import Embeddings
from typing import List, Optional
import os
import numpy as np
def get_embeddings(
provider: str = "bge",
model_name: Optional[str] = None,
device: str = "cpu",
) -> Embeddings:
"""
获取 LangChain Embeddings 实例
Args:
provider: 模型提供商
- "bge": BGE 中文模型(本地运行,推荐)
- "openai": OpenAI Embeddings
- "deepseek": DeepSeek Embeddings
model_name: 模型名称(可选)
device: 设备,"cpu" 或 "cuda"
Returns:
LangChain Embeddings 实例
"""
if provider == "bge":
model = model_name or "BAAI/bge-base-zh-v1.5"
return HuggingFaceEmbeddings(
model_name=model,
model_kwargs={"device": device},
encode_kwargs={"normalize_embeddings": True},
)
elif provider == "openai":
return OpenAIEmbeddings(
model=model_name or "text-embedding-3-small",
openai_api_key=os.getenv("OPENAI_API_KEY"),
)
elif provider == "deepseek":
# DeepSeek 兼容 OpenAI 接口
return OpenAIEmbeddings(
model=model_name or "text-embedding-3-small",
openai_api_key=os.getenv("DEEPSEEK_API_KEY"),
openai_api_base="https://api.deepseek.com/v1",
)
else:
raise ValueError(f"未知的 Embedding 提供商: {provider}")
class EmbeddingUtils:
"""Embedding 工具类"""
@staticmethod
def similarity(vec1: List[float], vec2: List[float]) -> float:
"""计算余弦相似度"""
arr1 = np.array(vec1)
arr2 = np.array(vec2)
return float(np.dot(arr1, arr2) / (np.linalg.norm(arr1) * np.linalg.norm(arr2)))
@staticmethod
def batch_similarity(
query_vec: List[float],
doc_vecs: List[List[float]],
) -> List[float]:
"""批量计算相似度"""
query = np.array(query_vec)
docs = np.array(doc_vecs)
# 向量已归一化,直接点积
scores = np.dot(docs, query)
return scores.tolist()
# === 使用示例 ===
if __name__ == "__main__":
# 获取 BGE 嵌入模型
embeddings = get_embeddings("bge")
# 嵌入单个查询
query_vec = embeddings.embed_query("什么是机器学习?")
print(f"查询向量维度: {len(query_vec)}")
# 批量嵌入文档
docs = ["机器学习是AI的分支", "深度学习使用神经网络", "今天天气不错"]
doc_vecs = embeddings.embed_documents(docs)
print(f"文档向量数量: {len(doc_vecs)}, 维度: {len(doc_vecs[0])}")
# 计算相似度
score = EmbeddingUtils.similarity(
embeddings.embed_query("机器学习"),
embeddings.embed_query("深度学习"),
)
print(f"相似度: {score:.4f}")
5. 向量检索与混合搜索
5.1 LangChain Retriever 架构
5.2 向量检索器实现
"""
rag/retriever.py - LangChain 检索器
"""
from langchain_community.vectorstores import Chroma
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from typing import List, Dict, Optional
import jieba
class VectorStoreRetrieverWrapper:
"""
向量存储检索器封装
基于 LangChain Chroma 实现
"""
def __init__(
self,
embeddings: Embeddings,
collection_name: str = "rag_docs",
persist_dir: str = "./chroma_db",
):
self.embeddings = embeddings
self.collection_name = collection_name
self.persist_dir = persist_dir
self.vectorstore: Optional[Chroma] = None
def add_documents(self, documents: List[Document]):
"""添加文档到向量库"""
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
collection_name=self.collection_name,
persist_directory=self.persist_dir,
)
print(f"[VectorStore] 添加了 {len(documents)} 个文档块")
def as_retriever(self, search_kwargs: Optional[Dict] = None):
"""获取 LangChain Retriever"""
if self.vectorstore is None:
raise RuntimeError("请先添加文档")
return self.vectorstore.as_retriever(
search_kwargs=search_kwargs or {"k": 5}
)
def similarity_search(self, query: str, k: int = 5) -> List[Document]:
"""相似度搜索"""
if self.vectorstore is None:
raise RuntimeError("请先添加文档")
return self.vectorstore.similarity_search(query, k=k)
def similarity_search_with_score(
self,
query: str,
k: int = 5,
) -> List[tuple]:
"""带分数的相似度搜索"""
if self.vectorstore is None:
raise RuntimeError("请先添加文档")
return self.vectorstore.similarity_search_with_score(query, k=k)
class HybridRetrieverWrapper:
"""
混合检索器封装
结合向量检索和 BM25 关键词检索
使用 LangChain EnsembleRetriever 实现
"""
def __init__(
self,
embeddings: Embeddings,
alpha: float = 0.5,
):
"""
Args:
embeddings: LangChain Embeddings 实例
alpha: 向量检索权重(1-alpha 为 BM25 权重)
"""
self.embeddings = embeddings
self.alpha = alpha
self.documents: List[Document] = []
self.vectorstore: Optional[Chroma] = None
self.bm25_retriever = None
self.ensemble_retriever = None
def add_documents(self, documents: List[Document]):
"""添加文档"""
self.documents = documents
# 创建向量存储
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
)
# 创建 BM25 检索器
self.bm25_retriever = BM25Retriever.from_documents(
documents,
# 使用 jieba 分词
preprocess_func=lambda text: " ".join(jieba.cut(text)),
)
# 创建混合检索器
vector_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
self.bm25_retriever.k = 10
self.ensemble_retriever = EnsembleRetriever(
retrievers=[vector_retriever, self.bm25_retriever],
weights=[self.alpha, 1 - self.alpha],
)
print(f"[HybridRetriever] 添加了 {len(documents)} 个文档块")
def retrieve(self, query: str, k: int = 5) -> List[Document]:
"""检索"""
if self.ensemble_retriever is None:
raise RuntimeError("请先添加文档")
return self.ensemble_retriever.invoke(query)[:k]
def retrieve_with_scores(self, query: str, k: int = 5) -> List[Dict]:
"""带分数的检索"""
# 向量检索
vector_results = self.vectorstore.similarity_search_with_score(query, k=k * 2)
# 合并结果
results = []
seen = set()
for doc, score in vector_results:
doc_id = doc.page_content[:50]
if doc_id not in seen:
seen.add(doc_id)
results.append({
"document": doc,
"score": float(score),
})
return results[:k]
# === 使用示例 ===
if __name__ == "__main__":
from rag.embeddings import get_embeddings
# 获取嵌入模型
embeddings = get_embeddings("bge")
# 准备测试文档
docs = [
Document(page_content="机器学习是人工智能的一个分支,让计算机从数据中学习"),
Document(page_content="深度学习使用多层神经网络处理数据"),
Document(page_content="Python是一种流行的编程语言,语法简洁"),
Document(page_content="自然语言处理让计算机理解人类语言"),
]
# 向量检索
print("=== 向量检索 ===")
vector_retriever = VectorStoreRetrieverWrapper(embeddings)
vector_retriever.add_documents(docs)
results = vector_retriever.similarity_search("什么是深度学习", k=3)
for doc in results:
print(f" {doc.page_content[:50]}...")
# 混合检索
print("\n=== 混合检索 ===")
hybrid_retriever = HybridRetrieverWrapper(embeddings, alpha=0.7)
hybrid_retriever.add_documents(docs)
results = hybrid_retriever.retrieve("神经网络", k=3)
for doc in results:
print(f" {doc.page_content[:50]}...")
6. 重排序(Reranking)
6.1 为什么需要重排序
研究表明,重排序可以:
- 提升检索准确率 10-30%
- 减少 35% 的 LLM 幻觉
6.2 LangChain Reranker 集成
"""
rag/reranker.py - LangChain 重排序器
"""
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank
from langchain_core.documents import Document
from typing import List, Dict, Optional
class BGEReranker:
"""
BGE Reranker 重排序器
安装: pip install FlagEmbedding
模型: BAAI/bge-reranker-v2-m3(多语言,支持中英文)
"""
def __init__(
self,
model_name: str = "BAAI/bge-reranker-v2-m3",
device: str = "cpu",
):
from FlagEmbedding import FlagReranker
self.reranker = FlagReranker(
model_name,
use_fp16=(device == "cuda"),
)
print(f"[Reranker] 模型: {model_name}")
def rerank(
self,
query: str,
documents: List[Document],
top_k: int = 5,
) -> List[Dict]:
"""
对检索结果进行重排序
Args:
query: 查询文本
documents: 候选文档列表
top_k: 返回数量
Returns:
重排序后的结果列表
"""
if not documents:
return []
# 构建 query-doc 对
pairs = [(query, doc.page_content) for doc in documents]
# 批量评分
scores = self.reranker.compute_score(pairs)
# 确保是列表
if not isinstance(scores, list):
scores = [scores]
# 排序
scored = list(zip(documents, scores))
scored.sort(key=lambda x: x[1], reverse=True)
return [
{
"document": doc,
"rerank_score": float(score),
}
for doc, score in scored[:top_k]
]
class FlashrankRerankerWrapper:
"""
Flashrank Reranker 封装
轻量级,适合快速部署
安装: pip install flashrank
"""
def __init__(self, model_name: str = "ms-marco-MiniLM-L-12-v2"):
self.compressor = FlashrankRerank(model_name=model_name)
print(f"[Flashrank] 模型: {model_name}")
def create_compression_retriever(self, base_retriever):
"""
创建压缩检索器
将 Reranker 与检索器结合
"""
return ContextualCompressionRetriever(
base_compressor=self.compressor,
base_retriever=base_retriever,
)
# === 使用示例 ===
if __name__ == "__main__":
# 模拟检索结果
docs = [
Document(page_content="机器学习是人工智能的核心技术之一"),
Document(page_content="今天北京天气晴朗"),
Document(page_content="深度学习是机器学习的子集,使用神经网络"),
Document(page_content="Python由Guido van Rossum创建"),
Document(page_content="强化学习通过奖励信号训练智能体"),
]
# BGE Reranker
reranker = BGEReranker()
results = reranker.rerank("什么是深度学习?", docs, top_k=3)
print("=== 重排序结果 ===")
for r in results:
print(f" [{r['rerank_score']:.4f}] {r['document'].page_content[:50]}...")
7. LangChain RAG Chain 完整实现
7.1 使用 LCEL 构建 RAG Chain
"""
rag/rag_chain.py - LangChain RAG Chain
使用 LCEL 构建完整的 RAG 管道
"""
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.documents import Document
from typing import List, Dict, Optional
import os
def create_rag_chain(
vectorstore: Chroma,
llm_model: str = "deepseek-chat",
llm_base_url: str = "https://api.deepseek.com/v1",
temperature: float = 0.3,
):
"""
创建 RAG Chain
使用 LCEL 语法构建
Args:
vectorstore: LangChain 向量存储
llm_model: LLM 模型名称
llm_base_url: LLM API 地址
temperature: 温度参数
Returns:
RAG Chain
"""
# 1. 创建 LLM
llm = ChatOpenAI(
model=llm_model,
base_url=llm_base_url,
api_key=os.getenv("DEEPSEEK_API_KEY"),
temperature=temperature,
)
# 2. 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 3. 创建 Prompt
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个智能助手,基于提供的参考信息回答问题。
参考信息:
{context}
请遵循以下原则:
1. 只使用参考信息中的内容回答问题
2. 如果参考信息不足以回答问题,请明确说明
3. 回答要准确、简洁、有条理
4. 标注信息来源"""),
("human", "{question}"),
])
# 4. 文档格式化函数
def format_docs(docs: List[Document]) -> str:
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "未知")
page = doc.metadata.get("page", "")
formatted.append(f"[文档{i}] 来源: {source} 第{page}页\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
# 5. 构建 RAG Chain (LCEL)
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return rag_chain, retriever
def create_rag_chain_with_sources(
vectorstore: Chroma,
llm_model: str = "deepseek-chat",
llm_base_url: str = "https://api.deepseek.com/v1",
temperature: float = 0.3,
):
"""
创建带来源追踪的 RAG Chain
返回答案和来源文档
"""
llm = ChatOpenAI(
model=llm_model,
base_url=llm_base_url,
api_key=os.getenv("DEEPSEEK_API_KEY"),
temperature=temperature,
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
prompt = ChatPromptTemplate.from_messages([
("system", """基于参考信息回答问题。
参考信息:
{context}
回答时标注来源编号。"""),
("human", "{question}"),
])
def format_docs(docs: List[Document]) -> str:
return "\n\n".join(
f"[{i}] {doc.page_content}"
for i, doc in enumerate(docs, 1)
)
# 使用 RunnableParallel 同时返回答案和来源
rag_chain_from_docs = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 完整 Chain
rag_chain_with_sources = RunnableParallel(
answer=rag_chain_from_docs,
sources=retriever,
)
return rag_chain_with_sources
# === 使用示例 ===
if __name__ == "__main__":
# 创建嵌入模型
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5",
model_kwargs={"device": "cpu"},
)
# 创建测试文档
docs = [
Document(
page_content="公司退货政策:购买后30天内可无理由退货,需保持商品完好。退货运费由买家承担。",
metadata={"source": "policy.pdf", "page": 1},
),
Document(
page_content="产品保修政策:所有电子产品享受一年官方保修。保修期内非人为损坏免费维修。",
metadata={"source": "policy.pdf", "page": 2},
),
Document(
page_content="配送说明:默认顺丰快递。一线城市次日达,满99元包邮。",
metadata={"source": "policy.pdf", "page": 3},
),
]
# 创建向量存储
vectorstore = Chroma.from_documents(
documents=docs,
embedding=embeddings,
)
# 创建 RAG Chain
rag_chain, retriever = create_rag_chain(vectorstore)
# 查询
question = "退货政策是什么?"
answer = rag_chain.invoke(question)
print(f"问题: {question}")
print(f"答案: {answer}")
# 带来源的查询
print("\n=== 带来源追踪 ===")
rag_chain_sources = create_rag_chain_with_sources(vectorstore)
result = rag_chain_sources.invoke("保修期多久?")
print(f"答案: {result['answer']}")
print(f"来源文档数: {len(result['sources'])}")
7.2 完整 RAG 管道类
"""
rag/rag_pipeline.py - 完整 RAG 管道
"""
from langchain_openai import ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from dataclasses import dataclass
from typing import List, Dict, Optional
import os
import time
@dataclass
class RAGConfig:
"""RAG 配置"""
# 嵌入模型
embedding_model: str = "BAAI/bge-base-zh-v1.5"
# 分块
chunk_size: int = 800
chunk_overlap: int = 200
# 检索
retrieval_k: int = 5
# 重排序
enable_reranker: bool = False
# LLM
llm_model: str = "deepseek-chat"
llm_base_url: str = "https://api.deepseek.com/v1"
llm_temperature: float = 0.3
# 向量库
chroma_dir: str = "./rag_chroma_db"
collection_name: str = "rag_docs"
@dataclass
class RAGResult:
"""RAG 查询结果"""
answer: str
sources: List[Dict]
latency_ms: float = 0
class RAGPipeline:
"""
完整 RAG 管道
整合文档加载、分块、嵌入、检索、生成
"""
def __init__(self, config: Optional[RAGConfig] = None):
self.config = config or RAGConfig()
# 延迟初始化
self._embeddings = None
self._vectorstore = None
self._llm = None
self._reranker = None
@property
def embeddings(self):
if self._embeddings is None:
self._embeddings = HuggingFaceEmbeddings(
model_name=self.config.embedding_model,
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
return self._embeddings
@property
def llm(self):
if self._llm is None:
self._llm = ChatOpenAI(
model=self.config.llm_model,
base_url=self.config.llm_base_url,
api_key=os.getenv("DEEPSEEK_API_KEY"),
temperature=self.config.llm_temperature,
)
return self._llm
@property
def reranker(self):
if self._reranker is None and self.config.enable_reranker:
from rag.reranker import BGEReranker
self._reranker = BGEReranker()
return self._reranker
def index_documents(self, documents: List[Document]):
"""
索引文档
Args:
documents: LangChain Document 列表
"""
print(f"[RAG] 索引 {len(documents)} 个文档块...")
self._vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
collection_name=self.config.collection_name,
persist_directory=self.config.chroma_dir,
)
print(f"[RAG] 索引完成")
def index_from_files(self, file_paths: List[str]):
"""
从文件索引
Args:
file_paths: 文件路径列表
"""
from rag.document_loader import UnifiedDocumentLoader
from rag.text_splitter import DocumentChunker
# 加载文档
loader = UnifiedDocumentLoader()
all_docs = []
for path in file_paths:
try:
docs = loader.load_file(path)
all_docs.extend(docs)
print(f"✅ {path}: {len(docs)} 页/段")
except Exception as e:
print(f"❌ {path}: {e}")
# 分块
chunker = DocumentChunker(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap,
)
chunks = chunker.split_documents(all_docs)
# 索引
self.index_documents(chunks)
def query(self, question: str) -> RAGResult:
"""
查询
Args:
question: 用户问题
Returns:
RAGResult
"""
start_time = time.time()
if self._vectorstore is None:
return RAGResult(
answer="知识库尚未建立,请先索引文档。",
sources=[],
)
# 检索
retriever = self._vectorstore.as_retriever(
search_kwargs={"k": self.config.retrieval_k}
)
docs = retriever.invoke(question)
# 重排序
if self.config.enable_reranker and self.reranker:
reranked = self.reranker.rerank(question, docs, top_k=3)
docs = [r["document"] for r in reranked]
# 构建上下文
context = "\n\n---\n\n".join(
f"[来源: {d.metadata.get('source', '未知')}]\n{d.page_content}"
for d in docs
)
# 生成答案
prompt = ChatPromptTemplate.from_messages([
("system", """基于参考信息回答问题。
参考信息:
{context}
要求:
1. 只使用参考信息回答
2. 信息不足时明确说明
3. 标注来源"""),
("human", "{question}"),
])
chain = prompt | self.llm | StrOutputParser()
answer = chain.invoke({"context": context, "question": question})
# 构建来源
sources = [
{
"content": d.page_content[:200],
"source": d.metadata.get("source", "未知"),
"page": d.metadata.get("page", ""),
}
for d in docs
]
latency = (time.time() - start_time) * 1000
return RAGResult(
answer=answer,
sources=sources,
latency_ms=round(latency, 2),
)
# === 使用示例 ===
if __name__ == "__main__":
# 配置
config = RAGConfig(
embedding_model="BAAI/bge-base-zh-v1.5",
chunk_size=800,
chunk_overlap=200,
enable_reranker=False,
)
pipeline = RAGPipeline(config)
# 创建测试文档
docs = [
Document(
page_content="公司退货政策:购买后30天内可无理由退货,需保持商品完好。",
metadata={"source": "policy.pdf", "page": 1},
),
Document(
page_content="产品保修政策:所有电子产品享受一年官方保修。",
metadata={"source": "policy.pdf", "page": 2},
),
]
# 索引
pipeline.index_documents(docs)
# 查询
result = pipeline.query("退货政策是什么?")
print(f"答案: {result.answer}")
print(f"来源: {result.sources}")
print(f"耗时: {result.latency_ms}ms")
7.3 RAG 流程图
8. Agentic RAG:让 RAG 更智能
8.1 传统 RAG vs Agentic RAG
| 维度 | 传统 RAG | Agentic RAG |
|---|---|---|
| 查询处理 | 所有查询走相同管道 | 智能路由,按需选择策略 |
| 检索失败 | 返回低质量结果 | 自我纠错,改写查询重试 |
| 知识范围 | 仅限本地知识库 | 可扩展到网络搜索 |
| 多步推理 | 不支持 | 支持多步检索和推理 |
8.2 使用 LangChain 实现 Agentic RAG
"""
rag/agentic_rag.py - Agentic RAG
使用 LangChain Agent 实现智能检索
"""
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from typing import Optional, List
import os
class AgenticRAG:
"""
Agentic RAG
特性:
- 智能路由:判断查询该走 RAG 还是直接回答
- 自我纠错:检索结果不相关时自动改写查询重试
- 工具调用:可扩展到网络搜索等
"""
def __init__(
self,
vectorstore: Chroma,
llm_model: str = "deepseek-chat",
llm_base_url: str = "https://api.deepseek.com/v1",
):
self.vectorstore = vectorstore
# 创建 LLM
self.llm = ChatOpenAI(
model=llm_model,
base_url=llm_base_url,
api_key=os.getenv("DEEPSEEK_API_KEY"),
temperature=0,
)
# 创建工具
self.tools = self._create_tools()
# 创建 Agent
self.agent_executor = self._create_agent()
def _create_tools(self):
"""创建工具"""
@tool
def search_knowledge_base(query: str) -> str:
"""
搜索知识库,查找与问题相关的信息。
当问题涉及公司政策、产品信息、技术文档等内容时使用此工具。
Args:
query: 搜索查询
Returns:
相关文档内容
"""
retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3})
docs = retriever.invoke(query)
if not docs:
return "未找到相关信息"
return "\n\n---\n\n".join(
f"[来源: {d.metadata.get('source', '未知')}]\n{d.page_content}"
for d in docs
)
@tool
def direct_answer(question: str) -> str:
"""
直接回答简单问题。
当问题是常识性问题、问候、或不需要检索知识库时使用此工具。
Args:
question: 用户问题
Returns:
直接回答
"""
# 简单问题直接让 LLM 回答
return f"这是一个可以直接回答的问题: {question}"
return [search_knowledge_base, direct_answer]
def _create_agent(self):
"""创建 Agent"""
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个智能助手,可以使用工具来回答问题。
工具使用指南:
1. search_knowledge_base: 当问题涉及公司政策、产品信息、技术文档时使用
2. direct_answer: 当问题是常识性问题、问候、或不需要检索时使用
请根据问题类型选择合适的工具。"""),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_tool_calling_agent(self.llm, self.tools, prompt)
return AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True,
handle_parsing_errors=True,
)
def query(self, question: str) -> str:
"""查询"""
result = self.agent_executor.invoke({"input": question})
return result["output"]
# === 使用示例 ===
if __name__ == "__main__":
# 创建嵌入模型
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5",
model_kwargs={"device": "cpu"},
)
# 创建测试文档
from langchain_core.documents import Document
docs = [
Document(
page_content="公司退货政策:购买后30天内可无理由退货。",
metadata={"source": "policy.pdf"},
),
Document(
page_content="产品保修政策:所有电子产品享受一年官方保修。",
metadata={"source": "policy.pdf"},
),
]
# 创建向量存储
vectorstore = Chroma.from_documents(
documents=docs,
embedding=embeddings,
)
# 创建 Agentic RAG
rag = AgenticRAG(vectorstore)
# 测试
print("=== 测试知识库问题 ===")
answer = rag.query("退货政策是什么?")
print(f"答案: {answer}")
print("\n=== 测试简单问题 ===")
answer = rag.query("你好,今天天气怎么样?")
print(f"答案: {answer}")
9. 小结与下一步
9.1 本篇回顾
| 知识点 | 掌握程度 |
|---|---|
| RAG 的原理和完整流程 | ✅ |
| LangChain Document Loaders(PyPDFLoader, Docx2txtLoader) | ✅ |
| LangChain Text Splitters(RecursiveCharacterTextSplitter) | ✅ |
| LangChain Embeddings(HuggingFaceEmbeddings) | ✅ |
| LangChain VectorStore(Chroma) | ✅ |
| LangChain Retriever(VectorStoreRetriever, EnsembleRetriever) | ✅ |
| 使用 LCEL 构建 RAG Chain | ✅ |
| Agentic RAG(智能路由 + 工具调用) | ✅ |
9.2 LangChain RAG 组件总览
9.3 生产级 RAG 决策树
9.4 下一篇预告
第五篇:综合案例(上) 将实战构建一个完整的智能客服系统:
- 需求分析与架构设计
- 使用 LangChain Chain 构建多 Agent 系统
- 集成 RAG 知识库
- 对话管理与人机协作
系列导航 | 第一篇:概述与环境搭建 | 第二篇:Prompt 工程与工具调用 | 第三篇:记忆与数据库集成 | 第四篇:RAG 检索增强生成 | 第五篇:综合案例(上) | 第六篇:综合案例(下)
更多推荐

所有评论(0)