-- =======================================
-- 异步触发器对象
-- 1. service broker 对象
-- =======================================
-- a. message type, 要求使用XML 传递数据
CREATE MESSAGE TYPE MSGT_async_trigger
VALIDATION = WELL_FORMED_XML
GO
-- b. 只需要发送消息
CREATE CONTRACT CNT_async_trigger(
MSGT_async_trigger SENT BY INITIATOR)
GO
-- c. 存储消息的队列
CREATE QUEUE dbo.Q_async_trigger
GO
-- d. 用于消息处理的服务
CREATE SERVICE SRV_async_trigger
ON QUEUE dbo.Q_async_trigger(
CNT_async_trigger)
GO
-- =======================================
-- 异步触发器对象
-- 2. 异步触发器处理的对象
-- =======================================
-- a. 登记异步触发器的表
CREATE TABLE dbo.tb_async_trigger(
ID int IDENTITY
PRIMARY KEY,
table_name sysname,
trigger_name sysname
)
-- b. 登记订阅异步触发器的存储过程
CREATE TABLE dbo.tb_async_trigger_subscriber(
ID int IDENTITY
PRIMARY KEY,
procedure_name sysname
)
-- c. 异步触发器和存储过程之间的订阅关系
CREATE TABLE dbo.tb_async_trigger_subscribtion(
trigger_id int
REFERENCES dbo.tb_async_trigger(
ID),
procedure_id int
REFERENCES dbo.tb_async_trigger_subscriber(
ID),
PRIMARY KEY(
trigger_id, procedure_id)
)
GO
-- d. 发送消息的存储过程
CREATE PROC dbo.p_async_trigger_send
@message xml
AS
SET NOCOUNT ON
DECLARE
@handle uniqueidentifier
BEGIN DIALOG CONVERSATION @handle
FROM SERVICE [SRV_async_trigger]
TO SERVICE N'SRV_async_trigger'
ON CONTRACT CNT_async_trigger
WITH
ENCRYPTION = OFF;
SEND
ON CONVERSATION @handle
MESSAGE TYPE MSGT_async_trigger(
@message);
-- 消息发出即可, 不需要回复, 因此发出后即可结束会话
END CONVERSATION @handle
GO
-- e. 处理异步触发器发送的消息
CREATE PROC dbo.p_async_trigger_process
AS
SET NOCOUNT ON
DECLARE
@handle uniqueidentifier,
@message xml,
@rows int
SET @rows = 1
WHILE @rows > 0
BEGIN
-- 处理已经收到的消息
WAITFOR(
RECEIVE TOP(1)
@handle = conversation_handle,
@message = CASE
WHEN message_type_name = N'MSGT_async_trigger'
THEN CONVERT(xml, message_body)
ELSE NULL
END
FROM dbo.Q_async_trigger
), TIMEOUT 10
SET @rows = @@ROWCOUNT
IF @rows > 0
BEGIN
-- 结束会话
END CONVERSATION @handle;
-- 处理消息
-- a. 取发送者信息
DECLARE
@table_name sysname,
@trigger_name sysname,
@sql nvarchar(max)
SELECT
@table_name = @message.value('(/root/table_name)[1]', 'sysname'),
@trigger_name = @message.value('(/root/trigger_name)[1]', 'sysname')
-- b. 调用异步触发器订阅的存储过程
;WITH
SUB AS(
SELECT
TR.table_name,
TR.trigger_name,
SUB.procedure_name
FROM dbo.tb_async_trigger TR,
dbo.tb_async_trigger_subscriber SUB,
dbo.tb_async_trigger_subscribtion TRSUB
WHERE TRSUB.trigger_id = TR.ID
AND TRSUB.procedure_id = SUB.ID
)
SELECT
@sql = (
SELECT
N'
EXEC ' + procedure_name + N'
@message
'
FROM SUB
WHERE table_name = @table_name
AND trigger_name = @trigger_name
FOR XML PATH(''), ROOT('r'), TYPE
).value('(/r)[1]', 'nvarchar(max)')
EXEC sp_executesql @sql, N'@message xml', @message
END
END
GO
-- f. 绑定处理的存储过程到队列
ALTER QUEUE dbo.Q_async_trigger
WITH ACTIVATION(
STATUS = ON,
PROCEDURE_NAME = dbo.p_async_trigger_process,
MAX_QUEUE_READERS = 10,
EXECUTE AS OWNER)
GO |