Jms集群的意义在于提升系统在处理消息时的并发能力,建立这样的集群,有三个步骤:
1、 配置jms消息持久化所使用的数据库
2、 配置分布式的jndi环境
3、 配置分布式jms
在jboss 4.0.2中,系统采用hibernate的方式来保存消息,所以能够兼容hibernate支持的所有数据库。Jboss默认采用hsql,在我们的例子中,将使用oracle 9.2。首先需要配置连接到数据库的jndi数据源。方法是把doc\examples\jca下的oracle-ds.xml文件拷贝到server\all\farm下,并且修改其中的参数,保证数据库能够正确连接。Cluster启动后,该文件能够通过jboss的farm服务,自动拷贝到其他集群节点,并且自动部署。假设jndi数据源的名称为:GlobalDS
将doc\examples\jms下的oracle-jdbc3-service.xml文件拷贝到server\all\deploy-hasingleton\jms目录下,并且删除该目录下的hsqldb-jdbc2-service.xml。修改oracle-jdbc3-service.xml,在56行左右指定name的值为数据源的名字:GlobalDS。这样系统会使用该数据源来保存jms消息。使用如下命令启动boss: run ?c all
启动完成后,正常情况下会发现oracle数据库中多出了三张表:
1、Jms_message_log 该表用于保存所有未处理的点对点消息,表结构是:
Messageid 消息id
Destination 目的地
Txid 事务id
Txop 消息操作类型(a为新增,d为删除)
Messageblob 消息内容
2、JMS_REFERENCE_LOG 用于保存所有未处理的topic消息,表结构是:
Messageid
Destination
Txid
Txop
Messageblob
Redelivered 消息是否被重发
3、JMS_TRANSACTION_LOG 用于保存处理消息过程中的一些重要的事务
需要注意的是,jboss 3.2之后就不在支持以文件形式保存消息,虽然这样最会比数据库操作快一倍以上。Jboss官方的解释是,使用文件会让系统不可靠。
客户端在发送jms消息的时候,首先需要向app server查询jndi,在jboss cluster中,jndi是作为一个分布式的singleton出现的。每个节点除了有自己的jndi环境以外,整个cluster还具有一些全局的jndi,客户端在进行jndi查询的时候,只需要向这个全局的jndi进行查询,cluster如果在全局jndi中找不到对应的jndi对象,就会按次序向每个节点询问,看他们的本地jndi中是否有匹配的对象,如果有则返回给客户,如果所有的节点都没有,则抛出异常。所有以all方式启动的jboss,都会打开1100端口,这个端口是全局jndi的入口,所有节点都是如此。
分布式的jndi有的节点有主次的区别,第一个启动的jboss是主服务器,它会保存所有的全局jndi,其他的节点如果收到客户查询jndi的请求后,都会向主服务器请求数据。如果主服务器不幸down掉,那么次节点会发现这个变化,然后启动自己的jndi环境,取代主服务器提供服务。
下面是配置jms的jndi,打开server\all\deploy-hasingleton\jms下的jbossmq-destinations-service.xml文件,增加一个名为test的destination,如下:
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.mq.destination:service=Queue,name=test">
<depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
</mbean>
为了预防主服务器down了之后丢失该jndi,所以最好在每个节点都进行这个配置。
在jboss 4.0.2的默认配置下,是不支持消息bean的集群的,要达到这个目的,必须下载一个jar包才能实现,可以从这里获得: http://blog.yam.com/bromon/archives/489460.html
得到这个jar文件后,将它命名为cdot-jbossx.jar
文件放到server\all\deploy\jms下。下面编写消息bean,它的功能很简单,接收到来自test队列的消息后,打印消息id。
public class TestJmsBean
implements MessageDrivenBean, MessageListener {
MessageDrivenContext messageDrivenContext;
public void ejbCreate() {
System.out.println("消息bean创建");
}
public void ejbRemove() {
}
public void onMessage(Message msg) {
try
{
System.out.println(msg.getJMSMessageID());
}catch(Exception e)
{
e.printStackTrace();
}
}
public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
this.messageDrivenContext = messageDrivenContext;
}
}
把这个消息bean部署到server\all\farm目录下,它会被自动拷贝到cluster的其它节点,并且被自动部署,你会看到如下部署信息:
上面显示通过farm的方式,部署了一个名为GlobalDS的连接池,以及一个名为TestJms的消息bean。
下面写个客户端来测试一下:
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
p.put(Context.PROVIDER_URL, "172.16.0.116:1100"); // 全局jndi入口
InitialContext ctx = new InitialContext(p);
QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup(
"ConnectionFactory");
QueueConnection conn = qcf.createQueueConnection();
Queue q = (Queue) ctx.lookup("queue/test");//查询名为test的destination
QueueSession session = conn.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
conn.start();
QueueSender sender = session.createSender(q);
for (int i = 0; i < 10000; i++) {
TextMessage tm = session.createTextMessage(sdf.format(new Date()));
sender.send(tm, DeliveryMode.PERSISTENT, 4, 0);//发送持久化消息
System.out.print("第" + i);
}
conn.stop();
session.close();
conn.close();
执行一下,可以看到每个节点都创建了若干个消息bean,同时在处理消息,任意关闭一个次服务器,系统会自动fail over。查看Jms_message_log数据表,里面没有任何数据,表示所有的消息都已经被处理。
Jboss的jms cluster功能与websphere mq比较起来,是非常简陋的,可以配置的地方也很少,毕竟是免费的东西。Jboss的论坛上透露,在jboss 6.0中将会有全新的jboss messaging服务,不知要等到何年何月。针对这个cluster,我做过简单的测试,800万左右的消息数量,无一丢失,应该说还算比较可靠。响应时间也还过的去,在简单的网络环境下,能够应付比较高的并发。
查看本文来源