2026年,AI Agent相关内容阅读量增长340%,搜索热度首次超越"大模型"本身。本文深度解析Agent核心架构(规划、记忆、工具、协作),附生产级代码实现,覆盖ReAct、Plan-and-Execute、多Agent协作三大范式。

1. 为什么2026年是Agent爆发元年

1.1 从"百模大战"到"Agent落地"

2023-2024: 百模大战
  → 各家疯狂训练大模型,模型能力快速提升
  → 但实际落地场景有限,大量模型"跑在Demo里"

2025: RAG普及年
  → 检索增强生成成为标配,解决幻觉问题
  → 企业开始把AI用到知识库、客服等场景

2026: Agent爆发年 ⭐
  → 大模型本身不再稀缺,垂直场景微调+RAG+函数调用+工作流
  → AI从"被动回答"升级为"主动完成任务"
  → 多Agent协作成为主流架构

1.2 Agent vs 传统AI的核心区别

传统AI (RAG模式):
  用户 → "查询订单状态" → [LLM + RAG] → 直接回答 → 结束
  特点: 一次性请求,无状态,无法执行多步操作

AI Agent模式:
  用户 → "帮我处理这个退货申请" 
  ↓
  [Agent] 理解意图
  ↓
  [规划] 拆解任务: ①查询订单 → ②验证状态 → ③生成退货单 → ④通知用户
  ↓
  [执行] 调用工具(API/代码/数据库)
  ↓
  [反馈] 返回结果 + 自动执行后续步骤
  ↓
  整个过程无需人工干预,可跨系统操作

1.3 CSDN 2026数据支撑

指标 数据
Agent内容阅读量增长 340% (2025下半年)
Agent搜索热度 vs 大模型 首次超越
企业Agent部署率 仅11%,但成功者正"Agent-First"
开发者必备技能 RAG + Agent框架 + Prompt工程

2. Agent核心架构:四要素模型

2.1 四要素详解

┌─────────────────────────────────────────────────────────────┐
│                     AI Agent 四要素架构                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│                      ┌──────────────┐                       │
│                      │   Planning    │  ← 任务规划与分解     │
│                      │  (规划器)     │                       │
│                      └───────┬──────┘                       │
│                              │                               │
│          ┌───────────────────┼───────────────────┐           │
│          │                   │                   │           │
│  ┌───────┴───────┐  ┌───────┴───────┐  ┌──────┴──────┐    │
│  │   Memory      │  │    Tools       │  │  Collaboration│    │
│  │  (记忆)       │  │   (工具)       │  │   (协作)     │    │
│  │               │  │               │  │               │    │
│  │ 短期/长期记忆 │  │ API/代码/搜索 │  │ 多Agent通信  │    │
│  │ 会话/向量存储 │  │ 函数调用/Filp  │  │ A2A协议/HTTP │    │
│  └───────────────┘  └───────────────┘  └───────────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

2.2 Agent基类实现

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import json
import time

class AgentState(Enum):
    IDLE = "idle"
    PLANNING = "planning"
    EXECUTING = "executing"
    WAITING = "waiting"
    DONE = "done"
    ERROR = "error"

@dataclass
class Message:
    """对话消息"""
    role: str  # "user" | "assistant" | "system" | "tool"
    content: str
    tool_call: Optional[Dict] = None
    tool_result: Optional[str] = None
    timestamp: float = field(default_factory=time.time)

@dataclass
class Tool:
    """工具定义"""
    name: str
    description: str
    parameters: Dict[str, Any]
    func: Callable
    requires_confirmation: bool = False

@dataclass
class PlanStep:
    """规划步骤"""
    step_id: str
    description: str
    tool_name: Optional[str] = None
    tool_args: Optional[Dict] = None
    status: str = "pending"  # pending | running | done | failed
    result: Any = None
    error: Optional[str] = None

class BaseAgent(ABC):
    """
    Agent基类:四要素架构
    - Planning: 任务规划与分解
    - Memory: 短期/长期记忆管理
    - Tools: 工具注册与调用
    - Collaboration: Agent间协作接口
    """
    def __init__(self, name: str, model_client: Any = None):
        self.name = name
        self.model = model_client
        self.state = AgentState.IDLE
        
        # 核心组件
        self.tools: Dict[str, Tool] = {}
        self.short_term_memory: List[Message] = []
        self.long_term_memory: Optional[Any] = None  # 向量数据库
        self.plan: List[PlanStep] = []
        
        # 统计
        self.stats = {
            "total_steps": 0,
            "tool_calls": 0,
            "errors": 0,
            "start_time": None,
        }
    
    # ====== 工具管理 ======
    def register_tool(self, tool: Tool):
        """注册工具"""
        self.tools[tool.name] = tool
        return self
    
    def register_tools(self, tools: List[Tool]):
        for tool in tools:
            self.register_tool(tool)
        return self
    
    def get_tool_schemas(self) -> List[Dict]:
        """生成工具Schema(供LLM调用)"""
        return [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters,
                }
            }
            for tool in self.tools.values()
        ]
    
    @tool_call
    def execute_tool(self, tool_name: str, arguments: Dict) -> str:
        """执行工具"""
        self.stats["tool_calls"] += 1
        
        if tool_name not in self.tools:
            return f"Error: Tool '{tool_name}' not found. Available tools: {list(self.tools.keys())}"
        
        tool = self.tools[tool_name]
        
        try:
            result = tool.func(**arguments)
            return json.dumps(result, ensure_ascii=False)
        except Exception as e:
            self.stats["errors"] += 1
            return f"Error executing {tool_name}: {str(e)}"
    
    # ====== 记忆管理 ======
    def add_message(self, role: str, content: str, **kwargs):
        """添加消息到短期记忆"""
        msg = Message(role=role, content=content, **kwargs)
        self.short_term_memory.append(msg)
        # 限制短期记忆长度
        if len(self.short_term_memory) > 50:
            self.short_term_memory = self.short_term_memory[-50:]
        return msg
    
    def get_context(self, max_turns: int = 10) -> List[Dict]:
        """获取对话上下文"""
        recent = self.short_term_memory[-max_turns:]
        return [
            {"role": m.role, "content": m.content}
            for m in recent
        ]
    
    def save_to_long_term(self, key: str, value: Any):
        """保存到长期记忆"""
        if self.long_term_memory:
            self.long_term_memory.add(key, value)
    
    def retrieve_from_long_term(self, query: str, top_k: int = 3) -> List[Any]:
        """从长期记忆检索"""
        if self.long_term_memory:
            return self.long_term_memory.search(query, top_k)
        return []
    
    # ====== 核心推理循环 ======
    @abstractmethod
    def plan_action(self, user_input: str) -> List[PlanStep]:
        """规划下一步行动(子类实现)"""
        pass
    
    @abstractmethod
    def execute_step(self, step: PlanStep) -> Any:
        """执行单个步骤(子类实现)"""
        pass
    
    def run(self, user_input: str, max_iterations: int = 10) -> str:
        """Agent主循环"""
        self.stats["start_time"] = time.time()
        self.state = AgentState.PLANNING
        self.add_message("user", user_input)
        
        self.plan = self.plan_action(user_input)
        
        for i, step in enumerate(self.plan):
            if i >= max_iterations:
                return f"Max iterations ({max_iterations}) reached. Partial result: {self.get_context()}"
            
            self.state = AgentState.EXECUTING
            step.status = "running"
            self.stats["total_steps"] += 1
            
            try:
                result = self.execute_step(step)
                step.status = "done"
                step.result = result
                self.add_message("tool", str(result), 
                               tool_call={"name": step.tool_name, "args": step.tool_args})
            except Exception as e:
                step.status = "failed"
                step.error = str(e)
                self.stats["errors"] += 1
        
        self.state = AgentState.DONE
        return self._generate_final_response()
    
    def _generate_final_response(self) -> str:
        """生成最终回复"""
        return "Task completed. Steps executed: " + ", ".join(
            s.description for s in self.plan if s.status == "done"
        )
    
    def get_stats(self) -> Dict:
        """获取运行统计"""
        elapsed = time.time() - self.stats["start_time"] if self.stats["start_time"] else 0
        return {
            **self.stats,
            "elapsed_seconds": round(elapsed, 2),
            "success_rate": f"{(self.stats['total_steps'] - self.stats['errors']) / max(1, self.stats['total_steps']) * 100:.1f}%"
        }

3. ReAct模式:推理与行动合一

3.1 ReAct原理

ReAct (Reason + Act) 是2026年最流行的Agent推理范式,核心思想:每一步都是"思考→行动→观察"的循环

# ReAct核心循环
# Thought → Action → Observation → Thought → ...

"""
用户: 北京今天适合跑步吗?

Step 1: Thought=我需要知道北京的天气情况才能判断
        Action=get_weather(city="北京")
        Observation={"temp": 28, "AQI": 85, "weather": "多云"}
        
Step 2: Thought=温度28度偏高,AQI 85空气质量一般,不算理想
        Action=search_sports_advice(temp=28, aqi=85)
        Observation={"advice": "建议傍晚跑步,多云天气注意补水"}

Step 3: Final Answer=今天北京多云,28度,AQI 85
        不算最佳跑步日,建议傍晚6点左右跑步,注意补水
"""

3.2 ReAct Agent实现

from typing import List, Tuple

class ReActAgent(BaseAgent):
    """
    ReAct推理模式Agent
    核心: Thought → Action → Observation 循环
    """
    MAX_REACT_STEPS = 15
    
    def __init__(self, name: str, model_client: Any = None):
        super().__init__(name, model_client)
        self.thought_history: List[str] = []
        self.observation_history: List[str] = []
    
    def plan_action(self, user_input: str) -> List[PlanStep]:
        """
        ReAct规划:让LLM自行决定Thought+Action
        返回: 包含Thought推理的PlanStep列表
        """
        prompt = self._build_react_prompt(user_input)
        response = self.model.chat([{"role": "user", "content": prompt}])
        
        steps = self._parse_react_response(response.content)
        return steps
    
    def _build_react_prompt(self, user_input: str) -> str:
        """构建ReAct推理Prompt"""
        tool_schemas = "\n".join([
            f"- {t.name}: {t.description} (args: {list(t.parameters.get('properties', {}).keys())})"
            for t in self.tools.values()
        ])
        
        history = ""
        for t, o in zip(self.thought_history, self.observation_history):
            history += f"\nThought: {t}\nObservation: {o}\n"
        
        return f"""你是一个智能助手。请通过"Thought-Action-Observation"循环回答用户问题。

可用工具:
{tool_schemas}

历史记录:
{history or "(空)"}

当前问题: {user_input}

请按以下格式回复:
Thought: 你的思考过程,为什么要做这个动作
Action: 工具名称 (参数用JSON格式)
Observation: 工具返回的结果

如果问题已解决,回复:
Thought: 我已经获得足够信息
Final Answer: 你的完整回答

开始:"""
    
    def _parse_react_response(self, response: str) -> List[PlanStep]:
        """解析ReAct响应"""
        steps = []
        lines = response.strip().split("\n")
        
        i = 0
        step_id = 1
        while i < len(lines) and step_id <= self.MAX_REACT_STEPS:
            line = lines[i].strip()
            
            if line.startswith("Thought:"):
                thought = line[8:].strip()
                self.thought_history.append(thought)
            
            elif line.startswith("Action:"):
                action_line = line[7:].strip()
                # 解析 "tool_name(args)"
                if "(" in action_line:
                    tool_part = action_line.split("(")[0].strip()
                    args_part = action_line.split("(")[1].rstrip(")").strip()
                    try:
                        args = json.loads(args_part) if args_part else {}
                    except:
                        args = {}
                    
                    step = PlanStep(
                        step_id=f"step_{step_id}",
                        description=f"{thought}{tool_part}",
                        tool_name=tool_part,
                        tool_args=args
                    )
                    steps.append(step)
                    step_id += 1
            
            elif line.startswith("Final Answer:"):
                # 最终回答,添加一个finalize步骤
                final_answer = line[12:].strip()
                steps.append(PlanStep(
                    step_id=f"step_{step_id}",
                    description="生成最终回答",
                    tool_name=None,
                    tool_args={"answer": final_answer}
                ))
                break
            
            i += 1
        
        return steps
    
    def execute_step(self, step: PlanStep) -> str:
        """执行ReAct步骤"""
        if step.tool_name is None:
            # Final Answer步骤
            return step.tool_args.get("answer", "")
        
        if step.tool_name not in self.tools:
            return f"Tool '{step.tool_name}' not found"
        
        # 添加思考过程到记忆
        if self.thought_history:
            self.add_message("assistant", self.thought_history[-1])
        
        # 执行工具
        result = self.execute_tool(step.tool_name, step.tool_args)
        
        # 记录观察结果
        self.observation_history.append(result)
        
        return result
    
    def run(self, user_input: str, max_iterations: int = 15) -> str:
        """ReAct主循环(显式迭代控制)"""
        self.stats["start_time"] = time.time()
        self.thought_history = []
        self.observation_history = []
        
        for iteration in range(max_iterations):
            # 构建ReAct Prompt
            prompt = self._build_react_prompt(user_input)
            
            # 调用LLM
            response = self.model.chat([
                {"role": "system", "content": "你是一个使用ReAct推理模式的智能助手。"},
                {"role": "user", "content": prompt}
            ])
            
            # 解析响应
            steps = self._parse_react_response(response.content)
            
            if not steps:
                return "无法生成有效的行动计划"
            
            # 执行第一个步骤
            step = steps[0]
            
            if step.tool_name is None:
                # 最终回答
                return step.tool_args.get("answer", "")
            
            # 执行工具
            result = self.execute_tool(step.tool_name, step.tool_args)
            self.thought_history.append(
                steps[0].description.split(" → ")[0] if " → " in steps[0].description else ""
            )
            self.observation_history.append(result)
            
            # 将结果注入到下一步(循环)
            user_input = f"继续,基于之前的观察结果: {result}"
        
        return "达到最大迭代次数"

3.3 实战:退货处理Agent

# 定义退货处理工具
def get_order(order_id: str) -> dict:
    """查询订单信息"""
    return {
        "order_id": order_id,
        "status": "已发货",
        "amount": 299.00,
        "create_time": "2026-06-25",
        "can_return": True,
        "return_deadline": "2026-07-05"
    }

def validate_return_eligibility(order: dict, reason: str) -> dict:
    """验证退货资格"""
    if not order["can_return"]:
        return {"eligible": False, "reason": "订单状态不允许退货"}
    if "已过退货期限" in reason:
        return {"eligible": False, "reason": "已过退货期限"}
    return {"eligible": True, "reason": "可以退货"}

def create_return_order(order_id: str, reason: str) -> dict:
    """创建退货单"""
    return {
        "return_id": f"RO{order_id[2:]}",
        "order_id": order_id,
        "status": "待取件",
        "pickup_address": "北京市朝阳区xxx",
        "logistics_company": "顺丰速运",
        "pickup_code": "SF12345678"
    }

def send_notification(user_id: str, message: str, channel: str = "wechat") -> dict:
    """发送通知"""
    return {"sent": True, "channel": channel, "message": message}

# 注册工具并运行Agent
agent = ReActAgent(
    name="退货助手",
    model_client=your_model_client
)

agent.register_tools([
    Tool("get_order", "查询订单信息", {"order_id": {"type": "string"}}, get_order),
    Tool("validate_return", "验证退货资格", {
        "order": {"type": "object"},
        "reason": {"type": "string"}
    }, validate_return_eligibility),
    Tool("create_return_order", "创建退货单", {
        "order_id": {"type": "string"},
        "reason": {"type": "string"}
    }, create_return_order),
    Tool("send_notification", "发送通知", {
        "user_id": {"type": "string"},
        "message": {"type": "string"},
        "channel": {"type": "string"}}
    , send_notification),
])

result = agent.run("用户ID 12345,想退订单 ORD20260625001,原因是收到商品破损")
print(result)

4. Plan-and-Execute:规划-执行分离

4.1 原理

Plan-and-Execute 与 ReAct 的核心区别:先规划全部步骤,再逐个执行。适合复杂长任务,避免每步都调用LLM造成的高成本和延迟。

ReAct:  思考→行动→观察→思考→行动→观察... (边想边做)
        问题: 每个步骤都要调用LLM,成本高,延迟大

Plan-and-Execute:
  
  Phase 1 - Plan (一次LLM调用):
    用户: "帮我做一份季度销售报告,包含同比环比分析"
    LLM规划:
      Step 1: 查询Q2销售数据
      Step 2: 查询Q1销售数据(同比)
      Step 3: 计算同比/环比增长率
      Step 4: 生成图表
      Step 5: 导出PDF报告
  
  Phase 2 - Execute (执行器逐个执行):
    执行器 → 查询数据库 → 计算数据 → 生成图表 → 导出PDF
    (无需每次调用LLM,执行器按计划行动)

4.2 Plan-and-Execute实现

class PlanAndExecuteAgent(BaseAgent):
    """
    Plan-and-Execute 模式Agent
    优势: 规划成本低(一次LLM调用),执行效率高
    适合: 复杂长任务,多步骤工作流
    """
    def __init__(self, name: str, model_client: Any = None,
                 replan_threshold: float = 0.3):
        super().__init__(name, model_client)
        self.replan_threshold = replan_threshold  # 失败率超过30%则重新规划
    
    def plan_action(self, user_input: str) -> List[PlanStep]:
        """规划阶段:一次性生成完整计划"""
        tool_schemas = "\n".join([
            f"- {t.name}: {t.description}"
            for t in self.tools.values()
        ])
        
        prompt = f"""用户需求: {user_input}

可用工具:
{tool_schemas}

请将这个任务分解为精确的执行步骤。每个步骤必须:
1. 有明确的执行目标
2. 指定要使用的工具
3. 工具参数具体明确

请以JSON数组格式返回:
[
  {{
    "step_id": "step_1",
    "description": "步骤描述",
    "tool_name": "工具名",
    "tool_args": {{"参数": "值"}}
  }},
  ...
]

只返回JSON,不要其他文字:"""
        
        response = self.model.chat([
            {"role": "user", "content": prompt}
        ])
        
        try:
            # 提取JSON
            content = response.content
            if "```json" in content:
                content = content.split("```json")[1].split("```")[0]
            elif "```" in content:
                content = content.split("```")[1].split("```")[0]
            
            steps_data = json.loads(content.strip())
            return [PlanStep(**s) for s in steps_data]
        except Exception as e:
            return [PlanStep(
                step_id="step_1",
                description="无法生成有效计划",
                tool_name=None
            )]
    
    def execute_step(self, step: PlanStep) -> str:
        """执行单个步骤"""
        if step.tool_name is None:
            return step.description
        
        if step.tool_name not in self.tools:
            return f"Tool '{step.tool_name}' not found"
        
        return self.execute_tool(step.tool_name, step.tool_args)
    
    def should_replan(self) -> bool:
        """检查是否需要重新规划"""
        if not self.plan:
            return False
        
        failed = sum(1 for s in self.plan if s.status == "failed")
        total = len(self.plan)
        
        return (failed / total) > self.replan_threshold
    
    def run(self, user_input: str, max_replan: int = 2) -> str:
        """Plan-and-Execute主循环"""
        self.stats["start_time"] = time.time()
        self.add_message("user", user_input)
        
        for attempt in range(max_replan + 1):
            # ===== Phase 1: 规划 =====
            self.state = AgentState.PLANNING
            self.plan = self.plan_action(user_input)
            
            if not self.plan or self.plan[0].tool_name is None:
                return "无法生成有效计划"
            
            # ===== Phase 2: 执行 =====
            self.state = AgentState.EXECUTING
            results = []
            
            for step in self.plan:
                step.status = "running"
                self.stats["total_steps"] += 1
                
                try:
                    result = self.execute_step(step)
                    step.status = "done"
                    step.result = result
                    results.append({"step": step.description, "result": result})
                except Exception as e:
                    step.status = "failed"
                    step.error = str(e)
                    self.stats["errors"] += 1
                    results.append({"step": step.description, "error": str(e)})
            
            # ===== Phase 3: 检查是否需要重新规划 =====
            if not self.should_replan():
                break
            else:
                # 重新规划,跳过失败的步骤
                failed_steps = [s for s in self.plan if s.status == "failed"]
                context = f"上轮计划中以下步骤失败: {[s.description for s in failed_steps]}"
                user_input = f"{user_input}\n\n上下文: {context}"
        
        return self._summarize_results(results)
    
    def _summarize_results(self, results: List[Dict]) -> str:
        """汇总执行结果"""
        summary = "执行完成:\n"
        for r in results:
            if "error" in r:
                summary += f"- {r['step']}: ❌ {r['error']}\n"
            else:
                summary += f"- {r['step']}: ✅\n"
        
        summary += f"\n成功率: {self.get_stats()['success_rate']}"
        return summary

4.3 实战:季度报告自动生成

# 季度报告生成Agent
report_agent = PlanAndExecuteAgent(
    name="报告生成助手",
    model_client=your_model
)

report_agent.register_tools([
    Tool("query_sales_data", "查询销售数据", {
        "quarter": {"type": "string", "description": "季度,如Q2"},
        "year": {"type": "integer"}
    }, query_sales_data),
    Tool("calculate_growth", "计算增长率", {
        "current": {"type": "number"},
        "previous": {"type": "number"}
    }, calculate_growth),
    Tool("generate_chart", "生成图表", {
        "data": {"type": "object"},
        "chart_type": {"type": "string"}
    }, generate_chart),
    Tool("export_pdf", "导出PDF", {
        "content": {"type": "string"},
        "filename": {"type": "string"}
    }, export_pdf),
    Tool("send_email", "发送邮件", {
        "to": {"type": "string"},
        "subject": {"type": "string"},
        "body": {"type": "string"}
    }, send_email),
])

result = report_agent.run(
    "生成2026年Q2季度销售报告,包含同比(Q1)、环比(上一季度)分析,"
    "并发送给ceo@company.com"
)

5. 多Agent协作架构

5.1 协作模式对比

┌────────────────────────────────────────────────────────────────┐
│                    多Agent协作模式对比                          │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  模式1: 层级式 (Hierarchical)                                   │
│  ┌───────────┐                                                  │
│  │  Supervisor │ → 分析任务 → 分发给下属Agent                    │
│  │  (主管)   │ ← 收集结果 ←                                    │
│  └─────┬─────┘                                                  │
│        │                                                        │
│    ┌───┴───┬───────────┐                                       │
│    ▼       ▼           ▼                                       │
│  ┌────┐  ┌────┐     ┌────┐                                     │
│  │Data│  │Code│     │Doc │                                     │
│  │Agent│ │Agent│    │Agent│                                     │
│  └────┘  └────┘     └────┘                                     │
│                                                                 │
│  模式2: 同级协作 (Peer-to-Peer)                                 │
│    ┌────┐      ┌────┐      ┌────┐                              │
│    │Research│ ←→ │ Coder│ ←→ │Review│                          │
│    │ Agent  │    │Agent │    │Agent  │                          │
│    └────┘      └────┘      └────┘                              │
│       ↑            ↑            ↑                               │
│       └────────────┴────────────┘                               │
│              消息总线 (Message Bus)                             │
│                                                                 │
│  模式3: 辩论式 (Debate)                                        │
│    ┌────┐           ┌────┐                                      │
│    │Agent A│  VS   │Agent B│  → 裁判LLM仲裁                    │
│    │(正方) │       │(反方) │                                      │
│    └────┘           └────┘                                      │
│              裁判: 综合双方论点,输出最终结论                     │
│                                                                 │
└────────────────────────────────────────────────────────────────┘

5.2 A2A协议实现

import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from enum import Enum
import uuid
import json

class MessageType(Enum):
    TASK = "task"              # 分配任务
    RESULT = "result"          # 返回结果
    ERROR = "error"            # 出错
    HEARTBEAT = "heartbeat"    # 心跳
    QUERY = "query"            # 查询状态

@dataclass
class AgentMessage:
    """Agent间通信消息"""
    msg_id: str
    sender: str
    receiver: str
    msg_type: MessageType
    content: Any
    task_id: Optional[str] = None
    reply_to: Optional[str] = None
    timestamp: float = field(default_factory=time.time)
    metadata: Dict = field(default_factory=dict)

class AgentMessageBus:
    """
    Agent消息总线 (类A2A协议实现)
    支持: 点对点通信、广播、任务分发
    """
    def __init__(self):
        self.agents: Dict[str, 'BaseAgent'] = {}
        self.inbox: Dict[str, List[AgentMessage]] = {}  # agent_id -> 消息队列
        self.pending_tasks: Dict[str, Dict] = {}  # task_id -> 任务状态
        self.subscribers: Dict[str, List[str]] = {}  # topic -> agent_ids
    
    def register(self, agent: BaseAgent):
        """注册Agent"""
        self.agents[agent.name] = agent
        self.inbox[agent.name] = []
    
    async def send(self, message: AgentMessage):
        """发送消息"""
        if message.receiver not in self.inbox:
            raise ValueError(f"Unknown receiver: {message.receiver}")
        
        self.inbox[message.receiver].append(message)
        
        # 跟踪任务状态
        if message.task_id:
            self.pending_tasks[message.task_id] = {
                "status": "sent",
                "receiver": message.receiver,
                "msg_id": message.msg_id
            }
    
    async def broadcast(self, sender: str, content: Any, 
                       topic: Optional[str] = None):
        """广播消息"""
        if topic and topic in self.subscribers:
            targets = self.subscribers[topic]
        else:
            targets = [a for a in self.agents.keys() if a != sender]
        
        for target in targets:
            msg = AgentMessage(
                msg_id=str(uuid.uuid4()),
                sender=sender,
                receiver=target,
                msg_type=MessageType.TASK,
                content=content
            )
            await self.send(msg)
    
    def subscribe(self, agent_id: str, topic: str):
        """订阅主题"""
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(agent_id)
    
    async def receive(self, agent_id: str) -> Optional[AgentMessage]:
        """接收消息(非阻塞)"""
        if not self.inbox.get(agent_id):
            return None
        
        return self.inbox[agent_id].pop(0)
    
    def get_task_status(self, task_id: str) -> Optional[Dict]:
        return self.pending_tasks.get(task_id)


class MultiAgentOrchestrator:
    """
    多Agent编排器
    模式: 层级式编排 + 结果聚合
    """
    def __init__(self, message_bus: AgentMessageBus):
        self.bus = message_bus
        self.supervisor: Optional[BaseAgent] = None
        self.worker_agents: Dict[str, BaseAgent] = {}
        self.task_results: Dict[str, Any] = {}
    
    def set_supervisor(self, agent: BaseAgent):
        self.supervisor = agent
    
    def add_worker(self, name: str, agent: BaseAgent):
        self.worker_agents[name] = agent
        self.bus.register(agent)
    
    async def run_hierarchical_task(self, task: str) -> str:
        """
        层级式任务执行
        1. Supervisor分析任务
        2. 分发给多个Worker
        3. 聚合结果
        """
        task_id = str(uuid.uuid4())
        
        # ===== Step 1: Supervisor分析并规划 =====
        plan = self.supervisor.plan_action(task)
        
        # ===== Step 2: 分发子任务给Worker =====
        worker_tasks = {}
        for step in plan:
            # 根据步骤类型选择合适的Worker
            worker_name = self._route_to_worker(step)
            if worker_name not in worker_tasks:
                worker_tasks[worker_name] = []
            worker_tasks[worker_name].append(step)
        
        # 并行执行各Worker的任务
        async def run_worker_tasks(name: str, steps: List[PlanStep]):
            agent = self.worker_agents[name]
            results = []
            for step in steps:
                result = agent.execute_step(step)
                results.append({"step": step.description, "result": result})
            self.task_results[name] = results
        
        # 并发执行所有Worker
        await asyncio.gather(*[
            run_worker_tasks(name, steps)
            for name, steps in worker_tasks.items()
        ])
        
        # ===== Step 3: Supervisor聚合结果 =====
        all_results = json.dumps(self.task_results, ensure_ascii=False)
        final_prompt = f"原始任务: {task}\n\n各Worker结果:\n{all_results}\n\n请综合所有结果,给出最终回答。"
        
        final_response = self.supervisor.model.chat([
            {"role": "user", "content": final_prompt}
        ])
        
        return final_response.content
    
    def _route_to_worker(self, step: PlanStep) -> str:
        """根据步骤类型路由到合适的Worker"""
        tool_name = step.tool_name or ""
        
        routing_rules = {
            "data": ["query", "search", "fetch", "get", "calculate"],
            "code": ["write", "execute", "run", "compile", "test"],
            "doc": ["write", "format", "export", "generate_report"],
            "review": ["check", "validate", "review", "audit"],
        }
        
        for worker, keywords in routing_rules.items():
            if any(kw in tool_name.lower() for kw in keywords):
                return worker
        
        return list(self.worker_agents.keys())[0]  # 默认第一个


class MultiAgentDebate:
    """
    多Agent辩论框架
    Agent A (正方) vs Agent B (反方) → 裁判仲裁
    """
    def __init__(self, judge_model: Any, num_rounds: int = 3):
        self.judge = judge_model
        self.num_rounds = num_rounds
        self.positive_agent: Optional[BaseAgent] = None
        self.negative_agent: Optional[BaseAgent] = None
    
    def set_agents(self, positive: BaseAgent, negative: BaseAgent):
        self.positive_agent = positive
        self.negative_agent = negative
    
    def run_debate(self, topic: str) -> Dict:
        """运行辩论,返回最终裁决"""
        context = {
            "topic": topic,
            "rounds": [],
            "verdict": ""
        }
        
        for round_num in range(1, self.num_rounds + 1):
            # 正方发言
            pos_response = self.positive_agent.run(
                f"辩论主题: {topic}\n"
                f"你是正方,请提出支持该观点的最强论据。\n"
                f"这是第{round_num}轮,请针对之前的讨论深化论点。"
            )
            
            # 反方发言
            neg_response = self.negative_agent.run(
                f"辩论主题: {topic}\n"
                f"你是反方,请提出反对该观点的最强论据。\n"
                f"这是第{round_num}轮,请针对正方的论点进行反驳。"
            )
            
            context["rounds"].append({
                "round": round_num,
                "positive": pos_response,
                "negative": neg_response
            })
        
        # 裁判裁决
        debate_summary = "\n\n".join([
            f"第{r['round']}轮:\n正方: {r['positive']}\n反方: {r['negative']}"
            for r in context["rounds"]
        ])
        
        judge_prompt = f"""辩论主题: {topic}

辩论内容:
{debate_summary}

作为裁判,请:
1. 总结正方和反方的核心论点
2. 指出哪方的论据更有说服力
3. 给出你的最终裁决和理由

请用JSON格式返回:
{{
  "positive_summary": "正方核心论点",
  "negative_summary": "反方核心论点",
  "winner": "positive/negative/neutral",
  "reasoning": "裁决理由"
}}"""
        
        verdict_response = self.judge.chat([{"role": "user", "content": judge_prompt}])
        
        try:
            context["verdict"] = json.loads(verdict_response.content)
        except:
            context["verdict"] = {"winner": "neutral", "reasoning": "解析失败"}
        
        return context

6. 工具生态:Function Calling实战

6.1 主流框架对比

框架 定位 特点 适用场景
LangChain 全栈框架 组件丰富,但较重 快速原型
LangGraph 图编排 状态机式的Agent流程 复杂工作流
AutoGen 多Agent 微软出品,对话式协作 多Agent系统
CrewAI 多Agent 角色分工清晰 企业场景
自研 定制 完全可控,无依赖 生产环境

6.2 OpenAI Function Calling

import openai
from typing import List, Optional

class OpenAIFunctionAgent(BaseAgent):
    """基于OpenAI Function Calling的Agent"""
    
    def __init__(self, name: str, api_key: str, model: str = "gpt-4o"):
        super().__init__(name)
        self.client = openai.OpenAI(api_key=api_key)
        self.model = model
    
    def run(self, user_input: str, max_turns: int = 10) -> str:
        """Function Calling主循环"""
        messages = [
            {"role": "system", "content": f"你是{self.name},一个AI助手。"}
        ]
        messages.append({"role": "user", "content": user_input})
        
        for turn in range(max_turns):
            # 调用OpenAI,附带工具定义
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.get_tool_schemas(),
                tool_choice="auto"
            )
            
            response_message = response.choices[0].message
            
            # 检查是否需要调用工具
            if response_message.tool_calls:
                messages.append({
                    "role": "assistant",
                    "content": response_message.content or "",
                    "tool_calls": [
                        {
                            "id": tc.id,
                            "type": "function",
                            "function": {
                                "name": tc.function.name,
                                "arguments": tc.function.arguments
                            }
                        }
                        for tc in response_message.tool_calls
                    ]
                })
                
                # 执行工具
                for tc in response_message.tool_calls:
                    tool_name = tc.function.name
                    args = json.loads(tc.function.arguments)
                    
                    result = self.execute_tool(tool_name, args)
                    
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result
                    })
            
            else:
                # 直接回答
                messages.append({
                    "role": "assistant",
                    "content": response_message.content
                })
                return response_message.content
        
        return "达到最大对话轮次"

6.3 工具调用最佳实践

# ===== 实践1: 带确认的工具调用 =====
class ConfirmedFunctionAgent(OpenAIFunctionAgent):
    """带用户确认的工具调用"""
    
    def run(self, user_input: str) -> str:
        messages = [{"role": "user", "content": user_input}]
        
        while True:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.get_tool_schemas()
            )
            
            msg = response.choices[0].message
            
            if msg.tool_calls:
                for tc in msg.tool_calls:
                    tool = self.tools.get(tc.function.name)
                    
                    if tool and tool.requires_confirmation:
                        # 需要用户确认
                        confirmation_prompt = (
                            f"即将执行: {tool.name}\n"
                            f"参数: {tc.function.arguments}\n"
                            f"确认执行? (yes/no)"
                        )
                        # 这里应暂停等待用户确认
                        confirmed = self._ask_user_confirmation(confirmation_prompt)
                        if not confirmed:
                            messages.append({
                                "role": "assistant",
                                "content": f"用户取消了工具调用: {tool.name}"
                            })
                            continue
                    
                    result = self.execute_tool(
                        tc.function.name,
                        json.loads(tc.function.arguments)
                    )
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result
                    })
            else:
                return msg.content


# ===== 实践2: 批量工具并行调用 =====
class ParallelFunctionAgent(OpenAIFunctionAgent):
    """支持批量并行工具调用的Agent"""
    
    def run(self, user_input: str) -> str:
        messages = [{"role": "user", "content": user_input}]
        
        while True:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=self.get_tool_schemas()
            )
            
            msg = response.choices[0].message
            
            if msg.tool_calls:
                # 收集所有工具调用
                tool_calls = msg.tool_calls
                messages.append({
                    "role": "assistant",
                    "content": msg.content or "",
                    "tool_calls": tool_calls
                })
                
                # 并行执行所有工具(asyncio)
                async def execute_all():
                    tasks = [
                        self.execute_tool(
                            tc.function.name,
                            json.loads(tc.function.arguments)
                        )
                        for tc in tool_calls
                    ]
                    return await asyncio.gather(*tasks)
                
                results = asyncio.run(execute_all())
                
                # 添加所有结果
                for tc, result in zip(tool_calls, results):
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": result
                    })
            
            else:
                return msg.content

7. 记忆系统深度实现

7.1 混合记忆架构

import numpy as np
from typing import List, Tuple
from datetime import datetime, timedelta

class VectorMemory:
    """
    向量记忆系统 (长期记忆)
    使用简单向量相似度(生产环境建议用Milvus/Pinecone/FAISS)
    """
    def __init__(self, embedding_model: str = "text-embedding-3-small",
                 similarity_threshold: float = 0.7):
        self.embedding_model = embedding_model
        self.similarity_threshold = similarity_threshold
        self.memories: List[Dict] = []  # 内存版存储
        
        # 简单向量数据库(生产用专用向量库)
        self.embeddings: List[np.ndarray] = []
    
    def add(self, content: str, metadata: Optional[Dict] = None) -> str:
        """添加记忆"""
        memory_id = str(uuid.uuid4())
        
        # 生成embedding
        embedding = self._get_embedding(content)
        
        self.memories.append({
            "id": memory_id,
            "content": content,
            "metadata": metadata or {},
            "created_at": datetime.now().isoformat(),
            "access_count": 0,
            "last_accessed": None
        })
        self.embeddings.append(embedding)
        
        return memory_id
    
    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """语义检索"""
        query_embedding = self._get_embedding(query)
        
        # 计算余弦相似度
        scores = []
        for emb in self.embeddings:
            score = self._cosine_similarity(query_embedding, emb)
            scores.append(score)
        
        # 排序取top_k
        top_indices = np.argsort(scores)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            if scores[idx] >= self.similarity_threshold:
                self.memories[idx]["access_count"] += 1
                self.memories[idx]["last_accessed"] = datetime.now().isoformat()
                results.append({
                    **self.memories[idx],
                    "score": float(scores[idx])
                })
        
        return results
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """调用embedding模型(这里简化)"""
        # 生产中调用OpenAI/Cohere等embedding API
        # return openai_client.embeddings.create(input=text, model=self.embedding_model)
        # 简化:返回随机向量
        return np.random.randn(1536)
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))
    
    def forget_old(self, days: int = 30, access_threshold: int = 2):
        """遗忘不常用的旧记忆"""
        cutoff = datetime.now() - timedelta(days=days)
        
        new_memories = []
        new_embeddings = []
        
        for mem, emb in zip(self.memories, self.embeddings):
            created = datetime.fromisoformat(mem["created_at"])
            if created < cutoff and mem["access_count"] < access_threshold:
                continue  # 遗忘
            new_memories.append(mem)
            new_embeddings.append(emb)
        
        self.memories = new_memories
        self.embeddings = new_embeddings


class AgentMemoryManager:
    """
    Agent记忆管理器:整合短期+长期+工作记忆
    """
    def __init__(self, vector_memory: VectorMemory):
        self.short_term: List[Message] = []  # 当前会话
        self.working: Dict[str, Any] = {}    # 当前任务工作区
        self.long_term: VectorMemory = vector_memory
        
        self.max_short_term = 50
    
    def remember(self, query: str) -> List[Dict]:
        """检索相关记忆"""
        # 1. 先查长期记忆
        relevant = self.long_term.search(query, top_k=3)
        
        # 2. 查短期记忆(关键词匹配)
        short_matches = [
            m for m in self.short_term
            if any(word in m.content for word in query.split()[:3])
        ]
        
        return {
            "long_term": relevant,
            "short_term": short_matches[-5:],  # 最近5条相关
            "working": self.working
        }
    
    def memorize(self, content: str, importance: str = "normal"):
        """存储记忆"""
        # 短期记忆
        self.short_term.append(Message(role="system", content=content))
        
        # 超过限制则压缩
        if len(self.short_term) > self.max_short_term:
            self._compress_short_term()
        
        # 高重要性存入长期
        if importance == "high":
            self.long_term.add(content, metadata={"importance": "high"})
    
    def _compress_short_term(self):
        """压缩短期记忆:摘要+关键信息"""
        if len(self.short_term) < 10:
            return
        
        # 保留最近的10条,其余摘要
        recent = self.short_term[-10:]
        old = self.short_term[:-10]
        
        summary = f"[会话摘要,{len(old)}条消息]: " + " ".join(
            m.content[:100] for m in old[:5]
        )
        
        self.long_term.add(summary, metadata={"type": "session_summary"})
        self.short_term = recent

8. 生产级部署与监控

8.1 Agent服务化部署

# FastAPI Agent服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import uvicorn

app = FastAPI(title="AI Agent Service", version="1.0")

# 全局Agent实例(生产用对象池)
agent_pool: Dict[str, BaseAgent] = {}

class TaskRequest(BaseModel):
    task: str
    agent_type: str = "react"
    max_iterations: int = 10
    user_id: Optional[str] = None

class TaskResponse(BaseModel):
    result: str
    stats: Dict
    task_id: str

@app.post("/agent/run", response_model=TaskResponse)
async def run_agent_task(request: TaskRequest):
    """运行Agent任务"""
    task_id = str(uuid.uuid4())
    
    # 选择/创建Agent
    agent = agent_pool.get(request.agent_type)
    if not agent:
        agent = create_agent(request.agent_type)
        agent_pool[request.agent_type] = agent
    
    try:
        result = agent.run(request.task, max_iterations=request.max_iterations)
        return TaskResponse(
            result=result,
            stats=agent.get_stats(),
            task_id=task_id
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/agent/{agent_type}/stats")
async def get_agent_stats(agent_type: str):
    """获取Agent运行统计"""
    agent = agent_pool.get(agent_type)
    if not agent:
        raise HTTPException(status_code=404, detail="Agent not found")
    return agent.get_stats()

@app.post("/agent/{agent_type}/tool/{tool_name}")
async def call_tool(agent_type: str, tool_name: str, args: dict):
    """直接调用工具"""
    agent = agent_pool.get(agent_type)
    if not agent:
        raise HTTPException(status_code=404, detail="Agent not found")
    
    if tool_name not in agent.tools:
        raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
    
    result = agent.execute_tool(tool_name, args)
    return {"result": result}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

8.2 Agent监控指标

# Prometheus监控指标
from prometheus_client import Counter, Histogram, Gauge

# 计数器
agent_requests_total = Counter(
    'agent_requests_total',
    'Total agent requests',
    ['agent_type', 'status']
)

tool_calls_total = Counter(
    'agent_tool_calls_total',
    'Total tool calls',
    ['tool_name', 'status']
)

# 直方图
agent_duration = Histogram(
    'agent_request_duration_seconds',
    'Agent request duration',
    ['agent_type'],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60]
)

# 仪表
active_agents = Gauge(
    'agent_active_count',
    'Number of active agents'
)

token_usage = Gauge(
    'agent_token_usage',
    'Token usage by agent',
    ['agent_type', 'token_type']  # token_type: input/output
)

class AgentMetrics:
    """Agent指标收集器"""
    
    def __init__(self, agent_name: str):
        self.agent_name = agent_name
        self.start_time = None
    
    def track_request(self):
        self.start_time = time.time()
        active_agents.inc()
    
    def track_completion(self, status: str = "success"):
        agent_requests_total.labels(
            agent_type=self.agent_name,
            status=status
        ).inc()
        
        if self.start_time:
            duration = time.time() - self.start_time
            agent_duration.labels(self.agent_name).observe(duration)
        
        active_agents.dec()
    
    def track_tool_call(self, tool_name: str, status: str):
        tool_calls_total.labels(tool_name=tool_name, status=status).inc()

9. 总结

Agent开发路线图

Level 1: 基础Agent
  - 单Agent,ReAct模式
  - 少量工具,固定工作流
  → 适合: 简单问答、客服机器人

Level 2: 规划型Agent
  - Plan-and-Execute
  - 多步骤工作流
  - 短期+长期记忆
  → 适合: 复杂业务自动化、数据处理

Level 3: 多Agent协作
  - A2A协议通信
  - 层级/同级/辩论模式
  - Agent注册与发现
  → 适合: 企业级应用、大型工作流

Level 4: 生产级Agent系统
  - 服务化部署
  - 完善的监控与可观测性
  - 限流/熔断/重试
  → 适合: 商业化产品

2026年开发者行动清单

优先级 技能 工具
⭐⭐⭐ RAG工程化 Milvus/Pinecone
⭐⭐⭐ Agent框架 LangChain/LangGraph/CrewAI
⭐⭐ 多Agent协作 A2A协议、消息总线
⭐⭐ Prompt工程 结构化输出、Few-shot
模型微调 LoRA/QLoRA
Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐