这使 A2AAgent 应用程序能够连接到通过 代理到代理 (A2A) 协议公开的远程代理。 它将任何符合 A2A 的终结点包装为标准 AIAgent,因此可以使用熟悉的方法(例如 RunAsync ,并与 RunStreamingAsync 远程代理交互),而不考虑它们使用哪种框架或技术。
入门
将所需的 NuGet 包添加到项目:
dotnet add package Microsoft.Agents.AI.A2A --prerelease
代理发现
在与远程 A2A 代理通信之前,需要发现它并创建实例 AIAgent 。 A2A 协议定义了三种 发现策略,每个策略都受代理框架支持。
Well-Known URI
A2A 代理可以在标准化路径上发现其代理卡: https://{domain}/.well-known/agent-card.json 使用 A2ACardResolver 提取卡片并在单个调用中创建代理:
using A2A;
using Microsoft.Agents.AI;
// Initialize a resolver pointing at the remote agent's host.
A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
// Resolve the agent card and create an AIAgent in one step.
AIAgent agent = await resolver.GetAIAgentAsync();
// Use the agent.
Console.WriteLine(await agent.RunAsync("Hello!"));
Catalog-Based 发现
在企业环境或公共市场中,代理卡通常由中央注册表管理。 如果已 AgentCard 从此类注册表获取,请将其直接转换为 AIAgent:
using A2A;
using Microsoft.Agents.AI;
// Assume agentCard was retrieved from a registry or catalog.
AgentCard agentCard = await GetAgentCardFromRegistryAsync("travel-planner");
AIAgent agent = agentCard.AsAIAgent();
Console.WriteLine(await agent.RunAsync("Plan a trip to Paris."));
直接配置
对于提前知道代理终结点的紧密耦合系统或开发方案,请直接创建一个 A2AClient 并将其转换为 AIAgent:
using A2A;
using Microsoft.Agents.AI;
// Create a client pointing at the known agent endpoint.
A2AClient a2aClient = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = a2aClient.AsAIAgent(name: "my-agent", description: "A helpful assistant.");
Console.WriteLine(await agent.RunAsync("What can you help me with?"));
协议选择
A2A 代理可以公开多个协议绑定,例如 HTTP+JSON 和 JSON-RPC。 默认情况下,HTTP+JSON 优先于 JSON-RPC。 用于 A2AClientOptions.PreferredBindings 显式控制使用哪种协议绑定:
注释
远程 A2A 代理必须在支持所选协议绑定的终结点上可用。
using A2A;
using Microsoft.Agents.AI;
A2ACardResolver agentCardResolver = new(new Uri("https://a2a-agent.example.com"));
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();
// Prefer HTTP+JSON protocol binding. For JSON-RPC, set PreferredBindings = [ProtocolBindingNames.JsonRpc]
A2AClientOptions options = new()
{
PreferredBindings = [ProtocolBindingNames.HttpJson]
};
AIAgent agent = agentCard.AsAIAgent(options: options);
Console.WriteLine(await agent.RunAsync("Tell me a joke about a pirate."));
流媒体
A2A 支持通过 Server-Sent 事件进行流式处理响应。 用于 RunStreamingAsync 在远程代理处理请求时实时接收更新:
using A2A;
using Microsoft.Agents.AI;
A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = await resolver.GetAIAgentAsync();
await foreach (var update in agent.RunStreamingAsync("Write a short story about a robot."))
{
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
后台响应
A2A 代理支持 后台响应 来处理长时间运行的操作。 当远程 A2A 代理返回任务而不是即时消息时,代理框架提供了一个延续令牌,可用于轮询结果或重新连接到中断的流。
轮询任务完成
对于非流式处理方案,用于 AllowBackgroundResponses 接收延续令牌并轮询,直到任务完成:
using A2A;
using Microsoft.Agents.AI;
A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = await resolver.GetAIAgentAsync();
AgentSession session = await agent.CreateSessionAsync();
// AllowBackgroundResponses must be true so the server returns immediately with a continuation token
// instead of blocking until the task is complete.
AgentRunOptions options = new() { AllowBackgroundResponses = true };
// Start the initial run with a long-running task.
AgentResponse response = await agent.RunAsync(
"Conduct a comprehensive analysis of quantum computing applications in cryptography.",
session,
options: options);
// Poll until the response is complete.
while (response.ContinuationToken is { } token)
{
// Wait before polling again.
await Task.Delay(TimeSpan.FromSeconds(2));
// Continue with the token.
response = await agent.RunAsync(session, options: new AgentRunOptions { ContinuationToken = token });
}
Console.WriteLine(response);
流重新连接
在流式处理方案中,每个更新可能包含延续令牌。 如果流中断,请使用令牌重新连接并从头获取响应流:
using A2A;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = await resolver.GetAIAgentAsync();
AgentSession session = await agent.CreateSessionAsync();
ResponseContinuationToken? continuationToken = null;
await foreach (var update in agent.RunStreamingAsync(
"Conduct a comprehensive analysis of quantum computing applications in cryptography.",
session))
{
// Save the continuation token to reconnect later if the stream is interrupted.
// Continuation tokens are only returned for long-running tasks. If the A2A agent
// returns a message instead of a task, the continuation token will not be initialized.
if (update.ContinuationToken is { } token)
{
continuationToken = token;
}
}
// If the stream was interrupted and a continuation token was captured,
// reconnect to the response stream using the saved continuation token.
if (continuationToken is not null)
{
await foreach (var update in agent.RunStreamingAsync(
session,
options: new() { ContinuationToken = continuationToken }))
{
if (!string.IsNullOrEmpty(update.Text))
{
Console.WriteLine(update.Text);
}
}
}
注释
A2A 代理支持流重新连接(从头获取相同的响应流),而不是从流中的特定点进行流恢复。
工具
A2AAgent 是远程 A2A 代理周围的传输级包装器。 远程代理使用的任何工具都位于远程端,并且对代码不可见。 代理框架工具类型(函数工具、代码解释器、文件搜索、托管/本地 MCP 等)本身未配置 A2AAgent - 若要扩展远程代理的功能,请更改远程代理的配置。
入门
安装 A2A 包:
pip install agent-framework-a2a --pre
初始 化
A2AAgent 可以通过三种方式初始化,具体取决于你事先对远程代理的了解程度。
直接 URL
对于已知终结点的开发或紧密耦合系统:
from agent_framework.a2a import A2AAgent
async with A2AAgent(name="remote", url="https://a2a-agent.example.com") as agent:
response = await agent.run("Hello!")
print(response.messages[0].text)
仅提供 URL 时,在 A2AAgent 内部创建最小代理卡并使用 JSON-RPC 进行连接。
代理卡
如果有 AgentCard 注册表或目录,请直接传递它:
from agent_framework.a2a import A2AAgent
async with A2AAgent(agent_card=agent_card) as agent:
response = await agent.run("Plan a trip to Paris.")
print(response.messages[0].text)
提供 AgentCardA2AAgent默认值name和description卡片时。 它使用卡的 supported_interfaces协商传输。
Well-Known URI (A2ACardResolver)
在A2ACardResolvera2a-sdk标准已知路径/.well-known/agent.json():
import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent
async with httpx.AsyncClient(timeout=60.0) as http_client:
resolver = A2ACardResolver(httpx_client=http_client, base_url="https://a2a-agent.example.com")
agent_card = await resolver.get_agent_card()
async with A2AAgent(agent_card=agent_card) as agent:
response = await agent.run("What can you help me with?")
print(response.messages[0].text)
流媒体
用于 stream=True 在远程代理处理请求时实时接收更新:
from agent_framework.a2a import A2AAgent
async with A2AAgent(name="remote", url="https://a2a-agent.example.com") as agent:
async with agent.run("Write a short story about a robot.", stream=True) as stream:
async for update in stream:
for content in update.contents:
if content.text:
print(content.text, end="", flush=True)
final = await stream.get_final_response()
print(f"\n({len(final.messages)} message(s))")
长时间运行任务
默认情况下, A2AAgent 等待远程代理在返回之前完成。 对于长时间运行的任务,设置为 background=True 显示一个延续令牌,可用于稍后轮询或订阅:
from agent_framework.a2a import A2AAgent
async with A2AAgent(name="worker", url="https://a2a-agent.example.com") as agent:
# Start a long-running task
response = await agent.run("Process this large dataset", background=True)
if response.continuation_token:
# Poll for completion later
result = await agent.poll_task(response.continuation_token)
print(result)
还可以重新订阅 SSE 流,而不是轮询:
# Resubscribe to the task's event stream
response = await agent.run(continuation_token=response.continuation_token)
对话标识(context_id)
当你使用A2AAgent.run()调用AgentSession时,如果传出消息尚未携带 A2Acontext_id,代理会从session.service_session_id中自动派生出一个 A2A。 这样,便可以跨多个 A2A 调用保持聊天连续性:
from agent_framework import AgentSession
from agent_framework.a2a import A2AAgent
async with A2AAgent(name="remote", url="https://a2a-agent.example.com") as agent:
session = AgentSession(service_session_id="my-conversation-1")
# context_id is automatically set to "my-conversation-1"
response = await agent.run("Hello!", session=session)
# Subsequent calls with the same session continue the conversation
response = await agent.run("Follow-up question", session=session)
如果消息在其context_id中具有明确的additional_properties值,则该值优先于基于会话的回退。
身份验证
请使用AuthInterceptor来保护 A2A 终结点:
from a2a.client.auth.interceptor import AuthInterceptor
from agent_framework.a2a import A2AAgent
class BearerAuth(AuthInterceptor):
def __init__(self, token: str):
self.token = token
async def intercept(self, request):
request.headers["Authorization"] = f"Bearer {self.token}"
return request
async with A2AAgent(
name="secure-agent",
url="https://secure-a2a-agent.example.com",
auth_interceptor=BearerAuth("your-token"),
) as agent:
response = await agent.run("Hello!")
超时配置
A2AAgent
timeout接受用于控制请求超时的参数:
import httpx
from agent_framework.a2a import A2AAgent
# Simple timeout (applies to all components)
async with A2AAgent(name="remote", url="https://example.com", timeout=120.0) as agent:
...
# Fine-grained timeout
async with A2AAgent(
name="remote",
url="https://example.com",
timeout=httpx.Timeout(connect=10.0, read=120.0, write=10.0, pool=5.0),
) as agent:
...
如果未指定超时,则默认值为:10 秒连接、60 秒读取、10 秒写入、5s 池。
工具
A2AAgent 是远程 A2A 代理周围的传输级包装器。 远程代理使用的任何工具都位于远程端,并且对代码不可见。 代理框架工具类型(函数工具、代码解释器、文件搜索、托管/本地 MCP 等)本身未配置 A2AAgent - 若要扩展远程代理的功能,请更改远程代理的配置。
如果希望 Foundry 代理以工具的形式调用 A2A 代理,请参阅工厂get_a2a_tool。FoundryChatClient