彻底说清 Human-in-the-Loop【下】:分布式环境下的挑战与应对
原创 秋山墨客 2025-06-03 08:30 江苏
如何优雅与可靠的实现企业级的HITL Agent系统?
点击上方
蓝字
关注我们
-
分布式环境下的挑战与应对 -
实现分布式的HITL Agent -
实现客户端的故障恢复 -
服务端的故障恢复
01
分布式环境下的挑战与应对
# 单机模式:直接调用agent = create_react_agent(model, tools, checkpointer)result = agent.invoke({"messages": messages})# 中断处理是同步的if "__interrupt__" in result: user_response = input("请选择:yes/no/edit") # 直接恢复执行 result = agent.invoke(Command(resume=response_data))
-
从单一上下文到多个API接口 中断机制要求一次HITL Agent的交互过程必然要拆分到多个HTTP请求完成。 -
从单一用户会话到多用户会话 HITL Agent的中断工作模式要求系统保持会话与状态,因此需要隔离的会话管理机制来应对潜在的多用户访问。
-
不考虑流式(Streaming)过程输出 -
不考虑Agent的异步执行,采用同步API接口 -
一个用户当前只能有一个活动的Agent会话
-
服务端提供两个基本的API,分别用于启动Agent与恢复Agent执行;它们返回的数据可能是Agent执行的最终结果,也可能是中断请求数据 -
客户端发起请求时通过模拟的user_id来标识自己(实际应用中可能是安全令牌) -
服务端保存每个用户的当前Agent实例与会话ID(也作为运行Agent的thread_id) -
一次Agent会话的生命周期如下:
02
实现分布式的HITL Agent
sessions: Dict[str, Dict[str, Union[CompiledGraph, str]]] = {
# "user_id": {
# "agent": CompiledGraph, # create_react_agent 返回的智能体实例
# "session_id": str # UUID 字符串
# }
}
# 客户端发起的智能体请求
class AgentRequest(BaseModel):
user_id: str
query: str
system_message: Optional[str] = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。"
注意,这里仅是为了简单模拟。在真实应用中通常不会直接传输user_id,一般会基于携带的安全token在服务端获取(首先做用户认证)。
# 智能体给予的响应
class AgentResponse(BaseModel):
# 一次会话的唯一标识符
session_id: str
# 三个状态:interrupted, completed, error
status: str
#error时的提示消息
message: Optional[str] = None
#completed时的结果消息
result: Optional[Dict[str, Any]] = None
#interrupted时的中断消息
interrupt_data: Optional[Dict[str, Any]] = None
-
如果Agent正常返回,且结果中包含了__interrupt__,则取出结果中的interrupt_data返回,状态为interrupted -
如果Agent正常返回,且结果中未包含__interrupt__,则将结果直接返回,设置状态为completed -
其他异常情况,则将错误信息放入message返回,并设置状态为error
# 客户端给予的反馈响应
class InterruptResponse(BaseModel):
user_id: str
session_id: str
# 响应类型:accept, reject, edit
response_type: str
# 如果是edit类型,可能需要额外的参数
args: Optional[Dict[str, Any]] = None
-
针对新的请求创建session和agent实例,并在后续交互过程中重用它们。 -
session_id同时可以用作agent运行的thread_id,用来实现agent内部检查点。
@app.post("/agent/invoke", response_model=AgentResponse)
def invoke_agent(request: AgentRequest):
"""启动智能体处理用户请求 - 同步版本,等待执行完成或中断"""
user_id = request.user_id
if user_id notin sessions:
# 只在创建新会话时生成新的会话ID
session_id = str(uuid.uuid4())
agent = create_tavily_search_agent(user_id)
sessions[user_id] = {
"agent": agent,
"session_id": session_id
}
else:
# 使用现有会话的ID和agent
agent = sessions[user_id]["agent"]
session_id = sessions[user_id]["session_id"]
# 初始化智能体输入
messages = ...
config = {"configurable": {"thread_id": session_id}}
try:
result = agent.invoke(messages,config)
return process_agent_result(session_id, result)
...
@app.post("/agent/resume", response_model=AgentResponse)
def resume_agent(response: InterruptResponse):
"""恢复被中断的智能体执行 - 同步版本,等待执行完成或再次中断"""
user_id = response.user_id
client_session_id = response.session_id
# 检查用户会话是否存在
if user_id notin sessions:
raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在")
# 检查会话ID是否匹配,由于未实现过期机制,也可以省略
server_session_id = sessions[user_id]["session_id"]
if server_session_id != client_session_id:
raise HTTPException(status_code=400, detail="会话ID不匹配,可能是过期的请求")
# 获取智能体和配置
agent = sessions[user_id]["agent"]
# 构造响应数据
command_data = {
"type": response.response_type
}
# 如果提供了参数,添加到响应数据中
if response.args:
command_data["args"] = response.args
try:
# 先恢复智能体执行
result = agent.invoke(Command(resume=command_data), config={"configurable": {"thread_id": server_session_id}})
# 再处理结果
return process_agent_result(server_session_id, result)
......
@app.delete("/agent/session/{user_id}")
def delete_agent_session(user_id: str):
...
del sessions[user_id]
...
def main():
...获取用户ID,模拟不同的客户端
user_id = ...
while True:
try:
# 获取用户查询问题
query = ...
# 调用智能体
response = invoke_agent(user_id, query)
# 处理智能体响应
process_agent_response(response, user_id)
...
-
进入人类交互,请求确认或者拒绝等。 -
交互结束后调用resume_agent接口恢复运行。 -
用resum的返回结果递归调用process_agent_response,以支持可能再次发生的中断,直到返回completed或者error状态。
03
实现客户端的故障恢复
但是服务端的agent不是有checkpointer吗?
尽管checkpointer持久了Agent运行的步骤与State,但其更多用于agent恢复运行时的“现场重建”,但在上面的情况下,你甚至无法知道要不要“恢复运行”。比如,agent在运行到某个较慢的节点时,客户端忽然断开;此时再次连接后,由于服务端很可能还在运行中(running),并不能简单的调用resume来恢复。
【优化方案】
考虑两种优化方案:
-
同步+增强的会话状态管理
延续当前的实现模式。但是需要在此基础上增强服务端的会话状态管理,可以用来后续查询与恢复。
-
异步+任务管理
改造现有的同步Agent调用为异步模式。即:不管是invoke_agent还是resum_agent的运行都启动异步任务来运行,接口本身则立即返回task_id。后续客户端随时可以通过task_id来查询服务端任务的状态与响应,因此也就无惧会话断开。
这种模式适用于大规模并发模式,或者agent运行任务时间过长(比如做深度研究)。
【方案实现】
这里我们延续之前的demo,介绍第一种方式。
# 增强的会话存储结构
sessions = {
"user_id": {
"agent": agent_instance,
"session_id": "uuid",
"status": "idle|running|interrupted|completed|error", # 新增状态跟踪
"last_response": AgentResponse, # 保存最后响应
"last_query": str, # 保存最后查询
"last_updated": timestamp # 时间戳
}
}
这里的关键改进用来帮助客户端在会话断开时做恢复:
-
状态保存:每次agent状态变化都会更新并保存,划分成5种状态 -
响应缓存:保存最后一次的响应,包括中断数据,用于客户端恢复
...
sessions[user_id]["status"] = "running"
sessions[user_id]["last_query"] = request.query
sessions[user_id]["last_response"] = None
sessions[user_id]["last_updated"] = time.time()
......
sessions[user_id]["status"] = response.status
sessions[user_id]["last_response"] = response
sessions[user_id]["last_updated"] = time.time()
04
服务端的故障恢复建议
当然在实际应用中,还有一种更复杂的情况是服务端的故障恢复。特别是在这样“有状态”的服务端下,在关键应用中会面临几个普遍的问题:
-
单点故障:服务重启后所有会话丢失 -
无法简单的水平扩展:负载均衡会导致会话找不到 -
资源浪费:每个服务实例都要维护完整会话与状态
虽然LangGraph的checkpointer将agent运行轨迹保存在PostgreSQL中,但我们的会话元数据(session映射、用户状态、中断信息)仍然存储在内存中,这使得服务端重启后用户完全无法恢复之前的会话。
一个可能的解决方案是:
对服务端内存会话数据做持久化,比如借助Redis/rdbms;并在每次服务启动时,根据Redis中信息进行会话恢复。当然,实际应用中可以采用“懒”加载机制延后恢复。而服务端agent实例则借助checkpointer做状态恢复和运行。
这种方式不仅解决了服务端故障恢复的问题,还可以为系统的水平扩展和高可用部署奠定了基础。当然也需要更加精细化的控制与处理。
以上我们介绍了分布式环境下HITL Agent的关键设计,特别是网络环境下的故障恢复。毕竟在真实的生产环境中,服务重启、网络抖动、硬件故障都有可能发生,一个能够优雅地从各种故障中恢复的系统,才是真正可用的企业级系统。尽管增加了架构复杂度,但这种复杂度换来的是系统可靠性的飞跃。
本文演示代码参考:
https://github.com/pingcy/agent-hitl
福利时间
为了帮助LLM开发人员更系统性与更深入的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本长达500页的开发与优化指南,与大家一起来深入到LLM应用开发的全新世界。
更多细节,点击链接了解