在最近两年里,Sun引入了很多针对企业应用程序开发的API。其中最令人兴奋的是Java消息服务或者JMS。JMS API是为了在企业中进行消息传递而设计的,其中,JNDI的任务是为目录服务命名和提供目录服务,而JNBC的任务是提供数据库访问。JMS是为了给企业消息传递提供一种通用工具而设计的API,它没有考虑将消息发送到任何一台应用服务器的底层实现,也没有考虑您希望使用的其他企业消息传递软件技术。对于那些创建或者使用面向消息的中间件(MOM)的开发人员,尤其是那些需要在自己的产品中使用这些工具的Java开发人员,这是一个很大的进步。通过JMS,您应该能够编写一组针对JMS API进行消息传递的代码,然后在任何提供JMS支持的消息传递系统中使用这些代码。
在本文中,我将向您展示如何创建一系列的消息生产者和消费者,他们利用了JMS API的大部分函数来使用持久存储的主题和队列、创建临时目的地、通过消息选择器过滤消息,等等。我将在JMS的第一个供应商的实现中实现这些例子,该实现是随BEA的WebLogic应用服务器一起提供的(关于该服务器以及配置JMS的详细信息,可以从www.JavaDevelopersJournal.com的“install.txt”在线文件中获得)。在这一过程中,我将带您一起经历实现这些应用程序的必要步骤,其中包括修改应用服务器来支持您将创建的示例代码。
在继续创建下面的示例代码之前,建议您先获得并安装最新版本的BEA/WebLogic应用服务器。可以从BEA的网站http://ecommerce.bea.com/index.jsp下载免费使用试用版。
代码
包括在线的源文件在内,总共有4个源文件展示了如何在JMS中使用两种不同的消息传递类型:发布/订阅和点对点。Sender.java和Receiver.java展示了如何在JMS中使用Queue进行点对点消息传递,而Publisher.java和Subscriber.java展示了如何使用Topic进行发布/订阅消息传递。“readme.txt”文件(也在网上)包含运行各种应用程序的命令以及有关代码的最新信息。在尝试运行这些应用程序之前,一定要先阅读“install.txt”文件,并根据指示对WebLogic服务器进行必要的修改。因为示例代码中还包含一些注释和JMS的更多细节,本文无法完全介绍这些细节,因此,我们鼓励读者检查源代码并运行这些应用程序。
Java消息服务
Java消息服务被设计为一组接口,这些接口将由MOM供应商和其他希望支持消息传递的供应商(应用服务器供应商、数据库服务器供应商,等等)实现。这些接口为希望实现代码的客户应用提供了一个公共API,针对可能的、给定数量的底层消息传递系统进行消息传递。
同时,JMS的设计非常灵活,它可以在保持可移植性的同时提供对现有消息传递系统的广泛支持。因此,它不支持每个消息传递系统中每种可能的消息传递选择。尽管您可能熟悉任何给定的MOM产品,但JMS不会自动支持该产品的每一个方面。
JMS中的主要概念是Destination。Destination仅仅是消息生产者与消息消费者之间的一种关联。Destination分为两种类型:Topic和Queue。为每种Destination类型定义的单独接口允许供应商支持其中一种类型或者两种类型都支持,这取决于供应商的消息传递工具。Topic是为发布/订阅消息传递而设计的,而Queue是为客户对客户(点对点)的消息传递设计的。这两种Destination类型都支持持久性。JMS还提供了对事务的支持。事务使您能够支持消息传递操作的提交和回滚。因此,如果事务中的操作失败了,那么您可以对发生在该事务中的所用消息传递操作执行回滚。同样地,如果每件事都很顺利,那么您可以提交这些消息传递操作,使它们持久可用。因为JMS事务超出了本文的讨论范围,所以我将让您自己去研究JMS的这些方面。
初始化JMS
连接工厂
要初始化JMS,则需要使用ConnectionFactory,它是一个标记接口,其中没有任何方法,并且可以通过TopicConnectionFactory或者QueueConnectionFactory得以扩展。供应商实现这些工厂接口中的一个或者两个接口均实现,以提供对其特定消息传递服务实现的访问。WebLogic提供了每种Factory类型的一般实现。Factory,以及Topic和Queue,都被认为是“托管”对象,因此它们都是WebLogic服务器在开机时创建的(在我们的例子中,我们将使用WebLogic的JMS实现)。然后,可以通过JNDI检索这些托管对象。还可以使用JNDI命名上下文来检索这些托管对象。
如果这看起来有点让您感到迷惑,不要担心,示例代码会使您很容易理解它们。您还可以在WebLogic中创建自己的Factory对象,而不是使用默认的Factory,这项操作是在weblogic.properties文件中完成的。用户定义的Factory将在后面进行讨论,这一节主要关注Topic,定义自己的Factory的详细信息包含在“install.txt”文件中,该文件以及本文的源代码都可以在网上找到。
以下代码来自Sender.java示例,它将向您展示如何初始化JMS,以及如何从JNDI中获得Queue的ConnectionFactory:
public final String JMS_FACTORY = "javax.jms.QueueConnectionFactory";
...
queueFx = (QueueConnectionFactory) initCtx.lookup(JMS_FACTORY);
Queue的默认ConnectionFactory的名称被传递到初始JNDI命名上下文的lookup()方法中。然后会把对QueueConnectionFactory实现的引用从WebLogic应用服务器返回给我们(Sender.java 示例代码中的getInitialContext()方法展示了初始化JNDI和从WebLogic中获得初始命名上下文的细节。您可以从Sun的网站获得关于API的详细信息。网址是:http://java.sun.com/products/jndi/1.2/javadoc/index.html)。
因为ConnectionFactories 是托管对象,所以由WebLogic应用服务器负责为Queue、Topic以及所有用户定义的Factory创建默认ConnectionFactories,并将它们与WebLogic JNDI实现中的名称绑定在一起,以便通过客户端代码进行查找。因此,在JNDI中,Queue的默认ConnectionFactory的名称是“javax.jms.QueueConnectionFactory”,在通过JNDI查找该名称的时候,将返回一个对这个接口的默认实现的引用,该引用是由WebLogic提供的。Topic也有一个默认的ConnectionFactory,它遵循相同的格式,即“javax.jms.TopicConnectionFactory”。通过JNDI进行查找将返回一个对TopicConnectionFactory默认实现的引用,它也是WebLogic提供的。
连接
在成功创建正确的ConnectionFactory后,下一步将是创建一个“连接”,它是JMS定义的一个接口,表示连接到底层消息传递供应商的客户机连接。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户机只使用单一连接。根据JMS文档,Connection的目的是“利用JMS提供者封装开放的连接”,以及表示“客户机与提供者服务例程之间的开放TCP/IP套接字。该文档还指出Connection应该是进行客户机身份验证的地方,除了其他一些事项外,客户机还可以指定惟一标志符。像ConnectionFactories一样,Connections也有两种类型,这取决于您将使用的Destination类型。QueueConnection和TopicConnection 都扩展了基本的Connection接口,通常,您只能使用其中的一个,使用哪一个则取决于客户将使用的消息传递方式。Connection的创建是通过ConnectionFactory完成的。Connection包含两种重要的方法:start和stop,开始和停止在连接过程中发送和接收消息。请参见清单1中的完整代码块。
会话
一旦从ConnectionFactory中获得一个Connection,就必须从Connection中创建一个或者多个会话。Session也是基本接口,同时,有两种扩展基本接口的特定于Destination的接口:QueueSession和TopicSession,它们可以提供特定于Queue或者Topic的Session方法。Session是为Destination类型生产消息的消费者或生产者的工厂的一种类型(如果Destination类型是QueueSession,那么它将创建面向Queue的生产者和消费者;如果Destination类型是TopicSession,那么它将创建面向Topic的生产者和消费者)。
Session可以被事务化,也可以不被事务化,通常,您可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。同样,您也可以向Connection的Session创建方法传递一个参数,该方法可以设置将创建的Session的消息确认模式,而该模式将指定是由客户机还是由消费者来确认它所检索的任何信息(如果使用事务机制,则忽略该参数)。Session提供的其他方法是各种消息创建方法,这些方法允许您创建包含文本、字节、属性甚至串行化Java对象的特定类型的JMS消息(有关的更多信息在下面的Message接口上)。图1是实际JMS接口的继承和创建关系的图表,请参见清单1中的代码。
(暂缺图)
目的地(Destination)
在创建任何消息的生产者或消费者之前,必须要有特定的Destination,通过它您可以将任何生产者与消费者联系起来。记住,Destination是托管对象,类似ConnectionFactories。这意味着Destination是由WebLogic维护的,必须通过JNDI查找来检索它。这还意味着,在这种情况下,Destination必须是事先定义好的。但这并不是说您总是要提前创建Destination。JMS API提供了创建临时Destination的能力,但这个Destination只能在创建它的Session的生存周期中使用,JMS API也能在运行时创建永久性Destination。然而,在JMS当前的WebLogic实现中,应该注意的是,如果通过Session创建Destination,那么只要WebLogic服务器在运行,这些Destination就会存在。如果服务器出故障或者死机了,那么Destination也会不存在。创建真正的永久性Destination的方法是在weblogic.properties文件中创建它(请参阅“install.txt”文件,以获得关于如何做到这一点的详细信息)。
鉴于本文的目的,我们的Destination是通过weblogic.properties文件提前创建的。该文件中定义的Destination是WebLogic应用服务器在开机时创建的,并且可以通过JNDI用于客户代码。清单1显示了Send.java文件中的代码,展示了如何创建QueueConnection和QueueSession,以及如何像检索Sender应用程序的包层次结构那样检索Destination,我们在weblogic.properties文件中定义的队列的名称为“jdj.article.queue.sender”。
消息的消费者和消息的生产者
JMS初始化过程的最后一个阶段是创建MessageConsumers和MessageProducers。像 ConnectionFactory一样,它们也有Connection 和Session两个基本接口,为了使用Topic或者Queue,还有一些扩展这两个基本接口的特定于Destination的基本接口。(我在使用MessageProducer时使用Producer这个术语,在使用MessageConsumer时使用Consumer这个术语)。MessageConsumers用于检索发送到Destination的消息,MessageProducers用于将消息发送到Destination。二者均由Session实例创建。MessageProducer由特定于Destination的接口QueueSender和TopicPublisher扩展。MessageConsumer则由QueueReceiver和TopicSubscriber接口扩展。
一旦已经创建了自己的MessageConsumer和/或MessageProducer,也就做好了接收和/或发送消息的所有准备。因为生产者和消费者的创建是特定于Queue或Topic的,所以我将在下面的相关小节中讨论特定于Destination类型的这两种类型的处理。
消息
在进入下一步并开始深入探讨Topic和Queue中的消息发送和接收之前,我们需要讨论另一个接口,即Message,它表示JMS消息本身。这个对象包含将发送到Destination的信息,以及正在Destination上进行监听的消费者接收的信息。Message是由会话实例创建的,它由三个部分组成:消息头、属性和消息体。消息头是用来识别和路由消息的,客户端开发人员通常不会看到或者处理消息头信息。属性支持通过消息传递的特定于应用程序的值。这些属性字段是预先定义的,对它们的完整描述可以在JMS文档中找到。我将在示例代码中使用一些属性。消息的主体部分是消息的实际“有效负载”,它有5种类型,可以通过5个特定的接口来表示它们:StreamMessage、MapMessage、ObjectMessage、TextMessage 和BytesMessage(参见图1)。
持久性
JMS还有一个重要方面值得讨论:它支持消息的持久传递。很简单,这意味着当消息发送到Destination时,如果Consumer没有运行或不可用,那么这个消息将被保存起来,直至下次Consumer连接到Destination为止。例如,如果您有5个应用程序在某一个Topic上进行监听,其中一个崩溃了,那么下一次该应用程序启动并连接到这个特定的Topic时,尽管该应用程序崩溃了,但发送到这个Topic的所有消息都将在应用程序开始再次进行监听时发送到这个应用程序。如果这看起来难以理解,那么在您运行示例代码时就会更有意义。
队列
Queue是为“点对点”或“一对一”的消息传递而设计的。这意味着应该只有一台客户机发送消息给Queue,并且只有一个应用程序可以处理这个Queue中的消息。
事实上,您可以有多台向某一个Queue发送消息的客户机,但只能有一个应用程序处理该Queue中的消息。如果在Queue上有多个Consumer,那么哪个Consumer接收消息将无法保证,但是只能有一个Consumer接收消息。如果Destination上需要多个Consumer,并且想让它们都收到相同的消息,那么应该使用Topic(参见本文后面内容)。
Sender.java和Receiver.java文件中包含展示如何使用Queue的代码。这些代码展示了JMS的初始化过程,并展示了如何检索预定义的Queue,以及为了在Queue上发送和接收消息,应该如何创建MessageProducer和MessageConsumer。
为了在Queue中消费和产生消息,系统中有两个特定接口。其中一个接口是QueueSender,它是通过调用QueueSession的一个createQueueSender()方法,从QueueSession返回的,用于向Queue发送消息。另一个接口是QueueReceiver,它是通过调用QueueSession的一个createQueueReceiver()方法,从QueueSession返回的,用于接收来自Queue的消息。
清单2是来自Sender.java的sendMsg方法的代码,它展示了如何从Session中如何创建MessageProducer,然后构造一个TextMessage并发送它。在这段代码中,我们将创建一个QueueSender,然后创建一个TextMessage,它包含从应用程序用户界面的TextField中检索到的文本。随后,我们使用QueueSender方法“发送”消息。
在MessageConsumer上,有两种处理传入消息接收的方法:同步的和异步的。第一步是创建MessageConsumer;下一步是判断您想在Consumer上同步发送信息还是异步发送消息。
在从Session中创建了MessageConsumer之后,如果想同步接收为Destination生成的下一条消息,那么可以调用MessageConsumer上的检索方法。该方法没有采用任何参数,在接收下一个Message之前,它将一直受阻,并且会将这个Message返回给调用者。为了接收异步消息,可以向MessageConsumer注册一个MessageListener接口的实现。这种方法对Topic 和 Queue都适用。MessageListener只有一个您必须实现的方法 —— onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。Sender.java和Receiver.java中的onMessage实现中示范了这个过程。在Receiver.java中,我们将下面的代码放入initializeJMS方法中,这段代码将创建MessageConsumer(一个QueueReceiver)并设置MessageListener的实现:
// Create a Receiver for the Queue...
receiver = session.createReceiver(queue);
// Set the listener (this class)
receiver.setMessageListener(this);
一旦调用了Connection的start方法,消息就会在到达Destination之时开始进入Consumer。
ReplyTo——使用临时队列
您还会注意到,Sender.java和Receiver.java都实现了MessageConsumers和MessageProducers。二者都实现了MessageListener。这说明了JMS的一个有趣特征,也就是说,它使用了临时Destination。希望接收它所产生信息的响应的应用程序可以创建一个临时的Queue或者Topic,并在它发送的Message中传递这个Destination。
Message的属性之一是JMSReplyTo属性。这个属性就是用于这个目的的。您可以创建一个临时的Queue或者Topic,并把它放入Message的JMSReplyTo属性中。收到该消息的消费者有一个私有的临时Destination,可以用它来响应发送者。可以通过两种方法对这一点进行说明,这两种方法分别在两个文件中。Sender.java中包含如下所示代码段,它将创建临时的Queue,并将它放置在TextMessage的JMSReplyTo属性中:
// Create a temporary queue for replies...
tempQueue = (Queue) session.createTemporaryQueue();
以上这行代码可以在Sender.java的initializeJMS方法中找到。这段代码将在启动应用程序时创建一个临时Queue,而这个Queue将存在于应用程序的整个生命周期中。下面这行代码可以在Sender.java的sendMsg方法中找到,它展示了如何设置JMSReplyTo属性,以包含临时的Queue。
// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);
在Receiver.java的QueueReceiver接收这条消息时,会从JMSReplyTo字段中提取临时Queue,并且会通过应用程序构造一个QueueSender,以便将响应消息发送回Sender.java。这展示了如何使用JMS Message的属性,并显示了私有的临时Destination的有用之处。它还展示了客户机可以如何既是消息的Producer,有是消息的Consumer。下面的代码来自Receiver.java,它展示了如何从JMS Message中提取临时Queue;可以在onMessage方法中找到这些代码:
// Get the temporary queue from the JMSReplyTo
// property of the message...
tempQueue = (Queue) msg.getJMSReplyTo();
以下代码块来自sendReplyToMsg方法,它展示了如何创建QueueSender和如何发送应答:
// create a Sender for the temporary queue
if (sender == null)
sender = session.createSender(tempQueue);
TextMessage msg = session.createTextMessage();
msg.setText(REPLYTO_TEXT);
...
// Send the message to the temporary queue...
sender.send(msg);
主题
Topic是为了实现“发布/订阅”消息传递机制而设计的。而Queue是为了拥有一个Producer和一个Consumer而设计的,Topic的设计目标是允许拥有多个能发送消息给它的Producer,同时拥有多个能接收来自Topic的同一消息的Consumer。
Topic的MessageProducers和MessageConsumers的创建过程与Queue的类似。您可以使用Session来创建TopicPublishers和TopicSubscribers。之所以存在QueueSender和QueueReceiver镜像,是因为它们都提供了特定于Topic的功能,并且都实现了基本的MessageProducer和MessageConsumer接口。
TopicPublisher的创建过程几乎与QueueSender的创建过程完全相同。以下代码来自Publisher.java的sendMsg方法,它展示了TopicPublisher的创建过程,以及如何向Topic发布消息:
// create a Publisher if there isn't one...
if (publisher == null)
publisher = session.createPublisher(topic);
TextMessage msg = session.createTextMessage();
msg.setText(text);
...
// Publish it to the topic...
publisher.publish(msg);
持久订阅者
TopicSubscribers一个有趣的方面是它的持久订阅者,即用户提供的惟一名称,可以在Session中识别这个名称。Session拥有一个与之相关的Client ID,这个ID要么是通过Connection上的一个方法调用在运行时定义的,要么是作为托管ConnectionFactory的一部分(在我们的例子中,它是在weblogic.properties文件中用这种方法定义的)。因此,一个Connection可以提供一个带有Client ID的Session,并且持久订阅者的名称是Session中的惟一标识符,它与特定的Client ID相关联。持久订阅者的目的是为给定的Topic创建一个惟一的、持久的Consumer。
通过调用TopicSession的createDurableSubscriber方法创建TopicSubscriber的应用程序必须传递一个持久订阅者的名称(一个字符串)作为其参数之一;例如,您可以将持久订阅者的名称设置为目前已登录的用户的名称,等等。这个名称惟一地标识出了某一Topic的特殊订阅者(连同Connection/Session的惟一Client ID)。一旦已经向Topic注册这个持久订阅者,该Topic就会持久性地确保消息被发送到这个订阅者那里。这意味着如果某一特殊的持久订阅者是不可用的,那么将会一直保存消息,直到下次这个持久订阅者(有相同的、惟一的持久订阅者名称和Client ID)注册为一个Consumer为止。Subscriber.java 文件展示了持久订阅者的创建过程,并允许您使用默认的订阅者名称,或者从命令行设置这个标识符(要得到关于运行这个应用程序和展示通过拥有持久订阅者获得消息永久性的更多细节,请参阅“readme.txt”)。下列代码片段来自Subscriber.java的initializeJMS方法,它展示了如何从TopicSession中创建一个持久订阅者:
subscriber = session.createDurableSubscriber(
topic,
subscriberID,
SELECTOR,
false);
// Set the listener (this class)
subscriber.setMessageListener(this);
TopicSubscribers并没有被创建为持久订阅者,因此它不会持久性地接收消息,而是只在运行期间接收消息。
要想获得关于持久订阅者和Topic的更进一步的信息,请参阅Sun的JMS文档。关于上述代码的另外一点是:注意传递给该方法的第三个参数,这个参数是SELECTOR。这是消息选择器与Consumer有关联的地方(请参阅下面内容,以获得关于消息选择器的更多信息)。
过滤——使用消息选择器
我们将讨论的有关JMS的最后一个方面是消息选择器,消息选择器是用于MessageConsumers的过滤器,可以用来过滤传入消息的属性和头文件部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。您可以将消息选择器作为MessageConsumer创建的一部分。根据MessageConsumer是QueueReceiver还是TopicSubscriber,这种将消息选择器应用于传入信息的方法也会稍有不同。我建议您通过检查Subscriber.java文件以及运行Publisher和Subscriber应用程序,检查消息收集器的语法,并查看如何应用它们。以下代码段在Subscriber.java中定义了消息选择器,而应用程序本身允许您从文本域中改变这个选择器:
public final String SELECTOR = "JMSType = 'TOPIC_PUBLISHER'";
该选择器检查了来自Topic的传入消息的JMSType属性,并确定了这个属性的值是否等于TOPIC_PUBLISHER。如果相等,则将消息传递到MessageListener实现;如果不相等,那么消息会被忽略。请参阅“readme.txt”文件,以获得关于运行这些应用程序和展示其行为的更多细节。此外还建议您参阅Sun的JMS文档。
结束语
JMS是一项在应用程序中创建可移植消息传递代码的很有吸引力的、功能强大的技术。它允许进行“点对点”和“发布/订阅”消息传递,而且还支持事务和持久性。BEA的WebLogic应用服务器提供了一种健壮并完整的JMS实现,它能与应用服务器提供的其他技术一同工作,比如EJB和servlet。这为通过事务支持在不同企业对象和服务间进行永久的,异步的消息传递创造了极大的可能。我希望本文可以鼓励您开发在线示例代码,并且可以鼓励您检查WebLogic(尤其是JMS)提供的可能性。