MCP实战|从0到1构建异步 DeepResearch 工具,支持进度推送与超时控制
原创 曾经的毛毛 2025-06-16 18:34 江苏
如何构建一个异步的MCP DeepResearch Server?
点击上方
蓝字
关注我们
DeepResearch(深度研究)是一种颇为实用的Agent形式,如LangChain/Google,国内的字节/天工都有不错的开源项目。本文将尝试借助开源项目,构建一个可在MCP Server中异步运行的DeepResearch,并且支持任务进度推送与超时控制(源代码见文末)。
-
项目概述
-
构建DeepResearch Agent
-
构建异步MCP DeepResearch Server
-
效果测试与总结
01
项目概述
之前我们介绍过利用A2A协议构建“服务化”的Agent系统。那么作为另一种解决互操作问题的规范 – MCP,是否也可以用来实现“工具化”的Agent?比如将一个DeepResearch Agent运行在MCP Server之中。即:
把DeepResearch Agent作为一个Tool运行在MCP Server,实现共享与互操作。
这在技术上是完全可行的。不过相对更轻量级的普通Tool,一个由Agent支持的Tool有两个明显的挑战:
-
很多Agent运行时间较长,更适合异步任务,实现更复杂。
-
Agent通常利用流式输出来提供中间状态感知、优化客户体验。
这两个挑战在DeepResearch上有明显体现:时间长、步骤多、速度慢。
我们使用如下的架构来构建一个基于MCP的异步DeepResearch系统:
整个架构由三个部分组成:
DeepResearch Agent:LangGraph开发的智能体。为了简化,我们采用对Google的一个开源项目( gemini-fullstack-langgraph-quickstart)做“魔改”。
MCP Server:借助MCP SDK开发,提供并开放以下主要Tool:
-
start_research:启动深度研究任务,该工具会返回任务ID(task_id)
-
cancel_research:取消某个研究任务,输入任务ID
-
wait_research:等待并接收某个任务的“流”输出,用来观察任务过程,该工具会持续到任务结束,中途中断不影响任务进行
-
query_research:查询某个任务的当前状态与结果,该工具会立刻返回
Client应用:调用MCP Server完成深度研究任务的客户端,这里用来测试。
该架构下,一个典型的DeepResearch过程的MCP客户端与服务端的交互如下:
下面依次构建这几个部分。
02
构建DeepResearch Agent
Google的这个开源项目是一个完整的端到端的简单版的DeepResearch应用,包含前端与后端。这里我们“借用”它的后端应用,这是一个LangGraph框架的Agent,其工作流设计如下:
对其做如下主要修改:
-
将Gemini模型修改成了使用本地Ollama的qwen3模型
-
将使用的Google搜索工具改成使用tavily API搜索
-
对涉及到的提示词(查询生成、结果反思以及结果生成)适当调整
在进行下一步之间,可以对这个DeepResearch Agent做独立测试,确保其能够正常调用与输出。注意使用流式调用这个Agent,以方便跟踪与显示中间过程:
..... # 流式执行Graph async for chunk in graph.astream(initial_state, config=config): step_count += 1 #stream_mode=updates时,会在每个节点后输出节点名称与节点返回信息 for node_name, node_output in chunk.items(): print(f"📍 步骤 {step_count}: {node_name}") if node_name == "generate_query": if "query_list" in node_output: print(f" 🧠 智能查询生成:")......
通过这种方式可以持续输出Agent执行的步骤与输出信息。比如:
03
构建异步的MCP DeepResearch Server
准备好Agent后,现在我们将把其“包装”成一个可以共享使用的MCP Tool。由于我们期望该任务能够异步运行,并可以随时查询任务状态与结果,因此这里构建一个MCP Server中的任务管理器(TaskManager)。其职责是:
对服务端异步研究任务进行管理,包括任务创建、启动、取消与状态更新。
任务变化的状态图与相关接口的关系如下:
【TaskManager】
任务管理器的基本定义如下:
class ResearchTaskManager: def__init__(self): self.tasks: Dict[str, Dict[str, Any]] = {} self.task_futures: Dict[str, asyncio.Task] = {} def create_task(self, research_query: str) -> str: """创建新的研究任务并返回任务ID"""... async def start_task(self, task_id: str) -> None: """启动研究任务,在这里调用上面的DeepResearch Agent"""... def get_task(self, task_id: str) -> Optional[Dict[str, Any]]: """获取任务信息"""... def update_task(self, task_id: str, updates: Dict[str, Any]) -> None: """更新任务信息"""... async def cancel_task(self, task_id: str) -> str: """取消指定的研究任务...
简单说明其核心的实现要点(详细请参考源代码):
1. 任务生命周期管理
-
使用UUID生成唯一task_id,确保每个任务独立标识
-
通过状态机模式控制任务从创建到完成的完整流程
2. 异步任务执行
-
采用asyncio.create_task()创建独立的异步任务
-
使用task_futures字典管理所有运行中的异步任务
-
支持任务并发执行,不阻塞主线程和其他任务
-
在异步任务中调用DeepResearch Agent运行研究任务
3. 进度更新与推送机制
-
通过流式调用LangGraph,获取步骤与输出信息
-
任务信息中通过current_step字段保存步骤信息
-
后续用于推送MCP客户端实时进度信息
4. 任务取消和超时控制
-
任务取消:检查状态 → 更新标志 → 取消Future
-
设置超时机制,防止任务无限执行,借助asyncio.wait_for:
# 添加10分钟超时控制
returnawait asyncio.wait_for(
self._execute_task(task_id),
timeout=600
)
【Tools】
MCP Tool使用FastMCP实现,有了TaskManager,几个MCP Tool的实现就相对简单,可以在MCP Server启动后创建TaskManager的实例,然后在Tool中调用它。这里重点说明start_research与wait_research两个Tool的实现。
start_research(启动研究任务)
使用客户端输入创建任务,然后启动,并把task_id返回客户端即可。
@app.tool()async def start_research(ctx: Context, research_query: str) -> str: global task_manager task_id = task_manager.create_task(research_query) # 启动任务 await task_manager.start_task(task_id) message = f"研究任务已启动!n研究内容: {research_query}n任务ID: {task_id}nn该任务将使用LangGraph Agent进行深度研究" return message
wait_research(等待研究任务)
该工具给客户端等待与查询当前任务的状态与步骤,实现类似于“流式”的效果(这里的“流式”主要用于推送工作流步骤与中间输出,而非最终内容)。我们借助于MCP协议中定义的服务端progress(进度)功能来实现:调用 ctx.report_progress向客户端不断发送进度。
@app.tool()async def wait_research(ctx: Context, task_id: str) -> str: global task_manager task = task_manager.get_task(task_id) ifnot task: raise ValueError(f"找不到任务: {task_id}") # 记录上一次的状态和步骤,用于比较变化 last_status = None last_current_step = None try: whileTrue: task = task_manager.get_task(task_id) current_status = task["status"] current_step = task.get("current_step", "") # 只有在状态或当前步骤发生变化时才报告进度 if (current_status != last_status or current_step != last_current_step) and current_step: await ctx.report_progress(-1.0, -1.0, f"研究任务 {task_id[:8]}: {current_step}") last_status = current_status last_current_step = current_step if task["status"] in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED, TaskStatus.TIMEOUT]: break await asyncio.sleep(2) # 每2秒检查一次...
简单起见,这里采用轮询方式监测任务状态与步骤并推送;如果有必要,你也可以采用更复杂但更节约资源的事件通知方式(借助asnycio.Event)。
采用类似的方式创建其他必要的Tool。最后使用SSE或StreamableHTTP模式启动MCP Server:
04
效果测试与总结
真实应用场景下,MCP的客户端可能是一个ChatBot或智能体应用,通过调用MCP Server中的DeepResearch工具来完成一个研究任务。但这里我们只需要验证这个工具是可用的,因此编写一个简单的交互式命令行工具来完成。
借助MCP Client SDK快速实现这个工具,以调用wait_research工具为例(等待任务完成,并展示进度):
...省略MCP连接过程并获得session...async def handle_wait_task(session: ClientSession) -> None: """处理等待任务完成""" try: task_id = await get_user_input("请输入任务ID: ") result = await session.call_tool( "wait_research", {"task_id": task_id}, progress_callback=handle_progress_message ) print("n任务完成结果:") try: data = result.content pretty_print_task_result(data[0].text) except Exception: print(result.content) except Exception as e: print(f"等待任务出错: {str(e)}")
为了让客户端能够接收MCP Server的进度通知,这里需要用到call_tool的progress_callback参数来设置回调(该功能需要MCP SDK 1.9.0以后版本),在该回调中对Server推送的进度做简单展示。
完成这个客户端工具后,就可以启动并观察最后的测试效果。
1. 首先启动一个DeepResearch任务,MCP Server会立刻返回一个任务ID:
2. 使用这个任务ID,你可以观察并等待该任务的完成过程与步骤。注意,任何中断不会影响到MCP Server中任务的运行:
3. 你也可以调用query_research随时查询某个任务的状态与成果(立即返回):
OK,现在你就拥有了一个带有DeepResearch工具的MCP Server。它以异步的方式运行任务,并带有超时控制与任务进度推送功能!
尽管从实用角度,长时间运行的Agent任务可能更适合A2A而非MCP,但以上也确实演示了在MCP Server中实现的另一种可能性,这也赋予了我们在设计类似系统时更多的选择与更大的灵活性。
本案例还有较多的优化空间。比如:
-
增加同步执行任务的选项,把选择权交给使用者
-
Server端配置的参数化,比如大模型、迭代次数等
-
对异步任务做并发控制,防止过度消耗资源
-
远程多用户环境的隔离(目前是基于taskid的访问控制)
本文源码:
https://github.com/pingcy/mcp-deepresearch
END
福利时间
为了帮助LLM开发人员更系统性与更深入的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本长达500页的开发与优化指南,与大家一起来深入到LLM应用开发的全新世界。
更多细节,点击链接了解
交流请识别以下名片