AI Agent 商业落地:从开发到生产部署全链路
·
前言
💡 痛点: AI Agent Demo 很炫但上线就崩?多轮对话上下文丢失?Agent 调用工具链成功率低?生产环境成本失控?如何从 PoC 走到百万用户级部署?
🎯 解决方案: 从 Agent 架构设计→工具链编排→状态管理→成本优化→可观测性→灰度发布→多租户→生产级安全,覆盖 Agent 商业落地的每个环节。
2026 AI Agent 生产部署格局:
| 方案 | 适用场景 | 规模 | 成本 |
|---|---|---|---|
| LangGraph Cloud | 复杂多步 Agent | 中大型 | $0.05-0.5/次 |
| Dify Cloud | 低代码 Agent 平台 | 中小型 | 按用量 |
| 自建(FastAPI+vLLM) | 完全定制 | 大型 | 自控 |
| AWS Bedrock Agents | 云原生 | 大型 | 按用量 |
| 阿里云百炼 | 国内企业 | 大型 | 按用量 |
一、Agent 生产架构设计
1.1 生产级 Agent 框架
# ===== 生产级 Agent 框架 =====
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
from enum import Enum
import time
import asyncio
import json
from datetime import datetime
import hashlib
class AgentRole(Enum):
PLANNER = "planner" # 规划
EXECUTOR = "executor" # 执行
REVIEWER = "reviewer" # 审查
ROUTER = "router" # 路由
@dataclass
class AgentMessage:
"""Agent 消息"""
role: str
content: str
tool_calls: List[Dict] = field(default_factory=list)
tool_results: List[Dict] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
token_count: int = 0
cost_usd: float = 0.0
@dataclass
class AgentState:
"""Agent 运行状态"""
session_id: str
messages: List[AgentMessage] = field(default_factory=list)
variables: Dict[str, Any] = field(default_factory=dict)
current_step: int = 0
max_steps: int = 10
total_tokens: int = 0
total_cost: float = 0.0
status: str = "idle" # idle / running / completed / failed / cancelled
class BaseAgent(ABC):
"""生产级 Agent 基类"""
def __init__(
self,
name: str,
role: AgentRole,
model: str = "gpt-4o",
max_tokens_per_turn: int = 4096,
max_steps: int = 10,
budget_usd: float = 1.0,
):
self.name = name
self.role = role
self.model = model
self.max_tokens = max_tokens_per_turn
self.max_steps = max_steps
self.budget = budget_usd
# 工具注册表
self.tools: Dict[str, Callable] = {}
self.tool_schemas: List[Dict] = []
# 钩子
self._hooks = {
"before_run": [],
"after_run": [],
"before_tool_call": [],
"after_tool_call": [],
"on_error": [],
}
def register_tool(
self,
name: str,
func: Callable,
description: str,
parameters: Dict,
):
"""注册工具"""
self.tools[name] = func
self.tool_schemas.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters,
}
})
def add_hook(self, event: str, callback: Callable):
"""添加生命周期钩子"""
if event in self._hooks:
self._hooks[event].append(callback)
async def _call_hooks(self, event: str, **kwargs):
for hook in self._hooks.get(event, []):
await hook(self, **kwargs)
@abstractmethod
async def run(self, state: AgentState) -> AgentState:
"""运行 Agent(子类实现)"""
pass
def _check_budget(self, state: AgentState) -> bool:
"""检查预算"""
return state.total_cost < self.budget
def _check_steps(self, state: AgentState) -> bool:
"""检查步数"""
return state.current_step < state.max_steps
# ===== 规划 Agent =====
class PlannerAgent(BaseAgent):
"""规划 Agent:将任务分解为可执行步骤"""
def __init__(self, **kwargs):
super().__init__(
name="Planner",
role=AgentRole.PLANNER,
model="gpt-4o",
**kwargs
)
async def run(self, state: AgentState) -> AgentState:
"""生成执行计划"""
await self._call_hooks("before_run", state=state)
state.status = "running"
state.current_step += 1
# 构建规划提示
prompt = self._build_planning_prompt(state)
# 调用 LLM
response = await self._call_llm(prompt)
# 解析计划
plan = self._parse_plan(response)
state.variables["plan"] = plan
state.messages.append(AgentMessage(
role="assistant",
content=response,
metadata={"type": "plan", "steps": len(plan)},
))
state.status = "completed"
await self._call_hooks("after_run", state=state)
return state
def _build_planning_prompt(self, state: AgentState) -> str:
"""构建规划提示"""
last_user_msg = next(
(m for m in reversed(state.messages) if m.role == "user"),
None
)
return f"""你是一个任务规划专家。请将用户的请求分解为可执行的步骤。
用户请求:{last_user_msg.content if last_user_msg else '无'}
可用工具:{json.dumps([s["function"]["name"] for s in self.tool_schemas])}
请输出 JSON 格式的计划:
{{
"goal": "总体目标",
"steps": [
{{
"id": 1,
"action": "步骤描述",
"tool": "使用的工具名(可选)",
"depends_on": [],
"expected_output": "预期输出"
}}
]
}}
只输出 JSON,不要其他内容。"""
async def _call_llm(self, prompt: str) -> str:
"""调用 LLM(实际使用 OpenAI / Anthropic)"""
# 模拟实现
return '{"goal":"完成用户请求","steps":[{"id":1,"action":"分析需求","tool":null}]}'
def _parse_plan(self, response: str) -> List[Dict]:
"""解析计划"""
try:
data = json.loads(response)
return data.get("steps", [])
except json.JSONDecodeError:
return []
1.2 工具链编排
# ===== 工具链编排引擎 =====
from typing import Any, Dict, List, Optional
import asyncio
@dataclass
class ToolResult:
"""工具执行结果"""
tool_name: str
success: bool
result: Any
error: Optional[str] = None
duration_ms: int = 0
tokens_used: int = 0
class ToolChainOrchestrator:
"""工具链编排引擎"""
def __init__(self):
self.tools: Dict[str, Dict] = {}
self.fallback_chain: Dict[str, List[str]] = {}
def register(
self,
name: str,
handler: Callable,
timeout_ms: int = 30000,
max_retries: int = 2,
fallback: Optional[str] = None,
rate_limit: int = 100,
):
"""注册工具"""
self.tools[name] = {
"handler": handler,
"timeout_ms": timeout_ms,
"max_retries": max_retries,
"fallback": fallback,
"rate_limit": rate_limit,
"call_count": 0,
"error_count": 0,
}
async def execute(
self,
tool_name: str,
params: Dict[str, Any],
context: Dict[str, Any] = None,
) -> ToolResult:
"""执行工具(含重试+降级)"""
if tool_name not in self.tools:
return ToolResult(
tool_name=tool_name,
success=False,
result=None,
error=f"Tool '{tool_name}' not found",
)
tool = self.tools[tool_name]
# 速率限制检查
if tool["call_count"] >= tool["rate_limit"]:
# 尝试降级
return await self._fallback(tool_name, params, "rate_limit_exceeded")
last_error = None
for attempt in range(tool["max_retries"] + 1):
try:
start = time.time()
# 执行(带超时)
result = await asyncio.wait_for(
tool["handler"](params, context or {}),
timeout=tool["timeout_ms"] / 1000,
)
duration_ms = int((time.time() - start) * 1000)
tool["call_count"] += 1
return ToolResult(
tool_name=tool_name,
success=True,
result=result,
duration_ms=duration_ms,
)
except asyncio.TimeoutError:
last_error = "timeout"
except Exception as e:
last_error = str(e)
tool["error_count"] += 1
# 所有重试失败,尝试降级
return await self._fallback(tool_name, params, last_error)
async def _fallback(
self,
tool_name: str,
params: Dict,
error: str,
) -> ToolResult:
"""降级处理"""
tool = self.tools.get(tool_name)
if not tool or not tool["fallback"]:
return ToolResult(
tool_name=tool_name,
success=False,
result=None,
error=error,
)
# 使用降级工具
fallback_name = tool["fallback"]
return await self.execute(fallback_name, params)
async def execute_chain(
self,
steps: List[Dict],
initial_context: Dict = None,
) -> List[ToolResult]:
"""执行工具链"""
context = initial_context or {}
results = []
for step in steps:
tool_name = step.get("tool")
params = step.get("params", {})
# 合并上下文变量
params = self._resolve_params(params, context)
result = await self.execute(tool_name, params, context)
results.append(result)
if result.success:
# 将结果注入上下文
context[step.get("output_key", tool_name)] = result.result
else:
# 处理失败
if step.get("required", False):
break
return results
def _resolve_params(self, params: Dict, context: Dict) -> Dict:
"""解析参数中的变量引用"""
resolved = {}
for key, value in params.items():
if isinstance(value, str) and value.startswith("${"):
var_name = value[2:-1]
resolved[key] = context.get(var_name, value)
elif isinstance(value, dict):
resolved[key] = self._resolve_params(value, context)
else:
resolved[key] = value
return resolved
二、会话与状态管理
2.1 多轮会话管理
# ===== 多轮会话管理 =====
import redis.asyncio as aioredis
from typing import Optional, Dict, List
from dataclasses import dataclass, asdict
import json
@dataclass
class SessionConfig:
max_history_turns: int = 50
max_context_tokens: int = 100000
session_ttl_seconds: int = 3600 # 1小时
summary_threshold: int = 20 # 超过20轮时自动摘要
class SessionManager:
"""生产级会话管理器"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
config: SessionConfig = None,
):
self.config = config or SessionConfig()
self.redis: Optional[aioredis.Redis] = None
async def connect(self):
"""连接 Redis"""
self.redis = await aioredis.from_url(redis_url, decode_responses=True)
async def save_message(
self,
session_id: str,
message: AgentMessage,
):
"""保存消息到会话"""
key = f"session:{session_id}:messages"
msg_data = {
"role": message.role,
"content": message.content,
"token_count": message.token_count,
"cost_usd": message.cost_usd,
"timestamp": message.timestamp,
}
await self.redis.rpush(key, json.dumps(msg_data))
await self.redis.expire(key, self.config.session_ttl_seconds)
# 更新会话元数据
meta_key = f"session:{session_id}:meta"
await self.redis.hincrby(meta_key, "message_count", 1)
await self.redis.hincrbyfloat(meta_key, "total_cost", message.cost_usd)
await self.redis.hincrby(meta_key, "total_tokens", message.token_count)
await self.redis.expire(meta_key, self.config.session_ttl_seconds)
async def get_context(
self,
session_id: str,
max_tokens: int = None,
) -> List[Dict]:
"""获取会话上下文(自动摘要压缩)"""
max_tokens = max_tokens or self.config.max_context_tokens
key = f"session:{session_id}:messages"
# 获取最近的消息
raw_messages = await self.redis.lrange(
key,
-(self.config.max_history_turns * 2), -1
)
messages = [json.loads(m) for m in raw_messages]
# Token 预估和截断
total_tokens = sum(m.get("token_count", 0) for m in messages)
if total_tokens > max_tokens:
# 自动摘要压缩旧消息
old_messages = messages[:len(messages)//2]
new_messages = messages[len(messages)//2:]
summary = await self._summarize(old_messages)
messages = [{"role": "system", "content": summary}] + new_messages
return [{"role": m["role"], "content": m["content"]} for m in messages]
async def _summarize(self, messages: List[Dict]) -> str:
"""摘要压缩旧消息"""
# 实际调用 LLM 生成摘要
# 这里返回简化版
return f"[之前的对话摘要:共{len(messages)}轮,涵盖{len(set(m['role'] for m in messages))}个角色]"
async def get_session_stats(self, session_id: str) -> Dict:
"""获取会话统计"""
meta_key = f"session:{session_id}:meta"
return await self.redis.hgetall(meta_key)
async def close_session(self, session_id: str):
"""关闭会话"""
await self.redis.delete(
f"session:{session_id}:messages",
f"session:{session_id}:meta",
)
2.2 长期记忆
# ===== Agent 长期记忆 =====
import hashlib
from typing import List, Optional
class AgentMemory:
"""Agent 长期记忆(基于向量数据库)"""
def __init__(self, embedding_model, vector_store):
self.embedder = embedding_model
self.store = vector_store
async def store_memory(
self,
session_id: str,
content: str,
memory_type: str = "fact", # fact / preference / experience / skill
importance: float = 0.5,
):
"""存储记忆"""
# 生成嵌入
embedding = await self.embedder.embed(content)
memory_id = hashlib.md5(
f"{session_id}:{content}".encode()
).hexdigest()
await self.store.upsert(
id=memory_id,
vector=embedding,
metadata={
"session_id": session_id,
"type": memory_type,
"content": content,
"importance": importance,
"created_at": time.time(),
}
)
async def recall(
self,
session_id: str,
query: str,
top_k: int = 5,
memory_type: Optional[str] = None,
) -> List[Dict]:
"""检索记忆"""
query_embedding = await self.embedder.embed(query)
filters = {"session_id": session_id}
if memory_type:
filters["type"] = memory_type
results = await self.store.query(
vector=query_embedding,
top_k=top_k,
filters=filters,
)
return results
async def forget(self, session_id: str, memory_id: str):
"""删除记忆(GDPR/遗忘权)"""
await self.store.delete(id=memory_id)
三、成本控制与模型路由
3.1 Token 预算管理
# ===== Token 预算管理 =====
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class ModelPricing:
"""模型定价(每1K token)"""
name: str
input_price_per_1k: float
output_price_per_1k: float
# 2026 主流模型定价
MODEL_PRICING = {
"gpt-4o": ModelPricing("gpt-4o", 0.005, 0.015),
"gpt-4o-mini": ModelPricing("gpt-4o-mini", 0.00015, 0.0006),
"claude-3.5-sonnet": ModelPricing("claude-3.5-sonnet", 0.003, 0.015),
"claude-3-haiku": ModelPricing("claude-3-haiku", 0.00025, 0.00125),
"deepseek-chat": ModelPricing("deepseek-chat", 0.00014, 0.00028),
"qwen-plus": ModelPricing("qwen-plus", 0.0008, 0.002),
}
class BudgetManager:
"""Token 预算管理器"""
def __init__(
self,
daily_budget_usd: float = 100.0,
per_session_budget_usd: float = 0.5,
per_user_daily_budget_usd: float = 10.0,
):
self.daily_budget = daily_budget_usd
self.per_session_budget = per_session_budget_usd
self.per_user_budget = per_user_daily_budget_usd
self.daily_spent: Dict[str, float] = {}
self.session_spent: Dict[str, float] = {}
self.user_spent: Dict[str, float] = {}
def can_proceed(
self,
session_id: str,
user_id: str,
estimated_tokens: int,
model: str = "gpt-4o",
) -> Dict:
"""检查是否可以继续"""
today = datetime.now().strftime("%Y-%m-%d")
pricing = MODEL_PRICING.get(model, MODEL_PRICING["gpt-4o"])
# 估算成本(假设 input:output = 2:1)
est_input_tokens = int(estimated_tokens * 0.67)
est_output_tokens = int(estimated_tokens * 0.33)
est_cost = (
est_input_tokens / 1000 * pricing.input_price_per_1k +
est_output_tokens / 1000 * pricing.output_price_per_1k
)
daily_used = self.daily_spent.get(today, 0)
session_used = self.session_spent.get(session_id, 0)
user_used = self.user_spent.get(f"{user_id}:{today}", 0)
checks = {
"daily": daily_used + est_cost <= self.daily_budget,
"session": session_used + est_cost <= self.per_session_budget,
"user": user_used + est_cost <= self.per_user_budget,
}
can = all(checks.values())
return {
"can_proceed": can,
"estimated_cost": est_cost,
"checks": checks,
"remaining": {
"daily": self.daily_budget - daily_used,
"session": self.per_session_budget - session_used,
"user": self.per_user_budget - user_used,
},
}
def record_usage(
self,
session_id: str,
user_id: str,
input_tokens: int,
output_tokens: int,
model: str,
):
"""记录实际用量"""
today = datetime.now().strftime("%Y-%m-%d")
pricing = MODEL_PRICING.get(model, MODEL_PRICING["gpt-4o"])
cost = (
input_tokens / 1000 * pricing.input_price_per_1k +
output_tokens / 1000 * pricing.output_price_per_1k
)
self.daily_spent[today] = self.daily_spent.get(today, 0) + cost
self.session_spent[session_id] = self.session_spent.get(session_id, 0) + cost
user_key = f"{user_id}:{today}"
self.user_spent[user_key] = self.user_spent.get(user_key, 0) + cost
def get_report(self) -> Dict:
"""获取预算报告"""
today = datetime.now().strftime("%Y-%m-%d")
return {
"date": today,
"daily_budget": self.daily_budget,
"daily_spent": self.daily_spent.get(today, 0),
"daily_remaining": self.daily_budget - self.daily_spent.get(today, 0),
"active_sessions": len(self.session_spent),
"active_users": len(set(k.split(":")[0] for k in self.user_spent if k.endswith(today))),
}
3.2 智能模型路由
# ===== 智能模型路由 =====
class ModelRouter:
"""基于任务复杂度的模型路由"""
def __init__(self):
self.routes = {
"simple": "gpt-4o-mini", # 简单问答、格式化
"standard": "claude-3.5-sonnet", # 代码生成、推理
"complex": "gpt-4o", # 多步推理、长上下文
"creative": "claude-3.5-sonnet", # 创意写作
}
# 复杂度评估规则
self.complexity_rules = {
"simple": {
"max_input_tokens": 500,
"keywords": ["是什么", "翻译", "总结", "格式化", "convert"],
},
"standard": {
"max_input_tokens": 8000,
"keywords": ["写代码", "实现", "修复", "优化", "generate code"],
},
"complex": {
"min_steps": 3,
"keywords": ["分析", "架构", "系统设计", "multi-step", "复杂"],
},
}
def route(self, task_description: str, input_tokens: int = 0) -> Dict:
"""路由到合适的模型"""
desc_lower = task_description.lower()
# 评估复杂度
complexity = "standard" # 默认
for level, rules in self.complexity_rules.items():
if any(kw in desc_lower for kw in rules.get("keywords", [])):
complexity = level
break
# Token 数量辅助判断
if input_tokens > 50000:
complexity = "complex"
elif input_tokens < 500 and complexity != "simple":
complexity = "standard"
model = self.routes[complexity]
return {
"complexity": complexity,
"model": model,
"estimated_speedup": {
"simple": "10x vs complex model",
"standard": "3x vs complex model",
"complex": "1x (baseline)",
}.get(complexity, ""),
}
# ===== 语义缓存 =====
class SemanticCache:
"""语义缓存:相似问题复用答案"""
def __init__(self, embedding_model, similarity_threshold: float = 0.95):
self.embedder = embedding_model
self.threshold = similarity_threshold
self.cache: Dict[str, Dict] = {}
async def get(self, query: str) -> Optional[Dict]:
"""查询缓存"""
query_embedding = await self.embedder.embed(query)
for cache_key, cached in self.cache.items():
similarity = self._cosine_similarity(
query_embedding, cached["embedding"]
)
if similarity >= self.threshold:
return {
"answer": cached["answer"],
"similarity": similarity,
"cached_at": cached["cached_at"],
}
return None
async def set(self, query: str, answer: str, ttl_seconds: int = 3600):
"""写入缓存"""
embedding = await self.embedder.embed(query)
key = hashlib.md5(query.encode()).hexdigest()
self.cache[key] = {
"embedding": embedding,
"answer": answer,
"cached_at": time.time(),
"ttl": ttl_seconds,
}
def _cosine_similarity(self, a, b) -> float:
"""余弦相似度"""
import numpy as np
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
四、可观测性
4.1 分布式追踪
# ===== Agent 分布式追踪 =====
import uuid
import time
from contextlib import asynccontextmanager
from typing import Generator, Optional
@dataclass
class Span:
"""追踪 Span"""
trace_id: str
span_id: str
parent_id: Optional[str]
name: str
start_time: float
end_time: Optional[float] = None
attributes: Dict[str, Any] = None
status: str = "ok" # ok / error
events: List[Dict] = None
@property
def duration_ms(self) -> int:
if self.end_time:
return int((self.end_time - self.start_time) * 1000)
return 0
class AgentTracer:
"""Agent 追踪器(OpenTelemetry 兼容)"""
def __init__(self, service_name: str = "ai-agent"):
self.service_name = service_name
self.active_spans: Dict[str, Span] = {}
self.exported_spans: List[Span] = []
@asynccontextmanager
async def trace(
self,
name: str,
attributes: Dict = None,
parent_span: Span = None,
) -> Generator[Span, None, None]:
"""创建追踪上下文"""
trace_id = parent_span.trace_id if parent_span else str(uuid.uuid4())[:16]
span_id = str(uuid.uuid4())[:16]
span = Span(
trace_id=trace_id,
span_id=span_id,
parent_id=parent_span.span_id if parent_span else None,
name=name,
start_time=time.time(),
attributes=attributes or {},
)
self.active_spans[span_id] = span
try:
yield span
span.status = "ok"
except Exception as e:
span.status = "error"
span.events = [{"name": "error", "attributes": {"error": str(e)}}]
raise
finally:
span.end_time = time.time()
del self.active_spans[span_id]
self.exported_spans.append(span)
def get_trace(self, trace_id: str) -> List[Span]:
"""获取完整调用链"""
return [
s for s in self.exported_spans
if s.trace_id == trace_id
]
def get_dashboard_data(self) -> Dict:
"""获取追踪仪表板数据"""
total_spans = len(self.exported_spans)
error_spans = sum(1 for s in self.exported_spans if s.status == "error")
# 按名称统计
by_name = {}
for span in self.exported_spans:
if span.name not in by_name:
by_name[span.name] = {"count": 0, "total_ms": 0, "errors": 0}
by_name[span.name]["count"] += 1
by_name[span.name]["total_ms"] += span.duration_ms
if span.status == "error":
by_name[span.name]["errors"] += 1
# 计算平均延迟
for name, stats in by_name.items():
stats["avg_ms"] = stats["total_ms"] // stats["count"] if stats["count"] else 0
return {
"total_spans": total_spans,
"error_rate": error_spans / total_spans if total_spans else 0,
"by_operation": by_name,
}
4.2 指标收集
# ===== Agent 指标收集 =====
from collections import defaultdict
import statistics
class AgentMetrics:
"""Agent 运行指标"""
def __init__(self):
self.counters = defaultdict(int)
self.histograms = defaultdict(list)
self.gauges = defaultdict(float)
def inc(self, name: str, value: int = 1):
"""计数器"""
self.counters[name] += value
def observe(self, name: str, value: float):
"""直方图"""
self.histograms[name].append(value)
# 保留最近 10000 个
if len(self.histograms[name]) > 10000:
self.histograms[name] = self.histograms[name][-5000:]
def set_gauge(self, name: str, value: float):
"""仪表"""
self.gauges[name] = value
def get_summary(self, metric_name: str) -> Dict:
"""获取指标摘要"""
if metric_name in self.histograms:
values = self.histograms[metric_name]
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": statistics.mean(values),
"p50": statistics.median(values),
"p95": sorted(values)[int(len(values) * 0.95)],
"p99": sorted(values)[int(len(values) * 0.99)],
}
return {"value": self.counters.get(metric_name, 0)}
def get_all_metrics(self) -> Dict:
"""获取所有指标"""
return {
"counters": dict(self.counters),
"gauges": dict(self.gauges),
"summaries": {
name: self.get_summary(name)
for name in self.histograms
},
}
# ===== 关键指标定义 =====
"""
Agent 核心指标:
# 延迟
agent.request.latency # 请求延迟(ms)
agent.llm.latency # LLM 调用延迟
agent.tool.latency # 工具调用延迟
agent.e2e.latency # 端到端延迟
# 吞吐量
agent.requests.total # 总请求数
agent.requests.success # 成功请求数
agent.requests.failed # 失败请求数
# 质量
agent.task.success_rate # 任务完成率
agent.tool.success_rate # 工具调用成功率
agent.user.satisfaction # 用户满意度
# 成本
agent.cost.total # 总成本
agent.cost.per_request # 单请求成本
agent.cost.tokens.total # 总 Token 数
# 资源
agent.active.sessions # 活跃会话数
agent.queue.depth # 队列深度
agent.memory.usage # 内存使用
"""
五、灰度发布与弹性伸缩
5.1 A/B 测试与金丝雀发布
# ===== Agent 灰度发布 =====
import random
import hashlib
from typing import Dict, Optional
class CanaryDeployer:
"""金丝雀发布器"""
def __init__(self):
self.experiments: Dict[str, Dict] = {}
def create_experiment(
self,
name: str,
control_config: Dict, # 对照组配置
treatment_config: Dict, # 实验组配置
canary_percent: float = 0.1, # 金丝雀比例
criteria: str = "user_id", # 分流维度
):
"""创建实验"""
self.experiments[name] = {
"control": control_config,
"treatment": treatment_config,
"canary_percent": canary_percent,
"criteria": criteria,
"stats": {
"control": {"requests": 0, "success": 0, "cost": 0},
"treatment": {"requests": 0, "success": 0, "cost": 0},
},
}
def get_config(
self,
experiment_name: str,
identifier: str,
) -> Dict:
"""获取分配的配置"""
exp = self.experiments.get(experiment_name)
if not exp:
return {}
# 一致性哈希分流
hash_val = int(hashlib.md5(identifier.encode()).hexdigest(), 16)
bucket = (hash_val % 100) / 100
if bucket < exp["canary_percent"]:
return exp["treatment"]
else:
return exp["control"]
def record_result(
self,
experiment_name: str,
group: str,
success: bool,
cost: float,
):
"""记录实验结果"""
exp = self.experiments.get(experiment_name)
if exp:
stats = exp["stats"][group]
stats["requests"] += 1
if success:
stats["success"] += 1
stats["cost"] += cost
def get_experiment_report(self, name: str) -> Dict:
"""获取实验报告"""
exp = self.experiments.get(name)
if not exp:
return {}
report = {"experiment": name, "groups": {}}
for group, stats in exp["stats"].items():
success_rate = stats["success"] / stats["requests"] if stats["requests"] else 0
avg_cost = stats["cost"] / stats["requests"] if stats["requests"] else 0
report["groups"][group] = {
"requests": stats["requests"],
"success_rate": f"{success_rate:.2%}",
"avg_cost": f"${avg_cost:.4f}",
}
# 统计显著性
control = exp["stats"]["control"]
treatment = exp["stats"]["treatment"]
report["recommendation"] = self._evaluate(
control, treatment, exp["canary_percent"]
)
return report
def _evaluate(self, control: Dict, treatment: Dict, canary_pct: float) -> str:
"""评估是否扩量"""
if treatment["requests"] < 100:
return "INSUFFICIENT_DATA"
t_success = treatment["success"] / treatment["requests"]
c_success = control["success"] / control["requests"]
t_avg_cost = treatment["cost"] / treatment["requests"]
c_avg_cost = control["cost"] / control["requests"]
# 成功率提升且成本不增加 → 扩量
if t_success >= c_success and t_avg_cost <= c_avg_cost * 1.1:
if t_success > c_success * 1.05:
return "EXPAND_TO_50%"
return "EXPAND_TO_30%"
# 成功率下降 → 回滚
if t_success < c_success * 0.95:
return "ROLLBACK"
return "CONTINUE_MONITORING"
5.2 弹性伸缩
# ===== 弹性伸缩(Kubernetes + KEDA)=====
class AutoScaler:
"""Agent 弹性伸缩器"""
def __init__(self):
self.min_replicas = 1
self.max_replicas = 100
self.target_queue_depth = 10 # 目标队列深度
self.scale_up_cooldown = 60 # 扩容冷却(秒)
self.scale_down_cooldown = 300 # 缩容冷却(秒)
self.last_scale_time = 0
def calculate_replicas(
self,
current_replicas: int,
queue_depth: int,
avg_latency_ms: float,
cpu_usage: float,
) -> int:
"""计算目标副本数"""
now = time.time()
# 基于队列深度
queue_based = max(
self.min_replicas,
int(queue_depth / self.target_queue_depth)
)
# 基于延迟
latency_threshold_ms = 5000
if avg_latency_ms > latency_threshold_ms:
latency_based = current_replicas * 2
else:
latency_based = current_replicas
# 基于 CPU
cpu_threshold = 0.7
if cpu_usage > cpu_threshold:
cpu_based = int(current_replicas * cpu_usage / cpu_threshold)
else:
cpu_based = int(current_replicas * 0.8)
# 取最大值
target = max(queue_based, latency_based, cpu_based)
target = min(target, self.max_replicas)
target = max(target, self.min_replicas)
return target
def should_scale(
self,
current: int,
target: int,
) -> Dict:
"""判断是否需要伸缩"""
now = time.time()
if current == target:
return {"action": "none", "reason": "already at target"}
cooldown = self.scale_up_cooldown if target > current else self.scale_down_cooldown
if now - self.last_scale_time < cooldown:
return {"action": "none", "reason": "cooldown"}
self.last_scale_time = now
return {
"action": "scale_up" if target > current else "scale_down",
"current": current,
"target": target,
}
# ===== KEDA 配置示例 =====
"""
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ai-agent-scaler
spec:
scaleTargetRef:
name: ai-agent-deployment
minReplicaCount: 1
maxReplicaCount: 100
triggers:
- type: redis
metadata:
host: redis-master
port: "6379"
listName: agent:task-queue
listLength: "10" # 每个队列深度10个任务对应1个Pod
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: agent_request_latency_p95
threshold: "5000"
query: histogram_quantile(0.95, agent_request_latency_seconds_bucket)
"""
六、多租户与安全
6.1 多租户隔离
# ===== 多租户管理 =====
@dataclass
class TenantConfig:
"""租户配置"""
tenant_id: str
name: str
plan: str # free / pro / enterprise
max_daily_requests: int
max_concurrent_sessions: int
allowed_models: List[str]
daily_budget_usd: float
features: List[str]
class MultiTenantManager:
"""多租户管理器"""
def __init__(self):
self.tenants: Dict[str, TenantConfig] = {}
self.usage: Dict[str, Dict] = {}
def register_tenant(self, config: TenantConfig):
"""注册租户"""
self.tenants[config.tenant_id] = config
self.usage[config.tenant_id] = {
"daily_requests": 0,
"daily_cost": 0.0,
"active_sessions": 0,
}
def authorize(
self,
tenant_id: str,
model: str,
) -> Dict:
"""授权检查"""
tenant = self.tenants.get(tenant_id)
if not tenant:
return {"authorized": False, "reason": "tenant_not_found"}
today = datetime.now().strftime("%Y-%m-%d")
usage = self.usage.get(tenant_id, {})
# 检查日配额
if usage.get("daily_requests", 0) >= tenant.max_daily_requests:
return {"authorized": False, "reason": "daily_quota_exceeded"}
# 检查预算
if usage.get("daily_cost", 0) >= tenant.daily_budget_usd:
return {"authorized": False, "reason": "budget_exceeded"}
# 检查模型权限
if model not in tenant.allowed_models:
return {"authorized": False, "reason": f"model {model} not allowed"}
return {"authorized": True, "config": tenant}
def record_request(
self,
tenant_id: str,
cost: float,
):
"""记录请求"""
if tenant_id in self.usage:
self.usage[tenant_id]["daily_requests"] += 1
self.usage[tenant_id]["daily_cost"] += cost
def get_tenant_report(self, tenant_id: str) -> Dict:
"""获取租户报告"""
tenant = self.tenants.get(tenant_id)
usage = self.usage.get(tenant_id, {})
return {
"tenant": tenant.name if tenant else "unknown",
"plan": tenant.plan if tenant else "unknown",
"usage": {
"daily_requests": usage.get("daily_requests", 0),
"daily_cost": f"${usage.get('daily_cost', 0):.2f}",
"budget_remaining": f"${tenant.daily_budget_usd - usage.get('daily_cost', 0):.2f}" if tenant else "N/A",
}
}
# ===== 租户数据隔离 =====
class TenantDataIsolator:
"""租户数据隔离(基于命名空间)"""
def get_namespace(self, tenant_id: str) -> str:
return f"tenant:{tenant_id}"
async def store(
self,
tenant_id: str,
key: str,
value: Any,
):
"""存储数据(隔离命名空间)"""
namespace = self.get_namespace(tenant_id)
full_key = f"{namespace}:{key}"
# 实际使用 Redis / DB 分区
# await self.redis.set(full_key, json.dumps(value))
async def retrieve(
self,
tenant_id: str,
key: str,
) -> Optional[Any]:
"""检索数据"""
namespace = self.get_namespace(tenant_id)
full_key = f"{namespace}:{key}"
# return await self.redis.get(full_key)
return None
6.2 生产安全护栏
# ===== 生产安全护栏 =====
class ProductionGuardrails:
"""生产环境安全护栏"""
def __init__(self):
self.content_filter = ContentFilter()
self.rate_limiter = RateLimiter()
self.input_validator = InputValidator()
async def check_input(
self,
user_input: str,
user_id: str,
) -> Dict:
"""输入检查"""
checks = {}
# 1. 内容安全
content_result = self.content_filter.check(user_input)
checks["content_safety"] = content_result
# 2. 速率限制
rate_result = self.rate_limiter.check(user_id)
checks["rate_limit"] = rate_result
# 3. 输入验证
input_result = self.input_validator.validate(user_input)
checks["input_validation"] = input_result
# 汇总
all_passed = all(
c.get("passed", True) for c in checks.values()
)
return {
"passed": all_passed,
"checks": checks,
"blocked_reason": next(
(f"{k}: {v.get('reason', '')}" for k, v in checks.items() if not v.get("passed", True)),
None
),
}
async def check_output(self, output: str) -> Dict:
"""输出检查"""
# PII 检测
pii_found = self.content_filter.detect_pii(output)
# 有害内容
harmful = self.content_filter.detect_harmful(output)
return {
"passed": not pii_found and not harmful,
"pii_detected": pii_found,
"harmful_detected": harmful,
"action": "redact" if pii_found else ("block" if harmful else "allow"),
}
class RateLimiter:
"""滑动窗口速率限制"""
def __init__(self, max_requests: int = 60, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self.requests: Dict[str, List[float]] = {}
def check(self, identifier: str) -> Dict:
now = time.time()
if identifier not in self.requests:
self.requests[identifier] = []
# 清理过期记录
self.requests[identifier] = [
t for t in self.requests[identifier]
if now - t < self.window
]
count = len(self.requests[identifier])
if count >= self.max_requests:
return {
"passed": False,
"reason": f"rate limit: {count}/{self.max_requests}",
}
self.requests[identifier].append(now)
return {"passed": True, "remaining": self.max_requests - count - 1}
class ContentFilter:
"""内容安全过滤"""
def check(self, text: str) -> Dict:
# 简化版内容检查
harmful_keywords = ["暴力", "非法", "毒品"]
found = [kw for kw in harmful_keywords if kw in text]
return {
"passed": len(found) == 0,
"reason": f"harmful keywords: {found}" if found else None,
}
def detect_pii(self, text: str) -> bool:
import re
patterns = [r"\b\d{16}\b", r"\b\d{3}-\d{2}-\d{4}\b"]
return any(re.search(p, text) for p in patterns)
def detect_harmful(self, text: str) -> bool:
return "harmful_keyword" in text.lower()
七、完整部署示例
7.1 Docker Compose 部署
# docker-compose.yml - AI Agent 生产部署
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://agent:password@db:5432/agent_db
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
deploy:
resources:
limits:
cpus: "2"
memory: 4G
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
worker:
build: .
command: ["python", "-m", "agent.worker"]
environment:
- DATABASE_URL=postgresql://agent:password@db:5432/agent_db
- REDIS_URL=redis://redis:6379
depends_on:
- redis
deploy:
replicas: 2
resources:
limits:
cpus: "1"
memory: 2G
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: agent_db
POSTGRES_USER: agent
POSTGRES_PASSWORD: password
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U agent"]
interval: 10s
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
volumes:
redis_data:
pg_data:
grafana_data:
总结
Agent 生产部署 Checklist
□ 架构设计
□ Agent 基类(状态/工具/钩子)
□ 规划/执行/审查 Agent 角色分离
□ 工具链编排(重试+降级)
□ 状态管理
□ 多轮会话(Redis + 自动摘要)
□ 长期记忆(向量数据库)
□ 会话隔离
□ 成本控制
□ Token 预算(日/会话/用户)
□ 智能模型路由(简单/标准/复杂)
□ 语义缓存
□ 可观测性
□ 分布式追踪(OpenTelemetry)
□ 指标收集(延迟/吞吐/成本/质量)
□ 日志聚合
□ 灰度发布
□ A/B 测试(一致性哈希分流)
□ 金丝雀发布(自动扩量/回滚)
□ 弹性伸缩(KEDA + Redis 队列)
□ 多租户
□ 租户隔离(命名空间)
□ 配额管理(请求/预算/模型)
□ 数据隔离
□ 安全
□ 输入验证(内容安全+速率限制)
□ 输出审核(PII检测+有害内容)
□ 权限控制(零信任)
技术选型
| 组件 | 推荐方案 |
|---|---|
| Agent 框架 | LangGraph / 自建 FastAPI |
| 状态存储 | Redis(会话)+ PostgreSQL(持久化) |
| 记忆 | ChromaDB / Milvus / Pinecone |
| 追踪 | OpenTelemetry + Jaeger |
| 指标 | Prometheus + Grafana |
| 伸缩 | Kubernetes + KEDA |
| 缓存 | Redis + 语义缓存 |
本文涵盖 AI Agent 商业落地完整链路:生产级 Agent 框架(BaseAgent + Planner + 工具链编排)+ 会话与状态管理(Redis 会话 + 长期记忆向量检索)+ 成本控制(Token 预算 + 智能模型路由 + 语义缓存)+ 可观测性(分布式追踪 + 指标收集)+ 灰度发布(A/B 测试 + 金丝雀 + KEDA 弹性伸缩)+ 多租户与安全(命名空间隔离 + 配额管理 + 安全护栏)。
更多推荐

所有评论(0)