原创 秋山墨客 2025-06-03 08:30 江苏

如何优雅与可靠的实现企业级的HITL Agent系统?

?k=94ff03df&u=https%3A%2F%2Fmmbiz.qpic

点击上方

蓝字

关注我们

?k=c4576a99&u=https%3A%2F%2Fmmbiz.qpic
上篇(彻底说清 Human-in-the-Loop:企业级 Agent 系统的关键挑战与LangGraph解法【上】)介绍了单机模式下的Human-in-the-Loop(HITL)Agent原理与实现。本文继续探讨如何将HITL Agent扩展到分布式环境,通过API支持远程调用和并发访问(代码地址见文末):
  • 分布式环境下的挑战与应对
  • 实现分布式的HITL Agent
  • 实现客户端的故障恢复
  • 服务端的故障恢复

01

分布式环境下的挑战与应对

回顾下,在单机模式下,HITL Agent与客户端运行在单一进程空间:
# 单机模式:直接调用agent = create_react_agent(model, tools, checkpointer)result = agent.invoke({"messages": messages})# 中断处理是同步的if "__interrupt__" in result:    user_response = input("请选择:yes/no/edit")    # 直接恢复执行    result = agent.invoke(Command(resume=response_data))
很显然,这种单个用户且无法扩展的实现,一般只能用来做测试与演示。如果我们需要将LangGraph开发的Agent部署到生产环境,需要考虑将Agent部署成独立的服务。即使不考虑MCP/A2A这样的新型集成架构与协议,采用普通的HTTP API模式也会面临诸多挑战。
首先是两个最基本的工作模式转变:
  • 从单一上下文到多个API接口
    中断机制要求一次HITL Agent的交互过程必然要拆分到多个HTTP请求完成。

  • 从单一用户会话到多用户会话
    HITL Agent的中断工作模式要求系统保持会话与状态,因此需要隔离的会话管理机制来应对潜在的多用户访问。

可能的实现架构如下:
?k=810f583b&u=https%3A%2F%2Fmmbiz.qpic
首先,我们简化一些环节以更多关注在HITL本身:
  • 不考虑流式(Streaming)过程输出
  • 不考虑Agent的异步执行,采用同步API接口
  • 一个用户当前只能有一个活动的Agent会话

基于这个前提,主要的设计要点包括:
  • 服务端提供两个基本的API,分别用于启动Agent与恢复Agent执行;它们返回的数据可能是Agent执行的最终结果,也可能是中断请求数据
  • 客户端发起请求时通过模拟的user_id来标识自己(实际应用中可能是安全令牌)
  • 服务端保存每个用户的当前Agent实例与会话ID(也作为运行Agent的thread_id)
  • 一次Agent会话的生命周期如下:
    ?k=7bcb81f3&u=https%3A%2F%2Fmmbiz.qpic

02

实现分布式的HITL Agent

我们以上篇中的工具调用HITL Agent为例,将原来的应用拆分成服务端与客户端分别实现。
【服务端API】
基于FastAPI构建,提供如下接口服务,其中核心的是invoke与resume接口,对应Agent启动与恢复:
?k=193f27f4&u=https%3A%2F%2Fmmbiz.qpic
【关键类型与Schema】
Sessions
保存服务端会话。采用简单的Dict类型,定义如下:
sessions: Dict[str, Dict[str, Union[CompiledGraph, str]]] = {
    "user_id": {
    "agent": CompiledGraph, # create_react_agent 返回的智能体实例
    "session_id": str # UUID 字符串
    # }
}
一个用户(user_id)同时只有一个活动的会话(session_id)和智能体(agent)。
AgentRequest
代表客户端发起的Agent调用请求:
# 客户端发起的智能体请求
class AgentRequest(BaseModel):
    user_id: str
    query: str
    system_message: Optional[str] = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。"
其中设置了user_id代表发起请求的客户端,因为现在是一个多用户环境。
?k=0d9a11ed&u=https%3A%2F%2Fmmbiz.qpic

注意,这里仅是为了简单模拟。在真实应用中通常不会直接传输user_id,一般会基于携带的安全token在服务端获取(首先做用户认证)。

AgentResponse
代表服务端Agent给客户端的响应:
# 智能体给予的响应
class AgentResponse(BaseModel):

    # 一次会话的唯一标识符
    session_id: str

    # 三个状态:interrupted, completed, error
    status: str 

    #error时的提示消息
    message: Optional[str] = None

    #completed时的结果消息
    result: Optional[Dict[str, Any]] = None

    #interrupted时的中断消息
    interrupt_data: Optional[Dict[str, Any]] = None
为了方便处理,这里把Agent返回客户端的响应用统一的AgentResponse表示,并在服务端根据实际Agent调用的情况构造状态(status)和返回内容:
  • 如果Agent正常返回,且结果中包含了__interrupt__,则取出结果中的interrupt_data返回,状态为interrupted
  • 如果Agent正常返回,且结果中未包含__interrupt__,则将结果直接返回,设置状态为completed
  • 其他异常情况,则将错误信息放入message返回,并设置状态为error

InterruptResponse
这代表客户端对中断进行处理后给予的反馈结果:
# 客户端给予的反馈响应
class InterruptResponse(BaseModel):
    user_id: str
    session_id: str

    # 响应类型:accept, reject, edit
    response_type: str 

    # 如果是edit类型,可能需要额外的参数
    args: Optional[Dict[str, Any]] = None
对于工具调用的反馈,通常有三种类型:accpet(允许调用)、reject(不允许调用)、edit(调整参数,此时args中携带修改后的调用参数)。
【两个关键接口】
工具与Agent本身与本地模式并无不同,这里直接看两个关键接口:
invoke_agent
不同的地方主要在于增加session与agent实例管理的动作:
  • 针对新的请求创建session和agent实例,并在后续交互过程中重用它们。
  • session_id同时可以用作agent运行的thread_id,用来实现agent内部检查点。

大致实现如下(部分):
@app.post("/agent/invoke", response_model=AgentResponse)
def invoke_agent(request: AgentRequest):
    """启动智能体处理用户请求 - 同步版本,等待执行完成或中断"""
    user_id = request.user_id
    if user_id notin sessions:

        # 只在创建新会话时生成新的会话ID
        session_id = str(uuid.uuid4())
        agent = create_tavily_search_agent(user_id)
        sessions[user_id] = {
            "agent": agent,
            "session_id": session_id
        }
    else:
        # 使用现有会话的ID和agent
        agent = sessions[user_id]["agent"]
        session_id = sessions[user_id]["session_id"]
        
    # 初始化智能体输入
    messages = ...
    config = {"configurable": {"thread_id": session_id}}

    try:
        result = agent.invoke(messages,config)
        return process_agent_result(session_id, result)
    ...
resume_agent
借助user_id取出服务端保存的agent实例,并恢复运行即可:
@app.post("/agent/resume", response_model=AgentResponse)
def resume_agent(response: InterruptResponse):

    """恢复被中断的智能体执行 - 同步版本,等待执行完成或再次中断"""
    user_id = response.user_id
    client_session_id = response.session_id
    
    # 检查用户会话是否存在
    if user_id notin sessions:
        raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在")
    
    # 检查会话ID是否匹配,由于未实现过期机制,也可以省略
    server_session_id = sessions[user_id]["session_id"]
    if server_session_id != client_session_id:
        raise HTTPException(status_code=400, detail="会话ID不匹配,可能是过期的请求")
    
    # 获取智能体和配置
    agent = sessions[user_id]["agent"]
    
    # 构造响应数据
    command_data = {
        "type": response.response_type
    }
    
    # 如果提供了参数,添加到响应数据中
    if response.args:
        command_data["args"] = response.args
    
    try:
        # 先恢复智能体执行
        result = agent.invoke(Command(resume=command_data), config={"configurable": {"thread_id": server_session_id}})

        # 再处理结果
        return process_agent_result(server_session_id, result)

......
这里的process_agent_result是一个辅助函数,用来对agent返回结果做统一处理,其逻辑参考AgentReponse的返回规则。
最后再实现一个删除会话接口,让客户端可以根据需要来管理会话生命周期
@app.delete("/agent/session/{user_id}")
def delete_agent_session(user_idstr):
    ...
    del sessions[user_id]
    ...
【客户端实现】
完成了服务端,客户端的实现在流程上与本地模式并无太多不同。只是原来的调用转化成对invoke_agent与resume_agent两个API的调用。
核心处理如下所示:
def main():

...获取用户ID,模拟不同的客户端
   user_id = ...

    while True:
        try:
            # 获取用户查询问题
            query = ...
                
            # 调用智能体
            response = invoke_agent(user_id, query)
            
            # 处理智能体响应
            process_agent_response(response, user_id)

        ...
process_agent_response函数中,根据invoke接口调用的响应状态进行分类处理,特别是interrupted状态(其他状态就直接结束)的处理:
  • 进入人类交互,请求确认或者拒绝等。
  • 交互结束后调用resume_agent接口恢复运行。
  • 用resum的返回结果递归调用process_agent_response,以支持可能再次发生的中断,直到返回completed或者error状态。

这里不再展示详细实现。直接看客户端运行效果:
?k=2e411c27&u=https%3A%2F%2Fmmbiz.qpic
我们在agent流程结束后调用了会话清理(delete)接口以开启新的session,实际应用中可根据自身需要灵活控制。

03

实现客户端的故障恢复

我们成功地将HITL Agent从单机模式扩展到了远程API模式,实现了多用户访问。但是在实际使用中也引入了更多的故障点,特别是会话的脆弱性(比如Web客户端不小心刷新了页面)
【主要挑战】
这里主要的问题是:
一旦客户端意外退出,正在进行的会话就会断开,状态处于“未知”。
?k=af42554d&u=https%3A%2F%2Fmmbiz.qpic

但是服务端的agent不是有checkpointer吗?

尽管checkpointer持久了Agent运行的步骤与State,但其更多用于agent恢复运行时的“现场重建”,但在上面的情况下,你甚至无法知道要不要“恢复运行”。比如,agent在运行到某个较慢的节点时,客户端忽然断开;此时再次连接后,由于服务端很可能还在运行中(running),并不能简单的调用resume来恢复。

【优化方案】

考虑两种优化方案:

  • 同步+增强的会话状态管理

延续当前的实现模式。但是需要在此基础上增强服务端的会话状态管理,可以用来后续查询与恢复。

  • 异步+任务管理

改造现有的同步Agent调用为异步模式。即:不管是invoke_agent还是resum_agent的运行都启动异步任务来运行,接口本身则立即返回task_id。后续客户端随时可以通过task_id来查询服务端任务的状态与响应,因此也就无惧会话断开。

这种模式适用于大规模并发模式,或者agent运行任务时间过长(比如做深度研究)。

【方案实现】

这里我们延续之前的demo,介绍第一种方式。

首先在之前的基础上增强会话管理:增加status以及最后的响应内容等信息:
# 增强的会话存储结构
sessions = {
    "user_id": {
        "agent": agent_instance,
        "session_id""uuid",
        "status""idle|running|interrupted|completed|error"# 新增状态跟踪
        "last_response": AgentResponse, # 保存最后响应
        "last_query": str, # 保存最后查询
        "last_updated": timestamp # 时间戳
    }
}

这里的关键改进用来帮助客户端在会话断开时做恢复

  • 状态保存:每次agent状态变化都会更新并保存,划分成5种状态
  • 响应缓存:保存最后一次的响应,包括中断数据,用于客户端恢复

接下来只需要在每次Agent会话状态变化时,对其进行保存即可。
比如,在开始运行agent之前:
...
sessions[user_id]["status"] = "running"
sessions[user_id]["last_query"] = request.query
sessions[user_id]["last_response"] = None
sessions[user_id]["last_updated"] = time.time()
而在返回客户端之前,将返回信息保存起来(注意此时客户端可能已经断开):
......
sessions[user_id]["status"] = response.status
sessions[user_id]["last_response"] = response
sessions[user_id]["last_updated"] = time.time()
此外,在agent恢复、错误发生等环节,都注意更新状态。
在服务端实现更完整的状态保存,并提供查询接口后,客户端也需要做相应的处理:在每次启动后根据user_id查询服务端状态,一旦发现有未完成的agent会话在进行,就可以根据其status与last_response做相应处理。比如:
一个处于running状态的agent,客户端可以继续等待其状态变化;而一个处于interrupted状态的agent,则可以进入人类交互过程。
这里不再详细介绍客户端实现。
【效果演示】
首先启动优化后的服务端。然后运行客户端查询,开始执行后立刻强行退出:
?k=5dcf724f&u=https%3A%2F%2Fmmbiz.qpic
然后再次启动客户端,输入相同的user_id,可以看到服务端agent状态已经运行到“interrupted”状态:
?k=cd7dd543&u=https%3A%2F%2Fmmbiz.qpic
此时客户端会自动显示interrupt信息,并请求用户进行交互与反馈,流程得以继续:
?k=74b3aa71&u=https%3A%2F%2Fmmbiz.qpic
接着,我们在agent恢复运行后,再次强行中断:
?k=ed9fb09a&u=https%3A%2F%2Fmmbiz.qpic
再次启动客户端用同一用户进入。可以看到服务端agent当前处于“running”状态:
?k=29b16ba6&u=https%3A%2F%2Fmmbiz.qpic
此时,客户端会进入等待状态,直到服务端agent状态变成completed:
?k=2d0cba89&u=https%3A%2F%2Fmmbiz.qpic
最后显示completed状态的最终响应结果:
?k=b6b7559e&u=https%3A%2F%2Fmmbiz.qpic
整个过程可以看到,无论是在interrupt中断发生,还是在正常运行过程中,即使发生会话异常,最后都可以借助状态查询与恢复机制,让流程得以继续,使用的可靠性与体验得到了极大的提升!

04

服务端的故障恢复建议

当然在实际应用中,还有一种更复杂的情况是服务端的故障恢复。特别是在这样“有状态”的服务端下,在关键应用中会面临几个普遍的问题:

  • 单点故障:服务重启后所有会话丢失
  • 无法简单的水平扩展:负载均衡会导致会话找不到
  • 资源浪费:每个服务实例都要维护完整会话与状态

虽然LangGraph的checkpointer将agent运行轨迹保存在PostgreSQL中,但我们的会话元数据(session映射、用户状态、中断信息)仍然存储在内存中,这使得服务端重启后用户完全无法恢复之前的会话。

一个可能的解决方案是:

对服务端内存会话数据做持久化,比如借助Redis/rdbms;并在每次服务启动时,根据Redis中信息进行会话恢复。当然,实际应用中可以采用“懒”加载机制延后恢复。而服务端agent实例则借助checkpointer做状态恢复和运行。

这种方式不仅解决了服务端故障恢复的问题,还可以为系统的水平扩展和高可用部署奠定了基础。当然也需要更加精细化的控制与处理。

?k=e71e7478&u=https%3A%2F%2Fmmbiz.qpic

以上我们介绍了分布式环境下HITL Agent的关键设计,特别是网络环境下的故障恢复。毕竟在真实的生产环境中,服务重启、网络抖动、硬件故障都有可能发生,一个能够优雅地从各种故障中恢复的系统,才是真正可用的企业级系统。尽管增加了架构复杂度,但这种复杂度换来的是系统可靠性的飞跃。

本文演示代码参考:

https://github.com/pingcy/agent-hitl

福利时间

为了帮助LLM开发人员更系统性与更深入的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本长达500页的开发与优化指南,与大家一起来深入到LLM应用开发的全新世界。

更多细节,点击链接了解?k=656b68d8&u=https%3A%2F%2Fmmbiz.qpic


交流请识别以下名片
?k=5530cc2c&u=https%3A%2F%2Fmmbiz.qpic

640?

640?from=appmsg

640?from=appmsg

640

640

阅读原文

跳转微信打开