前言

💡 痛点: AI Agent Demo 很炫但上线就崩?多轮对话上下文丢失?Agent 调用工具链成功率低?生产环境成本失控?如何从 PoC 走到百万用户级部署?

🎯 解决方案: 从 Agent 架构设计→工具链编排→状态管理→成本优化→可观测性→灰度发布→多租户→生产级安全,覆盖 Agent 商业落地的每个环节。

部署层

治理层

运行层

开发层

Agent 设计
规划/记忆/工具

工具链编排
LangGraph/CrewAI

评估测试
准确率/延迟/成本

状态管理
会话/记忆/持久化

任务队列
优先级/重试

路由调度
模型路由/负载均衡

可观测性
Tracing/Metrics/日志

成本控制
Token预算/缓存/模型降级

安全护栏
输入校验/输出审核

灰度发布
A/B测试/金丝雀

弹性伸缩
HPA/KEDA

多租户
隔离/配额

2026 AI Agent 生产部署格局:

方案 适用场景 规模 成本
LangGraph Cloud 复杂多步 Agent 中大型 $0.05-0.5/次
Dify Cloud 低代码 Agent 平台 中小型 按用量
自建(FastAPI+vLLM) 完全定制 大型 自控
AWS Bedrock Agents 云原生 大型 按用量
阿里云百炼 国内企业 大型 按用量

一、Agent 生产架构设计

1.1 生产级 Agent 框架

# ===== 生产级 Agent 框架 =====

from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
from enum import Enum
import time
import asyncio
import json
from datetime import datetime
import hashlib

class AgentRole(Enum):
    PLANNER = "planner"       # 规划
    EXECUTOR = "executor"     # 执行
    REVIEWER = "reviewer"     # 审查
    ROUTER = "router"         # 路由

@dataclass
class AgentMessage:
    """Agent 消息"""
    role: str
    content: str
    tool_calls: List[Dict] = field(default_factory=list)
    tool_results: List[Dict] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: float = field(default_factory=time.time)
    token_count: int = 0
    cost_usd: float = 0.0

@dataclass
class AgentState:
    """Agent 运行状态"""
    session_id: str
    messages: List[AgentMessage] = field(default_factory=list)
    variables: Dict[str, Any] = field(default_factory=dict)
    current_step: int = 0
    max_steps: int = 10
    total_tokens: int = 0
    total_cost: float = 0.0
    status: str = "idle"  # idle / running / completed / failed / cancelled

class BaseAgent(ABC):
    """生产级 Agent 基类"""
    
    def __init__(
        self,
        name: str,
        role: AgentRole,
        model: str = "gpt-4o",
        max_tokens_per_turn: int = 4096,
        max_steps: int = 10,
        budget_usd: float = 1.0,
    ):
        self.name = name
        self.role = role
        self.model = model
        self.max_tokens = max_tokens_per_turn
        self.max_steps = max_steps
        self.budget = budget_usd
        
        # 工具注册表
        self.tools: Dict[str, Callable] = {}
        self.tool_schemas: List[Dict] = []
        
        # 钩子
        self._hooks = {
            "before_run": [],
            "after_run": [],
            "before_tool_call": [],
            "after_tool_call": [],
            "on_error": [],
        }
    
    def register_tool(
        self,
        name: str,
        func: Callable,
        description: str,
        parameters: Dict,
    ):
        """注册工具"""
        self.tools[name] = func
        self.tool_schemas.append({
            "type": "function",
            "function": {
                "name": name,
                "description": description,
                "parameters": parameters,
            }
        })
    
    def add_hook(self, event: str, callback: Callable):
        """添加生命周期钩子"""
        if event in self._hooks:
            self._hooks[event].append(callback)
    
    async def _call_hooks(self, event: str, **kwargs):
        for hook in self._hooks.get(event, []):
            await hook(self, **kwargs)
    
    @abstractmethod
    async def run(self, state: AgentState) -> AgentState:
        """运行 Agent(子类实现)"""
        pass
    
    def _check_budget(self, state: AgentState) -> bool:
        """检查预算"""
        return state.total_cost < self.budget
    
    def _check_steps(self, state: AgentState) -> bool:
        """检查步数"""
        return state.current_step < state.max_steps

# ===== 规划 Agent =====

class PlannerAgent(BaseAgent):
    """规划 Agent:将任务分解为可执行步骤"""
    
    def __init__(self, **kwargs):
        super().__init__(
            name="Planner",
            role=AgentRole.PLANNER,
            model="gpt-4o",
            **kwargs
        )
    
    async def run(self, state: AgentState) -> AgentState:
        """生成执行计划"""
        await self._call_hooks("before_run", state=state)
        
        state.status = "running"
        state.current_step += 1
        
        # 构建规划提示
        prompt = self._build_planning_prompt(state)
        
        # 调用 LLM
        response = await self._call_llm(prompt)
        
        # 解析计划
        plan = self._parse_plan(response)
        state.variables["plan"] = plan
        
        state.messages.append(AgentMessage(
            role="assistant",
            content=response,
            metadata={"type": "plan", "steps": len(plan)},
        ))
        
        state.status = "completed"
        await self._call_hooks("after_run", state=state)
        return state
    
    def _build_planning_prompt(self, state: AgentState) -> str:
        """构建规划提示"""
        
        last_user_msg = next(
            (m for m in reversed(state.messages) if m.role == "user"),
            None
        )
        
        return f"""你是一个任务规划专家。请将用户的请求分解为可执行的步骤。

用户请求:{last_user_msg.content if last_user_msg else '无'}

可用工具:{json.dumps([s["function"]["name"] for s in self.tool_schemas])}

请输出 JSON 格式的计划:
{{
    "goal": "总体目标",
    "steps": [
        {{
            "id": 1,
            "action": "步骤描述",
            "tool": "使用的工具名(可选)",
            "depends_on": [],
            "expected_output": "预期输出"
        }}
    ]
}}

只输出 JSON,不要其他内容。"""
    
    async def _call_llm(self, prompt: str) -> str:
        """调用 LLM(实际使用 OpenAI / Anthropic)"""
        # 模拟实现
        return '{"goal":"完成用户请求","steps":[{"id":1,"action":"分析需求","tool":null}]}'
    
    def _parse_plan(self, response: str) -> List[Dict]:
        """解析计划"""
        try:
            data = json.loads(response)
            return data.get("steps", [])
        except json.JSONDecodeError:
            return []

1.2 工具链编排

# ===== 工具链编排引擎 =====

from typing import Any, Dict, List, Optional
import asyncio

@dataclass
class ToolResult:
    """工具执行结果"""
    tool_name: str
    success: bool
    result: Any
    error: Optional[str] = None
    duration_ms: int = 0
    tokens_used: int = 0

class ToolChainOrchestrator:
    """工具链编排引擎"""
    
    def __init__(self):
        self.tools: Dict[str, Dict] = {}
        self.fallback_chain: Dict[str, List[str]] = {}
    
    def register(
        self,
        name: str,
        handler: Callable,
        timeout_ms: int = 30000,
        max_retries: int = 2,
        fallback: Optional[str] = None,
        rate_limit: int = 100,
    ):
        """注册工具"""
        self.tools[name] = {
            "handler": handler,
            "timeout_ms": timeout_ms,
            "max_retries": max_retries,
            "fallback": fallback,
            "rate_limit": rate_limit,
            "call_count": 0,
            "error_count": 0,
        }
    
    async def execute(
        self,
        tool_name: str,
        params: Dict[str, Any],
        context: Dict[str, Any] = None,
    ) -> ToolResult:
        """执行工具(含重试+降级)"""
        
        if tool_name not in self.tools:
            return ToolResult(
                tool_name=tool_name,
                success=False,
                result=None,
                error=f"Tool '{tool_name}' not found",
            )
        
        tool = self.tools[tool_name]
        
        # 速率限制检查
        if tool["call_count"] >= tool["rate_limit"]:
            # 尝试降级
            return await self._fallback(tool_name, params, "rate_limit_exceeded")
        
        last_error = None
        
        for attempt in range(tool["max_retries"] + 1):
            try:
                start = time.time()
                
                # 执行(带超时)
                result = await asyncio.wait_for(
                    tool["handler"](params, context or {}),
                    timeout=tool["timeout_ms"] / 1000,
                )
                
                duration_ms = int((time.time() - start) * 1000)
                tool["call_count"] += 1
                
                return ToolResult(
                    tool_name=tool_name,
                    success=True,
                    result=result,
                    duration_ms=duration_ms,
                )
            
            except asyncio.TimeoutError:
                last_error = "timeout"
            except Exception as e:
                last_error = str(e)
            
            tool["error_count"] += 1
        
        # 所有重试失败,尝试降级
        return await self._fallback(tool_name, params, last_error)
    
    async def _fallback(
        self,
        tool_name: str,
        params: Dict,
        error: str,
    ) -> ToolResult:
        """降级处理"""
        
        tool = self.tools.get(tool_name)
        if not tool or not tool["fallback"]:
            return ToolResult(
                tool_name=tool_name,
                success=False,
                result=None,
                error=error,
            )
        
        # 使用降级工具
        fallback_name = tool["fallback"]
        return await self.execute(fallback_name, params)
    
    async def execute_chain(
        self,
        steps: List[Dict],
        initial_context: Dict = None,
    ) -> List[ToolResult]:
        """执行工具链"""
        
        context = initial_context or {}
        results = []
        
        for step in steps:
            tool_name = step.get("tool")
            params = step.get("params", {})
            
            # 合并上下文变量
            params = self._resolve_params(params, context)
            
            result = await self.execute(tool_name, params, context)
            results.append(result)
            
            if result.success:
                # 将结果注入上下文
                context[step.get("output_key", tool_name)] = result.result
            else:
                # 处理失败
                if step.get("required", False):
                    break
        
        return results
    
    def _resolve_params(self, params: Dict, context: Dict) -> Dict:
        """解析参数中的变量引用"""
        resolved = {}
        
        for key, value in params.items():
            if isinstance(value, str) and value.startswith("${"):
                var_name = value[2:-1]
                resolved[key] = context.get(var_name, value)
            elif isinstance(value, dict):
                resolved[key] = self._resolve_params(value, context)
            else:
                resolved[key] = value
        
        return resolved

二、会话与状态管理

2.1 多轮会话管理

# ===== 多轮会话管理 =====

import redis.asyncio as aioredis
from typing import Optional, Dict, List
from dataclasses import dataclass, asdict
import json

@dataclass
class SessionConfig:
    max_history_turns: int = 50
    max_context_tokens: int = 100000
    session_ttl_seconds: int = 3600  # 1小时
    summary_threshold: int = 20  # 超过20轮时自动摘要

class SessionManager:
    """生产级会话管理器"""
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        config: SessionConfig = None,
    ):
        self.config = config or SessionConfig()
        self.redis: Optional[aioredis.Redis] = None
    
    async def connect(self):
        """连接 Redis"""
        self.redis = await aioredis.from_url(redis_url, decode_responses=True)
    
    async def save_message(
        self,
        session_id: str,
        message: AgentMessage,
    ):
        """保存消息到会话"""
        
        key = f"session:{session_id}:messages"
        
        msg_data = {
            "role": message.role,
            "content": message.content,
            "token_count": message.token_count,
            "cost_usd": message.cost_usd,
            "timestamp": message.timestamp,
        }
        
        await self.redis.rpush(key, json.dumps(msg_data))
        await self.redis.expire(key, self.config.session_ttl_seconds)
        
        # 更新会话元数据
        meta_key = f"session:{session_id}:meta"
        await self.redis.hincrby(meta_key, "message_count", 1)
        await self.redis.hincrbyfloat(meta_key, "total_cost", message.cost_usd)
        await self.redis.hincrby(meta_key, "total_tokens", message.token_count)
        await self.redis.expire(meta_key, self.config.session_ttl_seconds)
    
    async def get_context(
        self,
        session_id: str,
        max_tokens: int = None,
    ) -> List[Dict]:
        """获取会话上下文(自动摘要压缩)"""
        
        max_tokens = max_tokens or self.config.max_context_tokens
        key = f"session:{session_id}:messages"
        
        # 获取最近的消息
        raw_messages = await self.redis.lrange(
            key, 
            -(self.config.max_history_turns * 2),  -1
        )
        
        messages = [json.loads(m) for m in raw_messages]
        
        # Token 预估和截断
        total_tokens = sum(m.get("token_count", 0) for m in messages)
        
        if total_tokens > max_tokens:
            # 自动摘要压缩旧消息
            old_messages = messages[:len(messages)//2]
            new_messages = messages[len(messages)//2:]
            
            summary = await self._summarize(old_messages)
            
            messages = [{"role": "system", "content": summary}] + new_messages
        
        return [{"role": m["role"], "content": m["content"]} for m in messages]
    
    async def _summarize(self, messages: List[Dict]) -> str:
        """摘要压缩旧消息"""
        # 实际调用 LLM 生成摘要
        # 这里返回简化版
        return f"[之前的对话摘要:共{len(messages)}轮,涵盖{len(set(m['role'] for m in messages))}个角色]"
    
    async def get_session_stats(self, session_id: str) -> Dict:
        """获取会话统计"""
        meta_key = f"session:{session_id}:meta"
        return await self.redis.hgetall(meta_key)
    
    async def close_session(self, session_id: str):
        """关闭会话"""
        await self.redis.delete(
            f"session:{session_id}:messages",
            f"session:{session_id}:meta",
        )

2.2 长期记忆

# ===== Agent 长期记忆 =====

import hashlib
from typing import List, Optional

class AgentMemory:
    """Agent 长期记忆(基于向量数据库)"""
    
    def __init__(self, embedding_model, vector_store):
        self.embedder = embedding_model
        self.store = vector_store
    
    async def store_memory(
        self,
        session_id: str,
        content: str,
        memory_type: str = "fact",  # fact / preference / experience / skill
        importance: float = 0.5,
    ):
        """存储记忆"""
        
        # 生成嵌入
        embedding = await self.embedder.embed(content)
        
        memory_id = hashlib.md5(
            f"{session_id}:{content}".encode()
        ).hexdigest()
        
        await self.store.upsert(
            id=memory_id,
            vector=embedding,
            metadata={
                "session_id": session_id,
                "type": memory_type,
                "content": content,
                "importance": importance,
                "created_at": time.time(),
            }
        )
    
    async def recall(
        self,
        session_id: str,
        query: str,
        top_k: int = 5,
        memory_type: Optional[str] = None,
    ) -> List[Dict]:
        """检索记忆"""
        
        query_embedding = await self.embedder.embed(query)
        
        filters = {"session_id": session_id}
        if memory_type:
            filters["type"] = memory_type
        
        results = await self.store.query(
            vector=query_embedding,
            top_k=top_k,
            filters=filters,
        )
        
        return results
    
    async def forget(self, session_id: str, memory_id: str):
        """删除记忆(GDPR/遗忘权)"""
        await self.store.delete(id=memory_id)

三、成本控制与模型路由

3.1 Token 预算管理

# ===== Token 预算管理 =====

from dataclasses import dataclass
from typing import Dict, Optional

@dataclass
class ModelPricing:
    """模型定价(每1K token)"""
    name: str
    input_price_per_1k: float
    output_price_per_1k: float
    
# 2026 主流模型定价
MODEL_PRICING = {
    "gpt-4o": ModelPricing("gpt-4o", 0.005, 0.015),
    "gpt-4o-mini": ModelPricing("gpt-4o-mini", 0.00015, 0.0006),
    "claude-3.5-sonnet": ModelPricing("claude-3.5-sonnet", 0.003, 0.015),
    "claude-3-haiku": ModelPricing("claude-3-haiku", 0.00025, 0.00125),
    "deepseek-chat": ModelPricing("deepseek-chat", 0.00014, 0.00028),
    "qwen-plus": ModelPricing("qwen-plus", 0.0008, 0.002),
}

class BudgetManager:
    """Token 预算管理器"""
    
    def __init__(
        self,
        daily_budget_usd: float = 100.0,
        per_session_budget_usd: float = 0.5,
        per_user_daily_budget_usd: float = 10.0,
    ):
        self.daily_budget = daily_budget_usd
        self.per_session_budget = per_session_budget_usd
        self.per_user_budget = per_user_daily_budget_usd
        
        self.daily_spent: Dict[str, float] = {}
        self.session_spent: Dict[str, float] = {}
        self.user_spent: Dict[str, float] = {}
    
    def can_proceed(
        self,
        session_id: str,
        user_id: str,
        estimated_tokens: int,
        model: str = "gpt-4o",
    ) -> Dict:
        """检查是否可以继续"""
        
        today = datetime.now().strftime("%Y-%m-%d")
        pricing = MODEL_PRICING.get(model, MODEL_PRICING["gpt-4o"])
        
        # 估算成本(假设 input:output = 2:1)
        est_input_tokens = int(estimated_tokens * 0.67)
        est_output_tokens = int(estimated_tokens * 0.33)
        est_cost = (
            est_input_tokens / 1000 * pricing.input_price_per_1k +
            est_output_tokens / 1000 * pricing.output_price_per_1k
        )
        
        daily_used = self.daily_spent.get(today, 0)
        session_used = self.session_spent.get(session_id, 0)
        user_used = self.user_spent.get(f"{user_id}:{today}", 0)
        
        checks = {
            "daily": daily_used + est_cost <= self.daily_budget,
            "session": session_used + est_cost <= self.per_session_budget,
            "user": user_used + est_cost <= self.per_user_budget,
        }
        
        can = all(checks.values())
        
        return {
            "can_proceed": can,
            "estimated_cost": est_cost,
            "checks": checks,
            "remaining": {
                "daily": self.daily_budget - daily_used,
                "session": self.per_session_budget - session_used,
                "user": self.per_user_budget - user_used,
            },
        }
    
    def record_usage(
        self,
        session_id: str,
        user_id: str,
        input_tokens: int,
        output_tokens: int,
        model: str,
    ):
        """记录实际用量"""
        
        today = datetime.now().strftime("%Y-%m-%d")
        pricing = MODEL_PRICING.get(model, MODEL_PRICING["gpt-4o"])
        
        cost = (
            input_tokens / 1000 * pricing.input_price_per_1k +
            output_tokens / 1000 * pricing.output_price_per_1k
        )
        
        self.daily_spent[today] = self.daily_spent.get(today, 0) + cost
        self.session_spent[session_id] = self.session_spent.get(session_id, 0) + cost
        user_key = f"{user_id}:{today}"
        self.user_spent[user_key] = self.user_spent.get(user_key, 0) + cost
    
    def get_report(self) -> Dict:
        """获取预算报告"""
        today = datetime.now().strftime("%Y-%m-%d")
        return {
            "date": today,
            "daily_budget": self.daily_budget,
            "daily_spent": self.daily_spent.get(today, 0),
            "daily_remaining": self.daily_budget - self.daily_spent.get(today, 0),
            "active_sessions": len(self.session_spent),
            "active_users": len(set(k.split(":")[0] for k in self.user_spent if k.endswith(today))),
        }

3.2 智能模型路由

# ===== 智能模型路由 =====

class ModelRouter:
    """基于任务复杂度的模型路由"""
    
    def __init__(self):
        self.routes = {
            "simple": "gpt-4o-mini",      # 简单问答、格式化
            "standard": "claude-3.5-sonnet",  # 代码生成、推理
            "complex": "gpt-4o",             # 多步推理、长上下文
            "creative": "claude-3.5-sonnet",  # 创意写作
        }
        
        # 复杂度评估规则
        self.complexity_rules = {
            "simple": {
                "max_input_tokens": 500,
                "keywords": ["是什么", "翻译", "总结", "格式化", "convert"],
            },
            "standard": {
                "max_input_tokens": 8000,
                "keywords": ["写代码", "实现", "修复", "优化", "generate code"],
            },
            "complex": {
                "min_steps": 3,
                "keywords": ["分析", "架构", "系统设计", "multi-step", "复杂"],
            },
        }
    
    def route(self, task_description: str, input_tokens: int = 0) -> Dict:
        """路由到合适的模型"""
        
        desc_lower = task_description.lower()
        
        # 评估复杂度
        complexity = "standard"  # 默认
        
        for level, rules in self.complexity_rules.items():
            if any(kw in desc_lower for kw in rules.get("keywords", [])):
                complexity = level
                break
        
        # Token 数量辅助判断
        if input_tokens > 50000:
            complexity = "complex"
        elif input_tokens < 500 and complexity != "simple":
            complexity = "standard"
        
        model = self.routes[complexity]
        
        return {
            "complexity": complexity,
            "model": model,
            "estimated_speedup": {
                "simple": "10x vs complex model",
                "standard": "3x vs complex model",
                "complex": "1x (baseline)",
            }.get(complexity, ""),
        }

# ===== 语义缓存 =====

class SemanticCache:
    """语义缓存:相似问题复用答案"""
    
    def __init__(self, embedding_model, similarity_threshold: float = 0.95):
        self.embedder = embedding_model
        self.threshold = similarity_threshold
        self.cache: Dict[str, Dict] = {}
    
    async def get(self, query: str) -> Optional[Dict]:
        """查询缓存"""
        
        query_embedding = await self.embedder.embed(query)
        
        for cache_key, cached in self.cache.items():
            similarity = self._cosine_similarity(
                query_embedding, cached["embedding"]
            )
            
            if similarity >= self.threshold:
                return {
                    "answer": cached["answer"],
                    "similarity": similarity,
                    "cached_at": cached["cached_at"],
                }
        
        return None
    
    async def set(self, query: str, answer: str, ttl_seconds: int = 3600):
        """写入缓存"""
        
        embedding = await self.embedder.embed(query)
        key = hashlib.md5(query.encode()).hexdigest()
        
        self.cache[key] = {
            "embedding": embedding,
            "answer": answer,
            "cached_at": time.time(),
            "ttl": ttl_seconds,
        }
    
    def _cosine_similarity(self, a, b) -> float:
        """余弦相似度"""
        import numpy as np
        a, b = np.array(a), np.array(b)
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

四、可观测性

4.1 分布式追踪

# ===== Agent 分布式追踪 =====

import uuid
import time
from contextlib import asynccontextmanager
from typing import Generator, Optional

@dataclass
class Span:
    """追踪 Span"""
    trace_id: str
    span_id: str
    parent_id: Optional[str]
    name: str
    start_time: float
    end_time: Optional[float] = None
    attributes: Dict[str, Any] = None
    status: str = "ok"  # ok / error
    events: List[Dict] = None
    
    @property
    def duration_ms(self) -> int:
        if self.end_time:
            return int((self.end_time - self.start_time) * 1000)
        return 0

class AgentTracer:
    """Agent 追踪器(OpenTelemetry 兼容)"""
    
    def __init__(self, service_name: str = "ai-agent"):
        self.service_name = service_name
        self.active_spans: Dict[str, Span] = {}
        self.exported_spans: List[Span] = []
    
    @asynccontextmanager
    async def trace(
        self,
        name: str,
        attributes: Dict = None,
        parent_span: Span = None,
    ) -> Generator[Span, None, None]:
        """创建追踪上下文"""
        
        trace_id = parent_span.trace_id if parent_span else str(uuid.uuid4())[:16]
        span_id = str(uuid.uuid4())[:16]
        
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_id=parent_span.span_id if parent_span else None,
            name=name,
            start_time=time.time(),
            attributes=attributes or {},
        )
        
        self.active_spans[span_id] = span
        
        try:
            yield span
            span.status = "ok"
        except Exception as e:
            span.status = "error"
            span.events = [{"name": "error", "attributes": {"error": str(e)}}]
            raise
        finally:
            span.end_time = time.time()
            del self.active_spans[span_id]
            self.exported_spans.append(span)
    
    def get_trace(self, trace_id: str) -> List[Span]:
        """获取完整调用链"""
        return [
            s for s in self.exported_spans
            if s.trace_id == trace_id
        ]
    
    def get_dashboard_data(self) -> Dict:
        """获取追踪仪表板数据"""
        
        total_spans = len(self.exported_spans)
        error_spans = sum(1 for s in self.exported_spans if s.status == "error")
        
        # 按名称统计
        by_name = {}
        for span in self.exported_spans:
            if span.name not in by_name:
                by_name[span.name] = {"count": 0, "total_ms": 0, "errors": 0}
            by_name[span.name]["count"] += 1
            by_name[span.name]["total_ms"] += span.duration_ms
            if span.status == "error":
                by_name[span.name]["errors"] += 1
        
        # 计算平均延迟
        for name, stats in by_name.items():
            stats["avg_ms"] = stats["total_ms"] // stats["count"] if stats["count"] else 0
        
        return {
            "total_spans": total_spans,
            "error_rate": error_spans / total_spans if total_spans else 0,
            "by_operation": by_name,
        }

4.2 指标收集

# ===== Agent 指标收集 =====

from collections import defaultdict
import statistics

class AgentMetrics:
    """Agent 运行指标"""
    
    def __init__(self):
        self.counters = defaultdict(int)
        self.histograms = defaultdict(list)
        self.gauges = defaultdict(float)
    
    def inc(self, name: str, value: int = 1):
        """计数器"""
        self.counters[name] += value
    
    def observe(self, name: str, value: float):
        """直方图"""
        self.histograms[name].append(value)
        # 保留最近 10000 个
        if len(self.histograms[name]) > 10000:
            self.histograms[name] = self.histograms[name][-5000:]
    
    def set_gauge(self, name: str, value: float):
        """仪表"""
        self.gauges[name] = value
    
    def get_summary(self, metric_name: str) -> Dict:
        """获取指标摘要"""
        
        if metric_name in self.histograms:
            values = self.histograms[metric_name]
            return {
                "count": len(values),
                "min": min(values),
                "max": max(values),
                "avg": statistics.mean(values),
                "p50": statistics.median(values),
                "p95": sorted(values)[int(len(values) * 0.95)],
                "p99": sorted(values)[int(len(values) * 0.99)],
            }
        
        return {"value": self.counters.get(metric_name, 0)}
    
    def get_all_metrics(self) -> Dict:
        """获取所有指标"""
        return {
            "counters": dict(self.counters),
            "gauges": dict(self.gauges),
            "summaries": {
                name: self.get_summary(name)
                for name in self.histograms
            },
        }

# ===== 关键指标定义 =====

"""
Agent 核心指标:

# 延迟
agent.request.latency       # 请求延迟(ms)
agent.llm.latency          # LLM 调用延迟
agent.tool.latency         # 工具调用延迟
agent.e2e.latency          # 端到端延迟

# 吞吐量
agent.requests.total       # 总请求数
agent.requests.success     # 成功请求数
agent.requests.failed      # 失败请求数

# 质量
agent.task.success_rate    # 任务完成率
agent.tool.success_rate    # 工具调用成功率
agent.user.satisfaction    # 用户满意度

# 成本
agent.cost.total           # 总成本
agent.cost.per_request     # 单请求成本
agent.cost.tokens.total    # 总 Token 数

# 资源
agent.active.sessions      # 活跃会话数
agent.queue.depth          # 队列深度
agent.memory.usage         # 内存使用
"""

五、灰度发布与弹性伸缩

5.1 A/B 测试与金丝雀发布

# ===== Agent 灰度发布 =====

import random
import hashlib
from typing import Dict, Optional

class CanaryDeployer:
    """金丝雀发布器"""
    
    def __init__(self):
        self.experiments: Dict[str, Dict] = {}
    
    def create_experiment(
        self,
        name: str,
        control_config: Dict,  # 对照组配置
        treatment_config: Dict,  # 实验组配置
        canary_percent: float = 0.1,  # 金丝雀比例
        criteria: str = "user_id",  # 分流维度
    ):
        """创建实验"""
        
        self.experiments[name] = {
            "control": control_config,
            "treatment": treatment_config,
            "canary_percent": canary_percent,
            "criteria": criteria,
            "stats": {
                "control": {"requests": 0, "success": 0, "cost": 0},
                "treatment": {"requests": 0, "success": 0, "cost": 0},
            },
        }
    
    def get_config(
        self,
        experiment_name: str,
        identifier: str,
    ) -> Dict:
        """获取分配的配置"""
        
        exp = self.experiments.get(experiment_name)
        if not exp:
            return {}
        
        # 一致性哈希分流
        hash_val = int(hashlib.md5(identifier.encode()).hexdigest(), 16)
        bucket = (hash_val % 100) / 100
        
        if bucket < exp["canary_percent"]:
            return exp["treatment"]
        else:
            return exp["control"]
    
    def record_result(
        self,
        experiment_name: str,
        group: str,
        success: bool,
        cost: float,
    ):
        """记录实验结果"""
        
        exp = self.experiments.get(experiment_name)
        if exp:
            stats = exp["stats"][group]
            stats["requests"] += 1
            if success:
                stats["success"] += 1
            stats["cost"] += cost
    
    def get_experiment_report(self, name: str) -> Dict:
        """获取实验报告"""
        
        exp = self.experiments.get(name)
        if not exp:
            return {}
        
        report = {"experiment": name, "groups": {}}
        
        for group, stats in exp["stats"].items():
            success_rate = stats["success"] / stats["requests"] if stats["requests"] else 0
            avg_cost = stats["cost"] / stats["requests"] if stats["requests"] else 0
            
            report["groups"][group] = {
                "requests": stats["requests"],
                "success_rate": f"{success_rate:.2%}",
                "avg_cost": f"${avg_cost:.4f}",
            }
        
        # 统计显著性
        control = exp["stats"]["control"]
        treatment = exp["stats"]["treatment"]
        
        report["recommendation"] = self._evaluate(
            control, treatment, exp["canary_percent"]
        )
        
        return report
    
    def _evaluate(self, control: Dict, treatment: Dict, canary_pct: float) -> str:
        """评估是否扩量"""
        
        if treatment["requests"] < 100:
            return "INSUFFICIENT_DATA"
        
        t_success = treatment["success"] / treatment["requests"]
        c_success = control["success"] / control["requests"]
        
        t_avg_cost = treatment["cost"] / treatment["requests"]
        c_avg_cost = control["cost"] / control["requests"]
        
        # 成功率提升且成本不增加 → 扩量
        if t_success >= c_success and t_avg_cost <= c_avg_cost * 1.1:
            if t_success > c_success * 1.05:
                return "EXPAND_TO_50%"
            return "EXPAND_TO_30%"
        
        # 成功率下降 → 回滚
        if t_success < c_success * 0.95:
            return "ROLLBACK"
        
        return "CONTINUE_MONITORING"

5.2 弹性伸缩

# ===== 弹性伸缩(Kubernetes + KEDA)=====

class AutoScaler:
    """Agent 弹性伸缩器"""
    
    def __init__(self):
        self.min_replicas = 1
        self.max_replicas = 100
        self.target_queue_depth = 10  # 目标队列深度
        self.scale_up_cooldown = 60  # 扩容冷却(秒)
        self.scale_down_cooldown = 300  # 缩容冷却(秒)
        self.last_scale_time = 0
    
    def calculate_replicas(
        self,
        current_replicas: int,
        queue_depth: int,
        avg_latency_ms: float,
        cpu_usage: float,
    ) -> int:
        """计算目标副本数"""
        
        now = time.time()
        
        # 基于队列深度
        queue_based = max(
            self.min_replicas,
            int(queue_depth / self.target_queue_depth)
        )
        
        # 基于延迟
        latency_threshold_ms = 5000
        if avg_latency_ms > latency_threshold_ms:
            latency_based = current_replicas * 2
        else:
            latency_based = current_replicas
        
        # 基于 CPU
        cpu_threshold = 0.7
        if cpu_usage > cpu_threshold:
            cpu_based = int(current_replicas * cpu_usage / cpu_threshold)
        else:
            cpu_based = int(current_replicas * 0.8)
        
        # 取最大值
        target = max(queue_based, latency_based, cpu_based)
        target = min(target, self.max_replicas)
        target = max(target, self.min_replicas)
        
        return target
    
    def should_scale(
        self,
        current: int,
        target: int,
    ) -> Dict:
        """判断是否需要伸缩"""
        
        now = time.time()
        
        if current == target:
            return {"action": "none", "reason": "already at target"}
        
        cooldown = self.scale_up_cooldown if target > current else self.scale_down_cooldown
        
        if now - self.last_scale_time < cooldown:
            return {"action": "none", "reason": "cooldown"}
        
        self.last_scale_time = now
        
        return {
            "action": "scale_up" if target > current else "scale_down",
            "current": current,
            "target": target,
        }

# ===== KEDA 配置示例 =====

"""
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: ai-agent-scaler
spec:
  scaleTargetRef:
    name: ai-agent-deployment
  minReplicaCount: 1
  maxReplicaCount: 100
  triggers:
    - type: redis
      metadata:
        host: redis-master
        port: "6379"
        listName: agent:task-queue
        listLength: "10"  # 每个队列深度10个任务对应1个Pod
    - type: prometheus
      metadata:
        serverAddress: http://prometheus:9090
        metricName: agent_request_latency_p95
        threshold: "5000"
        query: histogram_quantile(0.95, agent_request_latency_seconds_bucket)
"""

六、多租户与安全

6.1 多租户隔离

# ===== 多租户管理 =====

@dataclass
class TenantConfig:
    """租户配置"""
    tenant_id: str
    name: str
    plan: str  # free / pro / enterprise
    max_daily_requests: int
    max_concurrent_sessions: int
    allowed_models: List[str]
    daily_budget_usd: float
    features: List[str]

class MultiTenantManager:
    """多租户管理器"""
    
    def __init__(self):
        self.tenants: Dict[str, TenantConfig] = {}
        self.usage: Dict[str, Dict] = {}
    
    def register_tenant(self, config: TenantConfig):
        """注册租户"""
        self.tenants[config.tenant_id] = config
        self.usage[config.tenant_id] = {
            "daily_requests": 0,
            "daily_cost": 0.0,
            "active_sessions": 0,
        }
    
    def authorize(
        self,
        tenant_id: str,
        model: str,
    ) -> Dict:
        """授权检查"""
        
        tenant = self.tenants.get(tenant_id)
        if not tenant:
            return {"authorized": False, "reason": "tenant_not_found"}
        
        today = datetime.now().strftime("%Y-%m-%d")
        usage = self.usage.get(tenant_id, {})
        
        # 检查日配额
        if usage.get("daily_requests", 0) >= tenant.max_daily_requests:
            return {"authorized": False, "reason": "daily_quota_exceeded"}
        
        # 检查预算
        if usage.get("daily_cost", 0) >= tenant.daily_budget_usd:
            return {"authorized": False, "reason": "budget_exceeded"}
        
        # 检查模型权限
        if model not in tenant.allowed_models:
            return {"authorized": False, "reason": f"model {model} not allowed"}
        
        return {"authorized": True, "config": tenant}
    
    def record_request(
        self,
        tenant_id: str,
        cost: float,
    ):
        """记录请求"""
        if tenant_id in self.usage:
            self.usage[tenant_id]["daily_requests"] += 1
            self.usage[tenant_id]["daily_cost"] += cost
    
    def get_tenant_report(self, tenant_id: str) -> Dict:
        """获取租户报告"""
        tenant = self.tenants.get(tenant_id)
        usage = self.usage.get(tenant_id, {})
        
        return {
            "tenant": tenant.name if tenant else "unknown",
            "plan": tenant.plan if tenant else "unknown",
            "usage": {
                "daily_requests": usage.get("daily_requests", 0),
                "daily_cost": f"${usage.get('daily_cost', 0):.2f}",
                "budget_remaining": f"${tenant.daily_budget_usd - usage.get('daily_cost', 0):.2f}" if tenant else "N/A",
            }
        }

# ===== 租户数据隔离 =====

class TenantDataIsolator:
    """租户数据隔离(基于命名空间)"""
    
    def get_namespace(self, tenant_id: str) -> str:
        return f"tenant:{tenant_id}"
    
    async def store(
        self,
        tenant_id: str,
        key: str,
        value: Any,
    ):
        """存储数据(隔离命名空间)"""
        namespace = self.get_namespace(tenant_id)
        full_key = f"{namespace}:{key}"
        # 实际使用 Redis / DB 分区
        # await self.redis.set(full_key, json.dumps(value))
    
    async def retrieve(
        self,
        tenant_id: str,
        key: str,
    ) -> Optional[Any]:
        """检索数据"""
        namespace = self.get_namespace(tenant_id)
        full_key = f"{namespace}:{key}"
        # return await self.redis.get(full_key)
        return None

6.2 生产安全护栏

# ===== 生产安全护栏 =====

class ProductionGuardrails:
    """生产环境安全护栏"""
    
    def __init__(self):
        self.content_filter = ContentFilter()
        self.rate_limiter = RateLimiter()
        self.input_validator = InputValidator()
    
    async def check_input(
        self,
        user_input: str,
        user_id: str,
    ) -> Dict:
        """输入检查"""
        
        checks = {}
        
        # 1. 内容安全
        content_result = self.content_filter.check(user_input)
        checks["content_safety"] = content_result
        
        # 2. 速率限制
        rate_result = self.rate_limiter.check(user_id)
        checks["rate_limit"] = rate_result
        
        # 3. 输入验证
        input_result = self.input_validator.validate(user_input)
        checks["input_validation"] = input_result
        
        # 汇总
        all_passed = all(
            c.get("passed", True) for c in checks.values()
        )
        
        return {
            "passed": all_passed,
            "checks": checks,
            "blocked_reason": next(
                (f"{k}: {v.get('reason', '')}" for k, v in checks.items() if not v.get("passed", True)),
                None
            ),
        }
    
    async def check_output(self, output: str) -> Dict:
        """输出检查"""
        
        # PII 检测
        pii_found = self.content_filter.detect_pii(output)
        
        # 有害内容
        harmful = self.content_filter.detect_harmful(output)
        
        return {
            "passed": not pii_found and not harmful,
            "pii_detected": pii_found,
            "harmful_detected": harmful,
            "action": "redact" if pii_found else ("block" if harmful else "allow"),
        }

class RateLimiter:
    """滑动窗口速率限制"""
    
    def __init__(self, max_requests: int = 60, window_seconds: int = 60):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests: Dict[str, List[float]] = {}
    
    def check(self, identifier: str) -> Dict:
        now = time.time()
        
        if identifier not in self.requests:
            self.requests[identifier] = []
        
        # 清理过期记录
        self.requests[identifier] = [
            t for t in self.requests[identifier]
            if now - t < self.window
        ]
        
        count = len(self.requests[identifier])
        
        if count >= self.max_requests:
            return {
                "passed": False,
                "reason": f"rate limit: {count}/{self.max_requests}",
            }
        
        self.requests[identifier].append(now)
        return {"passed": True, "remaining": self.max_requests - count - 1}

class ContentFilter:
    """内容安全过滤"""
    
    def check(self, text: str) -> Dict:
        # 简化版内容检查
        harmful_keywords = ["暴力", "非法", "毒品"]
        found = [kw for kw in harmful_keywords if kw in text]
        
        return {
            "passed": len(found) == 0,
            "reason": f"harmful keywords: {found}" if found else None,
        }
    
    def detect_pii(self, text: str) -> bool:
        import re
        patterns = [r"\b\d{16}\b", r"\b\d{3}-\d{2}-\d{4}\b"]
        return any(re.search(p, text) for p in patterns)
    
    def detect_harmful(self, text: str) -> bool:
        return "harmful_keyword" in text.lower()

七、完整部署示例

7.1 Docker Compose 部署

# docker-compose.yml - AI Agent 生产部署

version: "3.9"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://agent:password@db:5432/agent_db
      - REDIS_URL=redis://redis:6379
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    deploy:
      resources:
        limits:
          cpus: "2"
          memory: 4G
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
  
  worker:
    build: .
    command: ["python", "-m", "agent.worker"]
    environment:
      - DATABASE_URL=postgresql://agent:password@db:5432/agent_db
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    deploy:
      replicas: 2
      resources:
        limits:
          cpus: "1"
          memory: 2G
    restart: unless-stopped
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
  
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: agent_db
      POSTGRES_USER: agent
      POSTGRES_PASSWORD: password
    volumes:
      - pg_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U agent"]
      interval: 10s
  
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  redis_data:
  pg_data:
  grafana_data:

总结

Agent 生产部署 Checklist

□ 架构设计
  □ Agent 基类(状态/工具/钩子)
  □ 规划/执行/审查 Agent 角色分离
  □ 工具链编排(重试+降级)

□ 状态管理
  □ 多轮会话(Redis + 自动摘要)
  □ 长期记忆(向量数据库)
  □ 会话隔离

□ 成本控制
  □ Token 预算(日/会话/用户)
  □ 智能模型路由(简单/标准/复杂)
  □ 语义缓存

□ 可观测性
  □ 分布式追踪(OpenTelemetry)
  □ 指标收集(延迟/吞吐/成本/质量)
  □ 日志聚合

□ 灰度发布
  □ A/B 测试(一致性哈希分流)
  □ 金丝雀发布(自动扩量/回滚)
  □ 弹性伸缩(KEDA + Redis 队列)

□ 多租户
  □ 租户隔离(命名空间)
  □ 配额管理(请求/预算/模型)
  □ 数据隔离

□ 安全
  □ 输入验证(内容安全+速率限制)
  □ 输出审核(PII检测+有害内容)
  □ 权限控制(零信任)

技术选型

组件 推荐方案
Agent 框架 LangGraph / 自建 FastAPI
状态存储 Redis(会话)+ PostgreSQL(持久化)
记忆 ChromaDB / Milvus / Pinecone
追踪 OpenTelemetry + Jaeger
指标 Prometheus + Grafana
伸缩 Kubernetes + KEDA
缓存 Redis + 语义缓存

本文涵盖 AI Agent 商业落地完整链路:生产级 Agent 框架(BaseAgent + Planner + 工具链编排)+ 会话与状态管理(Redis 会话 + 长期记忆向量检索)+ 成本控制(Token 预算 + 智能模型路由 + 语义缓存)+ 可观测性(分布式追踪 + 指标收集)+ 灰度发布(A/B 测试 + 金丝雀 + KEDA 弹性伸缩)+ 多租户与安全(命名空间隔离 + 配额管理 + 安全护栏)。

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐