深入体验 A2A SDK:一步步教你构建“服务化”的 Agent 系统
原创 秋山墨客 2025-06-10 08:30 江苏
服务化的Agent?带你完整体验A2A SDK的力量。
点击上方蓝字加入我们
-
核心架构、概念与交互流程 -
使用SDK实现A2A Server端 -
使用SDK实现A2A Client端 -
测试与总结
01
核心架构、概念与交互流程
-
Agent:智能体实现,可以借助任何框架实现,比如使用LangGraph框架实现 -
AgentExecutor:负责接收任务请求,并通过调用Agent来生成响应的执行器。需要从AgentExecutor抽象类派生,并实现其execute与cancel两个标准接口 -
A2A Server:uvicorn服务器,提供标准化的HTTP服务。包括AgentCard的暴露以及将HTTP请求映射到对应的Web应用,进而交给AgentExecute执行。 -
A2A Client:调用服务端Agent能力的“客户端”应用。需要注意的是,在A2A架构中,客户端是相对的,它也可能是另外一个Agent系统。
-
Message(消息):代表Agent与客户端之间的一次沟通消息,包含角色、内容等(类似很多LLM API中的Message)。角色可以是“user”或者“agent”。
-
Task(任务):代表Agent为客户端完成的一次工作单元。其封装了整个任务过程中的多次交互消息(Message)、多个任务产出(Artifacts)等。
-
TaskStatusUpdateEvent(任务状态更新事件):代表Agent在任务过程中的状态变化。用在流(SSE)模式下多次输出,让客户端了解当前任务进展。状态包括:
-
TaskArtifactUpdateEvent(任务工件更新):代表Agent任务执行过程中交付的多个产出。一个Artifact在内容上与Message类似,都是由多个Part(文本、文件或JSON数据)组成。
-
一个简单的客户端消息(比如“你好”),服务端可能无需创建任务,只需要快速响应一个Message即可。 -
客户端发送任务请求,比如“完成xxxx主题的研究,并输出文件”。此时:
-
服务端需要首先生成一个任务让Agent完成。此时可以在调用Agent之前推送一个Task给客户端,状态为”submitted“,通知“我已经提交了一个任务”
-
任务执行的过程中(流式),Agent持续输出中间状态(比如“正在生成提纲”,“正在搜索”);服务端可以推送TaskStatusUpdateEvent给客户端
-
任务执行完成,Agent生成了任务的最终产出(比如一个报告);此时服务端可以推送TaskArtifactUpdateEvent给客户端,通知其任务结果
-
最后,服务端可以再次推送状态为“completed”的TaskStatusUpdateEvent给客户端,表示任务已经结束,将关闭SSE连接。
-
任何时候(比如客户端异常中断)也可以直接调用task/get方法(输入taskId)查询当前任务详情;此时服务端只会返回完整的Task信息。
-
任务用唯一的taskId来标识;多个task可以在一个用contextId标识的上下文中。
-
初始化:通过HTTP GET请求获取AgentCard,了解Agent的基本信息,创建客户端实例。 -
同步交互:客户端发送请求后等待服务端的完整响应。同步请求可能获得的响应为Task(包含了完整的任务处理过程和结果)或者Message类型。 -
中断处理:A2A的异步特性决定即使发生中断,仍然可以获取任务结果。通过发起get_task调用,可以轮询任务结果。
-
初始化:与非流式模式下基本一致,但该模式下建议检查Agent是否声明支持streaming。 -
交互过程:客户端通过send_streaming_message方法发起请求;然后通过异步迭代器接收多个SSE事件,直到服务端发送结束。流模式获得的响应内容可能是Task(任务)、Message(消息)、TaskStatusUpdateEvent(任务状态更新)、TaskArtifactUpdateEvent(任务工件更新)这四种类型。 -
中断处理:如果在交互过程中发生中断,客户端可以通过resubscribe调用重新连接到SSE流,以继续任务并接收更新。
02
实现A2A Server端
class SearchAgent:
SYSTEM_INSTRUCTION = '''
你是一个智能助手,会智能的使用工具来完成输入任务。
如果调用工具过程中需要用户提供参数信息,你必须严格按照以下格式返回JSON字符串,不要有任何解释:
{"status": "input-required", "message": "{需要用户提供的信息}"}
'''
......
async def stream(self, query: str, context_id) -> AsyncIterable[Dict[str, Any]]:
agent = await self._get_agent() #调用create_react_agent创建的graph
config = {'configurable': {'thread_id': context_id}}
try:
async for chunk in agent.astream({"messages": [("user", query)]}, config=config, stream_mode='values'):
message = chunk["messages"][-1]
if (isinstance(message, AIMessage) and hasattr(message, "tool_calls") and message.tool_calls):
yield {"status":"working","content":f"正在调用工具【{message.tool_calls[0]["name"]}】..."}
elif isinstance(message, ToolMessage):
yield {"status":"working","content":"正在处理工具调用结果..."}
elif isinstance(message, AIMessage):
if isinstance(message.content, str) and"input-required"in message.content:
try:
input_info = json.loads(message.content)
yield {"status": "input-required", "content": input_info.get("message", "需要更多信息")}
except Exception as e:
yield {"status":"failed","content":str(e)}
else:
yield {"status": "completed", "content": message.content}
except Exception as e:
yield {"status":"failed","content":str(e)}
-
通过astream方法流式调用Agent实例。注意这里使用了stream_mode=values;这种模式下Agent会在每个步骤(节点)后返回出最新的State;这样方便获得Agent执行过程的中间状态; -
对每个步骤返回的State进行处理。根据其最后一条消息判断Agent处理的状态,比如“准备调用工具”,“处理工具调用结果”这些中间状态并输出(yield); -
对Agent最终输出结果的处理。如果State表明这是最终输出而非中间状态,则又分成了两种情况:成功或者需要补充更多信息;并将这些结果输出(yield); -
针对A2A的特殊“优化”:将Agent的输出做结构化,增加一个“status”属性,是为了方便后面的AgentExecutor做处理;但要注意这种处理并不是必须的。
Agent准备完成,现在借助A2A SDK来实现AgentExecutor。它是Web Server与后端Agent之间的“桥梁”,需要实现的核心接口是execute()。其规格如下:
@abstractmethod
async def execute(self, context: RequestContext,
event_queue: EventQueue):
SDK会输入两个参数:
context:请求的上下文信息。其中包含了客户端的本次请求消息(Message)、任务ID(taskId)、上下文ID(contextId),以及可能的任务(Task)信息等。
什么时候context中会有task信息呢?
在接收到客户端新的任务请求时,context中没有task信息,此时服务端需要自行创建新的task;但在同一task的后续多次交互时(比如补充信息),新的客户端消息需携带相同的taskId,此时这里的context中就会有task信息。
event_queue:用来发布服务端响应的队列。在流模式下,可以通过该队列推送上面介绍的四种类型的SSE事件给客户端;在非流模式下,也可通过它返回结果。
实现代码如下:
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
query = context.get_user_input()
task = context.current_task
#如果没有task,就创建它;并通知客户端
if not task:
task = new_task(context.message)
event_queue.enqueue_event(task)
#用来简化发布事件的更新器
updater = TaskUpdater(event_queue, task.id, task.contextId)
try:
response = self.agent.stream(query, task.contextId)
#对后端agent的迭代输出做处理
asyncfor item in response:
status = item["status"]
content = item["content"]
#“任务状态更新”:working、input-required等
if status != "completed":
updater.update_status(
status,
new_agent_text_message(
content,
task.contextId,
task.id,
),
)
else:
#“任务工件更新”:Agent处理成果
updater.add_artifact(
[Part(root=TextPart(text=content))],
name="search_result",
)
#“任务状态更新”:completed,SSE关闭
updater.complete(
new_agent_text_message(
'任务完成',
task.contextId,
task.id,)
)
break
except Exception as e:
......
借助代码注释可以理解这里的逻辑。简单说明几点:
-
A2A是一个面向“有状态”任务的协议,因此不能简单的每次从上下文中取出消息->创建任务->调用Agent。你需要判断是否已经存在任务信息:
if not task:task = new_task(context.message)event_queue.enqueue_event(task)
如果存在任务,说明当前已经处于一个任务过程中,则无需创建。
-
用taskId或contextId来隔离Agent调用的上下文(对应着LangGraph Agent调用时thread_id)。这里使用了contextId来调用:
response = self.agent.stream(query, task.contextId)
由于一个context可能对应着多个task,因此这表明让Agent在多个任务间共享上下文与状态(State);你也可以使用task.id来调用,用来让Agent对每个任务设置不同的上下文与状态。
这里根据Agent响应中的status属性来决定向客户端推送的事件(参考前面Agent的实现)。借助updater这个帮助对象来简化操作,其update_status/add_artifact/complete本质上都是向event_queue放入响应事件。
【A2A Server】
最后来准备一个A2A Server。由于其底层基于uvicorn服务器,因此基本逻辑就是发布AgentCard,并把HTTP请求路由到AgentExecutor处理:
....#省略agent_card的准备过程......
httpx_client = httpx.AsyncClient()
request_handler = DefaultRequestHandler(
agent_executor=SearchAgentExecutor(model_name=model),
......
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=request_handler
)
logger.info(f"服务地址: http://{host}:{port}/")
logger.info(f"Agent Card: http://{host}:{port}/.well-known/agent.json")
uvicorn.run(server.build(), host=host, port=port)
现在可以启动A2A Server:
03
实现A2A Client端
客户端的处理流程并无特别之处,唯一需要注意的是上下文的管理,还是依赖于对taskId与contextId的应用。为了更好的控制,我选择了在客户端生成taskId与contextId(否则由服务端在创建任务时自动生成):
-
一次任务中可能的多次交互消息,注意携带相同的taskId与contextId
-
为不同的任务生成不同的taskId,但contextId可以根据实际情况而确定
具体请参考Demo的源代码进行理解。
04
测试与总结
现在,可以通过客户端来测试A2A Server中的Agent。仍然以上面本地测试Agent时的输入任务为例,但现在是基于A2A协议的通信模式:
由于采用了流模式,根据上面的设计,客户端将会分多次收到服务端推送(参考AgentExecutor的实现)的不同事件:
首先会收到一个Task类型的Event,表明这个任务已经创建(submitted):
接着Agent运行,但是这个任务需要补充信息,此时会发送一个状态为input-required的TaskSatusUpdateEvent事件:
现在我们提供补充信息,然后继续。注意此时的任务ID保持不变:
接着会看到Agent处理的多个中间状态,这是TaskSatusUpdateEvent类型的事件:
然后,当agent调用完成后,接收到一个TaskArtifactUpdateEvent类型的事件:
最后,在服务端关闭之前,还会发送一个Task类型,状态为completed的事件,表明这是最终事件(会触发SSE关闭):
除了正常的流程外,我们的demo还演示了当同步调用发生异常被中断时,如何通过task_id来轮询任务结果;以及如何设置通知机制来接收服务端的事件。这对于运行长时间的任务具有重大意义,方便接收到任务的异步状态更新与结果。
最后简单总结:A2A给Agent带来了一种基于标准的互操作能力,把Agent用一种标准的方法开放。即使不用于与其他Agent相互通信,也可以把它作为一种Agent“服务化”的解决方案,帮你简化常规方法(比如FastAPI)可能面临的棘手问题。比如:
-
灵活性。特别是能够同时支持同步响应、流式推送与异步通知等多种交互。
-
异步性。更友好的支持人机交互时间较长的任务,这对于Agent场景很常见。即使发生中断,也可以通过任务轮询、重新连接等方式继续获取任务结果。
-
多轮交互。基于有状态的任务单元设计,可以实现任务过程中的多轮交互。
A2A-SDK仍然在不断完善中,后续我们将持续关注并分享使用经验!
本文源代码:
https://github.com/pingcy/a2a-demo
END
福利时间
为了帮助LLM开发人员更系统性与更深入的学习RAG应用,特别是企业级的RAG应用场景下,当前主流的优化方法与技术实现,我们编写了《基于大模型的RAG应用开发与优化 — 构建企业级LLM应用》这本长达500页的开发与优化指南,与大家一起来深入到LLM应用开发的全新世界。
更多细节,点击链接了解
交流请识别以下名片