原创 秋山墨客 2025-06-10 08:30 江苏

服务化的Agent?带你完整体验A2A SDK的力量。

?k=3147c71c&u=https%3A%2F%2Fmmbiz.qpic

点击上方蓝字加入我们

?k=154cec1f&u=https%3A%2F%2Fmmbiz.qpic
?k=690b1834&u=https%3A%2F%2Fmmbiz.qpic
Google A2A协议提出了一种应用与Agent之间、或异构的Agent之间实现标准化协作、而无需关注对方内部实现的方法。相对MCP Server中更“轻”的Tool服务,A2A旨在将更“重”的Agent进行“服务化”,以方便发现与调用。
?k=3c69392f&u=https%3A%2F%2Fmmbiz.qpic
本文基于A2A最新的Python-SDK,介绍如何将一个LangGraph的Agent通过A2A协议对外暴露并服务,以初步体验A2A的集成方法(代码地址见文末)。
  • 核心架构、概念与交互流程
  • 使用SDK实现A2A Server端
  • 使用SDK实现A2A Client端
  • 测试与总结

A2A的基本思想可以翻阅:4个问题 + 1个Demo:彻底搞懂谷歌的新玩意Agent2Agent(A2A)

01

核心架构、概念与交互流程

我们借助A2A SDK构建一个“服务化”的Agent系统Demo,其大致架构如下:
?k=f4117afc&u=https%3A%2F%2Fmmbiz.qpic
【基本构成】
  • Agent:智能体实现,可以借助任何框架实现,比如使用LangGraph框架实现
  • AgentExecutor:负责接收任务请求,并通过调用Agent来生成响应的执行器。需要从AgentExecutor抽象类派生,并实现其execute与cancel两个标准接口
  • A2A Server:uvicorn服务器,提供标准化的HTTP服务。包括AgentCard的暴露以及将HTTP请求映射到对应的Web应用,进而交给AgentExecute执行。
  • A2A Client:调用服务端Agent能力的“客户端”应用。需要注意的是,在A2A架构中,客户端是相对的,它也可能是另外一个Agent系统。

【核心概念】
要使用A2A SDK,首先需要理解客户端/服务端交互中的四种类型的消息主体
  • Message(消息):代表Agent与客户端之间的一次沟通消息,包含角色、内容等(类似很多LLM API中的Message)。角色可以是“user”或者“agent”。

  • Task(任务):代表Agent为客户端完成的一次工作单元。其封装了整个任务过程中的多次交互消息(Message)、多个任务产出(Artifacts)等。

  • TaskStatusUpdateEvent(任务状态更新事件):代表Agent在任务过程中的状态变化。用在流(SSE)模式下多次输出,让客户端了解当前任务进展。状态包括:


?k=a6a96c40&u=https%3A%2F%2Fmmbiz.qpic
    注意,什么时候输出这些状态变更的Event是由服务端自行控制。


  • TaskArtifactUpdateEvent(任务工件更新):代表Agent任务执行过程中交付的多个产出。一个Artifact在内容上与Message类似,都是由多个Part(文本、文件或JSON数据)组成。

在交互过程中,客户端通常只会给服务端发送Message;但是服务端的响应则可能是上面四种不同类型。
我们用简单的例子来加强认识:
  • 一个简单的客户端消息(比如“你好”),服务端可能无需创建任务,只需要快速响应一个Message即可。

  • 客户端发送任务请求,比如“完成xxxx主题的研究,并输出文件”。此时:

  • 服务端需要首先生成一个任务让Agent完成。此时可以在调用Agent之前推送一个Task给客户端,状态为”submitted“,通知“我已经提交了一个任务”

  • 任务执行的过程中(流式),Agent持续输出中间状态(比如“正在生成提纲”,“正在搜索”);服务端可以推送TaskStatusUpdateEvent给客户端

  • 任务执行完成,Agent生成了任务的最终产出(比如一个报告);此时服务端可以推送TaskArtifactUpdateEvent给客户端,通知其任务结果

  • 最后,服务端可以再次推送状态为“completed”的TaskStatusUpdateEvent给客户端,表示任务已经结束,将关闭SSE连接。

  • 任何时候(比如客户端异常中断)也可以直接调用task/get方法(输入taskId)查询当前任务详情;此时服务端只会返回完整的Task信息


  • 任务用唯一的taskId来标识;多个task可以在一个用contextId标识的上下文中。


【交互流程】
A2A客户端与服务端之间的任务交互过程,有两种主要模式:
非流模式通过HTTP Post同步获得请求的响应结果。交互流程如下:
?k=760b7a5e&u=https%3A%2F%2Fmmbiz.qpic
  • 初始化:通过HTTP GET请求获取AgentCard,了解Agent的基本信息,创建客户端实例。

  • 同步交互:客户端发送请求后等待服务端的完整响应。同步请求可能获得的响应为Task(包含了完整的任务处理过程和结果)或者Message类型。

  • 中断处理:A2A的异步特性决定即使发生中断,仍然可以获取任务结果。通过发起get_task调用,可以轮询任务结果。

流模式通过接收HTTP请求后的多个SSE事件来获取响应。交互流程如下:
?k=5b22758f&u=https%3A%2F%2Fmmbiz.qpic
  • 初始化:与非流式模式下基本一致,但该模式下建议检查Agent是否声明支持streaming。

  • 交互过程:客户端通过send_streaming_message方法发起请求;然后通过异步迭代器接收多个SSE事件,直到服务端发送结束。流模式获得的响应内容可能是Task(任务)、Message(消息)、TaskStatusUpdateEvent(任务状态更新)、TaskArtifactUpdateEvent(任务工件更新)这四种类型。

  • 中断处理:如果在交互过程中发生中断,客户端可以通过resubscribe调用重新连接到SSE流,以继续任务并接收更新。

02

实现A2A Server端

接下来使用A2A最新的SDK来构建一个“服务化”的Agent(支持Streaming)。根据上面架构图,服务端需要实现三个部分:Agent、AgentExecutor、A2A Server
【Agent】
Agent是任务执行的主体,理论上可以用你任何喜欢的开发框架来实现,与普通的Agent并无特别之处。你应该在交给A2A Server前做完整测试。
这里用LangGraph实现一个简单的ReAct Agent,它有两个工具search(搜索)和mail(发邮件),其中search使用了MCP Server。这里展示其核心的调用接口实现:
class SearchAgent:

    SYSTEM_INSTRUCTION = '''
你是一个智能助手,会智能的使用工具来完成输入任务。
如果调用工具过程中需要用户提供参数信息,你必须严格按照以下格式返回JSON字符串,不要有任何解释:
{"status": "input-required", "message": "{需要用户提供的信息}"}
'''

    ......
    async def stream(self, query: str, context_id) -> AsyncIterable[Dict[str, Any]]:
            
        agent = await self._get_agent()   #调用create_react_agent创建的graph
        config = {'configurable': {'thread_id': context_id}}

        try:
            async for chunk in  agent.astream({"messages": [("user", query)]}, config=config, stream_mode='values'):
                message = chunk["messages"][-1]
                if (isinstance(message, AIMessage) and hasattr(message, "tool_calls"and message.tool_calls):
                    yield {"status":"working","content":f"正在调用工具【{message.tool_calls[0]["name"]}】..."}
                elif isinstance(message, ToolMessage):
                    yield {"status":"working","content":"正在处理工具调用结果..."}           
                elif isinstance(message, AIMessage):                    
                    if isinstance(message.content, str) and"input-required"in message.content:
                        try
                            input_info = json.loads(message.content)
                            yield {"status""input-required""content": input_info.get("message""需要更多信息")}
                        except Exception as e:
                            yield {"status":"failed","content":str(e)}                          
                    else:
                        yield {"status""completed""content": message.content}
        except Exception as e:
            yield {"status":"failed","content":str(e)}
大致过程如下:
  • 通过astream方法流式调用Agent实例。注意这里使用了stream_mode=values这种模式下Agent会在每个步骤(节点)后返回出最新的State;这样方便获得Agent执行过程的中间状态;

  • 对每个步骤返回的State进行处理。根据其最后一条消息判断Agent处理的状态,比如“准备调用工具”,“处理工具调用结果”这些中间状态并输出(yield);

  • 对Agent最终输出结果的处理。如果State表明这是最终输出而非中间状态,则又分成了两种情况:成功或者需要补充更多信息;并将这些结果输出(yield);

  • 针对A2A的特殊“优化”:将Agent的输出做结构化,增加一个“status”属性,是为了方便后面的AgentExecutor做处理;但要注意这种处理并不是必须的。

先用一段本地代码测试这个Agent:
?k=cbfa6483&u=https%3A%2F%2Fmmbiz.qpic
可以看到这个Agent支持连续对话以补充信息;并支持流式调用以持续输出中间状态与结果。
【AgentExecutor】

Agent准备完成,现在借助A2A SDK来实现AgentExecutor。它是Web Server与后端Agent之间的“桥梁”,需要实现的核心接口是execute()。其规格如下:

@abstractmethod
async def execute(self, context: RequestContext, 
                        event_queue: EventQueue)
:

SDK会输入两个参数:

context:请求的上下文信息。其中包含了客户端的本次请求消息(Message)、任务ID(taskId)、上下文ID(contextId),以及可能的任务(Task)信息等。

?k=0d9a11ed&u=https%3A%2F%2Fmmbiz.qpic

什么时候context中会有task信息呢?


在接收到客户端新的任务请求时,context中没有task信息,此时服务端需要自行创建新的task;但在同一task的后续多次交互时(比如补充信息),新的客户端消息需携带相同的taskId,此时这里的context中就会有task信息。

event_queue:用来发布服务端响应的队列。在流模式下,可以通过该队列推送上面介绍的四种类型的SSE事件给客户端;在非流模式下,也可通过它返回结果。

实现代码如下:

async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    )
 -> None:

        
        query = context.get_user_input()
        task = context.current_task

        #如果没有task,就创建它;并通知客户端
        if not task:
            task = new_task(context.message)
            event_queue.enqueue_event(task)
        
        #用来简化发布事件的更新器
        updater = TaskUpdater(event_queue, task.id, task.contextId)
        
        try:
            response = self.agent.stream(query, task.contextId)

            #对后端agent的迭代输出做处理
            asyncfor item in response:
                
                status = item["status"]
                content = item["content"]

                #“任务状态更新”:working、input-required等
                if status != "completed":
                    updater.update_status(
                        status,
                        new_agent_text_message(
                            content,
                            task.contextId,
                            task.id,
                        ),
                    )

                else
                    #“任务工件更新”:Agent处理成果 
                    updater.add_artifact(
[Part(root=TextPart(text=content))],
                        name="search_result",
                    )

                    #“任务状态更新”:completed,SSE关闭
                    updater.complete(
                        new_agent_text_message(
                            '任务完成',
                            task.contextId,
                            task.id,)
                    )
                    break

        except Exception as e:
......

借助代码注释可以理解这里的逻辑。简单说明几点:

  • A2A是一个面向“有状态”任务的协议,因此不能简单的每次从上下文中取出消息->创建任务->调用Agent。你需要判断是否已经存在任务信息:

if not task:    task = new_task(context.message)    event_queue.enqueue_event(task)

如果存在任务,说明当前已经处于一个任务过程中,则无需创建。

  • 用taskId或contextId来隔离Agent调用的上下文(对应着LangGraph Agent调用时thread_id)。这里使用了contextId来调用:

response = self.agent.stream(query, task.contextId)

由于一个context可能对应着多个task,因此这表明让Agent在多个任务间共享上下文与状态(State);你也可以使用task.id来调用,用来让Agent对每个任务设置不同的上下文与状态。

这里根据Agent响应中的status属性来决定向客户端推送的事件(参考前面Agent的实现)。借助updater这个帮助对象来简化操作,其update_status/add_artifact/complete本质上都是向event_queue放入响应事件。

【A2A Server】

最后来准备一个A2A Server。由于其底层基于uvicorn服务器,因此基本逻辑就是发布AgentCard,并把HTTP请求路由到AgentExecutor处理:

....#省略agent_card的准备过程......

        httpx_client = httpx.AsyncClient()
        request_handler = DefaultRequestHandler(
            agent_executor=SearchAgentExecutor(model_name=model),
            ......
        )
        server = A2AStarletteApplication(
            agent_card=agent_card, http_handler=request_handler
        )

        logger.info(f"服务地址: http://{host}:{port}/")
        logger.info(f"Agent Card: http://{host}:{port}/.well-known/agent.json")
        uvicorn.run(server.build(), host=host, port=port)

现在可以启动A2A Server:

?k=5ad9f561&u=https%3A%2F%2Fmmbiz.qpic

03

实现A2A Client端

创建一个A2A客户端来测试A2A Server中的Agent,客户端处理流程如下(流模式):
?k=a426d826&u=https%3A%2F%2Fmmbiz.qpic

客户端的处理流程并无特别之处,唯一需要注意的是上下文的管理,还是依赖于对taskId与contextId的应用。为了更好的控制,我选择了在客户端生成taskId与contextId(否则由服务端在创建任务时自动生成):

  • 一次任务中可能的多次交互消息,注意携带相同的taskId与contextId

  • 为不同的任务生成不同的taskId,但contextId可以根据实际情况而确定


具体请参考Demo的源代码进行理解。

04

测试与总结

现在,可以通过客户端来测试A2A Server中的Agent。仍然以上面本地测试Agent时的输入任务为例,但现在是基于A2A协议的通信模式:

?k=6af595df&u=https%3A%2F%2Fmmbiz.qpic

由于采用了流模式,根据上面的设计,客户端将会分多次收到服务端推送(参考AgentExecutor的实现)的不同事件:

首先会收到一个Task类型的Event,表明这个任务已经创建(submitted):

?k=26edebdf&u=https%3A%2F%2Fmmbiz.qpic

接着Agent运行,但是这个任务需要补充信息,此时会发送一个状态为input-required的TaskSatusUpdateEvent事件:

?k=501d275f&u=https%3A%2F%2Fmmbiz.qpic

现在我们提供补充信息,然后继续。注意此时的任务ID保持不变:

?k=8fda6720&u=https%3A%2F%2Fmmbiz.qpic

接着会看到Agent处理的多个中间状态,这是TaskSatusUpdateEvent类型的事件:

?k=a98a1ce4&u=https%3A%2F%2Fmmbiz.qpic

然后,当agent调用完成后,接收到一个TaskArtifactUpdateEvent类型的事件:

?k=eab71b51&u=https%3A%2F%2Fmmbiz.qpic

最后,在服务端关闭之前,还会发送一个Task类型,状态为completed的事件,表明这是最终事件(会触发SSE关闭):

?k=7c0a50e2&u=https%3A%2F%2Fmmbiz.qpic

除了正常的流程外,我们的demo还演示了当同步调用发生异常被中断时,如何通过task_id来轮询任务结果;以及如何设置通知机制来接收服务端的事件。这对于运行长时间的任务具有重大意义,方便接收到任务的异步状态更新与结果。

?k=e71e7478&u=https%3A%2F%2Fmmbiz.qpic

最后简单总结:A2A给Agent带来了一种基于标准的互操作能力,把Agent用一种标准的方法开放。即使不用于与其他Agent相互通信,也可以把它作为一种Agent“服务化”的解决方案,帮你简化常规方法(比如FastAPI)可能面临的棘手问题。比如:

  • 灵活性。特别是能够同时支持同步响应、流式推送与异步通知等多种交互。

  • 异步性。更友好的支持人机交互时间较长的任务,这对于Agent场景很常见。即使发生中断,也可以通过任务轮询、重新连接等方式继续获取任务结果。

  • 多轮交互。基于有状态的任务单元设计,可以实现任务过程中的多轮交互。


A2A-SDK仍然在不断完善中,后续我们将持续关注并分享使用经验!

本文源代码:

https://github.com/pingcy/a2a-demo

图片

END

福利时间


为了帮助LLM开发人员更系统性与更深入的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本长达500页的开发与优化指南,与大家一起来深入到LLM应用开发的全新世界。

更多细节,点击链接了解图片?k=09742918&u=


交流请识别以下名片

图片


640?from=appmsg

640?from=appmsg

640?from=appmsg

640

640?from=appmsg

640?from=appmsg

640

640?from=appmsg

阅读原文

跳转微信打开