最近在帮学弟学妹们看毕业设计,发现一个挺普遍的现象:很多AI Agent相关的项目,想法很酷,Demo也能跑起来,但一到“工程化”和“可部署”就卡壳了。要么是API调用一失败就崩,要么是任务一多就乱,答辩时被老师一问“怎么保证稳定运行”就答不上来。这让我想起自己当年踩过的坑,所以决定把这次帮他们梳理的一个实战项目——一个可部署的智能任务调度系统——的完整构建过程记录下来。它麻雀虽小,五脏俱全,用到的技术栈(FastAPI + LangChain + Celery)也都是业界常见组合,非常适合作为毕业设计的“骨架”。

1. 毕业设计中的常见工程痛点:别让Demo成为终点

很多同学把AI Agent毕业设计做成了一个“一次性玩具”,主要问题集中在:

  • 冷启动延迟与资源浪费:每次请求都重新加载大语言模型(LLM)或工具链,导致首次响应极慢,且并发能力差。
  • 脆弱的API调用:直接调用外部API(如天气、搜索),没有重试、超时和熔断机制,一个服务抖动整个Agent就失效。
  • 混乱的状态管理:任务状态、中间结果、用户上下文用全局变量或写死到内存里,服务一重启全丢,也无法支持异步或长时间任务。
  • 缺乏任务调度与队列:所有请求同步处理,一旦遇到耗时任务(如文档总结),就会阻塞整个服务,无法应对并发。
  • “黑盒”运行,无法调试:没有日志,没有监控,出错了只能靠猜,答辩时被问到“系统运行状态如何”无法给出数据支撑。

我们的目标,就是构建一个系统,能系统性地解决这些问题,让毕业设计从“演示版”升级为“可运行版”。

2. 技术选型:为什么是它们?

在开始敲代码前,我们先快速对比一下几个核心技术的选型考量。

LangChain vs LlamaIndex 两者都是构建AI应用的热门框架。LlamaIndex在文档检索和RAG(检索增强生成)方面非常专注和高效。而LangChain的优势在于其丰富的工具集成灵活的链(Chain)编排能力。对于毕业设计而言,我们通常需要集成多个工具(如计算器、网络搜索、自定义函数),并控制它们的调用流程,LangChain的AgentExecutorTool抽象更加直观和强大。因此,我们选择LangChain作为Agent的核心大脑。

Celery vs RQ (Redis Queue) 两者都是Python优秀的分布式任务队列。RQ更轻量,配置简单,适合小型应用。Celery则功能更全面,支持多种消息代理(RabbitMQ, Redis)、任务路由、定时任务、工作流(Canvas),并且有更成熟的监控工具(如Flower)。考虑到毕业设计需要展示一定的工程复杂度,以及未来可能的扩展(如多队列优先级),我们选择Celery,它能更好地模拟生产环境。

最终技术栈:FastAPI (Web框架 & API接口) + LangChain (Agent核心) + Celery (异步任务队列) + Redis (消息代理 & 结果后端) + SQLite/PostgreSQL (任务状态持久化,可选)。

3. 核心模块实现:从提交到回调的完整流程

整个系统的架构可以分为三层:API接口层、任务队列层和Agent执行层。

系统架构示意图

(示意图:用户请求 -> FastAPI -> Celery Task Queue -> Worker执行LangChain Agent -> 结果存储 -> 回调或查询)

下面我们分模块拆解:

3.1 项目结构与依赖 首先初始化一个清晰的项目结构。

smart_agent_system/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI 应用入口
│   ├── api/
│   │   └── endpoints.py # API路由
│   ├── core/
│   │   ├── config.py    # 配置管理
│   │   └── security.py  # API密钥管理
│   ├── agents/
│   │   ├── __init__.py
│   │   ├── base_agent.py # Agent基类
│   │   └── task_agent.py # 具体的任务Agent
│   ├── models/
│   │   └── schemas.py   # Pydantic数据模型
│   ├── tasks/
│   │   ├── __init__.py
│   │   └── celery_app.py # Celery应用定义
│   └── utils/
│       ├── logger.py    # 日志配置
│       └── idempotency.py # 幂等性工具
├── requirements.txt
└── .env.example

3.2 配置与安全(core/config.py & security.py) 安全是毕业设计答辩的加分项。务必避免在代码中硬编码API Key。

# core/config.py
from pydantic_settings import BaseSettings
import os
from dotenv import load_dotenv
load_dotenv()

class Settings(BaseSettings):
    # 从环境变量读取,.env文件管理
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    # 可以配置不同环境的数据库连接等
    class Config:
        env_file = ".env"

settings = Settings()

# core/security.py
import hashlib
from app.core.config import settings
import os

def get_api_key(service_name: str) -> str:
    """安全地获取API Key,可在生产环境集成Vault"""
    # 这里简单返回,实际可以从加密存储或密钥管理服务获取
    return getattr(settings, f"{service_name.upper()}_API_KEY", "")

def mask_key(key: str) -> str:
    """用于日志脱敏"""
    if len(key) > 8:
        return key[:4] + "*" * (len(key)-8) + key[-4:]
    return "****"

3.3 定义Celery任务(tasks/celery_app.py) 这是异步调度的核心。我们创建一个Celery应用,并定义执行Agent的任务。

# tasks/celery_app.py
from celery import Celery
from app.core.config import settings
import logging
from app.utils.logger import get_logger

logger = get_logger(__name__)

# 创建Celery实例,使用Redis作为消息代理和结果后端
celery_app = Celery(
    "agent_worker",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL,
)

# 配置
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
    task_track_started=True, # 跟踪任务开始
    task_time_limit=300,     # 任务超时时间5分钟
)

@celery_app.task(bind=True, name="execute_agent_task")
def execute_agent_task(self, task_id: str, user_input: str, user_id: str):
    """
    执行Agent任务的Celery Task。
    :param self: Celery任务实例,用于更新状态
    :param task_id: 唯一任务ID,用于幂等性校验和结果查询
    :param user_input: 用户输入
    :param user_id: 用户标识
    :return: Agent执行结果
    """
    logger.info(f"开始执行任务: {task_id}, 输入: {user_input[:50]}...")
    
    # --- 关键点1:幂等性检查(防止重复执行)---
    # 在实际项目中,这里可以查询数据库,如果该task_id已存在成功结果,则直接返回,不重复执行。
    # 示例伪代码:
    # if task_exists_and_success(task_id):
    #     logger.warning(f"任务 {task_id} 已成功执行过,直接返回缓存结果")
    #     return get_cached_result(task_id)
    
    try:
        # 更新任务状态为运行中(可存入数据库)
        # update_task_status(task_id, "RUNNING")
        
        # 动态导入,避免启动时加载所有重型依赖
        from app.agents.task_agent import TaskAgent
        agent = TaskAgent()
        
        # 执行核心的Agent逻辑
        result = agent.run(user_input=user_input, task_id=task_id)
        
        logger.info(f"任务 {task_id} 执行成功")
        # 更新任务状态为成功,并存储结果
        # update_task_status(task_id, "SUCCESS", result=result)
        return {"status": "success", "task_id": task_id, "result": result}
        
    except Exception as e:
        logger.error(f"任务 {task_id} 执行失败: {str(e)}", exc_info=True)
        # --- 关键点2:异常回退与重试机制---
        # Celery自带重试,我们可以根据异常类型决定是否重试
        retry_count = self.request.retries
        if isinstance(e, ConnectionError) and retry_count < 3:
            # 对于网络错误,指数退避重试
            raise self.retry(exc=e, countdown=2 ** retry_count, max_retries=3)
        
        # 对于逻辑错误或最终失败,更新状态为失败
        # update_task_status(task_id, "FAILED", error=str(e))
        return {"status": "failed", "task_id": task_id, "error": str(e)}

3.4 实现智能Agent(agents/task_agent.py) 这里我们实现一个具备简单工具调用能力的LangChain Agent。

# agents/task_agent.py
import os
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from app.core.security import get_api_key, mask_key
import logging
from app.utils.logger import get_logger

logger = get_logger(__name__)

class TaskAgent:
    def __init__(self, model_name="gpt-3.5-turbo"):
        # 安全获取API Key
        api_key = get_api_key("openai")
        if not api_key:
            raise ValueError("OpenAI API Key未配置")
        logger.info(f"初始化Agent,使用模型: {model_name}, API Key: {mask_key(api_key)}")
        
        # 初始化LLM
        self.llm = ChatOpenAI(
            model=model_name,
            api_key=api_key,
            temperature=0, # 降低随机性,使任务执行更稳定
            request_timeout=60 # 设置超时
        )
        
        # 定义工具 - 这里可以扩展你自己的工具
        self.tools = self._load_tools()
        
        # 构建Agent提示词
        prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个有帮助的智能任务助手。请根据用户需求,谨慎、准确地使用工具来完成任务。如果无法确定,请说不知道。"),
            MessagesPlaceholder(variable_name="chat_history", optional=True),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        # 创建Agent
        agent = create_openai_tools_agent(self.llm, self.tools, prompt)
        self.agent_executor = AgentExecutor(
            agent=agent,
            tools=self.tools,
            verbose=True, # 开发时开启,生产环境关闭或记录到日志
            handle_parsing_errors=True, # 处理解析错误
            max_iterations=5, # 限制最大迭代次数,防止死循环
            early_stopping_method="generate" # 提前停止方法
        )
    
    def _load_tools(self):
        """加载工具集。毕业设计中可以集成计算、搜索、查询等工具。"""
        
        def calculator(query: str) -> str:
            """一个简单的计算器工具。注意:直接eval有安全风险,此处仅用于演示,生产环境应用安全库如`asteval`。"""
            logger.info(f"计算器工具被调用,输入: {query}")
            try:
                # 警告:简化演示,实际应对输入做严格检查和沙箱执行
                result = eval(query)
                return f"计算结果为: {result}"
            except Exception as e:
                return f"计算错误: {str(e)}"
        
        # 可以继续添加其他工具,如网络搜索(需安装库并配置API Key)
        # from langchain_community.tools import DuckDuckGoSearchRun
        # search_tool = DuckDuckGoSearchRun()
        
        tools = [
            Tool(
                name="Calculator",
                func=calculator,
                description="""用于执行数学计算。输入应该是一个可被Python eval函数执行的数学表达式字符串。
                例如:'3 + 5 * 2' 或 'sqrt(16)'。注意:不要执行任何与数学计算无关的代码。"""
            ),
            # 其他工具...
        ]
        return tools
    
    def run(self, user_input: str, task_id: str = None):
        """执行Agent的主要方法。"""
        logger.info(f"Agent开始执行,Task ID: {task_id}, 输入: {user_input}")
        try:
            # 调用Agent执行链
            result = self.agent_executor.invoke({"input": user_input})
            logger.info(f"Agent执行完成,Task ID: {task_id}, 结果: {result['output'][:100]}...")
            return result['output']
        except Exception as e:
            logger.error(f"Agent执行异常,Task ID: {task_id}, 错误: {str(e)}", exc_info=True)
            # 这里可以定义更精细的异常处理,比如工具调用失败、网络超时等
            return f"任务执行过程中出现错误: {str(e)}。请检查输入或稍后重试。"

3.5 提供API接口(api/endpoints.py & main.py) FastAPI负责接收用户请求,并异步提交任务到Celery。

# api/endpoints.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from app.models.schemas import TaskRequest, TaskResponse
from app.tasks.celery_app import execute_agent_task
import uuid
from app.utils.idempotency import generate_task_id # 一个生成唯一任务ID的工具

router = APIRouter()

@router.post("/task", response_model=TaskResponse)
async def create_task(request: TaskRequest, background_tasks: BackgroundTasks):
    """
    提交一个新的智能任务。
    1. 生成唯一任务ID(支持客户端提供以实现幂等提交)。
    2. 异步触发Celery任务。
    3. 立即返回任务ID,客户端可凭此查询结果。
    """
    task_id = request.task_id or generate_task_id(request.user_input, request.user_id)
    
    # 异步执行,避免阻塞HTTP请求
    # 使用`delay`方法将任务发送到Celery队列
    celery_async_result = execute_agent_task.delay(task_id, request.user_input, request.user_id)
    
    # 可以将Celery的AsyncResult ID也存下来,方便后续管理(可选)
    return TaskResponse(
        task_id=task_id,
        status="submitted",
        message="任务已提交到队列,请使用task_id查询结果。",
        # celery_task_id=celery_async_result.id
    )

@router.get("/task/{task_id}")
async def get_task_result(task_id: str):
    """
    根据任务ID查询执行结果。
    实际项目中,结果应持久化到数据库,这里简化从Celery后端查询。
    """
    from app.tasks.celery_app import celery_app
    # 注意:直接查询Celery后端适用于短时间结果存储,生产环境建议用数据库
    async_result = celery_app.AsyncResult(task_id)
    
    if async_result.state == 'PENDING':
        return {"task_id": task_id, "status": "pending", "result": None}
    elif async_result.state == 'SUCCESS':
        return {"task_id": task_id, "status": "success", "result": async_result.result}
    elif async_result.state == 'FAILURE':
        return {"task_id": task_id, "status": "failed", "error": str(async_result.info)}
    else: # STARTED, RETRY等
        return {"task_id": task_id, "status": async_result.state, "result": None}
# app/main.py
from fastapi import FastAPI
from app.api.endpoints import router as api_router
from app.utils.logger import setup_logging

setup_logging() # 初始化日志

app = FastAPI(title="智能任务调度Agent系统", description="毕业设计实战项目")

app.include_router(api_router, prefix="/api/v1")

@app.get("/")
async def root():
    return {"message": "智能任务调度Agent系统已启动"}

4. 运行、测试与性能考量

4.1 如何运行?

  1. 启动Redis:docker run -d -p 6379:6379 redis
  2. 安装依赖:pip install -r requirements.txt (需包含fastapi, langchain-openai, celery, redis, python-dotenv等)
  3. 启动Celery Worker:celery -A app.tasks.celery_app.celery_app worker --loglevel=info
  4. 启动FastAPI服务:uvicorn app.main:app --reload

4.2 简单性能测试 使用locustwrk进行简单压测。在我的开发机(8核16G)上,使用本地Redis和GPT-3.5模拟,初步测试结果如下:

  • 单Worker,处理简单计算任务:QPS约 15-20。
  • P95延迟:约 1.2 秒(主要开销在LangChain Agent初始化及与LLM的通信)。
  • 瓶颈分析:LLM API调用延迟是主要瓶颈。优化方向包括:
    • 使用流式响应(Streaming)改善用户体验感知延迟。
    • 对相似任务进行结果缓存(如使用langchain.cache)。
    • 增加Celery Worker数量,并行处理任务。

5. 生产环境避坑指南(毕业设计答辩必看)

这部分是血泪经验的总结,能帮你避开很多坑。

  1. 日志缺失,问题无从查起

    • 问题:程序出错只打印“Something went wrong”,毫无头绪。
    • 解决:在项目入口(main.py)和每个关键模块(Agent、任务函数)配置结构化日志。记录task_iduser_id、关键输入输出和异常堆栈。使用logging模块,并考虑输出到文件,方便答辩时展示。
  2. 依赖版本冲突,环境无法复现

    • 问题:在你电脑上跑得好好的,在答辩教室或导师电脑上各种报错。
    • 解决:使用requirements.txtpyproject.toml严格锁定所有依赖的版本号。特别是langchain和相关langchain-community包版本更新快,API易变。建议使用虚拟环境(venv或conda),并提交requirements.txt
  3. 模型输出截断或格式错误

    • 问题:Agent返回的JSON解析失败,或者长文本被截断。
    • 解决:在调用LLM时,明确在提示词中指定输出格式(如“请以JSON格式回答”)。对于长文本,使用LangChain的StructuredOutputParser等工具。处理响应时,增加try-catch进行格式校验和重试。
  4. API Key泄露

    • 问题:不小心把API Key提交到了GitHub,导致被盗用扣费。
    • 解决永远不要将密钥硬编码在代码中。使用.env文件管理,并在.gitignore中忽略它。在代码中通过环境变量读取。可以考虑在答辩演示时使用免费的或额度很低测试Key。
  5. Celery Worker内存泄漏或僵尸任务

    • 问题:运行一段时间后Worker内存暴涨,或者任务卡死。
    • 解决:为Celery任务设置合理的task_time_limittask_soft_time_limit。使用--max-tasks-per-child参数让Worker在执行一定数量任务后重启,释放内存。使用Flower监控工具查看任务队列和Worker状态。
  6. 冷启动延迟

    • 问题:第一个请求特别慢,因为要加载模型和工具。
    • 解决:在Celery Worker启动时(celery_app.py中),进行预热,提前初始化好Agent实例(注意线程安全)。或者使用singleton模式确保Agent只初始化一次。

6. 总结与扩展思考

通过以上步骤,我们完成了一个具备基本工程化能力的AI Agent系统。它不再是脆弱的脚本,而是一个有异步队列、错误处理、状态管理和基础监控的“微服务”。

这个项目作为毕业设计,你可以从以下几个方向进行扩展,以体现你的思考和能力:

  1. 多Agent协作:定义不同的专业Agent(如分析Agent、执行Agent、校验Agent),通过一个协调器(Orchestrator)让它们协同完成复杂任务。这能很好地体现你对Agent生态的理解。
  2. 集成向量数据库:引入ChromaQdrant,为Agent增加长期记忆或知识库检索(RAG)能力。例如,让它能基于你的毕业论文资料回答问题。
  3. 实现WebSocket推送:将任务的执行进度和最终结果通过WebSocket实时推送给前端,打造交互性更强的演示界面。
  4. 增加更完善的监控:集成PrometheusGrafana,监控任务成功率、延迟、LLM API调用次数等指标,让答辩展示更加专业。

希望这篇笔记能为你提供一个扎实的起点。毕业设计不仅是功能的堆砌,更是工程思维的体现。理解每一行代码背后的“为什么”,比复制粘贴整个项目更重要。我已经将这个项目的精简版代码放到了GitHub上,你可以直接Fork,然后在此基础上添加你自己的创意和功能。

祝你毕业设计顺利,答辩成功!

项目扩展思维导图

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐