系列「企业级 AI Agent 实现拆解」E20 篇。上一篇 E19 讲了 Agent Transfer——怎么把任务从一个 AI 交给另一个 AI。这篇解决另一个核心问题:AI 和人怎么协同。Eino 在 adk/human-in-the-loop/ 下提供了八种具体模式,每种对应一个真实场景,底层共享同一套中断/恢复机制。

读完这篇你会知道

  • HITL 的底层机制:StatefulInterrupt、GetInterruptState、GetResumeContext 三件套
  • 八种模式各自的适用场景和实现方式
  • 如何用 tool.StatefulInterrupt 在任意工具里嵌入中断点
  • 为什么必须调用 schema.Register 才能让 Checkpoint 正确序列化

一、为什么需要人在环中

全自动 Agent 有一个无法回避的问题:高风险操作不能事后撤销。银行转账确认错了、删除文件确认错了、发出去的邮件撤不回来——这些操作需要在执行前让人看一眼。

HITL(Human-in-the-Loop,人在环中)的核心思路:AI 完成准备工作,在关键决策点暂停,等人确认后继续。它不是"AI 干不了才问人",而是"AI 能干,但这一步必须人决定"。


二、底层机制:三个函数

Eino 的所有 HITL 模式都建在同一套机制上,只需要三个函数:

tool.StatefulInterrupt:触发中断

// 在工具执行期间调用,暂停整个 Agent,把 info 展示给用户,把 state 存入 Checkpoint
return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{...}, argumentsInJSON)
//                                      ↑ 展示给用户的信息    ↑ 恢复时需要用的状态

区别info 是给用户看的(可以是任何实现了 String() 的对象),state 是工具恢复时需要的内部数据(存入 Checkpoint,反序列化后拿回来)。

tool.GetInterruptState:检查是否从中断恢复

wasInterrupted, _, storedState := tool.GetInterruptState[string](ctx)
if !wasInterrupted {
    // 第一次执行:触发中断
    return "", tool.StatefulInterrupt(ctx, info, argumentsInJSON)
}
// 从中断恢复:继续执行

tool.GetResumeContext:拿到用户传回的数据

isResumeTarget, hasData, data := tool.GetResumeContext[*ApprovalResult](ctx)
if isResumeTarget && hasData {
    if data.Approved {
        // 用户批准了,继续执行真实工具
    }
}

runner.ResumeWithParams:从应用侧恢复

iter, err = runner.ResumeWithParams(ctx, checkpointID, &adk.ResumeParams{
    Targets: map[string]any{
        interruptID: &ApprovalResult{Approved: true},
    },
})

Targets 是一个 Map:interruptID(从 lastEvent.Action.Interrupted.InterruptContexts[0].ID 拿到)→ 用户输入的结果。框架负责找到对应的工具调用并注入这个结果。


三、模式一:Approval(二选一审批)

场景:执行敏感工具前,展示参数,用户只需回答 Y/N。

核心实现是一个工具包装器 InvokableApprovableTooladk/common/tool/approval_wrapper.go):

func (i InvokableApprovableTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
    wasInterrupted, _, storedArguments := tool.GetInterruptState[string](ctx)
    if !wasInterrupted {
        // 第一次:中断,把参数展示给用户
        return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
            ToolName:        toolInfo.Name,
            ArgumentsInJSON: argumentsInJSON,
        }, argumentsInJSON)
    }

    isResumeTarget, hasData, data := tool.GetResumeContext[*ApprovalResult](ctx)
    if isResumeTarget && hasData {
        if data.Approved {
            return i.InvokableTool.InvokableRun(ctx, storedArguments, opts...)  // 批准:执行
        }
        return fmt.Sprintf("tool '%s' disapproved, reason: %s", ...), nil       // 拒绝:返回拒绝信息
    }
}

把任何工具变成"需要审批的工具",只需要包一层:

bookingTool := tool.InvokableApprovableTool{InvokableTool: originalBookingTool}

用户侧循环:

// 收到中断事件,拿到 interruptID
interruptID := lastEvent.Action.Interrupted.InterruptContexts[0].ID

// 等用户输入 Y/N,生成 ApprovalResult
if userInput == "Y" {
    apResult = &tool.ApprovalResult{Approved: true}
} else {
    apResult = &tool.ApprovalResult{Approved: false, DisapproveReason: &reason}
}

// 带结果恢复
iter, _ = runner.ResumeWithParams(ctx, "1", &adk.ResumeParams{
    Targets: map[string]any{interruptID: apResult},
})

四、模式二:Review-Edit(审阅 + 修改参数)

场景:工具调用前,用户不只是 Y/N,还可以修改参数再执行。

InvokableReviewEditTooladk/common/tool/review_edit_wrapper.go)在 Approval 基础上多了一个 ReviewEditResult

type ReviewEditResult struct {
    EditedArgumentsInJSON *string  // 用户修改后的 JSON 参数
    NoNeedToEdit          bool     // 直接批准,无需修改
    Disapproved           bool     // 拒绝
    DisapproveReason      *string
}

用户侧有三种选择:

switch strings.ToLower(nInput) {
case "no need to edit":
    result.NoNeedToEdit = true          // 直接用 AI 生成的参数执行
case "n":
    result.Disapproved = true           // 拒绝这次工具调用
default:
    result.EditedArgumentsInJSON = &nInput  // 把用户输入的 JSON 作为新参数
}

工具恢复时用修改后的参数重新执行:

if result.EditedArgumentsInJSON != nil {
    res, _ := i.InvokableTool.InvokableRun(ctx, *result.EditedArgumentsInJSON, opts...)
    return fmt.Sprintf("user changed args to %s. Result: %s", ..., res), nil
}

五、模式三:Feedback Loop(循环反馈)

场景:AI 生成内容 → 人审阅 → 提供修改意见 → AI 修改 → 循环,直到满意为止。

实现用 adk.NewLoopAgent,把 Writer 和 Reviewer 放在一个 Loop 里:

// writer_agent.go
la, _ := adk.NewLoopAgent(ctx, &adk.LoopAgentConfig{
    Name: "Writer MultiAgent",
    SubAgents: []adk.Agent{
        writerAgent,    // 负责写/修改
        &ReviewAgent{}, // 负责中断等人反馈
    },
})

ReviewAgent 是一个 Custom Agent,每次 Writer 写完就触发中断,拿到人工反馈:

type FeedbackInfo struct {
    NoNeedToEdit bool    // 满意,结束循环
    Feedback     *string // 不满意,填修改建议
}

用户侧:

  • 输入 “NO NEED TO EDIT” → reInfo.NoNeedToEdit = true → LoopAgent 收到后退出循环
  • 输入其他文字 → reInfo.Feedback = &nInput → Writer 看到反馈重新写

注意:使用自定义中断信息类型时,必须注册:

func init() {
    schema.RegisterName[*FeedbackInfo]("human_in_the_loop.FeedbackInfo")
}

不注册会导致 Checkpoint 序列化失败——框架无法知道反序列化时该用哪个具体类型。


六、模式四:Follow-up(追加澄清)

场景:用户请求模糊,AI 先问清楚再执行,而不是直接猜测。

FollowUpTool 工具(adk/common/tool/follow_up_tool.go)可以携带一组问题触发中断:

func FollowUp(ctx context.Context, input *FollowUpToolInput) (string, error) {
    wasInterrupted, _, storedState := tool.GetInterruptState[*FollowUpState](ctx)
    if !wasInterrupted {
        // 第一次:中断,展示所有问题
        return "", tool.StatefulInterrupt(ctx, 
            &FollowUpInfo{Questions: input.Questions},
            &FollowUpState{Questions: input.Questions})
    }
    
    isResumeTarget, hasData, resumeData := tool.GetResumeContext[*FollowUpInfo](ctx)
    if !isResumeTarget {
        // 不是本次恢复的目标工具,继续保持中断
        return "", tool.StatefulInterrupt(ctx, &FollowUpInfo{Questions: storedState.Questions}, storedState)
    }
    return resumeData.UserAnswer, nil  // 把用户答案返回给 Agent 作为工具结果
}

用户侧:

fuInfo.UserAnswer = userInput
iter, _ = runner.ResumeWithParams(ctx, "1", &adk.ResumeParams{
    Targets: map[string]any{interruptCtx.ID: fuInfo},
})

和 AskForClarification 的区别:E18 里的 ask_for_clarification 工具使用老 API compose.NewInterruptAndRerunErr,每次只支持单一文本输入。FollowUpTool 使用新的 tool.StatefulInterrupt API,支持携带任意结构化状态,更推荐用新 API。


七、模式五&八:Supervisor 内嵌审批(Multi-Agent)

场景:Multi-Agent 系统里,子 Agent 执行高风险操作时需要主控层面的审批。

模式五(5_supervisor)是金融转账场景,模式八(8_supervisor-plan-execute)是项目预算分配场景——两者的关键设计相同:把审批工具包装进子 Agent,中断信号会透传到顶层 Runner。

外层处理代码完全相同:

for {
    lastEvent, interrupted := processEvents(iter)
    if !interrupted { break }
    
    // 不管中断在哪个子 Agent 发生,都在这里统一处理
    interruptID := lastEvent.Action.Interrupted.InterruptContexts[0].ID
    
    // 等用户输入,ResumeWithParams 注入结果
    iter, err = runner.ResumeWithParams(ctx, checkpointID, &adk.ResumeParams{
        Targets: map[string]any{interruptID: apResult},
    })
}

框架保证:子 Agent 里的 Interrupted 动作会一路传到最外层的 Iterator,应用侧不需要关心它发生在哪一层。


八、模式六:Plan-Execute-Replan(计划阶段审阅)

场景:AI 先制定旅行计划,人审阅每一步的工具调用后再执行,可以逐步修改参数。

这是 Review-Edit 的扩展版——Agent 对每个工具调用都使用 InvokableReviewEditTool 包装,用户逐步审阅整个旅行计划的每个预订操作:

订机票 → 用户审阅 → 改了出发时间 → 执行
订酒店 → 用户审阅 → 批准 → 执行
推荐景点 → 用户审阅 → 批准 → 执行

每次中断和恢复的代码模式与模式二完全一样,只是 Agent 完成全程后会经历多次中断循环。


九、模式七:Deep Agents(深层追问)

场景:嵌套多层 Agent 执行昂贵任务前,先用 FollowUpTool 追问,避免方向跑偏后浪费大量 Token。

使用 deep.New 预制(adk/prebuilt/deep):

return deep.New(ctx, &deep.Config{
    Name:        "DataAnalysisAgent",
    Instruction: `...IMPORTANT: Before starting any analysis, you MUST first use the FollowUpTool to ask the user clarifying questions about:
1. What specific market sectors they are interested in
2. What time period they want to analyze
3. What type of analysis they need
4. Their risk tolerance`,
    ChatModel: m,
    SubAgents: []adk.Agent{researchAgent, analysisAgent},
    ToolsConfig: adk.ToolsConfig{
        ToolsNodeConfig: compose.ToolsNodeConfig{
            Tools: []tool.BaseTool{followUpTool},
        },
    },
})

系统 Prompt 里的 “you MUST first use the FollowUpTool” 是 Prompt 工程,不是技术强制——模型可能跳过,但这是可接受的折中。deep.New 本质上还是带 SubAgent 工具的 ChatModelAgent,即 E19 讲的 AgentTool 模式。


十、八种模式一张表

模式 核心机制 人的决策 适用场景
1. Approval InvokableApprovableTool Y / N 敏感操作一键审批
2. Review-Edit InvokableReviewEditTool 批准 / 拒绝 / 改参数 参数需要人工核对调整
3. Feedback Loop LoopAgent + Custom ReviewAgent 满意退出 / 给反馈 创作类内容迭代打磨
4. Follow-up FollowUpTool 回答问题 需求模糊时先澄清
5. Supervisor 审批 Supervisor + ApprovalTool Y / N(财务/权限类) Multi-Agent 高风险子操作
6. Plan-Execute-Replan ReviewEditTool 多步骤 逐步审阅 + 修改 复杂多步骤任务全程把控
7. Deep Agents 追问 FollowUpTool + SubAgent 回答多个问题 嵌套多步骤前的信息收集
8. Supervisor-Plan-Execute Supervisor + 计划 + 审批 Y / N(资源分配) 项目设置类复杂工作流

十一、实现要点:别忘了 Register

任何自定义的中断信息类型,都必须在 init() 里注册:

func init() {
    schema.Register[*ApprovalInfo]()
    schema.Register[*ReviewEditInfo]()
    schema.RegisterName[*FeedbackInfo]("human_in_the_loop.FeedbackInfo")
}

原因:Checkpoint 序列化时,框架需要把 infostate 存入持久化存储(比如 Redis),反序列化时要知道用哪个具体类型重建对象。不注册会在恢复时报类型找不到的错误,且这个错误只在真正尝试恢复时才出现——如果只在本进程内测试,不会踩到这个坑。


小结

Eino 的八种 HITL 模式都建在同一套机制上:StatefulInterrupt 触发中断,GetInterruptState/GetResumeContext 在工具侧检查状态,runner.ResumeWithParams 从应用侧注入用户输入。区别在于中断时展示什么、用户输入什么、恢复后做什么。三个官方工具包装器(ApprovalWrapper、ReviewEditWrapper、FollowUpTool)覆盖了大多数场景,复杂场景自己组合即可。

下篇继续。


代码来源:cloudwego/eino · cloudwego/eino-examples

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐