科技行者

行者学院 转型私董会 科技行者专题报道 网红大战科技行者

知识库

知识库 安全导航

至顶网软件频道通过 DB2 Type 2 JDBC 和 WebSphere Message Broker 使用协作事务

通过 DB2 Type 2 JDBC 和 WebSphere Message Broker 使用协作事务

  • 扫一扫
    分享文章到微信

  • 扫一扫
    关注官方公众号
    至顶头条

本文针对需要在访问 DB2 数据库的 WebSphere Message Broker V6 Java Compute 节点中实现 XA 协调事务的 Java 开发人员。

作者:ibm 来源:ibm 2007年10月6日

关键字: 技术 WEBSPHERE DB2 中间件

  • 评论
  • 分享微博
  • 分享邮件

引言

IBM® WebSphere® Message Broker V6 向流开发人员提供了对 Java 语言的全面访问,可将其作为 ESQL 的替代,用于实现路由、转换与充实以及对动态数据进行记录和仓库操作等消息处理逻辑。由于在 Broker 中实现的此处理通常包括同时对来自相关消息传递传输的动态消息数据和存储在数据库管理器中的数据进行更新,因此使用 Java™ Compute 节点的开发人员将尝试采用 JDBC 作为使用数据库的标准 Java 方法,从而为实现可部署在 Message Broker 和其他 Java 环境中的公用 Java 数据层的可能性提供了支持。

不过,当工作的处理单元需要由 XA 资源管理器以事务的形式与消息传递和数据库数据协作完成时,Message Broker 信息中心中的 Interacting with databases using the JavaCompute node 中所述的技术要求您使用提供的数据库处理节点,或使用传递到 Broker 的 ODBC 层的 SQL 指令(为了实现 XA 资源协调,需通过 MbSQLStatement 方法进行)。

通过使用本文描述的技术和配置,可以在使用 DB2® 数据库(采用 Type 2 JDBC 驱动程序)时,让 Broker 采用标准 JDBC 实现与 WebSphere Message Broker V6 的协作事务。通过此方法,可在运行于 Broker 和其他标准 Java 环境(如 WebSphere Application Server)中的应用程序间共享公共数据访问层代码。此方法还说明了如何在线程本地存储区中使用全局连接工厂和缓存连接,从而支持在配置为使用多处理线程的 Broker 流中获得良好的性能。

受支持的平台

本文中描述的配置已经过以下产品测试:

  • WebSphere Message Broker V6.0.0.1
  • WebSphere MQ V6.0.2
  • DB2® Universal Database V8.2

在以下平台进行了测试:

  • Microsoft® Windows®(仅限 32 位版本):
    • Windows XP Professional SP1 或更高版本
    • Windows Server 2003 Standard Edition 或 Enterprise Edition
  • IBM AIX® V5.3

开发消息流来使用 JDBC XA

通过使用本文中描述的技术和配置,Broker 可以实现与使用 Type 2 JDBC 驱动程序的 DB2 数据库协调工作的 XA 事务。通过使用 WebSphere MQ 的工具作为分布式平台上的全局事务协调器来协调 Message Broker MQ 操作和 JDBC 操作,从而实现 JDBC XA 支持。这之所以可行,是因为对于使用协调事务配置的消息流,消息流处理线程将在开始工作流的单元前对 DB2 执行事务协调缺省登记操作。正如下面约束与限制部分中所述,消息流必须仅访问 JDBC XA 数据源和 MQ 队列,而不是其他 ODBC 数据源或资源管理器。

在 Java Compute 节点中使用 Java JDBC T2 XA

为了获得由 WebSphere MQ Transaction Processor 进行协调的 JDBC 操作,必须通过 DB2 JDBC Type 2 驱动程序的 DriverManager Java 接口创建 JDBC 数据源,如 DB2 信息中心中的 How DB2 applications connect to a data source using the DriverManager interface 中所述。必须在执行消息流的每个唯一 Broker 线程的 Java Compute Node 类的 evaluate() 方法开始时获得数据库连接。获得连接后,应该缓存连接,并在每次相同的线程执行 evaluate() 方法来执行工作单元时进行重用。有关更多信息,请参见下面的使用全局连接工厂和线程级别连接缓存

正如下面约束与限制中所述,这些 JDBC 连接一定只能在不包括拥有 ODBC 的节点的消息流中使用,如 Compute 节点和 Database 节点。以下是典型的数据库连接技术:

	/**
	 * createConnection - Connect a thread to the JDBC data source
	 *
	 * @param DB2DB_Name
	 * @param DB2User_ID
	 * @param DB2User_PASSWORD
	 * @return Connection
	 * @throws DB2JdbcXaTestException
	 */
	private Connection createConnection(String DB2DB_Name,
	                                    String DB2User_ID,
	                                    String DB2User_PASSWORD)
	throws DB2JdbcXaTestException {
		Connection jdbcXaConn = null;
		try {
			// NB Must only use DB2 App type 2 classes
			Class.forName("COM.ibm.db2.jdbc.app.DB2Driver").newInstance();
			// URL form is jdbc:db2:dbname
			String url = "jdbc:db2:" + DB2DB_Name;

			Properties props = new Properties();
			props.put(new String("user"), DB2User_ID);
			props.put(new String("password"), DB2User_PASSWORD);

			jdbcXaConn = DriverManager.getConnection(url, props);

		} catch (SQLException e) {
                  // TODO Handle failure
			System.err.println(e.toString());
			e.printStackTrace();
		} catch (Exception e) {
                  // TODO Handle failure
			System.err.println(e.toString());
			e.printStackTrace();
		}
		return jdbcXaConn;
		}

Java Compute Node 类中的 Java 代码然后可以像任意正常 JDBC 代码一样使用返回的 Connection 对象。现在没有必要对此连接调用 commitrollback,因为在 Broker 消息流作为消息流处理的一部分内部调用 MQBEGINMQCOMMIT/MQROLLBACK 时,这些工作将由 MQ Transaction Processor 作为全局协调工作单元进行执行。对于启用了“coordinated transaction”的消息流,MQ Transaction Processor 将登记/提交/回滚对工作单元内的 Connection 对象进行的任何工作。事实上,对返回的 Connection 对象调用本地提交或回滚将导致 DB2 JDBC 驱动程序引发异常:

COM.ibm.db2.jdbc.DB2Exception: [...] SQL0926N SQL ROLLBACK invalid for application execution environment. SQLSTATE=2D521

修改消息流上所需的属性

将使用 JDBC 通过 Java Compute 节点访问 DB2 且要由流中的 MQ 操作进行协调的消息流必须在 Message Broker 中设置为使用事务:在 flow MQInput node properties 面板的 Advanced 选项卡上,确保 Transaction Mode 设置为 YesAutomatic

修改 BAR 文件上所需的属性

通过使用 Message Broker Archive (BAR) 编辑器,可编辑消息流的属性。添加包含使用 DB2 JDBC XA 的 Java Compute 节点的流时,必须将流设置为使用协调事务:

  1. 单击 Configure 选项卡。
  2. 为要使用 XA 协调机制的每个消息流选中对应的 Coordinated Transactions
  3. 保存并部署 BAR 文件。

如果在 BAR File Editor 中删除了 BAR 文件的内容,将必须重新选中流的 Coordinated Transactions。Java Compute Node 类可使用以下代码以编程的方式验证流已设置了此必需属性,确保业务逻辑将在保证协调事务完整性的前提下执行:

MbMessageFlow ourFlow = this.getMessageFlow();
flowName = ourFlow.getName();

// Enforce XA processing - validate flow has been
// deployed with Coordinated Transaction property set
if (!ourFlow.isCoordinatedTransaction()) {
	// TODO Flag deployment properties error via throw MbUserException
}

使用和配置 AIX

本部分详细说明在 WebSphere Message Broker 中为受支持的 AIX 平台设置和使用 DB2 JDBC "App" Type 2 XA 支持的步骤。有关此过程的更多信息,请参见 Message Broker 信息中心中的 Configuring global coordination with DB2 using a 64-bit queue manager

步骤 1. 创建 64 位 DB2 实例

WebSphere MQ V6 所使用的 XA 事务协调需要数据源位于 64 位 DB2 上。

步骤 2. 将 DB2 的连接方法设置为 TCP

在 AIX 上,DB2 仅限于十个共享内存分段,而这可能会限制能够建立的 DB2 JDBC 连接,从而导致在 Java Compute 节点通过 JDBC 连接到 DB2 时出现 DB2 错误消息 SQL1224N。为了克服此错误,系统需要配置为通过 TCP 连接到 DB2。有关此设置过程的更多信息,请参见 Message Broker 信息中心的 DB2 error message SQL1224N is issued when you connect to DB2。遵循这些步骤,并确保在下面的步骤中正确使用配置好的数据库名称及其别名。另外,确保 DB2 TP_MON_NAME 属性设置为 MQ。仅在 Windows 上需要该设置(有关更多信息,请参见下面“使用和配置 Windows 平台”中的步骤 1)。

步骤 3. 安装 DB2 XA 交换加载库(32 位和 64 位)

在此配置中,MQ Transaction Processor 需要安装一组库,以对 DB2 进行 XA 协调调用。可通过在 MQ 安装树中提供“交换加载库”文件来达到此目的。为 MQ 中这些文件提供了源代码和构建版本,但 Message Broker 提供了可在系统将 DB2 安装在缺省位置时使用的预构建文件。要安装这些预构建 DB2 交换加载库,请执行以下步骤:

  • 32 位交换加载库;将 /opt/IBM/mqsi/6.0/sample/xatm/db2swit 复制到 /var/mqm/exits/db2swit
  • 64 位交换加载库;将 /opt/IBM/mqsi/6.0/sample/xatm/db2swit64 复制到 /var/mqm/exits64/db2swit(此命令还将重命名文件,以在安装到 exits64 时删除 64)。

如果 DB2 未安装在缺省位置,必须从 MQ 中提供的源代码构建交换加载文件:

  1. 作为 root 登录。
  2. 如果 DB2 未安装在缺省位置,则导出 DB2_HOME=/<alternate location>/sqllib
  3. 转到目录 /usr/mqm/samp/xatm
  4. 发出以下命令,以构建库并将它们安装在 db2switdb2swit64 目录中: make -f xaswit.mak db2swit

步骤 4. 为数据库配置 ODBC .ini 文件(32 位和 64 位)

由于 Message Broker 是 32 位进程,而 MQ Queue Manager 是 64 位进程,因此必须同时在 ODBC .ini 文件中提供 32 位 和 64 位数据源驱动程序信息。在选择的位置(如代理服务用户主目录)创建 ODBC .ini 文件。通过这样,可让系统上的每个代理服务用户 ID 使用自己的数据源定义。

第一步是使用 /var/mqsi/odbc/.odbc.ini 处的模板创建并编辑 32 位 ODBC .ini 文件。Message Broker 数据库别名必须包括在此文件中。在下面的示例中,数据库 WBRKBDB 采用了用于 TCP 连接的别名 WBRKBDBT。该示例还给出了采用 ODBC 进行访问的数据库,该数据库的 ODBC 数据源名称(Data Source Name,DSN)为 ODBCDB,对于 TCP 连接的别名为 ODBCDBT

[ODBC Data Sources]

WBRKBDBT=IBM DB2 ODBC Driver
ODBCDBT=IBM DB2 ODBC Driver

[WBRKBDBT]
Driver=/usr/opt/db2_08_01/lib/libdb2.a
Description=WBRKBDB Database alias
Database=WBRKBDBT

[ODBCDBT]
Driver=/usr/opt/db2_08_01/lib/libdb2.a
Description=ODBCDB Database alias
Database=ODBCDBT

[ODBC]
Trace=0
TraceFile=/var/mqsi/odbc/odbctrace.out
TraceDll=/opt/IBM/mqsi/6.0/merant/lib/odbctrac.so
InstallDir=/opt/IBM/mqsi/6.0/merant
UseCursorLib=0
IANAAppCodePage=4

接下来,创建 64 位 ODBC .ini 文件:.odbc64.ini(如果尚不存在),其创建方法如 Message Broker 信息中心中 odbc64.ini 示例文件中所示。Message Broker 数据库别名必须包括在此文件中。在下面的示例中,数据库 WBRKBDB 采用了用于 TCP 连接的别名 WBRKBDBT。该示例还给出了采用 ODBC 进行访问的数据库,该数据库的 ODBC DSN 为 ODBCDB,对于 TCP 连接的别名为 ODBCDBT

  1. 如果有必要,复制 /var/mqsi/odbc/.odbc.ini 并对其重命名,从而创建名为 .odbc64.ini 的文件。
  2. 编辑 .odbc64.ini 以更改 64 位库 (libdb2Wrapper64.so) 的驱动程序,并按照下面所示替换 [ODBC] 内容:
    [ODBC Data Sources]
    
    WBRKBDBT=IBM DB2 ODBC Driver
    ODBCDBT=IBM DB2 ODBC Driver
    
    [WBRKBDBT]
    Driver=libdb2Wrapper64.so
    Description=WBRKBDB Database alias
    Database=WBRKBDBT
    
    [ODBCDBT]
    Driver=libdb2Wrapper64.so
    Description=ODBCDB Database alias
    Database=ODBCDBT
    
    [ODBC]
    Trace=0
    TraceFile=/var/mqsi/odbc/odbctrace.out
    TraceDll=/opt/IBM/mqsi/6.0/DD64/lib/odbctrac.so
    InstallDir=/opt/IBM/mqsi/6.0/DD64
    UseCursorLib=0
    IANAAppCodePage=4
    UNICODE=UTF-8
    

  3. 最后,确保 ODBC .ini 文件具有正确的所有关系和权限。作为 root 对文件执行以下命令:
    chown mqm:mqbrkrs .odbc.ini .odbc64.ini
    chmod 664 .odbc.ini .odbc64.ini
    

步骤 5. 设置所需的环境变量

对于所使用的任何控制台会话,请确保设置了以下环境变量:

  1. $ODBCINI 指向在前一步骤中创建的 .odbc.ini 文件: export ODBCINI64=/<path>/.odbc64.ini
  2. $ODBCINI64 指向在前一步骤中创建的 .odbc64.ini 文件: export ODBCINI64=/<path>/.odbc64.ini
  3. MQSI_LIBPATH64 包括常规 64 位 DB2 数据库的库。假定实例名/所有者为 db2inst1,则导出命令将与以下所示类似: export MQSI_LIBPATH64=/db2data/db2inst1/sqllib/lib64:$LD_LIBRARY_PATH.

不要直接更改产品 mqsiprofile 环境设置脚本,因为如果应用维护,可能会将其重写。不过,如果将名为 *.sh 的任意文件放入该位置中,mqsiprofile 可以为您调用任何其他用户编写的脚本。
<Workpath>/common/profiles
其中 <Workpath> 通过环境变量 MQSI_WORKPATH 定义。有关更多细节,请参见 Message Broker 信息中心的 Command environment: ® ® 。此步骤仅在使用 64 位执行组时是必需的,但最好是在稍后添加 64 位执行组的情况下完成配置。

步骤 6. 配置 Message Broker 队列管理器,从而为 DB2 JDBC 使用 XA

需要告知 WebSphere MQ 其将通过 Transaction Processor 进行协作的数据库的情况。可以通过为定义资源类型的数据库配置 XA 资源配置记录来完成此工作,例如,通过使用之前为支持通信而准备的交换加载文件。

这些指令用于 DB2,且假定 WebSphere MQ 和 WebSphere Message Broker 安装在缺省位置。数据库名称显示为 <jdbc database TCP alias name>,需要替换为用户 ID 和密码,以与您环境中的设置匹配。每个定义的 XA 资源都必须具有唯一的数据库名称,此名称需与相关代码建立到数据源的连接时使用的名称匹配:

  1. 停止 Broker 和任何访问 Broker 的队列管理器的应用程序。
  2. 停止 Broker 的对象管理器。
  3. 编辑以下文件: /var/mqm/qmgrs/<queue manager name>/qm.ini
  4. 将以下代码片段添加到 DB2 JDBC App Type 2 XA 对应的文件中:
    XAResourceManager:
    Name=<jdbc database TCP alias name>
    SwitchFile=db2swit
    XAOpenString=db=<database name>,uid=<userid>,pwd=<password>,toc=t
    ThreadOfControl=THREAD
    

  5. 重新启动 Broker 的队列管理器。
  6. 检查以下位置的错误:/var/mqm/qmgrs/<queue manager name>/errors
  7. 检查以下位置的错误:/var/mqm/errors
  8. 检查系统日志中的错误:/var/adm/syslog/user.log。(如果日志不位于此位置,则请查看 /etc/syslog.conf 底部,以找到其写入位置。)
  9. 由于 XA 所进行的实际测试在队列管理器上正常工作,因此可以在上面交换文件之一的 XAOpenString 中使用错误密码,并进行检查,会发现在尝试进行重新启动时在队列管理器日志中出现了错误。请记得稍后对此进行更正。

步骤 7. 修改 32 位 Broker 库路径

在启动 Message Broker 前及运行 db2profile 后,必须完成这些步骤,以便 Broker 能找到 DB2 必需的 32 位库文件。这些步骤假定实例名称/所有者为 db2inst1

	export LD_LIBRARY_PATH=/db2data/db2inst1/sqllib/lib32:$LD_LIBRARY_PATH
export LIBPATH=/db2data/db2inst1/sqllib/lib32:$LIBPATH

使用和配置 Windows

本部分详细说明在 WebSphere Message Broker 中为受支持的 Windows 平台设置和使用 DB2 JDBC "App" Type 2 XA 支持的步骤。

步骤 1. 更新 DB2 的 TP 监视名称,以使用 WebSphere MQ

需要告知 DB2 实例,WebSphere MQ 将提供 Transaction Processor。进行以下配置更改,以更改到将提供可通过 DB2 JDBC XA 连接从 Message Broker Java Compute 节点访问的数据库的任意 DB2 实例。

在 DB2 命令窗口中,输入以下命令,以将 TP 监视名称更新为 WebSphere MQ。仅在 Windows 上需要进行以下一次性设置。

db2 list applications
db2 update dbm cfg using TP_MON_NAME MQ
db2 get dbm cfg | more
db2stop
db2start

步骤 2. 配置队列管理器,以提供所需的数据库的 XA 资源管理

需要告知 WebSphere MQ 将通过其 Transaction Processor 进行协作的数据库的情况。可以通过为定义资源类型的数据库配置 XA 资源配置记录来实现此目的。DB2 提供了支持通信所需的必要交换加载文件。这些指令假定 WebSphere MQ 和 WebSphere Message Broker 安装在 C 驱动器的缺省位置。数据库名称显示为 <dbname<,您需要对其进行修改,以与环境中的设置匹配。每个定义的 XA 资源都必须具有唯一的数据库名称,此名称需与相关代码建立到数据源的连接时使用的名称匹配。

  1. 停止 Broker 和任何访问 Broker 队列管理器的应用程序。
  2. 停止 Broker 的对象管理器。
  3. 对于 DB2 JDBC Type 2 "App" 接口,请将 db2swit.dllC:\Program Files\IBM\MQSI\6.0\sample\xatm\db2swit.dll 复制到 C:\Program Files\IBM\WebSphere MQ\exits
  4. 在 MQ Explorer 中,右键单击 Broker 队列管理器,并选择 Properties
  5. 单击 'A resource managers => Add 并按照以下所示填写表格:

    AddXAresource 对话框

  6. 重新启动队列管理器。如果未能通过 GUI 启动队列管理器,请转而使用命令行。
  7. 检查以下位置的错误:C:\Program Files\IBM\WebSphere MQ\Qmgrs\<queue manager name>\errors
  8. 检查以下位置的错误:C:\Program Files\IBM\WebSphere MQ\errors
  9. 检查 Windows 事件日志中的错误。
  10. 由于 XA 所进行的实际测试在队列管理器上正常工作,因此可以在上面交换文件之一的 XAOpenString 中使用错误密码,并进行检查,会发现在尝试进行重新启动时在队列管理器日志中出现了错误。请记得稍后对此进行更正。

使用全局连接工厂和线程级别连接缓存

如上面所述,当包含使用此 JDBC 访问方法的 Java Compute 节点的 Message Broker 流为了实现伸缩性而需要并发处理多个事务时,将使用其他实例对其进行部署,以提供线程池。即,将在 BAR File Editor 中设置 additionaInstances 属性来定义在池中可用的线程数量。在这种情况下,必须确保从 JDBC Type 2 驱动程序获得的 JDBC 连接句柄仅获取一次,并在池中每个可用线程处理的后续事务上进行重用。有关更多信息,请参见 Message Broker 信息中心的 Configurable message flow properties。此处给出的解决方案通过两个阶段处理该需求:

  1. 提供了独立连接工厂来确保池中的每个线程仅连接一次。工厂类保留已连接的线程的静态映射,可以在线程进行调用时返回现有连接句柄,或创建并存储连接句柄。
  2. 我们将说明如何利用 Java ThreadLocal 来从连接工厂提供连接句柄的每个线程缓存,以避免每次各个线程处理新事务时锁定带来的开销。

我们之所以需要这个两个级别的实现,目的是为了方便关闭连接。当删除 Broker 消息流时,或所属执行组结束时,Broker 并不会对池中处于活动状态的每个线程调用 termination 方法。因此,如果我们只有数据库连接句柄的 ThreadLocal 缓存,我们将不能发起对所有开放句柄的关闭操作。通过使用基础独立连接工厂,可使用其 onDelete() 方法来提供可关闭所有开放数据库句柄的点,并确保能够干净地关闭数据库管理器。

独立连接工厂可以使用 Runnable 接口进行扩展,以提供基于非活动超时的连接关闭。但我们在代码中尚未实现此功能。

首先,我们将讨论连接工厂独立类 DB2JdbcXaThreadConnectionFactory。提供了单个公共方法来允许线程获得在该线程上使用的连接句柄:

/**
 * Retrieve an existing connection for this thread to this DB2 instance or
 * create and store a new connection
 *
 * @param DB2DB_Name
 * @param DB2User_ID
 * @param DB2User_PASSWORD
 * @return
 * @throws DB2JdbcXaTestException
 */
public static Connection getConnection(String DB2DB_Name,
		String DB2User_ID, String DB2User_PASSWORD)
		throws DB2JdbcXaTestException {
	DB2JdbcXaThreadConnectionFactory instance = getInstance();
	return instance.getCreateConnection(DB2DB_Name, DB2User_ID,
			DB2User_PASSWORD);
}

getInstance() 方法使用标准独立模式来检索或创建 DB2JdbcXaThreadConnectionFactory 类实例。 getCreateConnection() 方法使用静态哈希表来确定是否可以返回现有连接,或需要进行创建。

/**
 * Retrieve an existing connection for this thread to this DB2 instance or
 * create and store a new connection
 *
 * @param DB2DB_Name
 * @param user_ID
 * @param user_PASSWORD
 * @return Connection
 * @throws DB2JdbcXaTestException
 */
private Connection getCreateConnection(String DB2DB_Name, String user_ID,
		String user_PASSWORD) throws DB2JdbcXaTestException {
	Connection connObj = null;
	StringBuffer connectionKeyStrBuff = new StringBuffer();
	connectionKeyStrBuff.append(Thread.currentThread().getName());
	connectionKeyStrBuff.append('#');
	connectionKeyStrBuff.append(DB2DB_Name);
	connectionKeyStrBuff.append('#');
	connectionKeyStrBuff.append(user_ID);
	String connectionKeyString = connectionKeyStrBuff.toString();
	synchronized (connections) {
		connObj = (Connection) connections.get(connectionKeyString);
		if (connObj == null) {
			if (isDebugEnabled) {
				System.out.println("Create Connection: "
						+ connectionKeyString);
			}
			connObj = createConnection(DB2DB_Name, user_ID, user_PASSWORD);
			connections.put(connectionKeyString, connObj);
		}
	}
	return connObj;
}

如果尚不存在此数据和线程的连接,则将调用 createConnection() 方法来进行连接。

接下来,考虑如何从 Java Compute 节点使用此连接工厂独立类 DB2JdbcXaThreadConnectionFactory 并缓存到 Java ThreadLocal 中。有关 ThreadLocal 的用法的讨论,请参见 Exploiting ThreadLocal to enhance scalability。在 DB2JDBCXaJavaComputeBase 类中,通过扩展 MbJavaComputeNode 来实现 Broker Java Compute 节点。在此类中,将定义类 ThreadLocalConnection 来扩展 ThreadLocal 并实现 initialValue() 方法来从我们的连接工厂独立类 DB2JdbcXaThreadConnectionFactory 获取连接句柄。

// JCN Class object instance thread cache of the global DB2 instance
// connection
// obtained for each thread that executes the JCN in this flow
private class ThreadLocalConnection extends ThreadLocal {
	public Object initialValue() {
		Connection jdbcXaConn = null;
		try {
			if (isDebugEnabled) {
				System.out
						.println("Obtain thread local connection for Flow "
								+ flowName);
			}
			jdbcXaConn = DB2JdbcXaThreadConnectionFactory.getConnection(
					DB2DB_Name, DB2User_ID, DB2User_PASSWORD);

		} catch (DB2JdbcXaTestException e) {
			System.err.println("Failed getConnection(): " + e.toString());
			e.printStackTrace();
			noXaConnCause = e;
		}
		return jdbcXaConn;
	}
}

因此我们可以在每个消息处理调用 Java Compute 节点 DB2JDBCXaJavaComputeBase 类的 evaluate() 方法期间通过 ThreadLocal 对象的 get() 方法直接获得数据库连接句柄供使用,而这将在池中的每个线程的第一次调用时调用 initialValue() 方法:

/**
 * Obtain the DB2 JDBC Connection either from the local class threadlocal or
 * from the global DB2JdbcXaThreadConnectionFactory
 *
 * @return Connection
 * @throws DB2JdbcXaTestException
 */
public Connection getConnection() throws DB2JdbcXaTestException {
	Connection xaConn = (Connection) conn.get();
	if (xaConn == null) {
		throw noXaConnCause;
	}
	return xaConn;
}

实际运用

随本文提供的下载文件是 Message Broker Toolkit 项目交换文件,提供了以下所示的简单消息流:

所提供的示例消息流

此消息流包括 Java Compute 节点,该节点使用所介绍的技术来支持数据库表的简单插入操作。要运行此示例,请执行以下操作:

  1. 下载 ZIP 文件,然后使用 File => Import => Project Interchange,将其作为 Porject Interchange 导入您的 Message Broker Toolkit。
  2. 打开 MB_JCN_JDBCXA 项目中的消息文件 MB_JCN_JDBCXA.msgflow 和 Java 类 JavaComputeJdbcXa(该类实现 Java Compute 节点)。对其进行复查,注意 JavaComputeJdbcXa.evaluate() 方法所进行的工作:
    • 确保消息流经过验证(请参见 doValidation())。
    • 直接将输入消息复制到输出中。
    • 获取 JDBC 连接(请参见 getConnection())。
    • 在 DB2 示例数据库 SAMPLEEMPLOYEE 表中插入行(请参见 insertEmployee())。
    • 执行测试,验证连接的确是采用 XA。
  3. 按照上面所讨论的为协调 JDBC 事务准备数据库系统。如果在未针对 XA 协调进行设置的情况下部署和运行流,Message Broker 将发出错误:
    "BIP2633" Warning when starting transaction coordinated by WebSphere MQ;
    MQBEGIN failed: MQCC=1; MQRC=2122; node 'MB_JCN_JDBCXA.MQInput.

    ,其中 MQRC 为:
    C:\Program Files\IBM\MQSI\6.0.0.3>mqrc 2122
    	2122  0x0000084a  MQRC_PARTICIPANT_NOT_AVAILABLE
    C:\Program Files\IBM\MQSI\6.0.0.3>
    

  4. 在数据库中创建带 EMPLOYEE 表的 DB2 SAMPLE 数据库。MB_JCN_JDBCXA 项目中还提供了 Database\employee.ddl 文件。
    db2 => CREATE TABLE "EMPLOYEE"  ("EMPNO" CHAR(6) NOT NULL , "FIRSTNME" VARCHAR(12) NOT
    NULL ,"MIDINIT" CHAR(1) NOT NULL ,  "LASTNAME" VARCHAR(15) NOT NULL ,  "EDLEVEL" SMALLINT
    NOT NULL  )
    DB20000I  The SQL command completed successfully.
    db2 => commit work
    DB20000I  The SQL command completed successfully.
    db2 => connect reset
    DB20000I  The SQL command completed successfully.
    

  5. 在 Broker Queue Manager 上为流创建输入和输出 MQ 队列。MB_JCN_JDBCXA 项目中提供了 Queues\queues.mqc 文件。
    def ql(MB_JCN_JDBCXA_IN) bothresh(3) replace
    def ql(MB_JCN_JDBCXA_OUT) replace
    

  6. 创建包含 MB_JCN_JDBCXA 消息流的 BAR 文件。然后设置 Coordinated Transaction 和 Additional Instances 属性:

    设置 Coordinated Transaction 和 Additional Instances 属性

    如果在未设置 Coordinated Transaction 的情况下部署流,Java 代码验证将引发用户异常,并导致拒绝所有消息输入:

    Java exception: 'com.ibm.broker.plugin.MbUserException'; thrown from class name:
    'com.ibm.mbJcnJdbcXa.JavaComputeJdbcXa', method name: 'doValidation', file:
    'JavaComputeJdbcXa.java', line: '128'; trace text: 'Incorrect setup Flow is not
    setup for coordinated Transaction';  resource bundle:
    'com.ibm.mbJcnJdbcXa.JcnJdbcXaMessages'; key: 'DB2JDBCXATEST_FAILURE';
    

    然后设置以下用户定义属性,从而向 Java 提供数据库名称及部署环境的访问凭证细节:

    恰当地设置数据库名称和访问凭据

  7. 将创建的 BAR 文件部署到代理,并检查是否存在任何错误消息。
  8. 将包含任意有效 XML 的测试消息放在流输入队列 MB_JCN_JDBCXA_IN 上。
  9. 确认消息得到了处理,并被复制到输出队列 MB_JCN_JDBCXA_OUT,而且在 SAMPLE 数据库的 EMPLOYEE 表中创建了新行。

    添加到 EMPLOYEE 表的行


    db2 => SELECT * FROM EMPLOYEE
    
    EMPNO  FIRSTNME     MIDINIT LASTNAME        EDLEVEL
    ------ ------------ ------- --------------- -------
    4131  MSGBRK       M       JDBCXA               22
    
    1 record(s) selected.
    
    db2 =>>
    

    如果未创建新行,则请检查系统事件日志确定原因。如果创建了新行,将使用 XA 协调执行事务,因为 Java 代码将通过确保拒绝本地回滚来执行设置的验证。

约束与限制

  • 必须使用 DB2 "App" JDBC Type 2 驱动程序的 DriverManager 接口获得协调连接——实例化 COM.ibm.db2.jdbc.app.DB2Driver 类的实例。所有通过这种方式建立的连接将基于在流上进行的 Coordinated Transaction 设置来进行协调。
  • 只有分布式平台可用于此配置中——此方法在 z/OS® 上不可行。我们只对 Windows 和 AIX 进行测试。
  • 混合 JDBC / ODBC 消息流不能用于此配置。
  • 在 Message Broker 线程上注册了 XA 资源后,就不能再次对其进行注册。如果尝试重新部署流,可能会得到 MQRC 2195 错误。解决方法是,直接将部署的子项从执行组删除,然后重新部署 BAR 文件。
  • 必须在此配置中使用的 DB2 type 2 "App" JDBC 类并不实现任何连接池操作,因此将为使用 Java Compute 节点(建立到 JDBC XA 源的连接)运行流的每个 Message Broker 线程保留 JDBC 连接。

结束语

本文介绍了如何在 WebSphere Message Broker V6 Java Compute 节点中实现访问 DB2 数据库的协调事务,从而支持采用标准 JDBC 编写代码,使得构建能在 Message Broker 和其他 Java 环境(如 WebSphere Application Server)中部署的公用 Java 数据访问层成为可能。本文还提供了将解决方案付诸实践的示例项目,能够部署此项目并验证其成功操作。

    • 评论
    • 分享微博
    • 分享邮件
    邮件订阅

    如果您非常迫切的想了解IT领域最新产品与技术信息,那么订阅至顶网技术邮件将是您的最佳途径之一。

    重磅专题
    往期文章
    最新文章