大多数 Service Broker 应用程序都按照相同的基本步骤来接收和处理消息:
应用程序开始一个事务。
如果应用程序需维护状态,则应用程序将获取一个会话组标识符。 应用程序使用此标识符从状态表还原状态。 如果没有包含已准备好接收消息的对话组,应用程序将回滚事务并退出。
应用程序从队列接收一个或多个消息。 如果应用程序具有会话组标识符,则应用程序使用此会话组标识符接收相应会话组的消息。 如果不再有消息可供接收,则应用程序提交事务并返回步骤 1。
应用程序根据消息类型名称验证消息的内容。
应用程序根据消息类型名称和消息的内容处理消息。
应用程序发送处理后的所有消息。
如果应用程序需维护状态,则应用程序会更新状态表,并使用会话组标识符作为此表的主键。
应用程序返回步骤 3 以检查是否有其他消息可用。
一个应用程序的确切结构取决于该应用程序的要求和通信方式、该应用程序是目标服务还是起始服务以及 Service Broker 是否激活该应用程序。
例如,起始应用程序在发送消息之后才会开始上述步骤中概述的处理循环。 启动服务可能会从另一个程序或存储过程发送消息,然后对启动服务队列使用激活存储过程。 例如,订单输入应用程序可以包含一个外部应用程序,该应用程序启动对话以输入订单。 在输入订单之后,外部应用程序不需要保持运行状态。 当从订单服务返回响应时,起始服务的激活存储过程将发送订单确认。 激活存储过程还处理目标服务返回的任何 Service Broker 错误消息,并发送无法确认订单的通知。
或者,启动应用程序可能会发送消息,然后作为同一程序的一部分启动处理循环,而不是从其他程序发送消息。 不管这些差异如何,总体步骤基本都是相同的。
处理同一会话组中大量消息的应用程序可能会在处理一定数量的消息后记录接收到的消息数量,然后提交事务。 利用这一“计数与提交”策略,应用程序可使事务保持相对较短,并可以处理多个不同的会话组。
例子
以下 Transact-SQL 示例处理 MyServiceQueue 队列中的所有消息。 已对消息的处理进行了最大限度的简化。 如果出现 EndDialog 或 Error 消息,则此代码将结束会话。 对于其他任何消息,此代码将创建相应消息的 XML 表示形式,并且生成包含会话句柄、消息类型名称和 XML 的结果集。 如果在 500 毫秒内没有任何可用消息,则此代码将退出。
为简化起见,此脚本为每个消息都生成结果集。 如果在读取队列期间发生错误,则此脚本提交更改,而不生成任何结果。 因此,此脚本将自行删除导致错误的任何消息。
注意
由于此脚本只是显示消息,因而对于此脚本不会存在有害消息。 因此,脚本不包含用于处理病毒消息的代码。 编写生产应用程序时应包含处理有害消息的代码。 有关病毒消息的详细信息,请参阅 “处理有害消息”。
注意
本文中的代码示例是使用 AdventureWorks2025 示例数据库进行测试的,可以从 Microsoft SQL Server 示例和社区项目 主页下载该数据库。
USE [AdventureWorks2022];
GO
-- Process all conversation groups.
WHILE (1 = 1)
BEGIN
DECLARE @conversation_handle AS UNIQUEIDENTIFIER,
@conversation_group_id AS UNIQUEIDENTIFIER,
@message_body AS XML,
@message_type_name AS NVARCHAR (128);
-- Begin a transaction, one per conversation group.
BEGIN TRANSACTION;
-- Get next conversation group.
WAITFOR (GET CONVERSATION GROUP
@conversation_group_id FROM [MyServiceQueue]),
TIMEOUT 500;
-- Restore the state for this conversation group here
-- If there are no more conversation groups, break.
IF @conversation_group_id IS NULL
BEGIN
ROLLBACK;
BREAK;
END
-- Process all messages in the conversation group.
WHILE 1 = 1
BEGIN
-- Get the next message.
RECEIVE TOP (1)
@conversation_handle = [conversation_handle],
@message_type_name = [message_type_name],
@message_body = CASE
WHEN [validation] = 'X' THEN CAST ([message_body] AS XML)
ELSE CAST (N'<none/>' AS XML)
END
FROM [MyServiceQueue]
WHERE CONVERSATION_GROUP_ID = @conversation_group_id;
-- If there is no message, or there is an error
-- reading from the queue, break.
IF @@ROWCOUNT = 0
OR @@ERROR <> 0
BREAK;
-- Process the message. In this case, the program ends the conversation
-- for Error and EndDialog messages. For all other messages, the program
-- produces a result set with information about the message.
SELECT @conversation_handle,
@message_type_name,
@message_body;
-- If the message is an end dialog message or an error,
-- end the conversation. Notice that other conversations
-- in the same conversation group may still have messages
-- to process. Therefore, the program does not break after
-- ending the conversation.
IF @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
OR @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
END CONVERSATION @conversation_handle;
END
END -- Process all messages in conversation group.
COMMIT TRANSACTION;
END -- Process all conversation groups.