AgentApplication 是使用 Agents SDK 生成的代理的中心构建基块。
AgentApplication 是所有传入活动的入口点,包括来自用户的消息、对话生命周期事件、自适应卡片交互、OAuth 回调。
智能体的核心本质是 AgentApplication。 使用描述代理行为的处理程序对其进行配置。 SDK 负责路由、状态管理和运行所需的基础结构。
AgentApplication 的工作原理
每个代理都有一个生命周期,该生命周期在通道(Microsoft Teams、机器人服务或自定义客户端)将活动传送到代理的终结点时启动。
AgentApplication 位于该生命周期的中心:
Channel → Hosting layer → AgentApplication → Your handlers
使用 Agents SDK 生成的代理中的处理层如下所示:
- 托管层接收 HTTP 请求并对其进行身份验证。
-
AgentApplication 通过其管道处理传入活动。
- 您的处理程序会根据匹配的路由被调用。
在处理程序运行之前,智能体会加载回合状态。 之后,智能体会保存回合状态。
核心概念
活动
代理 SDK 中的所有内容都作为 活动流动。 活动是表示所发生事件的结构化消息。 活动具有一种类型,例如消息、事件、调用、conversationUpdate 等。 它携带与该类型相关的承载物。
AgentApplication 接收活动并将其路由到正确的处理程序。
Routes
路由将 选择器 与 处理程序配对。 选择器确定路由是否与当前活动匹配。 当路由匹配时,处理程序将运行您的逻辑代码。
配置代理时注册路由。 它们能够匹配:
- 包含特定文本或匹配正则表达式的消息
- 给定类型的任何活动
- 对话生命周期事件(已添加成员、已删除成员)
- 自适应卡片操作
- 自定义条件
当活动到达时,系统会按顺序评估路由,直到找到匹配项。 默认情况下,仅运行一个路由。
回合状态
AgentApplication manages _turn 状态——一种按作用域划分的结构化存储:
| 作用域类型 |
Description |
| 对话 |
在对话中所有用户之间共享,并在回合之间持久化 |
| User |
针对单个用户,适用于所有对话 |
| 临时 |
仅限当前回合——从不持久化 |
系统会在处理程序运行之前自动加载状态,并随后自动保存状态。
转变上下文
当处理程序运行时,它会收到 轮次上下文。 回合上下文是当前活动、适配器连接以及用于发送响应的实用程序的快照。 轮次上下文是当前交互的接口。
Middleware
AgentApplication 支持 中间件管道。 中间件是一系列组件,在处理程序运行前后处理每个回合。 中间件可以检查、转换或使活动流短路。 常见用途包括日志记录、身份验证检查和请求规范化。
创建代理
继承 AgentApplication 并将其处理程序注册在构造函数中。 宿主框架会自动注入 AgentApplicationOptions。
public class MyAgent : AgentApplication
{
public MyAgent(AgentApplicationOptions options) : base(options)
{
OnConversationUpdate(ConversationUpdateEvents.MembersAdded, WelcomeAsync);
OnActivity(ActivityTypes.Message, OnMessageAsync, rank: RouteRank.Last);
}
private async Task WelcomeAsync(ITurnContext context, ITurnState state, CancellationToken ct)
{
foreach (var member in context.Activity.MembersAdded)
{
if (member.Id != context.Activity.Recipient.Id)
{
await context.SendActivityAsync("Hello! How can I help you?", cancellationToken: ct);
}
}
}
private async Task OnMessageAsync(ITurnContext context, ITurnState state, CancellationToken ct)
{
await context.SendActivityAsync($"You said: {context.Activity.Text}", cancellationToken: ct);
}
}
在Program.cs中注册您的代理商
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
builder.Services.AddHttpClient();
builder.Services.AddSingleton<IStorage, MemoryStorage>();
builder.Services.AddAgent<MyAgent>();
builder.Services.AddAgentAspNetAuthentication(builder.Configuration);
WebApplication app = builder.Build();
app.UseAuthentication();
app.UseAuthorization();
app.MapAgentApplicationEndpoints(requireAuth: !app.Environment.IsDevelopment());
app.Run();
使用 Fluent API 对 AgentApplication 进行实例化,并在实例上注册处理程序。 若要启动服务器,请从 Express 托管包调用 startServer 。
import { AgentApplication, MemoryStorage, TurnContext, TurnState } from '@microsoft/agents-hosting'
import { startServer } from '@microsoft/agents-hosting-express'
const app = new AgentApplication<TurnState>({ storage: new MemoryStorage() })
app.onConversationUpdate('membersAdded', async (context: TurnContext) => {
await context.sendActivity('Hello! How can I help you?')
})
app.onActivity('message', async (context: TurnContext, state: TurnState) => {
await context.sendActivity(`You said: ${context.activity.text}`)
})
startServer(app)
若需在不使用 startServer 的情况下手动配置 Express:
import express from 'express'
import { CloudAdapter, authorizeJWT, AuthConfiguration } from '@microsoft/agents-hosting'
const authConfig: AuthConfiguration = loadAuthConfigFromEnv()
const adapter = new CloudAdapter(authConfig)
const expressApp = express()
expressApp.use(express.json())
expressApp.use(authorizeJWT(authConfig))
expressApp.post('/api/messages', async (req, res) => {
await adapter.process(req, res, async (context) => {
await app.run(context)
})
})
使用类型参数实例化 AgentApplication,并通过修饰器注册处理程序。
from microsoft_agents.hosting.core import AgentApplication, TurnState, TurnContext, MemoryStorage
from microsoft_agents.hosting.aiohttp import CloudAdapter, start_agent_process
from aiohttp.web import Request, Response, Application, run_app
AGENT_APP = AgentApplication[TurnState](
storage=MemoryStorage(),
adapter=CloudAdapter()
)
@AGENT_APP.conversation_update("membersAdded")
async def on_members_added(context: TurnContext, state: TurnState):
await context.send_activity("Hello! How can I help you?")
@AGENT_APP.activity("message")
async def on_message(context: TurnContext, state: TurnState):
await context.send_activity(f"You said: {context.activity.text}")
if __name__ == "__main__":
async def messages(req: Request) -> Response:
return await start_agent_process(req, AGENT_APP, AGENT_APP.adapter)
app = Application()
app.router.add_post("/api/messages", messages)
app["agent_app"] = AGENT_APP
app["adapter"] = AGENT_APP.adapter
run_app(app, host="localhost", port=3978)
注册活动处理程序
处理消息
按精确文本匹配消息(不区分大小写):
OnMessage("help", async (context, state, ct) =>
{
await context.SendActivityAsync("Here's what I can do...", cancellationToken: ct);
});
使用正则表达式匹配消息:
OnMessage(new Regex(@"^order\s+\d+$", RegexOptions.IgnoreCase), async (context, state, ct) =>
{
await context.SendActivityAsync("Looking up your order...", cancellationToken: ct);
});
app.onMessage('help', async (context: TurnContext, state: TurnState) => {
await context.sendActivity("Here's what I can do...")
})
使用正则表达式匹配消息:
app.onMessage(/^order\s+\d+$/i, async (context: TurnContext, state: TurnState) => {
await context.sendActivity('Looking up your order...')
})
@AGENT_APP.message("/help")
async def on_help(context: TurnContext, state: TurnState):
await context.send_activity("Here's what I can do...")
使用正则表达式匹配消息:
import re
@AGENT_APP.message(re.compile(r"^order\s+\d+$", re.IGNORECASE))
async def on_order(context: TurnContext, state: TurnState):
await context.send_activity("Looking up your order...")
处理对话更新
为对话生命周期事件(如成员加入或离开)注册处理程序。
OnConversationUpdate(ConversationUpdateEvents.MembersAdded, async (context, state, ct) =>
{
foreach (var member in context.Activity.MembersAdded)
{
if (member.Id != context.Activity.Recipient.Id)
{
await context.SendActivityAsync("Welcome!", cancellationToken: ct);
}
}
});
OnConversationUpdate(ConversationUpdateEvents.MembersRemoved, async (context, state, ct) =>
{
// Called when participants leave the conversation
});
app.onConversationUpdate('membersAdded', async (context: TurnContext) => {
for (const member of context.activity.membersAdded ?? []) {
if (member.id !== context.activity.recipient.id) {
await context.sendActivity('Welcome!')
}
}
})
app.onConversationUpdate('membersRemoved', async (context: TurnContext) => {
// Called when participants leave the conversation
})
@AGENT_APP.conversation_update("membersAdded")
async def on_members_added(context: TurnContext, state: TurnState):
for member in context.activity.members_added or []:
if member.id != context.activity.recipient.id:
await context.send_activity("Welcome!")
@AGENT_APP.conversation_update("membersRemoved")
async def on_members_removed(context: TurnContext, state: TurnState):
pass # Called when participants leave the conversation
处理任何活动类型
按其类型字符串匹配任何活动,以完全控制路由。
OnActivity(ActivityTypes.Message, async (context, state, ct) =>
{
// Handles all message activities
});
OnActivity(ActivityTypes.Event, async (context, state, ct) =>
{
// Handles event activities
});
使用 ActivityTypes 常量而不是硬编码字符串。
app.onActivity('message', async (context: TurnContext, state: TurnState) => {
// Handles all message activities
})
app.onActivity('event', async (context: TurnContext, state: TurnState) => {
// Handles event activities
})
@AGENT_APP.activity("message")
async def on_message(context: TurnContext, state: TurnState):
pass # Handles all message activities
@AGENT_APP.activity("event")
async def on_event(context: TurnContext, state: TurnState):
pass # Handles event activities
控制路由评估顺序
系统在您注册路由时(而非运行时)将其排序为固定的评估顺序。 排序使用两个级别:
路由类型:系统按类型对路由进行分组,无论排名如何,它始终在低优先级类型之前评估高优先级类型:
| 优先级 |
路由类型 |
| 1 (最高) |
智能体式调用路由 |
| 2 |
调用路由(自适应卡片操作、OAuth 回调及其他对时间敏感的调用) |
| 3 |
智能体式路由 |
| 4 (最低) |
所有其他路由 |
排名:在每个路由类型组中,系统按其排名值对路由进行排序。 首先评估较低的数值。
注册处理程序时,使用 RouteRank 常量设置排名:
| 恒定 |
Value |
Meaning |
RouteRank.First |
0 |
在其所属组中优先于所有其他路由进行评估 |
RouteRank.Unspecified |
32767 |
未指定排名时默认值 |
RouteRank.Last |
65535 |
在其所属组中晚于所有其他路由进行评估 |
默认情况下,计算在第一个匹配路由处停止。 使用 RouteRank.Last 作为通配回退,用于处理未被更具体路由匹配的任何情况。
// Specific handlers use the default rank
OnMessage("status", HandleStatusAsync);
OnMessage("help", HandleHelpAsync);
// Catch-all — handles anything not matched above
OnActivity(ActivityTypes.Message, HandleUnknownMessageAsync, rank: RouteRank.Last);
// Specific handlers use the default rank
app.onMessage('status', handleStatusAsync)
app.onMessage('help', handleHelpAsync)
// Catch-all — handles anything not matched above
app.onActivity('message', handleUnknownMessageAsync, RouteRank.Last)
@AGENT_APP.message("/status")
async def on_status(context: TurnContext, state: TurnState):
await context.send_activity("Status: OK")
@AGENT_APP.message("/help")
async def on_help(context: TurnContext, state: TurnState):
await context.send_activity("Here's what I can do...")
# Catch-all — handles anything not matched above (registered last)
@AGENT_APP.activity("message")
async def on_unknown(context: TurnContext, state: TurnState):
await context.send_activity("I didn't understand that. Type /help for options.")
Turn 生命周期钩子
注册在每个 Turn 执行时(路由匹配之前或之后)运行的逻辑。 这些钩子适用于日志记录、横切关注点和错误处理。
OnBeforeTurn(async (context, state, ct) =>
{
logger.LogInformation("Turn started: {Type}", context.Activity.Type);
return true; // Return false to abort the turn
});
OnAfterTurn(async (context, state, ct) =>
{
logger.LogInformation("Turn completed");
return true; // Return false to skip state saving
});
OnTurnError(async (context, state, exception, ct) =>
{
logger.LogError(exception, "Turn error");
await context.SendActivityAsync("Something went wrong. Please try again.", cancellationToken: ct);
});
当 OnBeforeTurn 返回 false 时,该轮次将被中止,且不会执行任何路由。 当 OnAfterTurn 返回 false 时,轮次状态不会被保存。
app.beforeTurn(async (context: TurnContext, state: TurnState) => {
console.log(`Turn started: ${context.activity.type}`)
return true // Return false to abort the turn
})
app.afterTurn(async (context: TurnContext, state: TurnState) => {
console.log('Turn completed')
return true // Return false to skip state saving
})
app.onError(async (context: TurnContext, error: Error) => {
console.error('Turn error:', error)
await context.sendActivity('Something went wrong. Please try again.')
})
@AGENT_APP.before_turn
async def on_before_turn(context: TurnContext, state: TurnState) -> bool:
print(f"Turn started: {context.activity.type}")
return True # Return False to abort the turn
@AGENT_APP.after_turn
async def on_after_turn(context: TurnContext, state: TurnState) -> bool:
print("Turn completed")
return True # Return False to skip state saving
@AGENT_APP.turn_error
async def on_turn_error(context: TurnContext, state: TurnState, error: Exception):
print(f"Turn error: {error}")
await context.send_activity("Something went wrong. Please try again.")
使用轮次状态
代理会在处理程序运行之前自动加载轮次状态,然后保存它。 传递给处理程序的轮次状态对象允许您访问不同的作用域,从而读取和写入跨轮次持久的数据或仅在当前轮次中存在的临时数据:
-
会话范围:适用于在会话中所有轮次共享的数据
-
用户范围:对于单个用户数据
-
临时作用域:用于仅需在当前回合中存在的数据
OnActivity(ActivityTypes.Message, async (context, state, ct) =>
{
// Conversation scope — persisted per conversation
var count = state.Conversation.GetValue<int>("messageCount", () => 0);
state.Conversation.SetValue("messageCount", count + 1);
// User scope — persisted per user
var name = state.User.GetValue<string>("displayName");
// Temp scope — current turn only
state.Temp.SetValue("parsedInput", context.Activity.Text?.Trim());
await context.SendActivityAsync($"Message #{count + 1}: {context.Activity.Text}", cancellationToken: ct);
});
app.onActivity('message', async (context: TurnContext, state: TurnState) => {
// Conversation scope — persisted per conversation
let count: number = state.getValue('conversation.messageCount') ?? 0
state.setValue('conversation.messageCount', count + 1)
// User scope — persisted per user
const name: string = state.getValue('user.displayName')
await context.sendActivity(`Message #${count + 1}: ${context.activity.text}`)
})
@AGENT_APP.activity("message")
async def on_message(context: TurnContext, state: TurnState):
# Conversation scope — persisted per conversation
count = state.get_value("conversation.message_count") or 0
state.set_value("conversation.message_count", count + 1)
# User scope — persisted per user
name = state.get_value("user.display_name")
await context.send_activity(f"Message #{count + 1}: {context.activity.text}")
注释
请使用 MemoryStorage 来进行本地开发和测试。 对于生产部署(尤其是多个实例上运行的部署),请使用持久性存储提供程序,例如Azure Cosmos DB或Azure Blob 存储。 请参阅 在代理中使用存储提供程序。
后续步骤