AI Agent 毕业设计实战:从零构建一个可部署的智能任务调度系统
最近在帮学弟学妹们看毕业设计,发现一个挺普遍的现象:很多AI Agent相关的项目,想法很酷,Demo也能跑起来,但一到“工程化”和“可部署”就卡壳了。要么是API调用一失败就崩,要么是任务一多就乱,答辩时被老师一问“怎么保证稳定运行”就答不上来。这让我想起自己当年踩过的坑,所以决定把这次帮他们梳理的一个实战项目——一个可部署的智能任务调度系统——的完整构建过程记录下来。它麻雀虽小,五脏俱全,用到的技术栈(FastAPI + LangChain + Celery)也都是业界常见组合,非常适合作为毕业设计的“骨架”。
1. 毕业设计中的常见工程痛点:别让Demo成为终点
很多同学把AI Agent毕业设计做成了一个“一次性玩具”,主要问题集中在:
- 冷启动延迟与资源浪费:每次请求都重新加载大语言模型(LLM)或工具链,导致首次响应极慢,且并发能力差。
- 脆弱的API调用:直接调用外部API(如天气、搜索),没有重试、超时和熔断机制,一个服务抖动整个Agent就失效。
- 混乱的状态管理:任务状态、中间结果、用户上下文用全局变量或写死到内存里,服务一重启全丢,也无法支持异步或长时间任务。
- 缺乏任务调度与队列:所有请求同步处理,一旦遇到耗时任务(如文档总结),就会阻塞整个服务,无法应对并发。
- “黑盒”运行,无法调试:没有日志,没有监控,出错了只能靠猜,答辩时被问到“系统运行状态如何”无法给出数据支撑。
我们的目标,就是构建一个系统,能系统性地解决这些问题,让毕业设计从“演示版”升级为“可运行版”。
2. 技术选型:为什么是它们?
在开始敲代码前,我们先快速对比一下几个核心技术的选型考量。
LangChain vs LlamaIndex 两者都是构建AI应用的热门框架。LlamaIndex在文档检索和RAG(检索增强生成)方面非常专注和高效。而LangChain的优势在于其丰富的工具集成和灵活的链(Chain)编排能力。对于毕业设计而言,我们通常需要集成多个工具(如计算器、网络搜索、自定义函数),并控制它们的调用流程,LangChain的AgentExecutor和Tool抽象更加直观和强大。因此,我们选择LangChain作为Agent的核心大脑。
Celery vs RQ (Redis Queue) 两者都是Python优秀的分布式任务队列。RQ更轻量,配置简单,适合小型应用。Celery则功能更全面,支持多种消息代理(RabbitMQ, Redis)、任务路由、定时任务、工作流(Canvas),并且有更成熟的监控工具(如Flower)。考虑到毕业设计需要展示一定的工程复杂度,以及未来可能的扩展(如多队列优先级),我们选择Celery,它能更好地模拟生产环境。
最终技术栈:FastAPI (Web框架 & API接口) + LangChain (Agent核心) + Celery (异步任务队列) + Redis (消息代理 & 结果后端) + SQLite/PostgreSQL (任务状态持久化,可选)。
3. 核心模块实现:从提交到回调的完整流程
整个系统的架构可以分为三层:API接口层、任务队列层和Agent执行层。

(示意图:用户请求 -> FastAPI -> Celery Task Queue -> Worker执行LangChain Agent -> 结果存储 -> 回调或查询)
下面我们分模块拆解:
3.1 项目结构与依赖 首先初始化一个清晰的项目结构。
smart_agent_system/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── api/
│ │ └── endpoints.py # API路由
│ ├── core/
│ │ ├── config.py # 配置管理
│ │ └── security.py # API密钥管理
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── base_agent.py # Agent基类
│ │ └── task_agent.py # 具体的任务Agent
│ ├── models/
│ │ └── schemas.py # Pydantic数据模型
│ ├── tasks/
│ │ ├── __init__.py
│ │ └── celery_app.py # Celery应用定义
│ └── utils/
│ ├── logger.py # 日志配置
│ └── idempotency.py # 幂等性工具
├── requirements.txt
└── .env.example
3.2 配置与安全(core/config.py & security.py) 安全是毕业设计答辩的加分项。务必避免在代码中硬编码API Key。
# core/config.py
from pydantic_settings import BaseSettings
import os
from dotenv import load_dotenv
load_dotenv()
class Settings(BaseSettings):
# 从环境变量读取,.env文件管理
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# 可以配置不同环境的数据库连接等
class Config:
env_file = ".env"
settings = Settings()
# core/security.py
import hashlib
from app.core.config import settings
import os
def get_api_key(service_name: str) -> str:
"""安全地获取API Key,可在生产环境集成Vault"""
# 这里简单返回,实际可以从加密存储或密钥管理服务获取
return getattr(settings, f"{service_name.upper()}_API_KEY", "")
def mask_key(key: str) -> str:
"""用于日志脱敏"""
if len(key) > 8:
return key[:4] + "*" * (len(key)-8) + key[-4:]
return "****"
3.3 定义Celery任务(tasks/celery_app.py) 这是异步调度的核心。我们创建一个Celery应用,并定义执行Agent的任务。
# tasks/celery_app.py
from celery import Celery
from app.core.config import settings
import logging
from app.utils.logger import get_logger
logger = get_logger(__name__)
# 创建Celery实例,使用Redis作为消息代理和结果后端
celery_app = Celery(
"agent_worker",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
)
# 配置
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
task_track_started=True, # 跟踪任务开始
task_time_limit=300, # 任务超时时间5分钟
)
@celery_app.task(bind=True, name="execute_agent_task")
def execute_agent_task(self, task_id: str, user_input: str, user_id: str):
"""
执行Agent任务的Celery Task。
:param self: Celery任务实例,用于更新状态
:param task_id: 唯一任务ID,用于幂等性校验和结果查询
:param user_input: 用户输入
:param user_id: 用户标识
:return: Agent执行结果
"""
logger.info(f"开始执行任务: {task_id}, 输入: {user_input[:50]}...")
# --- 关键点1:幂等性检查(防止重复执行)---
# 在实际项目中,这里可以查询数据库,如果该task_id已存在成功结果,则直接返回,不重复执行。
# 示例伪代码:
# if task_exists_and_success(task_id):
# logger.warning(f"任务 {task_id} 已成功执行过,直接返回缓存结果")
# return get_cached_result(task_id)
try:
# 更新任务状态为运行中(可存入数据库)
# update_task_status(task_id, "RUNNING")
# 动态导入,避免启动时加载所有重型依赖
from app.agents.task_agent import TaskAgent
agent = TaskAgent()
# 执行核心的Agent逻辑
result = agent.run(user_input=user_input, task_id=task_id)
logger.info(f"任务 {task_id} 执行成功")
# 更新任务状态为成功,并存储结果
# update_task_status(task_id, "SUCCESS", result=result)
return {"status": "success", "task_id": task_id, "result": result}
except Exception as e:
logger.error(f"任务 {task_id} 执行失败: {str(e)}", exc_info=True)
# --- 关键点2:异常回退与重试机制---
# Celery自带重试,我们可以根据异常类型决定是否重试
retry_count = self.request.retries
if isinstance(e, ConnectionError) and retry_count < 3:
# 对于网络错误,指数退避重试
raise self.retry(exc=e, countdown=2 ** retry_count, max_retries=3)
# 对于逻辑错误或最终失败,更新状态为失败
# update_task_status(task_id, "FAILED", error=str(e))
return {"status": "failed", "task_id": task_id, "error": str(e)}
3.4 实现智能Agent(agents/task_agent.py) 这里我们实现一个具备简单工具调用能力的LangChain Agent。
# agents/task_agent.py
import os
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from app.core.security import get_api_key, mask_key
import logging
from app.utils.logger import get_logger
logger = get_logger(__name__)
class TaskAgent:
def __init__(self, model_name="gpt-3.5-turbo"):
# 安全获取API Key
api_key = get_api_key("openai")
if not api_key:
raise ValueError("OpenAI API Key未配置")
logger.info(f"初始化Agent,使用模型: {model_name}, API Key: {mask_key(api_key)}")
# 初始化LLM
self.llm = ChatOpenAI(
model=model_name,
api_key=api_key,
temperature=0, # 降低随机性,使任务执行更稳定
request_timeout=60 # 设置超时
)
# 定义工具 - 这里可以扩展你自己的工具
self.tools = self._load_tools()
# 构建Agent提示词
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有帮助的智能任务助手。请根据用户需求,谨慎、准确地使用工具来完成任务。如果无法确定,请说不知道。"),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# 创建Agent
agent = create_openai_tools_agent(self.llm, self.tools, prompt)
self.agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True, # 开发时开启,生产环境关闭或记录到日志
handle_parsing_errors=True, # 处理解析错误
max_iterations=5, # 限制最大迭代次数,防止死循环
early_stopping_method="generate" # 提前停止方法
)
def _load_tools(self):
"""加载工具集。毕业设计中可以集成计算、搜索、查询等工具。"""
def calculator(query: str) -> str:
"""一个简单的计算器工具。注意:直接eval有安全风险,此处仅用于演示,生产环境应用安全库如`asteval`。"""
logger.info(f"计算器工具被调用,输入: {query}")
try:
# 警告:简化演示,实际应对输入做严格检查和沙箱执行
result = eval(query)
return f"计算结果为: {result}"
except Exception as e:
return f"计算错误: {str(e)}"
# 可以继续添加其他工具,如网络搜索(需安装库并配置API Key)
# from langchain_community.tools import DuckDuckGoSearchRun
# search_tool = DuckDuckGoSearchRun()
tools = [
Tool(
name="Calculator",
func=calculator,
description="""用于执行数学计算。输入应该是一个可被Python eval函数执行的数学表达式字符串。
例如:'3 + 5 * 2' 或 'sqrt(16)'。注意:不要执行任何与数学计算无关的代码。"""
),
# 其他工具...
]
return tools
def run(self, user_input: str, task_id: str = None):
"""执行Agent的主要方法。"""
logger.info(f"Agent开始执行,Task ID: {task_id}, 输入: {user_input}")
try:
# 调用Agent执行链
result = self.agent_executor.invoke({"input": user_input})
logger.info(f"Agent执行完成,Task ID: {task_id}, 结果: {result['output'][:100]}...")
return result['output']
except Exception as e:
logger.error(f"Agent执行异常,Task ID: {task_id}, 错误: {str(e)}", exc_info=True)
# 这里可以定义更精细的异常处理,比如工具调用失败、网络超时等
return f"任务执行过程中出现错误: {str(e)}。请检查输入或稍后重试。"
3.5 提供API接口(api/endpoints.py & main.py) FastAPI负责接收用户请求,并异步提交任务到Celery。
# api/endpoints.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from app.models.schemas import TaskRequest, TaskResponse
from app.tasks.celery_app import execute_agent_task
import uuid
from app.utils.idempotency import generate_task_id # 一个生成唯一任务ID的工具
router = APIRouter()
@router.post("/task", response_model=TaskResponse)
async def create_task(request: TaskRequest, background_tasks: BackgroundTasks):
"""
提交一个新的智能任务。
1. 生成唯一任务ID(支持客户端提供以实现幂等提交)。
2. 异步触发Celery任务。
3. 立即返回任务ID,客户端可凭此查询结果。
"""
task_id = request.task_id or generate_task_id(request.user_input, request.user_id)
# 异步执行,避免阻塞HTTP请求
# 使用`delay`方法将任务发送到Celery队列
celery_async_result = execute_agent_task.delay(task_id, request.user_input, request.user_id)
# 可以将Celery的AsyncResult ID也存下来,方便后续管理(可选)
return TaskResponse(
task_id=task_id,
status="submitted",
message="任务已提交到队列,请使用task_id查询结果。",
# celery_task_id=celery_async_result.id
)
@router.get("/task/{task_id}")
async def get_task_result(task_id: str):
"""
根据任务ID查询执行结果。
实际项目中,结果应持久化到数据库,这里简化从Celery后端查询。
"""
from app.tasks.celery_app import celery_app
# 注意:直接查询Celery后端适用于短时间结果存储,生产环境建议用数据库
async_result = celery_app.AsyncResult(task_id)
if async_result.state == 'PENDING':
return {"task_id": task_id, "status": "pending", "result": None}
elif async_result.state == 'SUCCESS':
return {"task_id": task_id, "status": "success", "result": async_result.result}
elif async_result.state == 'FAILURE':
return {"task_id": task_id, "status": "failed", "error": str(async_result.info)}
else: # STARTED, RETRY等
return {"task_id": task_id, "status": async_result.state, "result": None}
# app/main.py
from fastapi import FastAPI
from app.api.endpoints import router as api_router
from app.utils.logger import setup_logging
setup_logging() # 初始化日志
app = FastAPI(title="智能任务调度Agent系统", description="毕业设计实战项目")
app.include_router(api_router, prefix="/api/v1")
@app.get("/")
async def root():
return {"message": "智能任务调度Agent系统已启动"}
4. 运行、测试与性能考量
4.1 如何运行?
- 启动Redis:
docker run -d -p 6379:6379 redis - 安装依赖:
pip install -r requirements.txt(需包含fastapi, langchain-openai, celery, redis, python-dotenv等) - 启动Celery Worker:
celery -A app.tasks.celery_app.celery_app worker --loglevel=info - 启动FastAPI服务:
uvicorn app.main:app --reload
4.2 简单性能测试 使用locust或wrk进行简单压测。在我的开发机(8核16G)上,使用本地Redis和GPT-3.5模拟,初步测试结果如下:
- 单Worker,处理简单计算任务:QPS约 15-20。
- P95延迟:约 1.2 秒(主要开销在LangChain Agent初始化及与LLM的通信)。
- 瓶颈分析:LLM API调用延迟是主要瓶颈。优化方向包括:
- 使用流式响应(Streaming)改善用户体验感知延迟。
- 对相似任务进行结果缓存(如使用
langchain.cache)。 - 增加Celery Worker数量,并行处理任务。
5. 生产环境避坑指南(毕业设计答辩必看)
这部分是血泪经验的总结,能帮你避开很多坑。
-
日志缺失,问题无从查起
- 问题:程序出错只打印“Something went wrong”,毫无头绪。
- 解决:在项目入口(
main.py)和每个关键模块(Agent、任务函数)配置结构化日志。记录task_id、user_id、关键输入输出和异常堆栈。使用logging模块,并考虑输出到文件,方便答辩时展示。
-
依赖版本冲突,环境无法复现
- 问题:在你电脑上跑得好好的,在答辩教室或导师电脑上各种报错。
- 解决:使用
requirements.txt或pyproject.toml严格锁定所有依赖的版本号。特别是langchain和相关langchain-community包版本更新快,API易变。建议使用虚拟环境(venv或conda),并提交requirements.txt。
-
模型输出截断或格式错误
- 问题:Agent返回的JSON解析失败,或者长文本被截断。
- 解决:在调用LLM时,明确在提示词中指定输出格式(如“请以JSON格式回答”)。对于长文本,使用LangChain的
StructuredOutputParser等工具。处理响应时,增加try-catch进行格式校验和重试。
-
API Key泄露
- 问题:不小心把API Key提交到了GitHub,导致被盗用扣费。
- 解决:永远不要将密钥硬编码在代码中。使用
.env文件管理,并在.gitignore中忽略它。在代码中通过环境变量读取。可以考虑在答辩演示时使用免费的或额度很低测试Key。
-
Celery Worker内存泄漏或僵尸任务
- 问题:运行一段时间后Worker内存暴涨,或者任务卡死。
- 解决:为Celery任务设置合理的
task_time_limit和task_soft_time_limit。使用--max-tasks-per-child参数让Worker在执行一定数量任务后重启,释放内存。使用Flower监控工具查看任务队列和Worker状态。
-
冷启动延迟
- 问题:第一个请求特别慢,因为要加载模型和工具。
- 解决:在Celery Worker启动时(
celery_app.py中),进行预热,提前初始化好Agent实例(注意线程安全)。或者使用singleton模式确保Agent只初始化一次。
6. 总结与扩展思考
通过以上步骤,我们完成了一个具备基本工程化能力的AI Agent系统。它不再是脆弱的脚本,而是一个有异步队列、错误处理、状态管理和基础监控的“微服务”。
这个项目作为毕业设计,你可以从以下几个方向进行扩展,以体现你的思考和能力:
- 多Agent协作:定义不同的专业Agent(如分析Agent、执行Agent、校验Agent),通过一个协调器(Orchestrator)让它们协同完成复杂任务。这能很好地体现你对Agent生态的理解。
- 集成向量数据库:引入
Chroma或Qdrant,为Agent增加长期记忆或知识库检索(RAG)能力。例如,让它能基于你的毕业论文资料回答问题。 - 实现WebSocket推送:将任务的执行进度和最终结果通过WebSocket实时推送给前端,打造交互性更强的演示界面。
- 增加更完善的监控:集成
Prometheus和Grafana,监控任务成功率、延迟、LLM API调用次数等指标,让答辩展示更加专业。
希望这篇笔记能为你提供一个扎实的起点。毕业设计不仅是功能的堆砌,更是工程思维的体现。理解每一行代码背后的“为什么”,比复制粘贴整个项目更重要。我已经将这个项目的精简版代码放到了GitHub上,你可以直接Fork,然后在此基础上添加你自己的创意和功能。
祝你毕业设计顺利,答辩成功!

更多推荐


所有评论(0)