在SQL Server 2005中实现异步触发器架构 (1)

ZDNet软件频道 时间:2009-12-03 作者: | 赛迪网 我要评论()
本文关键词:Server 服务器 SQL
在SQL Server 2005数据库中,通过新增的Service Broker可以实现异步触发器的处理功能。删除异步触发器处理的相关对象DROP PROC dbo.p_async_trigger_processDROP PROC dbo.p_async_trigger_sendDROP TABLE dbo.tb_async_trigger_subscribtionDROP TABLE dbo.tb_async_trigger_subscriberDROP TABLE dbo.tb_async_trigg

  在SQL Server 2005数据库中,通过新增的Service Broker可以实现异步触发器的处理功能。本文提供一种使用Service Broker实现的通用异步触发器方法。

  在本这个方法中,通过Service Broker构造异步触发器处理架构,对于要使用这种架构的表,只需要创建相应的触发器及处理触发器中数据的存储过程,并且在异步触发器架构中登记触发器和处理的存储过程即可。如果一个触发器中的数据要被多个表使用,只需要在dbo.tb_async_trigger_subscribtion中登记相应处理数据的存储过程即可,即一个表的数据变更可以被多个表订阅(使用)。

  架构的步骤如下:

  1. 数据库配置

  需要配置数据库以允许使用Service Broker。本文以tempdb库为例,故配置均在tempdb上下文中进行。

  USE tempdb

  GO

  -- 允许Service Broker

  ALTER DATABASE tempdb SET

  ENABLE_BROKER

  GO

  2. 构建异步触发器相关的对象

  下面的T-SQL创建异步触发器处理架构相关的对象。

  -- =======================================

  -- 异步触发器对象

  -- 1. service broker 对象

  -- =======================================

  -- a. message type, 要求使用xml 传递数据

  CREATE MESSAGE TYPE MSGT_async_trigger

  VALIDATION = WELL_FORMED_XML

  GO

  -- b. 只需要发送消息

  CREATE CONTRACT CNT_async_trigger(

  MSGT_async_trigger SENT BY INITIATOR)

  GO

  -- c. 存储消息的队列

  CREATE QUEUE dbo.Q_async_trigger

  GO

  -- d. 用于消息处理的服务

  CREATE SERVICE SRV_async_trigger

  ON QUEUE dbo.Q_async_trigger(

  CNT_async_trigger)

  GO

  -- =======================================

  -- 异步触发器对象

  -- 2. 异步触发器处理的对象

  -- =======================================

  -- a. 登记异步触发器的表

  CREATE TABLE dbo.tb_async_trigger(

  ID int IDENTITY

  PRIMARY KEY,

  table_name sysname,

  trigger_name sysname

  )

  -- b. 登记订阅异步触发器的存储过程

  CREATE TABLE dbo.tb_async_trigger_subscriber(

  ID int IDENTITY

  PRIMARY KEY,

  procedure_name sysname

  )

  -- c. 异步触发器和存储过程之间的订阅关系

  CREATE TABLE dbo.tb_async_trigger_subscribtion(

  trigger_id int

  REFERENCES dbo.tb_async_trigger(

  ID),

  procedure_id int

  REFERENCES dbo.tb_async_trigger_subscriber(

  ID),

  PRIMARY KEY(

  trigger_id, procedure_id)

  )

  GO

  -- d. 发送消息的存储过程

  CREATE PROC dbo.p_async_trigger_send

  @message xml

  AS

  SET NOCOUNT ON

  DECLARE

  @handle uniqueidentifier

  BEGIN DIALOG CONVERSATION @handle

  FROM SERVICE [SRV_async_trigger]

  TO SERVICE N'SRV_async_trigger'

  ON CONTRACT CNT_async_trigger

  WITH

  ENCRYPTION = OFF;

  SEND

  ON CONVERSATION @handle

  MESSAGE TYPE MSGT_async_trigger(

  @message);

  -- 消息发出即可, 不需要回复, 因此发出后即可结束会话

  END CONVERSATION @handle

  GO

  -- e. 处理异步触发器发送的消息

  CREATE PROC dbo.p_async_trigger_process

  AS

  SET NOCOUNT ON

  DECLARE

  @handle uniqueidentifier,

  @message xml,

  @rows int

  SET @rows = 1

  WHILE @rows >0

  BEGIN

  -- 处理已经收到的消息

  WAITFOR(

  RECEIVE TOP(1)

  @handle = conversation_handle,

  @message = CASE

  WHEN message_type_name = N'MSGT_async_trigger'

  THEN CONVERT(xml, message_body)

  ELSE NULL

  END

  FROM dbo.Q_async_trigger

  ), TIMEOUT 10

  SET @rows = @@ROWCOUNT

  IF @rows >0

  BEGIN

  -- 结束会话

  END CONVERSATION @handle;

  -- 处理消息

  -- a. 取发送者信息

  DECLARE

  @table_name sysname,

  @trigger_name sysname,

  @SQL nvarchar(max)

  SELECT

  @table_name = @message.value('(/root/table_name)[1]', 'sysname'),

  @trigger_name = @message.value('(/root/trigger_name)[1]', 'sysname')

  -- b. 调用异步触发器订阅的存储过程

  ;WITH

  SUB AS(

  SELECT

  TR.table_name,

  TR.trigger_name,

  SUB.procedure_name

  FROM dbo.tb_async_trigger TR,

  dbo.tb_async_trigger_subscriber SUB,

  dbo.tb_async_trigger_subscribtion TRSUB

  WHERE TRSUB.trigger_id = TR.ID

  AND TRSUB.procedure_id = SUB.ID

  )

  SELECT

  @SQL = (

  SELECT

  N'

  EXEC ' + procedure_name + N'

  @message

  '

  FROM SUB

  WHERE table_name = @table_name

  AND trigger_name = @trigger_name

  FOR XML PATH(''), ROOT('r'), TYPE

  ).value('(/r)[1]', 'nvarchar(max)')

  EXEC sp_executeSQL @SQL, N'@message xml', @message

  END

  END

  GO

  -- f. 绑定处理的存储过程到队列

  ALTER QUEUE dbo.Q_async_trigger

  WITH ACTIVATION(

  STATUS = ON,

  PROCEDURE_NAME = dbo.p_async_trigger_process,

  MAX_QUEUE_READERS = 10,

  EXECUTE AS OWNER)

  GO

  3. 使用示例

  下面的T-SQL演示使用异步触发器构架。示例中创建了三个表:

  Dbo.t1 这个是源表,此表的数据变化将用于其他表

  Dbo.t2 这个表要求保持与dbo.t1同步

  Dbo.tb_log 这个表记录dbo.t1中的数据变化情况

  触发器 TR_async_trigger 用于将表Dbo.t1中的数据变化发送到异步触发器构架中。dbo.p_Sync_t1_t2和dbo.p_Record_log用于处理dbo.t1于中变化的数据。

  在处理时,需要把相关的信息登记到异步触发器架构的表中。

  -- =======================================

  -- 3. 使用示例

  -- =======================================

  -- ===============================

  -- 测试对象

  -- a. 源表

  CREATE TABLE dbo.t1(

  id int IDENTITY

  PRIMARY KEY,

  col int

  )

  -- b. 同步的目的表

  CREATE TABLE dbo.t2(

  id int IDENTITY

  PRIMARY KEY,

  col int

  )

  -- c. 记录操作的日志表

  CREATE TABLE dbo.tb_log(

  id int IDENTITY

  PRIMARY KEY,

  user_name sysname,

  operate_type varchar(10),

  inserted xml,

  deleted xml

  )

  GO

  -- a. 异步发送处理消息的触发器

  CREATE TRIGGER TR_async_trigger

  ON dbo.t1

  FOR INSERT, UPDATE, DELETE

  AS

  IF @@ROWCOUNT = 0

  RETURN

  SET NOCOUNT ON

  -- 将要发送的数据生成xml 数据

  DECLARE

  @message xml

  SELECT

  @message = (

  SELECT

  table_name = (

  SELECT TOP 1

  OBJECT_NAME(parent_object_id)

  FROM sys.objects

  WHERE object_id = @@PROCID),

  trigger_name = OBJECT_NAME(@@PROCID),

  user_name = SUSER_SNAME(),

  inserted = (

  SELECT * FROM inserted FOR XML AUTO, TYPE),

  deleted = (

  SELECT * FROM deleted FOR XML AUTO, TYPE)

  FOR XML PATH(''), ROOT('root'), TYPE

  )

  -- 发送消息

  EXEC dbo.p_async_trigger_send

  @message = @message

  GO

  -- b. 处理异步触发器的存储过程

  -- b.1 同步到t2 的存储过程

  CREATE PROC dbo.p_Sync_t1_t2

  @message xml

  AS

  SET NOCOUNT ON

  DECLARE

  @inserted bit,

  @deleted bit

  SELECT

  @inserted = @message.exist('/root/inserted'),

  @deleted = @message.exist('/root/deleted')

  IF @inserted = 1

  IF @deleted = 1 -- 更新

  BEGIN

  ;WITH

  I AS(

  SELECT

  id = T.c.value('@id[1]', 'int'),

  col = T.c.value('@col[1]', 'int')

  FROM @message.nodes('/root/inserted/inserted') T(c)

  ),

  D AS(

  SELECT

  id = T.c.value('@id[1]', 'int'),

  col = T.c.value('@col[1]', 'int')

  FROM @message.nodes('/root/deleted/deleted') T(c)

  )

  UPDATE A SET

  col = I.col

  FROM dbo.t2 A, I, D

  WHERE A.ID = I.ID

  AND I.ID = D.ID

  END

  ELSE -- 插入

  BEGIN

  SET IDENTITY_INSERT dbo.t2 ON

  ;WITH

  I AS(

  SELECT

  id = T.c.value('@id[1]', 'int'),

  col = T.c.value('@col[1]', 'int')

  FROM @message.nodes('/root/inserted/inserted') T(c)

  )

  INSERT dbo.t2(

  id, col)

  SELECT

  id, col

  FROM I

  SET IDENTITY_INSERT dbo.t2 OFF

  END

  ELSE -- 删除

  BEGIN

  ;WITH

  D AS(

  SELECT

  id = T.c.value('@id[1]', 'int'),

  col = T.c.value('@col[1]', 'int')

  FROM @message.nodes('/root/deleted/deleted') T(c)

  )

  DELETE A

  FROM dbo.t2 A, D

  WHERE A.ID = D.ID

  END

  GO

  -- b.2 记录操作记录到dbo.tb_log 的存储过程

  CREATE PROC dbo.p_Record_log

  @message xml

  AS

  SET NOCOUNT ON

  DECLARE

  @inserted bit,

  @deleted bit

  SELECT

  @inserted = @message.exist('/root/inserted'),

  @deleted = @message.exist('/root/deleted')

  INSERT dbo.tb_log(

  user_name,

  operate_type,

  inserted,

  deleted)

  SELECT

  @message.value('(/root/user_name)[1]', 'sysname'),

  operate_type = CASE

  WHEN @inserted = 1 AND @deleted = 1 THEN 'update'

  WHEN @inserted = 1 THEN 'insert'

  WHEN @deleted = 1 THEN 'delete'

  END,

  @message.query('/root/inserted'),

  @message.query('/root/deleted')

  GO

  -- ===============================

  -- 在异步触发器处理系统中登记对象

  INSERT dbo.tb_async_trigger(

  table_name, trigger_name)

  VALUES(

  N't1', N'TR_async_trigger')

  INSERT dbo.tb_async_trigger_subscriber(

  procedure_name)

  SELECT N'dbo.p_Sync_t1_t2' UNION ALL

  SELECT N'dbo.p_Record_log'

  INSERT dbo.tb_async_trigger_subscribtion(

  trigger_id, procedure_id)

  SELECT 1, 1 UNION ALL

  SELECT 1, 2

  GO

  4.使用测试

  下面的T-SQL修改表dbo.t1中的数据,并检查dbo.t2、dbo.tb_log中的数据,以确定异步触发器架构的工作是否成功。

  执行完成后可以看到dbo.t2、dbo.tb_log中有相关的记录。

  -- ===============================

  -- 测试

  INSERT dbo.t1

  SELECT 1 UNION ALL

  SELECT 2

  UPDATE dbo.t1 SET

  col = 2

  WHERE id = 1

  DELETE dbo.t1

  WHERE id = 2

  -- 显示结果

  WAITFOR DELAY '00:00:05'

  -- 延迟5 分钟, 以便有时间处理消息(因为是异步的)

  SELECT * FROM dbo.t2

  SELECT * FROM dbo.tb_log

  GO

  5.使用测试

  下面的T-SQL删除本文中建立的所有对象。

  -- =======================================

  -- 5. 删除相关的对象

  -- =======================================

  -- a. 删除service broker 对象

  DROP SERVICE SRV_async_trigger

  DROP QUEUE dbo.Q_async_trigger

  DROP CONTRACT CNT_async_trigger

  DROP MESSAGE TYPE MSGT_async_trigger

  GO

  -- b. 删除异步触发器处理的相关对象

  DROP PROC dbo.p_async_trigger_process

  DROP PROC dbo.p_async_trigger_send

  DROP TABLE dbo.tb_async_trigger_subscribtion

  DROP TABLE dbo.tb_async_trigger_subscriber

  DROP TABLE dbo.tb_async_trigger

  GO

  -- c. 删除测试的对象

  DROP TABLE dbo.tb_log, dbo.t1, dbo.t2

  DROP PROC dbo.p_Sync_t1_t2, dbo.p_Record_log

Server

服务器

SQL


百度大联盟认证黄金会员Copyright© 1997- CNET Networks 版权所有。 ZDNet 是CNET Networks公司注册服务商标。
中华人民共和国电信与信息服务业务经营许可证编号:京ICP证010391号 京ICP备09041801号-159
京公网安备:1101082134