AI智能体平台开发文档

从零开始,手把手教你搭建一套生产级 AI 智能体管理平台
本文档将完整讲解框架设计、目录结构、代码逻辑、模块实现、扩展方法


目录


第一章 项目总览

1.1 项目目标

构建一个开箱即用的 AI 智能体管理平台,提供以下能力:

  • 智能体管理 - 创建/删除/对话多个独立 AI 角色
  • 技能/工具系统 - 函数注册 + Function Calling
  • RAG 知识库 - Milvus Lite + Embedding 实现文档问答
  • 多智能体协作 - 角色分工 + 自动任务流
  • 反思式推理 - 三步推理提升准确性
  • 多格式文档解析 - TXT/PDF/DOCX/XLSX/ZIP/MD

1.2 核心特性

特性 说明
零数据库 内置 SQLite,开箱即用
零向量库服务 Milvus Lite 嵌入式
零配置 .env 一份配置文件
跨平台 Windows / macOS / Linux
异步高性能 FastAPI async + httpx
完整 REST API OpenAPI 文档自动生成

1.3 技术栈

后端

  • Python 3.10+
  • FastAPI(Web 框架)
  • SQLAlchemy(ORM)
  • Milvus Lite(向量库)
  • httpx(异步 HTTP 客户端)
  • loguru(日志)

前端

  • Vue 3(CDN 引入)
  • Axios(HTTP 客户端)

第二章 架构设计

2.1 分层架构

┌─────────────────────────────────────────────────────────┐
│                 表现层 (Presentation)                    │
│              浏览器 / Vue 3 / Axios                      │
└────────────────────────┬────────────────────────────────┘
                         │ HTTP
┌────────────────────────▼────────────────────────────────┐
│                  路由层 (Routes)                         │
│                    FastAPI                               │
│  /agents  /skills  /rag  /crewai  /rerct  /system       │
└────────────────────────┬────────────────────────────────┘
                         │
┌────────────────────────▼────────────────────────────────┐
│                业务逻辑层 (Services)                      │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │
│  │  Agent   │ │  Skill   │ │   RAG    │ │  CrewAI  │   │
│  │ Manager  │ │ Registry │ │  System  │ │  /ReRCT  │   │
│  └──────────┘ └──────────┘ └──────────┘ └──────────┘   │
└────────────────────────┬────────────────────────────────┘
                         │
┌────────────────────────▼────────────────────────────────┐
│                 基础设施层 (Infrastructure)                │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │
│  │ SQLite   │ │ Milvus   │ │   LLM    │ │   日志   │   │
│  │ (ORM)    │ │  Lite    │ │  (httpx) │ │ (loguru) │   │
│  └──────────┘ └──────────┘ └──────────┘ └──────────┘   │
└─────────────────────────────────────────────────────────┘

2.2 核心设计模式

  • 单例模式 - RAGSystemAgentManagerEmbeddingService 全局单例
  • 注册表模式 - SkillRegistry 动态注册技能
  • 工厂模式 - DocumentParserFactory 多格式解析
  • 策略模式 - 不同 Embedding 策略可切换
  • 模板方法 - SkillBase 抽象基类,子类实现具体逻辑

2.3 数据流

智能体对话流程
用户输入
  │
  ▼
[可选] RAG 检索 ──── 查询 Milvus Lite ──── 返回相关文档
  │
  ▼
构造 prompt(系统提示 + 历史 + 用户消息 + RAG 上下文)
  │
  ▼
调用 LLM(OpenAI 兼容 API via httpx)
  │
  ▼
[如有] LLM 触发 Function Calling ──── 调用 Skill Registry ──── 执行具体工具
  │
  ▼
工具结果返回给 LLM ──── 重新生成最终回答
  │
  ▼
保存到对话历史 ──── 返回用户
文档上传与检索流程
用户上传文件
  │
  ▼
DocumentParserFactory 解析 ──── TXT/PDF/DOCX/XLSX/ZIP
  │
  ▼
TextChunker 切分(500 字/片,50 字 overlap)
  │
  ▼
EmbeddingService 向量化(OpenAI API 或本地模型)
  │
  ▼
存入 Milvus Lite collection
  │
  ▼
[查询] Embedding 问题 ──── Milvus 检索 top-k ──── 返回相关文档

第三章 环境搭建

3.1 Python 版本

要求 Python 3.10+:

python --version
# Python 3.10.x 或更高

3.2 创建虚拟环境(推荐)

# Windows
python -m venv venv
venv\Scripts\activate

# macOS / Linux
python3 -m venv venv
source venv/bin/activate

3.3 安装依赖

cd backend
pip install -r requirements.txt

关键依赖说明

用途
fastapi Web 框架
uvicorn ASGI 服务器
pydantic / pydantic-settings 数据验证与配置
sqlalchemy ORM
httpx 异步 HTTP 客户端
pymilvus[milvus-lite] 嵌入式向量库
loguru 日志库
pypdf / pdfplumber PDF 解析
python-docx DOCX 解析
openpyxl XLSX 解析

3.4 配置文件

cp .env.example .env

编辑 .env

# 大模型配置
OPENAI_API_KEY=sk-xxxxxxxx
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_MODEL=gpt-3.5-turbo

# 向量库(Milvus Lite 嵌入式)
MILVUS_DB_PATH=./data/milvus_lite.db
EMBEDDING_DIM=1536
EMBEDDING_MODEL=text-embedding-ada-002
USE_LOCAL_EMBEDDING=false

3.5 启动服务

python main.py

访问 http://localhost:8000/docs 查看 API 文档。


第四章 目录结构详解

ai-platform/
├── backend/                          # Python 后端
│   ├── main.py                       # FastAPI 入口
│   ├── requirements.txt              # 依赖清单
│   ├── .env.example                  # 配置模板
│   │
│   ├── core/                         # 核心配置
│   │   └── config.py                 # Settings(pydantic-settings)
│   │
│   ├── db/                           # 数据库层
│   │   └── models.py                 # SQLAlchemy 模型定义
│   │
│   ├── agents/                       # 智能体核心
│   │   ├── manager.py                # AgentManager 单例
│   │   ├── crewai.py                 # 多智能体协作
│   │   └── rerct.py                  # 反思式推理
│   │
│   ├── skills/                       # 技能系统
│   │   └── registry.py               # SkillRegistry + SkillBase
│   │
│   ├── rag/                          # 知识库
│   │   ├── rag_system.py             # RAGSystem + EmbeddingService
│   │   └── document_parser.py        # DocumentParserFactory
│   │
│   └── scheduler/                    # (已移除)
│
├── frontend/                         # 前端
│   ├── index.html                    # Vue 3 单页应用
│   └── styles.css                    # 样式
│
├── data/                             # 数据目录(自动创建)
│   ├── uploads/                      # 上传文件
│   ├── platform.db                   # SQLite 数据库
│   └── milvus_lite.db                # Milvus Lite 数据
│
└── docs/                             # 文档

第五章 配置系统

5.1 使用 pydantic-settings

backend/core/config.py 核心代码:

from pydantic_settings import BaseSettings
from pydantic import Field
from pathlib import Path


class Settings(BaseSettings):
    # ==================== 应用基础 ====================
    APP_NAME: str = "AI智能体平台"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = True
    HOST: str = "0.0.0.0"
    PORT: int = 8000

    # ==================== LLM ====================
    OPENAI_API_KEY: str = "sk-xxx"
    OPENAI_BASE_URL: str = "https://api.openai.com/v1"
    OPENAI_MODEL: str = "gpt-3.5-turbo"
    LLM_TIMEOUT: int = 60

    # ==================== Milvus Lite ====================
    MILVUS_DB_PATH: str = "./data/milvus_lite.db"
    MILVUS_COLLECTION_PREFIX: str = "ai_platform_"

    # ==================== Embedding ====================
    EMBEDDING_DIM: int = 1536
    USE_LOCAL_EMBEDDING: bool = False
    EMBEDDING_MODEL: str = "text-embedding-ada-002"

    # ==================== 路径 ====================
    DATA_DIR: Path = Field(default_factory=lambda: Path("./data"))

    class Config:
        env_file = ".env"          # 自动读取 .env
        env_file_encoding = "utf-8"
        case_sensitive = True

    def init_dirs(self):
        """启动时自动创建目录"""
        for d in [self.DATA_DIR, self.UPLOADS_DIR, self.LOGS_DIR]:
            d.mkdir(parents=True, exist_ok=True)


# 全局单例
settings = Settings()
settings.init_dirs()

5.2 使用方式

from core.config import settings

print(settings.OPENAI_MODEL)        # gpt-3.5-turbo
print(settings.DATA_DIR)            # PosixPath('.../data')

设计要点

  • 使用 pydantic-settings 自动从环境变量和 .env 读取
  • 类型安全(自动类型转换)
  • 集中管理所有配置
  • init_dirs() 在启动时确保目录存在

第六章 数据库层

6.1 SQLAlchemy + SQLite

backend/db/models.py 设计了 3 张表:

6.1.1 智能体配置表(agent_configs)
class AgentConfig(Base):
    __tablename__ = "agent_configs"

    id = Column(Integer, primary_key=True)
    agent_id = Column(String(50), unique=True)        # 业务ID
    name = Column(String(100))
    description = Column(Text)
    system_prompt = Column(Text)                       # 系统提示词
    model = Column(String(50))                         # 使用的 LLM
    temperature = Column(Integer)                      # 0-100
    skills = Column(Text)                              # JSON 数组
    created_at = Column(DateTime)
    updated_at = Column(DateTime)
6.1.2 聊天记录表(chat_histories)
class ChatHistory(Base):
    __tablename__ = "chat_histories"

    id = Column(Integer, primary_key=True)
    session_id = Column(String(50), index=True)        # 会话ID
    agent_id = Column(String(50), index=True)
    role = Column(String(20))                          # user / assistant
    content = Column(Text)
    tool_calls = Column(Text)                          # JSON
    created_at = Column(DateTime)
6.1.3 知识库表(knowledge_bases)
class KnowledgeBase(Base):
    __tablename__ = "knowledge_bases"

    id = Column(Integer, primary_key=True)
    kb_id = Column(String(50), unique=True)
    name = Column(String(100))
    description = Column(Text)
    document_count = Column(Integer)
    chunk_count = Column(Integer)
    created_at = Column(DateTime)

6.2 数据库初始化

def init_db():
    """创建所有表"""
    Base.metadata.create_all(bind=engine)


def get_db():
    """FastAPI 依赖注入"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

6.3 在 FastAPI 中使用

from db.models import get_db
from sqlalchemy.orm import Session


@app.get("/api/agents")
async def list_agents(db: Session = Depends(get_db)):
    agents = db.query(AgentConfig).all()
    return {"agents": [...]}

注意db: Session = Depends(get_db) 会在请求结束时自动关闭连接。


第七章 智能体核心

7.1 AgentManager 设计

backend/agents/manager.py 是整个系统的核心。

设计思想

  1. 单例模式 - 全局唯一管理器
  2. 内存池 - 所有 AgentRuntime 存在 _agents 字典
  3. 会话隔离 - 每个 agent 的对话历史按 session_id 分组
  4. 异步执行 - 全程使用 async/await

7.2 核心类

class AgentManager:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._init()
        return cls._instance

    def _init(self):
        self._agents: Dict[str, AgentRuntime] = {}

    def create_agent(self, agent_id, name, system_prompt, skills, model):
        runtime = AgentRuntime(agent_id, name, system_prompt, skills, model)
        self._agents[agent_id] = runtime
        return runtime

    def get_agent(self, agent_id):
        return self._agents.get(agent_id)

    def list_agents(self):
        return list(self._agents.values())

    async def send_message_between_agents(self, from_id, to_id, message):
        """A2A 通信"""
        from_agent = self.get_agent(from_id)
        to_agent = self.get_agent(to_id)
        # 转发消息到对方上下文
        response = await to_agent.process_internal(message, from_id)
        return response


# 全局单例
agent_manager = AgentManager()

7.3 AgentRuntime 核心

class AgentRuntime:
    def __init__(self, agent_id, name, system_prompt, skills, model):
        self.agent_id = agent_id
        self.name = name
        self.system_prompt = system_prompt
        self.skills = skills or []
        self.model = model or settings.OPENAI_MODEL

        # 按 session_id 分组的对话历史
        # 结构: {session_id: [{"role": "user", "content": "..."}]}
        self.sessions: Dict[str, List[Dict]] = {}

    async def chat(self, message, session_id="default"):
        """用户对话"""
        # 1. 初始化会话
        if session_id not in self.sessions:
            self.sessions[session_id] = []

        # 2. 添加用户消息
        self.sessions[session_id].append({"role": "user", "content": message})

        # 3. 调用 LLM
        messages = [{"role": "system", "content": self.system_prompt}] + \
                   self.sessions[session_id]

        response = await self._call_llm(messages)

        # 4. 保存回复
        self.sessions[session_id].append({"role": "assistant", "content": response})

        return response

    async def _call_llm(self, messages):
        """调用 OpenAI 兼容 API"""
        import httpx

        url = f"{settings.OPENAI_BASE_URL}/chat/completions"
        headers = {
            "Authorization": f"Bearer {settings.OPENAI_API_KEY}",
            "Content-Type": "application/json"
        }
        payload = {
            "model": self.model,
            "messages": messages,
            "temperature": 0.7
        }

        # 如果绑定了技能,传入工具定义
        if self.skills:
            tools = registry.get_tools_for_skills(self.skills)
            payload["tools"] = tools

        async with httpx.AsyncClient(timeout=settings.LLM_TIMEOUT) as client:
            response = await client.post(url, json=payload, headers=headers)
            data = response.json()

        return data["choices"][0]["message"]["content"]

7.4 Function Calling 处理

当 LLM 返回 tool_calls 时,需要循环调用工具直到拿到最终答案:

async def _call_llm_with_tools(self, messages):
    """支持 Function Calling 的 LLM 调用"""
    max_iterations = 5

    for _ in range(max_iterations):
        response = await self._call_llm(messages)
        message = response.choices[0].message

        # 没有工具调用,直接返回
        if not message.tool_calls:
            return message.content

        # 执行工具
        messages.append(message)
        for tool_call in message.tool_calls:
            tool_name = tool_call.function.name
            arguments = json.loads(tool_call.function.arguments)

            # 调用工具
            result = await registry.execute_tool(tool_name, arguments)

            # 工具结果加入消息
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": json.dumps(result, ensure_ascii=False)
            })

    return response.choices[0].message.content

7.5 智能体互通(A2A)

async def process_internal(self, message, sender_id):
    """接收来自其他 Agent 的消息"""
    system_msg = f"[系统] 你收到了来自 {sender_id} 的消息"
    return await self.chat(f"{system_msg}\n{message}", session_id=f"a2a_{sender_id}")

A2A 使用场景

  • 研究员 Agent 找到资料 → 转给写作 Agent
  • 客服 Agent 收集需求 → 转给订单 Agent

第八章 技能系统

8.1 核心设计

技能系统基于注册表模式 + Function Calling

backend/skills/registry.py

class SkillBase:
    """技能基类"""

    @property
    def name(self) -> str:
        raise NotImplementedError

    @property
    def description(self) -> str:
        raise NotImplementedError

    def get_tools(self) -> List[Dict]:
        """返回 OpenAI Function Calling 格式的工具定义"""
        raise NotImplementedError

    async def execute(self, tool_name: str, arguments: Dict) -> Any:
        """执行具体工具"""
        raise NotImplementedError

8.2 内置技能:Calculator

class CalculatorSkill(SkillBase):
    @property
    def name(self): return "calculator"

    @property
    def description(self): return "执行数学计算,支持四则运算、函数"

    def get_tools(self):
        return [{
            "type": "function",
            "function": {
                "name": "calculate",
                "description": "计算数学表达式",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "expression": {
                            "type": "string",
                            "description": "数学表达式,如 1+2*3"
                        }
                    },
                    "required": ["expression"]
                }
            }
        }]

    async def execute(self, tool_name, arguments):
        if tool_name == "calculate":
            expr = arguments.get("expression", "")
            # 安全过滤:只允许数字和运算符
            safe = "".join(c for c in expr if c in "0123456789.+-*/()% ")
            try:
                return {"result": eval(safe)}
            except Exception as e:
                return {"error": str(e)}

8.3 技能注册表

class SkillRegistry:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._init()
        return cls._instance

    def _init(self):
        self._skills: Dict[str, SkillBase] = {}
        # 注册内置技能
        self.register(CalculatorSkill())
        self.register(WebSearchSkill())
        self.register(DateTimeSkill())

    def register(self, skill: SkillBase):
        self._skills[skill.name] = skill

    def list_skills(self):
        return [
            {"name": s.name, "description": s.description}
            for s in self._skills.values()
        ]

    def get_tools_for_skills(self, skill_names):
        """获取指定技能的工具定义"""
        tools = []
        for name in skill_names:
            if name in self._skills:
                tools.extend(self._skills[name].get_tools())
        return tools

    async def execute_tool(self, tool_name, arguments):
        """根据工具名找到对应技能并执行"""
        for skill in self._skills.values():
            for tool in skill.get_tools():
                if tool["function"]["name"] == tool_name:
                    return await skill.execute(tool_name, arguments)
        return {"error": f"工具 {tool_name} 不存在"}


# 全局注册表
registry = SkillRegistry()

8.4 如何添加自定义技能

# 在 skills/registry.py 中添加
class WeatherSkill(SkillBase):
    @property
    def name(self): return "weather"

    @property
    def description(self): return "查询天气"

    def get_tools(self):
        return [{
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "查询某城市天气",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {"type": "string", "description": "城市名"}
                    },
                    "required": ["city"]
                }
            }
        }]

    async def execute(self, tool_name, arguments):
        if tool_name == "get_weather":
            city = arguments.get("city")
            # 调用天气 API
            return {"city": city, "weather": "晴"}


# 注册
registry.register(WeatherSkill())

扩展点:技能系统是平台最易扩展的部分,可以快速接入:

  • 邮件发送
  • 日历管理
  • 数据库查询
  • 第三方 API
  • 代码执行(沙箱)

第九章 RAG 知识库

9.1 整体架构

┌──────────────┐
│ 文档上传      │
└──────┬───────┘
       │
       ▼
┌──────────────┐
│DocumentParser│ (TXT/PDF/DOCX/XLSX/ZIP/MD)
└──────┬───────┘
       │ 纯文本
       ▼
┌──────────────┐
│TextChunker   │ (智能分片 500+50)
└──────┬───────┘
       │ 文本片段列表
       ▼
┌──────────────┐
│Embedding     │ (OpenAI/本地模型)
│Service       │
└──────┬───────┘
       │ 向量列表
       ▼
┌──────────────┐
│Milvus Lite   │ (IVF_FLAT + COSINE)
│(嵌入式)      │
└──────────────┘

[检索] 用户问题 → Embedding → Milvus search → top-k 文档

9.2 EmbeddingService

backend/rag/rag_system.py 核心代码:

class EmbeddingService:
    def __init__(self):
        from core.config import settings
        self.use_local = settings.USE_LOCAL_EMBEDDING
        self.dim = settings.EMBEDDING_DIM
        self.model_name = settings.EMBEDDING_MODEL

        if self.use_local:
            from sentence_transformers import SentenceTransformer
            self._local_model = SentenceTransformer(self.model_name)
            actual_dim = self._local_model.get_sentence_embedding_dimension()
            if actual_dim != self.dim:
                self.dim = actual_dim

    async def embed_texts(self, texts):
        if self.use_local:
            return self._embed_local(texts)
        return await self._embed_openai(texts)

    async def _embed_openai(self, texts):
        import httpx
        url = f"{settings.OPENAI_BASE_URL}/embeddings"
        headers = {"Authorization": f"Bearer {settings.OPENAI_API_KEY}"}
        payload = {"model": self.model_name, "input": texts}

        async with httpx.AsyncClient(timeout=60) as client:
            response = await client.post(url, json=payload, headers=headers)
            data = response.json()

        return [item["embedding"] for item in data["data"]]

双模式

  1. OpenAI Embedding - 在线,准确度更高,需要 API Key
  2. 本地 sentence-transformers - 离线免费,首次需下载模型

9.3 RAGSystem 核心

class RAGSystem:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._init()
        return cls._instance

    def _init(self):
        from core.config import settings
        from pymilvus import MilvusClient, DataType

        # Milvus Lite 嵌入式
        db_path = Path(settings.MILVUS_DB_PATH).resolve()
        db_path.parent.mkdir(parents=True, exist_ok=True)
        self.client = MilvusClient(uri=str(db_path))

        self.embedding = EmbeddingService()
        self.prefix = settings.MILVUS_COLLECTION_PREFIX
        self.dim = self.embedding.dim

    def _collection_name(self, kb_id):
        return f"{self.prefix}{kb_id}".replace("-", "_")

    def create_kb(self, kb_id, name, description=""):
        collection_name = self._collection_name(kb_id)

        if self.client.has_collection(collection_name):
            return {"status": "exists"}

        # 定义 schema
        schema = self.client.create_schema(auto_id=False, enable_dynamic_field=True)
        schema.add_field("id", DataType.VARCHAR, max_length=64, is_primary=True)
        schema.add_field("vector", DataType.FLOAT_VECTOR, dim=self.dim)
        schema.add_field("text", DataType.VARCHAR, max_length=65535)
        schema.add_field("source", DataType.VARCHAR, max_length=512)

        # 创建 collection
        self.client.create_collection(collection_name=collection_name, schema=schema)

        # 创建向量索引
        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="vector",
            index_type="IVF_FLAT",
            metric_type="COSINE",
            params={"nlist": 64}
        )
        self.client.create_index(collection_name=collection_name, index_params=index_params)

    async def add_documents(self, kb_id, texts, metadatas=None, ids=None):
        collection_name = self._collection_name(kb_id)

        if ids is None:
            ids = [str(uuid.uuid4()) for _ in texts]

        # 向量化
        vectors = await self.embedding.embed_texts(texts)

        # 构造数据
        data = []
        for doc_id, text, vector, meta in zip(ids, texts, vectors, metadatas or [{}]*len(texts)):
            data.append({
                "id": doc_id,
                "vector": vector,
                "text": text,
                "source": meta.get("filename", "")
            })

        # 插入
        self.client.insert(collection_name=collection_name, data=data)
        self.client.load_collection(collection_name)

        return ids

    async def query(self, kb_id, query_text, top_k=5):
        collection_name = self._collection_name(kb_id)

        if not self.client.has_collection(collection_name):
            return []

        # 向量化查询
        query_vector = await self.embedding.embed_query(query_text)

        # 检索
        results = self.client.search(
            collection_name=collection_name,
            data=[query_vector],
            limit=top_k,
            output_fields=["text", "source"]
        )

        # 格式化
        formatted = []
        for hit in results[0]:
            distance = hit.get("distance", 0)
            score = max(0, 1 - distance)  # COSINE 距离转相似度
            entity = hit.get("entity", {})
            formatted.append({
                "id": hit.get("id"),
                "text": entity.get("text"),
                "source": entity.get("source"),
                "score": score
            })

        return formatted

9.4 文档解析器工厂

backend/rag/document_parser.py

class DocumentParser:
    """解析器基类"""
    @classmethod
    def can_parse(cls, filename):
        raise NotImplementedError

    @classmethod
    def parse(cls, file_path):
        raise NotImplementedError


class TXTParser(DocumentParser):
    @classmethod
    def can_parse(cls, filename):
        return filename.endswith(('.txt', '.md'))

    @classmethod
    def parse(cls, file_path):
        return {"content": Path(file_path).read_text(encoding='utf-8')}


class PDFParser(DocumentParser):
    @classmethod
    def can_parse(cls, filename):
        return filename.endswith('.pdf')

    @classmethod
    def parse(cls, file_path):
        import pdfplumber
        text = ""
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text += page.extract_text() or ""
        return {"content": text}


class DocumentParserFactory:
    _parsers = [TXTParser, PDFParser, DOCXParser, XLSXParser, ZIPParser]

    @classmethod
    def parse(cls, file_path):
        for parser in cls._parsers:
            if parser.can_parse(file_path):
                return parser.parse(file_path)
        raise ValueError(f"不支持的文件类型: {file_path}")

9.5 文本分片

class TextChunker:
    def __init__(self, chunk_size=500, overlap=50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def split(self, text):
        """智能分片:按段落优先,长段落按句子切"""
        text = text.strip()
        if not text:
            return []

        paragraphs = text.split("\n\n")
        chunks = []
        current = ""

        for para in paragraphs:
            if len(current) + len(para) + 2 > self.chunk_size:
                if current:
                    chunks.append(current.strip())
                    # 保留 overlap
                    current = current[-self.overlap:] if len(current) > self.overlap else ""
                current += para + "\n\n"
            else:
                current += para + "\n\n"

        if current.strip():
            chunks.append(current.strip())

        return chunks

分片策略

  • 按段落切分(保留语义完整性)
  • 长段落按句子切分
  • 50 字 overlap 避免关键信息丢失
  • 默认 500 字/片(平衡精度和召回率)

第十章 多智能体协作

10.1 CrewAI 模式

backend/agents/crewai.py 实现一个简化版 CrewAI:

用户任务
  │
  ▼
[信息研究员] - InformationResearcher
  │  ├ 联网搜索
  │  └ RAG 检索
  │
  ▼
[文稿撰写员] - ReportWriter
  │  └ 基于研究材料生成报告
  │
  ▼
最终报告

10.2 角色定义

class InformationResearcher:
    """信息研究员"""

    async def research(self, topic):
        results = []

        # 1. 联网搜索
        try:
            from duckduckgo_search import DDGS
            with DDGS() as ddgs:
                search_results = ddgs.text(topic, max_results=5)
                for r in search_results:
                    results.append({
                        "type": "web",
                        "title": r.get("title"),
                        "content": r.get("body")
                    })
        except Exception as e:
            logger.warning(f"联网搜索失败: {e}")

        # 2. RAG 检索(如有知识库)
        try:
            kbs = rag_system.list_kbs()
            for kb_id in kbs[:2]:  # 取前2个
                docs = await rag_system.query(kb_id, topic, top_k=3)
                for doc in docs:
                    results.append({
                        "type": "rag",
                        "content": doc["text"]
                    })
        except Exception as e:
            logger.warning(f"RAG 检索失败: {e}")

        return results


class ReportWriter:
    """文稿撰写员"""

    async def write(self, topic, research_data):
        # 构造 prompt
        context = "\n\n".join([
            f"[{r.get('type', 'doc')}] {r.get('title', '')}\n{r.get('content', '')}"
            for r in research_data
        ])

        prompt = f"""基于以下研究材料,写一份关于 "{topic}" 的专业报告。

研究材料:
{context}

要求:
1. 结构清晰,使用标题分节
2. 数据准确,引用具体来源
3. 不少于 800 字
"""

        # 调用 LLM
        async with httpx.AsyncClient(timeout=60) as client:
            response = await client.post(
                f"{settings.OPENAI_BASE_URL}/chat/completions",
                headers={"Authorization": f"Bearer {settings.OPENAI_API_KEY}"},
                json={
                    "model": settings.OPENAI_MODEL,
                    "messages": [{"role": "user", "content": prompt}],
                    "temperature": 0.5
                }
            )
            data = response.json()

        return data["choices"][0]["message"]["content"]

10.3 编排器

class CrewAIOrchestrator:
    def __init__(self):
        self.researcher = InformationResearcher()
        self.writer = ReportWriter()
        self.history = []

    async def execute_task(self, task, steps=None):
        # 1. 信息搜集
        logger.info(f"[Crew] 信息研究员开始: {task}")
        research_data = await self.researcher.research(task)

        # 2. 报告撰写
        logger.info(f"[Crew] 文稿撰写员开始")
        final_report = await self.writer.write(task, research_data)

        # 3. 记录历史
        result = {
            "task": task,
            "research_count": len(research_data),
            "final_report": final_report,
            "created_at": datetime.now().isoformat()
        }
        self.history.append(result)

        return result


# 全局实例
crew = CrewAIOrchestrator()

第十一章 反思式推理

11.1 ReRCT 思想

ReRCT = Reasoning + Reflection + Criticism + reThinking

通过三步推理提升 LLM 回答质量:

  1. 初步回答 - 直接给出第一反应答案
  2. 自我反思 - 审查答案的不足
  3. 优化输出 - 基于反思重新生成

11.2 实现代码

backend/agents/rerct.py

class ReRCTReasoner:
    def __init__(self):
        self.model = settings.OPENAI_MODEL

    async def reason(self, question, context=None):
        # 步骤 1: 初步回答
        initial = await self._initial_response(question, context)

        # 步骤 2: 自我反思
        reflection = await self._reflect(question, initial, context)

        # 步骤 3: 优化输出
        final = await self._refine(question, initial, reflection, context)

        return {
            "question": question,
            "initial": initial,
            "reflection": reflection,
            "final": final
        }

    async def _initial_response(self, question, context):
        prompt = f"""请回答以下问题,给出你的第一反应答案。
问题:{question}
{f"参考信息:{context}" if context else ""}
"""
        return await self._call_llm(prompt)

    async def _reflect(self, question, initial, context):
        prompt = f"""请审查以下答案,指出其中的不足、错误或不完整之处。
问题:{question}
初步答案:{initial}

请从以下角度审查:
1. 准确性 - 是否有事实错误?
2. 完整性 - 是否遗漏关键信息?
3. 逻辑性 - 推理是否合理?
4. 相关性 - 是否切题?
"""
        return await self._call_llm(prompt)

    async def _refine(self, question, initial, reflection, context):
        prompt = f"""基于以下信息,给出最终优化答案。
问题:{question}
初步答案:{initial}
审查反馈:{reflection}
{f"参考信息:{context}" if context else ""}

请提供一份更准确、更完整、更专业的答案。
"""
        return await self._call_llm(prompt)

    async def _call_llm(self, prompt):
        async with httpx.AsyncClient(timeout=60) as client:
            response = await client.post(
                f"{settings.OPENAI_BASE_URL}/chat/completions",
                headers={"Authorization": f"Bearer {settings.OPENAI_API_KEY}"},
                json={
                    "model": self.model,
                    "messages": [{"role": "user", "content": prompt}],
                    "temperature": 0.5
                }
            )
            data = response.json()
        return data["choices"][0]["message"]["content"]


# 全局实例
reasoner = ReRCTReasoner()

为什么有效

  • 让 LLM 有机会"自我批评"
  • 揭示隐藏的逻辑漏洞
  • 提升 30-50% 的准确性

第十二章 FastAPI 路由层

12.1 路由设计

backend/main.py 定义所有 API:

路由前缀 功能
/api/agents 智能体管理
/api/agents/chat 对话接口
/api/agents/a2a 智能体互通
/api/skills 技能管理
/api/rag/kbs 知识库管理
/api/rag/upload 文档上传
/api/rag/query RAG 检索
/api/crewai/execute 多智能体任务
/api/rerct/reason 反思推理
/api/system/info 系统信息

12.2 标准接口模式

from pydantic import BaseModel


class CreateAgentRequest(BaseModel):
    """请求体模型"""
    name: str
    description: str = ""
    system_prompt: str = "你是一个有帮助的AI助手。"
    skills: List[str] = []
    model: Optional[str] = None
    temperature: float = 0.7


@app.post("/api/agents")
async def create_agent(
    request: CreateAgentRequest,
    db: Session = Depends(get_db)
):
    """创建智能体"""
    # 1. 业务逻辑
    agent_id = f"agent_{uuid.uuid4().hex[:12]}"

    # 2. 持久化
    agent_config = AgentConfig(
        agent_id=agent_id,
        name=request.name,
        ...
    )
    db.add(agent_config)
    db.commit()

    # 3. 内存中创建
    agent_manager.create_agent(...)

    return {"agent_id": agent_id, "status": "created"}

12.3 文件上传

@app.post("/api/rag/upload")
async def upload_document(
    kb_id: str,
    file: UploadFile = File(...)
):
    # 1. 读取文件
    content_bytes = await file.read()

    # 2. 保存到磁盘
    file_path = settings.UPLOADS_DIR / file.filename
    with open(file_path, "wb") as f:
        f.write(content_bytes)

    # 3. 解析
    text = DocumentParserFactory.parse(str(file_path))

    # 4. 分片
    chunks = TextChunker().split(text["content"])

    # 5. 向量化并存入向量库
    await rag_system.add_documents(kb_id, chunks, metadatas=[...])

    return {"filename": file.filename, "chunks": len(chunks)}

12.4 CORS 配置

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

为什么 *:开发期方便前后端分离调试。生产环境应指定具体域名。

12.5 启动事件

@app.on_event("startup")
async def startup_event():
    logger.info(f"🚀 {settings.APP_NAME} 启动中...")
    init_db()
    logger.info("✅ 系统启动完成")

第十三章 前端实现

13.1 整体结构

frontend/index.html 是一个单页应用(SPA),使用 Vue 3 CDN 引入。

核心模块

  • 侧边栏导航
  • 6 个视图:智能体 / RAG / 多智能体 / 反思 / 技能
  • 模态框:创建智能体

13.2 Vue 3 Setup 模式

const { createApp, ref, onMounted } = Vue;

createApp({
    setup() {
        // 响应式数据
        const agents = ref([]);
        const currentView = ref('agents');

        // 方法
        const loadAll = async () => {
            agents.value = (await callApi('get', '/api/agents'))?.agents || [];
        };

        // 生命周期
        onMounted(() => {
            loadAll();
        });

        // 暴露给模板
        return { agents, currentView, loadAll };
    }
}).mount('#app');

13.3 Axios 封装

const api = axios.create({
    baseURL: window.location.origin,
});

const callApi = async (method, path, data = null) => {
    try {
        const response = await api({ method, url: path, data });
        return response.data;
    } catch (err) {
        alert(err.response?.data?.detail || '请求失败');
        return null;
    }
};

13.4 各模块实现

智能体对话
<div v-if="selectedAgent" class="chat-area">
    <div class="messages">
        <div v-for="(m, i) in messages" :key="i" :class="['message', m.role]">
            <div class="message-content">{{ m.content }}</div>
        </div>
    </div>
    <div class="chat-input">
        <input v-model="inputMessage" @keyup.enter="sendMessage">
        <button @click="sendMessage">发送</button>
    </div>
</div>
const sendMessage = async () => {
    if (!inputMessage.value || !selectedAgent.value) return;
    const userMsg = inputMessage.value;
    messages.value.push({ role: 'user', content: userMsg });
    inputMessage.value = '';

    const res = await callApi('post', '/api/agents/chat', {
        agent_id: selectedAgent.value.agent_id,
        message: userMsg
    });
    if (res) {
        messages.value.push({ role: 'assistant', content: res.message });
    }
};
文档上传
const uploadFile = async () => {
    const formData = new FormData();
    formData.append('file', selectedFile.value);
    const res = await api.post(`/api/rag/upload?kb_id=${selectedKb.value}`, formData, {
        headers: { 'Content-Type': 'multipart/form-data' }
    });
    alert(`上传成功!切分为 ${res.data.chunks} 个片段`);
};

第十四章 启动与部署

14.1 本地启动

# 1. 进入项目目录
cd ai-platform

# 2. 安装依赖
cd backend
pip install -r requirements.txt

# 3. 复制配置
cp .env.example .env
# 编辑 .env 填入 OPENAI_API_KEY

# 4. 启动
python main.py

14.2 访问

  • API 文档:http://localhost:8000/docs
  • 前端页面:浏览器打开 frontend/index.html
  • 健康检查:http://localhost:8000/

14.3 生产部署

# 使用 Gunicorn + Uvicorn workers
pip install gunicorn

gunicorn main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --access-logfile - \
    --error-logfile -

14.4 Nginx 反向代理

server {
    listen 80;
    server_name api.example.com;

    location / {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

第十五章 扩展开发指南

15.1 添加新技能

# 在 backend/skills/registry.py 中追加
class MySkill(SkillBase):
    @property
    def name(self): return "my_skill"

    @property
    def description(self): return "我的技能"

    def get_tools(self):
        return [{
            "type": "function",
            "function": {
                "name": "my_tool",
                "description": "工具说明",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "param1": {"type": "string"}
                    }
                }
            }
        }]

    async def execute(self, tool_name, arguments):
        return {"result": "..."}


# 注册
registry.register(MySkill())

15.2 添加新文档格式

# 在 backend/rag/document_parser.py 中追加
class MyFormatParser(DocumentParser):
    @classmethod
    def can_parse(cls, filename):
        return filename.endswith('.myext')

    @classmethod
    def parse(cls, file_path):
        # 解析逻辑
        return {"content": "..."}

# 添加到工厂
DocumentParserFactory._parsers.append(MyFormatParser)

15.3 添加新 LLM 提供商

修改 core/config.py

class Settings(BaseSettings):
    # 选择使用的提供商
    LLM_PROVIDER: str = "openai"  # openai / anthropic / gemini
    ...

修改 agents/manager.py_call_llm

async def _call_llm(self, messages):
    if settings.LLM_PROVIDER == "anthropic":
        # 调用 Anthropic API
        ...
    elif settings.LLM_PROVIDER == "openai":
        # 默认 OpenAI
        ...

15.4 添加新数据库后端

修改 db/models.py

# 将 SQLite 改为 PostgreSQL
engine = create_engine(
    "postgresql://user:pass@localhost/dbname",
    pool_size=10
)

15.5 性能优化方向

  • 缓存层 - 引入 Redis 缓存 RAG 结果
  • 异步批处理 - Embedding 批量提交
  • 流式响应 - LLM 流式返回(SSE)
  • 连接池 - httpx 连接复用
  • 后台任务 - 文档解析异步化

附录:常见问题

Q1: 没有 OpenAI API Key 能跑吗?

A: 可以启动并创建知识库。LLM 调用会失败,但 RAG 检索和文档解析功能可用。

Q2: Milvus Lite 数据存在哪?

A: 默认 ./data/milvus_lite.db,删除该文件即可重置向量库。

Q3: 如何重置整个系统?

A: 删除 data/ 目录下的所有文件即可。

Q4: 为什么 Embedding 维度要对齐?

A: 写入和查询必须使用同一维度的向量模型。OpenAI 是 1536,本地 MiniLM 是 384。

Q5: 如何备份知识库?

A: 复制 data/milvus_lite.db 文件即可。


总结

本平台从零搭建了一个生产级 AI 智能体管理平台,覆盖了:

  • ✅ 配置管理(pydantic-settings)
  • ✅ 数据持久化(SQLAlchemy + SQLite)
  • ✅ 智能体核心(Agent + A2A + Function Calling)
  • ✅ 技能系统(注册表模式)
  • ✅ RAG 知识库(Milvus Lite + Embedding)
  • ✅ 多智能体协作(CrewAI 模式)
  • ✅ 反思式推理(ReRCT)
  • ✅ 多格式文档解析
  • ✅ REST API(FastAPI)
  • ✅ Vue 3 前端

通过模块化设计,每个组件都可独立替换和扩展。后续可在此基础上:

  • 添加用户系统
  • 接入更多 LLM 提供商
  • 实现工作流编排
  • 加入监控告警
  • 部署到 Kubernetes
Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐