一套适合落地的架构:FastAPI + Swagger/OpenAPI + MCP + A2A + Queue + Memory + Observability

FastAPI 适合做 Agent 服务层,因为它基于 Python 类型提示构建 API,并自动生成 OpenAPI schema、Swagger UI /docs 和 ReDoc /redoc。([FastAPI][1])
A2A 负责 Agent 与 Agent 协作,核心包括 Agent Card、Task、Artifact、Streaming、Push Notifications。([GitHub][2])


1. 总体架构

User / App / Web
      ↓
API Gateway / Auth
      ↓
FastAPI Agent Service
      ↓
Supervisor Agent
      ├── Planner
      ├── Router
      ├── Memory Manager
      ├── Tool Executor
      ├── Safety Guard
      └── Observability
      ↓
 ┌───────────────┬───────────────┬───────────────┐
 │ A2A Agent      │ MCP Tools      │ Internal APIs  │
 │ Research Agent │ Search Tool    │ CRM / ERP      │
 │ Code Agent     │ DB Tool        │ GIS / IoT      │
 │ Report Agent   │ File Tool      │ Payment        │
 └───────────────┴───────────────┴───────────────┘
      ↓
Task Store / Vector DB / Redis / Queue
      ↓
Artifact Output

一句话:

FastAPI 暴露服务,Swagger 描述接口,A2A 调 Agent,MCP 调工具,Queue 跑长任务,Memory 存上下文,Observability 做生产监控。


2. 分层设计

L1:接口层 FastAPI

负责对外暴露:

POST /agent/chat
POST /agent/task
GET  /agent/task/{task_id}
GET  /agent/stream/{task_id}
GET  /.well-known/agent-card.json
GET  /openapi.json
GET  /docs

FastAPI 会自动生成 OpenAPI schema,Swagger UI 会读取该 schema 生成可交互 API 文档。([FastAPI][3])


L2:Agent 编排层

核心组件:

Supervisor Agent
 ├── Intent Parser:理解用户目标
 ├── Planner:拆解任务
 ├── Router:选择 Agent 或 Tool
 ├── Executor:执行任务
 ├── Critic:检查结果
 ├── Memory:读写上下文
 └── Artifact Builder:生成最终结果

典型流程:

用户请求
 ↓
意图识别
 ↓
任务拆解
 ↓
选择 A2A Agent / MCP Tool / 内部 API
 ↓
执行
 ↓
校验
 ↓
生成 Artifact
 ↓
返回用户

3. A2A 与 MCP 的工程位置

Supervisor Agent
   ↓ A2A
Specialized Agent
   ↓ MCP
Tool / DB / API / File / Browser

区别:

能力 A2A MCP
目标 Agent 调 Agent Agent 调工具
对象 Remote Agent Tool / Resource
输出 Task / Artifact Tool result
适合 多 Agent 协作 工具标准化

A2A 的 Agent Card 是能力发现入口,通常是一个公开 JSON 元数据文档,用来描述 Agent 身份、能力、认证、接口和技能。([Google Codelabs][4])


4. 推荐目录结构

agent-platform/
├── app/
│   ├── main.py
│   ├── api/
│   │   ├── chat.py
│   │   ├── tasks.py
│   │   ├── stream.py
│   │   └── agent_card.py
│   ├── agents/
│   │   ├── supervisor.py
│   │   ├── planner.py
│   │   ├── router.py
│   │   ├── critic.py
│   │   └── memory_agent.py
│   ├── a2a/
│   │   ├── client.py
│   │   ├── server.py
│   │   └── schemas.py
│   ├── mcp/
│   │   ├── client.py
│   │   └── tools.py
│   ├── memory/
│   │   ├── vector_store.py
│   │   └── session_store.py
│   ├── queue/
│   │   ├── worker.py
│   │   └── tasks.py
│   ├── security/
│   │   ├── auth.py
│   │   ├── policy.py
│   │   └── guardrails.py
│   ├── observability/
│   │   ├── logging.py
│   │   ├── tracing.py
│   │   └── metrics.py
│   └── artifacts/
│       └── builder.py
├── tests/
├── docker-compose.yml
├── pyproject.toml
└── README.md

5. 核心数据模型

from pydantic import BaseModel
from typing import Literal, Any

class AgentMessage(BaseModel):
    role: Literal["user", "agent", "tool"]
    content: str
    metadata: dict[str, Any] = {}

class AgentTask(BaseModel):
    task_id: str
    user_id: str
    status: Literal[
        "submitted",
        "working",
        "input_required",
        "completed",
        "failed"
    ]
    input: AgentMessage
    output: dict[str, Any] | None = None

class Artifact(BaseModel):
    artifact_id: str
    name: str
    type: Literal["markdown", "json", "image", "html", "code"]
    content: str

6. FastAPI 服务骨架

from fastapi import FastAPI
from pydantic import BaseModel
from uuid import uuid4

app = FastAPI(
    title="AI Agent Platform",
    description="FastAPI + Swagger + A2A + MCP Agent 工程平台",
    version="1.0.0",
)

class ChatRequest(BaseModel):
    user_id: str
    message: str

@app.get("/.well-known/agent-card.json")
def agent_card():
    return {
        "name": "City Emergency Supervisor Agent",
        "description": "智慧城市应急响应主控 Agent",
        "skills": [
            {
                "id": "emergency_orchestration",
                "name": "Emergency Orchestration",
                "description": "协调消防、交通、医疗、气象 Agent"
            }
        ],
        "capabilities": {
            "streaming": True,
            "pushNotifications": True
        }
    }

@app.post("/agent/task")
def create_task(req: ChatRequest):
    task_id = str(uuid4())
    return {
        "task_id": task_id,
        "status": "submitted",
        "message": "任务已创建"
    }

@app.get("/agent/task/{task_id}")
def get_task(task_id: str):
    return {
        "task_id": task_id,
        "status": "working"
    }

启动后:

uvicorn app.main:app --reload

访问:

http://127.0.0.1:8000/docs

7. A2A 调度逻辑

class A2ARouter:
    def __init__(self, agents: list[dict]):
        self.agents = agents

    def select_agent(self, task: str) -> dict:
        for agent in self.agents:
            skills = " ".join(skill["id"] for skill in agent["skills"])
            if "traffic" in task and "traffic_control" in skills:
                return agent
            if "fire" in task and "fire_response" in skills:
                return agent
        return self.agents[0]

class SupervisorAgent:
    def __init__(self, router: A2ARouter):
        self.router = router

    def plan(self, user_input: str):
        return [
            {"type": "a2a", "task": "fire_detection"},
            {"type": "a2a", "task": "traffic_control"},
            {"type": "mcp", "task": "query_gis"},
            {"type": "artifact", "task": "build_report"},
        ]

    def run(self, user_input: str):
        plan = self.plan(user_input)
        results = []

        for step in plan:
            if step["type"] == "a2a":
                agent = self.router.select_agent(step["task"])
                results.append({
                    "agent": agent["name"],
                    "result": f"{step['task']} completed"
                })

            elif step["type"] == "mcp":
                results.append({
                    "tool": "GIS Tool",
                    "result": "地图数据已返回"
                })

        return results

8. 长任务架构

对于 AI Agent,很多任务不是同步返回的,比如:

深度研究
代码生成
城市应急调度
文件分析
多 Agent 协作
报表生成

推荐使用:

FastAPI
 ↓
Redis / RabbitMQ / Kafka
 ↓
Worker
 ↓
Task Store
 ↓
SSE / WebSocket / Push Notification

流程:

POST /agent/task
 ↓
返回 task_id
 ↓
后台 Worker 执行
 ↓
GET /agent/task/{task_id}
 ↓
SSE 推送中间状态
 ↓
返回 Artifact

9. 流式输出 SSE 示例

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_stream(task_id: str):
    states = [
        {"status": "submitted"},
        {"status": "working", "message": "正在分析任务"},
        {"status": "working", "message": "正在调用交通 Agent"},
        {"status": "working", "message": "正在生成 Artifact"},
        {"status": "completed", "artifact": "report.md"},
    ]

    for state in states:
        yield f"data: {json.dumps(state, ensure_ascii=False)}\n\n"
        await asyncio.sleep(1)

@app.get("/agent/stream/{task_id}")
def stream_task(task_id: str):
    return StreamingResponse(
        event_stream(task_id),
        media_type="text/event-stream"
    )

10. 智慧城市应急响应联盟:进阶架构

Citizen / IoT / CCTV / 12345 Hotline
        ↓
Event Ingestion Gateway
        ↓
City Emergency Supervisor Agent
        ↓
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ Fire Agent   │ Traffic Agent│ Medical Agent│ Weather Agent│
└──────┬───────┴──────┬───────┴──────┬───────┴──────┬───────┘
       ↓              ↓              ↓              ↓
   Fire IoT        GIS / CCTV      Hospital API    Weather API
       ↓              ↓              ↓              ↓
       └──────────────┬──────────────┬──────────────┘
                      ↓
              Decision Optimizer
                      ↓
              Emergency Artifact
                      ↓
       Dispatch / Alert / Dashboard / Report

这里:

A2A:Supervisor 调 Fire / Traffic / Medical / Weather Agent
MCP:各 Agent 调 IoT、GIS、医院、气象、数据库
FastAPI:每个 Agent 的服务接口
Swagger:每个 Agent 的 API 文档
Queue:异步任务与事件流
Vector DB:历史事件与预案记忆
Redis:短期状态与任务缓存
PostgreSQL:任务、审计、Artifact 元数据

11. 生产级必备能力

模块 必须做什么
Auth API Key、OAuth2、JWT、mTLS
Permission 不同 Agent 只能访问授权工具
Rate Limit 限制 Agent 调用频率
Cost Control 限制 token、工具调用次数、外部 API 成本
Audit Log 谁调用了谁、输入是什么、结果是什么
Trace 一次用户请求贯穿多个 Agent 的 trace_id
Replay 失败任务可重放
Human-in-the-loop 高风险动作必须人工确认
Guardrails 防注入、防越权、防幻觉输出
Sandbox 代码执行、文件处理必须隔离
Artifact Store 文件、报告、图片、JSON 统一存储

12. 最推荐的技术栈

推荐
API 服务 FastAPI
API 文档 Swagger UI / OpenAPI
Agent 协议 A2A
工具协议 MCP
编排 LangGraph / 自研状态机
异步任务 Celery / Dramatiq / RQ
消息队列 Redis / RabbitMQ / Kafka
结构化存储 PostgreSQL
向量数据库 pgvector / Qdrant / Milvus
缓存 Redis
文件存储 S3 / MinIO
监控 OpenTelemetry + Prometheus + Grafana
日志 Loki / ELK
部署 Docker / Kubernetes

13. 路线落地

第一阶段:单 Agent 服务化

FastAPI + Swagger + 单个 Supervisor Agent

目标:能通过 HTTP 调 Agent。


第二阶段:工具调用标准化

Supervisor Agent + MCP Tools

目标:Agent 可以调用搜索、数据库、文件、GIS 等工具。


第三阶段:多 Agent 协作

Supervisor Agent + A2A Remote Agents

目标:主控 Agent 可以调专业 Agent。


第四阶段:异步与流式任务

Task Store + Queue + SSE

目标:支持长任务、进度条、中间结果。


第五阶段:生产级治理

Auth + Audit + Trace + Guardrails + Cost Control

目标:能上线企业系统。


14. 总结

进阶 AI Agent 工程架构不是一个 Prompt,也不是一个聊天机器人,而是一套完整后端系统:

FastAPI = Agent 服务入口
Swagger/OpenAPI = API 说明书
A2A = Agent 协作协议
MCP = 工具调用协议
Queue = 长任务执行
Memory = 上下文与知识
Artifact = 可交付结果
Observability = 生产监控
Guardrails = 安全边界

最小可上线版本建议:

FastAPI
+ Swagger/OpenAPI
+ Supervisor Agent
+ MCP Tool Layer
+ A2A Agent Card
+ Redis Queue
+ PostgreSQL Task Store
+ SSE Streaming
+ OpenTelemetry Trace

参考链接:
[1]: https://fastapi.tiangolo.com/?utm_source=chatgpt.com “FastAPI - FastAPI”
[2]: https://github.com/a2aproject/A2A/blob/main/docs/specification.md?utm_source=chatgpt.com “A2A/docs/specification.md at main · a2aproject/A2A”
[3]: https://fastapi.tiangolo.com/reference/openapi/docs/?utm_source=chatgpt.com “OpenAPI docs - FastAPI”
[4]: https://codelabs.developers.google.com/intro-a2a-purchasing-concierge?utm_source=chatgpt.com “Getting Started with Agent2Agent (A2A) Protocol - Codelabs”

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐