原创 曾经的毛毛 2025-06-16 18:34 江苏

如何构建一个异步的MCP DeepResearch Server?

?k=3f5e427d&u=https%3A%2F%2Fmmbiz.qpic

点击上方

蓝字

关注我们

?k=c4576a99&u=https%3A%2F%2Fmmbiz.qpic

DeepResearch(深度研究)是一种颇为实用的Agent形式,如LangChain/Google,国内的字节/天工都有不错的开源项目。本文将尝试借助开源项目,构建一个可在MCP Server中异步运行的DeepResearch,并且支持任务进度推送与超时控制(源代码见文末)

  • 项目概述

  • 构建DeepResearch Agent

  • 构建异步MCP DeepResearch Server

  • 效果测试与总结

01

项目概述

之前我们介绍过利用A2A协议构建“服务化”的Agent系统。那么作为另一种解决互操作问题的规范 – MCP,是否也可以用来实现“工具化”的Agent?比如将一个DeepResearch Agent运行在MCP Server之中。即:

把DeepResearch Agent作为一个Tool运行在MCP Server,实现共享与互操作。

这在技术上是完全可行的。不过相对更轻量级的普通Tool,一个由Agent支持的Tool有两个明显的挑战:

  • 很多Agent运行时间较长,更适合异步任务,实现更复杂。

  • Agent通常利用流式输出来提供中间状态感知、优化客户体验。

这两个挑战在DeepResearch上有明显体现:时间长、步骤多、速度慢

我们使用如下的架构来构建一个基于MCP的异步DeepResearch系统:

?k=3cfd39ac&u=https%3A%2F%2Fmmbiz.qpic

整个架构由三个部分组成:

DeepResearch Agent:LangGraph开发的智能体。为了简化,我们采用对Google的一个开源项目( gemini-fullstack-langgraph-quickstart)做“魔改”。

MCP Server:借助MCP SDK开发,提供并开放以下主要Tool:

  • start_research:启动深度研究任务,该工具会返回任务ID(task_id)

  • cancel_research:取消某个研究任务,输入任务ID

  • wait_research:等待并接收某个任务的“流”输出,用来观察任务过程,该工具会持续到任务结束,中途中断不影响任务进行

  • query_research:查询某个任务的当前状态与结果,该工具会立刻返回

Client应用:调用MCP Server完成深度研究任务的客户端,这里用来测试。

该架构下,一个典型的DeepResearch过程的MCP客户端与服务端的交互如下:

?k=1a2a5179&u=https%3A%2F%2Fmmbiz.qpic

下面依次构建这几个部分。

02

构建DeepResearch Agent

Google的这个开源项目是一个完整的端到端的简单版的DeepResearch应用,包含前端与后端。这里我们“借用”它的后端应用,这是一个LangGraph框架的Agent,其工作流设计如下:

?k=cdf58403&u=https%3A%2F%2Fmmbiz.qpic

对其做如下主要修改:

  • 将Gemini模型修改成了使用本地Ollama的qwen3模型

  • 将使用的Google搜索工具改成使用tavily API搜索

  • 对涉及到的提示词(查询生成、结果反思以及结果生成)适当调整

在进行下一步之间,可以对这个DeepResearch Agent做独立测试,确保其能够正常调用与输出。注意使用流式调用这个Agent,以方便跟踪与显示中间过程:

.....    # 流式执行Graph    async for chunk in graph.astream(initial_state, config=config):        step_count += 1                #stream_mode=updates时,会在每个节点后输出节点名称与节点返回信息        for node_name, node_output in chunk.items():            print(f"📍 步骤 {step_count}{node_name}")                        if node_name == "generate_query":                if "query_list" in node_output:                    print(f" 🧠 智能查询生成:")......

通过这种方式可以持续输出Agent执行的步骤与输出信息。比如:

?k=15cab1f9&u=https%3A%2F%2Fmmbiz.qpic

03

构建异步的MCP DeepResearch Server

准备好Agent后,现在我们将把其“包装”成一个可以共享使用的MCP Tool。由于我们期望该任务能够异步运行,并可以随时查询任务状态与结果,因此这里构建一个MCP Server中的任务管理器(TaskManager)。其职责是:

对服务端异步研究任务进行管理,包括任务创建、启动、取消与状态更新。

任务变化的状态图与相关接口的关系如下:

?k=d8e529e3&u=https%3A%2F%2Fmmbiz.qpic

【TaskManager】

任务管理器的基本定义如下:

class ResearchTaskManager:    def__init__(self):        self.tasks: Dict[str, Dict[str, Any]] = {}        self.task_futures: Dict[str, asyncio.Task] = {}    def create_task(self, research_query: str) -> str:        """创建新的研究任务并返回任务ID"""...    async def start_task(self, task_id: str) -> None:        """启动研究任务,在这里调用上面的DeepResearch Agent"""...    def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:        """获取任务信息"""...    def update_task(self, task_id: str, updates: Dict[str, Any]) -> None:        """更新任务信息"""...        async def cancel_task(self, task_id: str) -> str:        """取消指定的研究任务...

简单说明其核心的实现要点(详细请参考源代码):

1. 任务生命周期管理

  • 使用UUID生成唯一task_id,确保每个任务独立标识

  • 通过状态机模式控制任务从创建到完成的完整流程

2. 异步任务执行

  • 采用asyncio.create_task()创建独立的异步任务

  • 使用task_futures字典管理所有运行中的异步任务

  • 支持任务并发执行,不阻塞主线程和其他任务

  • 在异步任务中调用DeepResearch Agent运行研究任务

3. 进度更新与推送机制

  • 通过流式调用LangGraph,获取步骤与输出信息

  • 任务信息中通过current_step字段保存步骤信息

  • 后续用于推送MCP客户端实时进度信息

4. 任务取消和超时控制

  • 任务取消:检查状态 → 更新标志 → 取消Future

  • 设置超时机制,防止任务无限执行,借助asyncio.wait_for:

    # 添加10分钟超时控制

    returnawait asyncio.wait_for(

       self._execute_task(task_id),

       timeout=600

    )

    【Tools】

    MCP Tool使用FastMCP实现,有了TaskManager,几个MCP Tool的实现就相对简单,可以在MCP Server启动后创建TaskManager的实例,然后在Tool中调用它。这里重点说明start_research与wait_research两个Tool的实现。

    start_research(启动研究任务)

    使用客户端输入创建任务,然后启动,并把task_id返回客户端即可。

    @app.tool()async def start_research(ctx: Context, research_query: str) -> str:    global task_manager    task_id = task_manager.create_task(research_query)        # 启动任务    await task_manager.start_task(task_id)        message = f"研究任务已启动!n研究内容: {research_query}n任务ID: {task_id}nn该任务将使用LangGraph Agent进行深度研究"        return message

    wait_research(等待研究任务)

    该工具给客户端等待与查询当前任务的状态与步骤,实现类似于“流式”的效果(这里的“流式”主要用于推送工作流步骤与中间输出,而非最终内容)。我们借助于MCP协议中定义的服务端progress(进度)功能来实现:调用 ctx.report_progress向客户端不断发送进度。

    @app.tool()async def wait_research(ctx: Context, task_id: str) -> str:        global task_manager    task = task_manager.get_task(task_id)    ifnot task:        raise ValueError(f"找不到任务: {task_id}")    # 记录上一次的状态和步骤,用于比较变化    last_status = None    last_current_step = None        try:        whileTrue:            task = task_manager.get_task(task_id)                        current_status = task["status"]            current_step = task.get("current_step""")                        # 只有在状态或当前步骤发生变化时才报告进度            if (current_status != last_status or current_step != last_current_step) and current_step:                await ctx.report_progress(-1.0-1.0f"研究任务 {task_id[:8]}{current_step}")                last_status = current_status                last_current_step = current_step                        if task["status"in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED, TaskStatus.TIMEOUT]:                break                            await asyncio.sleep(2# 每2秒检查一次...

    简单起见,这里采用轮询方式监测任务状态与步骤并推送;如果有必要,你也可以采用更复杂但更节约资源的事件通知方式(借助asnycio.Event)。

    采用类似的方式创建其他必要的Tool。最后使用SSE或StreamableHTTP模式启动MCP Server:

    ?k=b13469c4&u=https%3A%2F%2Fmmbiz.qpic

    04

    效果测试与总结

    真实应用场景下,MCP的客户端可能是一个ChatBot或智能体应用,通过调用MCP Server中的DeepResearch工具来完成一个研究任务。但这里我们只需要验证这个工具是可用的,因此编写一个简单的交互式命令行工具来完成。

    借助MCP Client SDK快速实现这个工具,以调用wait_research工具为例(等待任务完成,并展示进度):

    ...省略MCP连接过程并获得session...async def handle_wait_task(session: ClientSession) -> None:    """处理等待任务完成"""    try        task_id = await get_user_input("请输入任务ID: "        result = await session.call_tool(            "wait_research",            {"task_id": task_id},            progress_callback=handle_progress_message        )        print("n任务完成结果:")        try:            data = result.content            pretty_print_task_result(data[0].text)        except Exception:            print(result.content)    except Exception as e:        print(f"等待任务出错: {str(e)}")

    为了让客户端能够接收MCP Server的进度通知,这里需要用到call_tool的progress_callback参数来设置回调(该功能需要MCP SDK 1.9.0以后版本),在该回调中对Server推送的进度做简单展示。

    完成这个客户端工具后,就可以启动并观察最后的测试效果。

    1. 首先启动一个DeepResearch任务,MCP Server会立刻返回一个任务ID:

    ?k=d7cfac65&u=https%3A%2F%2Fmmbiz.qpic

    2. 使用这个任务ID,你可以观察并等待该任务的完成过程与步骤。注意,任何中断不会影响到MCP Server中任务的运行:

    ?k=16029975&u=https%3A%2F%2Fmmbiz.qpic

    3. 你也可以调用query_research随时查询某个任务的状态与成果(立即返回):

    ?k=4397cecc&u=https%3A%2F%2Fmmbiz.qpic

    OK,现在你就拥有了一个带有DeepResearch工具的MCP Server。它以异步的方式运行任务,并带有超时控制与任务进度推送功能!

    ?k=e525571c&u=https%3A%2F%2Fmmbiz.qpic

    尽管从实用角度,长时间运行的Agent任务可能更适合A2A而非MCP,但以上也确实演示了在MCP Server中实现的另一种可能性,这也赋予了我们在设计类似系统时更多的选择与更大的灵活性。

    本案例还有较多的优化空间。比如:

    • 增加同步执行任务的选项,把选择权交给使用者

    • Server端配置的参数化,比如大模型、迭代次数等

    • 对异步任务做并发控制,防止过度消耗资源

    • 远程多用户环境的隔离(目前是基于taskid的访问控制)

    本文源码:

    https://github.com/pingcy/mcp-deepresearch

    图片

    END

    福利时间

    为了帮助LLM开发人员更系统性与更深入的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本长达500页的开发与优化指南,与大家一起来深入到LLM应用开发的全新世界。

    更多细节,点击链接了解图片?k=09742918&u=?k=09742918&u=

    交流请识别以下名片

    图片

    ?k=6effd26d&u=https%3A%2F%2Fmmbiz.qpic

    ?k=55545820&u=https%3A%2F%2Fmmbiz.qpic

    ?k=037b217b&u=https%3A%2F%2Fmmbiz.qpic

    ?k=52cbd92e&u=https%3A%2F%2Fmmbiz.qpic

    ?k=c915a310&u=https%3A%2F%2Fmmbiz.qpic

    ?k=15cab1f9&u=https%3A%2F%2Fmmbiz.qpic

    ?k=190e1a02&u=https%3A%2F%2Fmmbiz.qpic

    ?k=c47bca8a&u=https%3A%2F%2Fmmbiz.qpic

    ?k=a6b07641&u=https%3A%2F%2Fmmbiz.qpic

    ?k=002ddffa&u=https%3A%2F%2Fmmbiz.qpic

    ?k=1375a72a&u=https%3A%2F%2Fmmbiz.qpic

    ?k=e525571c&u=https%3A%2F%2Fmmbiz.qpic

    ?k=e3607751&u=https%3A%2F%2Fmmbiz.qpic

    ?k=3803820e&u=https%3A%2F%2Fmmbiz.qpic

    ?k=b81c9053&u=https%3A%2F%2Fmmbiz.qpic

    阅读原文

    跳转微信打开