A2A 代理

这使 A2AAgent 应用程序能够连接到通过 代理到代理 (A2A) 协议公开的远程代理。 它将任何符合 A2A 的终结点包装为标准 AIAgent,因此可以使用熟悉的方法(例如 RunAsync ,并与 RunStreamingAsync 远程代理交互),而不考虑它们使用哪种框架或技术。

入门

将所需的 NuGet 包添加到项目:

dotnet add package Microsoft.Agents.AI.A2A --prerelease

代理发现

在与远程 A2A 代理通信之前,需要发现它并创建实例 AIAgent 。 A2A 协议定义了三种 发现策略,每个策略都受代理框架支持。

Well-Known URI

A2A 代理可以在标准化路径上发现其代理卡https://{domain}/.well-known/agent-card.json 使用 A2ACardResolver 提取卡片并在单个调用中创建代理:

using A2A;
using Microsoft.Agents.AI;

// Initialize a resolver pointing at the remote agent's host.
A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));

// Resolve the agent card and create an AIAgent in one step.
AIAgent agent = await resolver.GetAIAgentAsync();

// Use the agent.
Console.WriteLine(await agent.RunAsync("Hello!"));

Tip

GetAIAgentAsync还接受用于A2AClientOptions的可选参数。

Catalog-Based 发现

在企业环境或公共市场中,代理卡通常由中央注册表管理。 如果已 AgentCard 从此类注册表获取,请将其直接转换为 AIAgent

using A2A;
using Microsoft.Agents.AI;

// Assume agentCard was retrieved from a registry or catalog.
AgentCard agentCard = await GetAgentCardFromRegistryAsync("travel-planner");

AIAgent agent = agentCard.AsAIAgent();

Console.WriteLine(await agent.RunAsync("Plan a trip to Paris."));

直接配置

对于提前知道代理终结点的紧密耦合系统或开发方案,请直接创建一个 A2AClient 并将其转换为 AIAgent

using A2A;
using Microsoft.Agents.AI;

// Create a client pointing at the known agent endpoint.
A2AClient a2aClient = new(new Uri("https://a2a-agent.example.com"));

AIAgent agent = a2aClient.AsAIAgent(name: "my-agent", description: "A helpful assistant.");

Console.WriteLine(await agent.RunAsync("What can you help me with?"));

协议选择

A2A 代理可以公开多个协议绑定,例如 HTTP+JSON 和 JSON-RPC。 默认情况下,HTTP+JSON 优先于 JSON-RPC。 用于 A2AClientOptions.PreferredBindings 显式控制使用哪种协议绑定:

注释

远程 A2A 代理必须在支持所选协议绑定的终结点上可用。

using A2A;
using Microsoft.Agents.AI;

A2ACardResolver agentCardResolver = new(new Uri("https://a2a-agent.example.com"));

AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();

// Prefer HTTP+JSON protocol binding. For JSON-RPC, set PreferredBindings = [ProtocolBindingNames.JsonRpc]
A2AClientOptions options = new()
{
    PreferredBindings = [ProtocolBindingNames.HttpJson]
};

AIAgent agent = agentCard.AsAIAgent(options: options);

Console.WriteLine(await agent.RunAsync("Tell me a joke about a pirate."));

流媒体

A2A 支持通过 Server-Sent 事件进行流式处理响应。 用于 RunStreamingAsync 在远程代理处理请求时实时接收更新:

using A2A;
using Microsoft.Agents.AI;

A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = await resolver.GetAIAgentAsync();

await foreach (var update in agent.RunStreamingAsync("Write a short story about a robot."))
{
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

后台响应

A2A 代理支持 后台响应 来处理长时间运行的操作。 当远程 A2A 代理返回任务而不是即时消息时,代理框架提供了一个延续令牌,可用于轮询结果或重新连接到中断的流。

轮询任务完成

对于非流式处理方案,用于 AllowBackgroundResponses 接收延续令牌并轮询,直到任务完成:

using A2A;
using Microsoft.Agents.AI;

A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = await resolver.GetAIAgentAsync();

AgentSession session = await agent.CreateSessionAsync();

// AllowBackgroundResponses must be true so the server returns immediately with a continuation token
// instead of blocking until the task is complete.
AgentRunOptions options = new() { AllowBackgroundResponses = true };

// Start the initial run with a long-running task.
AgentResponse response = await agent.RunAsync(
    "Conduct a comprehensive analysis of quantum computing applications in cryptography.",
    session,
    options: options);

// Poll until the response is complete.
while (response.ContinuationToken is { } token)
{
    // Wait before polling again.
    await Task.Delay(TimeSpan.FromSeconds(2));

    // Continue with the token.
    response = await agent.RunAsync(session, options: new AgentRunOptions { ContinuationToken = token });
}

Console.WriteLine(response);

流重新连接

在流式处理方案中,每个更新可能包含延续令牌。 如果流中断,请使用令牌重新连接并从头获取响应流:

using A2A;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

A2ACardResolver resolver = new(new Uri("https://a2a-agent.example.com"));
AIAgent agent = await resolver.GetAIAgentAsync();

AgentSession session = await agent.CreateSessionAsync();

ResponseContinuationToken? continuationToken = null;

await foreach (var update in agent.RunStreamingAsync(
    "Conduct a comprehensive analysis of quantum computing applications in cryptography.",
    session))
{
    // Save the continuation token to reconnect later if the stream is interrupted.
    // Continuation tokens are only returned for long-running tasks. If the A2A agent
    // returns a message instead of a task, the continuation token will not be initialized.
    if (update.ContinuationToken is { } token)
    {
        continuationToken = token;
    }
}

// If the stream was interrupted and a continuation token was captured,
// reconnect to the response stream using the saved continuation token.
if (continuationToken is not null)
{
    await foreach (var update in agent.RunStreamingAsync(
        session,
        options: new() { ContinuationToken = continuationToken }))
    {
        if (!string.IsNullOrEmpty(update.Text))
        {
            Console.WriteLine(update.Text);
        }
    }
}

注释

A2A 代理支持流重新连接(从头获取相同的响应流),而不是从流中的特定点进行流恢复。

工具

A2AAgent 是远程 A2A 代理周围的传输级包装器。 远程代理使用的任何工具都位于远程端,并且对代码不可见。 代理框架工具类型(函数工具、代码解释器、文件搜索、托管/本地 MCP 等)本身未配置 A2AAgent - 若要扩展远程代理的功能,请更改远程代理的配置。

入门

安装 A2A 包:

pip install agent-framework-a2a --pre

初始 化

A2AAgent 可以通过三种方式初始化,具体取决于你事先对远程代理的了解程度。

直接 URL

对于已知终结点的开发或紧密耦合系统:

from agent_framework.a2a import A2AAgent

async with A2AAgent(name="remote", url="https://a2a-agent.example.com") as agent:
    response = await agent.run("Hello!")
    print(response.messages[0].text)

仅提供 URL 时,在 A2AAgent 内部创建最小代理卡并使用 JSON-RPC 进行连接。

代理卡

如果有 AgentCard 注册表或目录,请直接传递它:

from agent_framework.a2a import A2AAgent

async with A2AAgent(agent_card=agent_card) as agent:
    response = await agent.run("Plan a trip to Paris.")
    print(response.messages[0].text)

提供 AgentCardA2AAgent默认值namedescription卡片时。 它使用卡的 supported_interfaces协商传输。

Well-Known URI (A2ACardResolver)

A2ACardResolvera2a-sdk标准已知路径/.well-known/agent.json():

import httpx
from a2a.client import A2ACardResolver
from agent_framework.a2a import A2AAgent

async with httpx.AsyncClient(timeout=60.0) as http_client:
    resolver = A2ACardResolver(httpx_client=http_client, base_url="https://a2a-agent.example.com")
    agent_card = await resolver.get_agent_card()

async with A2AAgent(agent_card=agent_card) as agent:
    response = await agent.run("What can you help me with?")
    print(response.messages[0].text)

流媒体

用于 stream=True 在远程代理处理请求时实时接收更新:

from agent_framework.a2a import A2AAgent

async with A2AAgent(name="remote", url="https://a2a-agent.example.com") as agent:
    async with agent.run("Write a short story about a robot.", stream=True) as stream:
        async for update in stream:
            for content in update.contents:
                if content.text:
                    print(content.text, end="", flush=True)

        final = await stream.get_final_response()
        print(f"\n({len(final.messages)} message(s))")

长时间运行任务

默认情况下, A2AAgent 等待远程代理在返回之前完成。 对于长时间运行的任务,设置为 background=True 显示一个延续令牌,可用于稍后轮询或订阅:

from agent_framework.a2a import A2AAgent

async with A2AAgent(name="worker", url="https://a2a-agent.example.com") as agent:
    # Start a long-running task
    response = await agent.run("Process this large dataset", background=True)

    if response.continuation_token:
        # Poll for completion later
        result = await agent.poll_task(response.continuation_token)
        print(result)

还可以重新订阅 SSE 流,而不是轮询:

# Resubscribe to the task's event stream
response = await agent.run(continuation_token=response.continuation_token)

对话标识(context_id)

当你使用A2AAgent.run()调用AgentSession时,如果传出消息尚未携带 A2Acontext_id,代理会从session.service_session_id中自动派生出一个 A2A。 这样,便可以跨多个 A2A 调用保持聊天连续性:

from agent_framework import AgentSession
from agent_framework.a2a import A2AAgent

async with A2AAgent(name="remote", url="https://a2a-agent.example.com") as agent:
    session = AgentSession(service_session_id="my-conversation-1")

    # context_id is automatically set to "my-conversation-1"
    response = await agent.run("Hello!", session=session)

    # Subsequent calls with the same session continue the conversation
    response = await agent.run("Follow-up question", session=session)

如果消息在其context_id中具有明确的additional_properties值,则该值优先于基于会话的回退。

身份验证

请使用AuthInterceptor来保护 A2A 终结点:

from a2a.client.auth.interceptor import AuthInterceptor
from agent_framework.a2a import A2AAgent

class BearerAuth(AuthInterceptor):
    def __init__(self, token: str):
        self.token = token

    async def intercept(self, request):
        request.headers["Authorization"] = f"Bearer {self.token}"
        return request

async with A2AAgent(
    name="secure-agent",
    url="https://secure-a2a-agent.example.com",
    auth_interceptor=BearerAuth("your-token"),
) as agent:
    response = await agent.run("Hello!")

超时配置

A2AAgent timeout接受用于控制请求超时的参数:

import httpx
from agent_framework.a2a import A2AAgent

# Simple timeout (applies to all components)
async with A2AAgent(name="remote", url="https://example.com", timeout=120.0) as agent:
    ...

# Fine-grained timeout
async with A2AAgent(
    name="remote",
    url="https://example.com",
    timeout=httpx.Timeout(connect=10.0, read=120.0, write=10.0, pool=5.0),
) as agent:
    ...

如果未指定超时,则默认值为:10 秒连接、60 秒读取、10 秒写入、5s 池。

工具

A2AAgent 是远程 A2A 代理周围的传输级包装器。 远程代理使用的任何工具都位于远程端,并且对代码不可见。 代理框架工具类型(函数工具、代码解释器、文件搜索、托管/本地 MCP 等)本身未配置 A2AAgent - 若要扩展远程代理的功能,请更改远程代理的配置。

如果希望 Foundry 代理以工具的形式调用 A2A 代理,请参阅工厂get_a2a_toolFoundryChatClient

后续步骤