Chapter 9. Messaging on JBoss

JMS Configuration and Architecture

The JMS API stands for Java Message Service Application Programming Interface, and it is used by applications to send asynchronous business-quality messages to other applications. In the messaging world, messages are not sent directly to other applications. Instead, messages are sent to destinations, known as queues or topics. Applications sending messages do not need to worry if the receiving applications are up and running, and conversely, receiving applications do not need to worry about the sending application's status. Both senders, and receivers only interact with the destinations.
The JMS API is the standardized interface to a JMS provider, sometimes called a Message Oriented Middleware (MOM) system. JBoss comes with a JMS 1.1 compliant JMS provider called JBoss Messaging or JBossMQ. When you use the JMS API with JBoss, you are using the JBoss Messaging engine transparently. JBoss Messaging fully implements the JMS specification; therefore, the best JBoss Messaging user guide is the JMS specification. For more information about the JMS API please visit the JMS Tutorial or JMS Downloads & Specifications.
This chapter focuses on the JBoss specific aspects of using JMS and message driven beans as well as the JBoss Messaging configuration and MBeans.

9.1. JMS Examples

In this section we discuss the basics needed to use the JBoss JMS implementation. JMS leaves the details of accessing JMS connection factories and destinations as provider specific details. What you need to know to use the JBoss Messaging layer is:
  • The location of the queue and topic connect factories: In JBoss both connection factory implementations are located under the JNDI name ConnectionFactory.
  • How to lookup JMS destinations (queues and topics): Destinations are configured via MBeans as we will see when we discuss the messaging MBeans. JBoss comes with a few queues and topics preconfigured. You can find them under the jboss.mq.destination domain in the JMX Console..
  • Which JARS JMS requires: These include concurrent.jar, jbossmq-client.jar, jboss-common-client.jar, jboss-system-client.jar, jnp-client.jar and log4j.jar.
In the following sections we will look at examples of the various JMS messaging models and message driven beans. The chapter example source is located under the src/main/org/jboss/book/jms directory of the book examples.

9.1.1. A Point-To-Point Example

Let's start out with a point-to-point (P2P) example. In the P2P model, a sender delivers messages to a queue and a single receiver pulls the message off of the queue. The receiver does not need to be listening to the queue at the time the message is sent. Example 9.1, “A P2P JMS client example” shows a complete P2P example that sends a javax.jms.TextMessage to the queue queue/testQueue and asynchronously receives the message from the same queue.

Example 9.1. A P2P JMS client example

package org.jboss.book.jms.ex1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;
import org.apache.log4j.Logger;
import org.jboss.util.ChapterExRepository;

/** 
 * A complete JMS client example program that sends a
 * TextMessage to a Queue and asynchronously receives the
 * message from the same Queue.
 * 
 * @author  Scott.Stark@jboss.org
 * @version $Revision: 1.9 $
 */
public class SendRecvClient
{
    static Logger log;
    static CountDown done = new CountDown(1);
    
    QueueConnection conn;
    QueueSession session;
    Queue que;
    
    public static class ExListener 
        implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                log.info("onMessage, recv text=" + tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }
    
    public void setupPTP()
        throws JMSException, 
               NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        que = (Queue) iniCtx.lookup("queue/testQueue");
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void sendRecvAsync(String text)
        throws JMSException, 
               NamingException
    {
        log.info("Begin sendRecvAsync");
        // Setup the PTP connection, session
        setupPTP();

        // Set the async listener
        QueueReceiver recv = session.createReceiver(que);
        recv.setMessageListener(new ExListener());

        // Send a text msg
        QueueSender send = session.createSender(que);
        TextMessage tm = session.createTextMessage(text);
        send.send(tm);
        log.info("sendRecvAsync, sent text=" + tm.getText());
        send.close();
        log.info("End sendRecvAsync");
    }
    
    public void stop()
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) 
        throws Exception
    {
        ChapterExRepository.init(SendRecvClient.class);
        log = Logger.getLogger("SendRecvClient");
        
        log.info("Begin SendRecvClient, now=" + System.currentTimeMillis());
        SendRecvClient client = new SendRecvClient();
        client.sendRecvAsync("A text msg");
        client.done.acquire();
        client.stop();
        log.info("End SendRecvClient");
        System.exit(0);
    }
}
The client may be run using the following command line:
[examples]$ ant -Dchap=jms -Dex=1p2p run-example
...
run-example1p2p:
     [java] [INFO,SendRecvClient] Begin SendRecvClient, now=1102808673386
     [java] [INFO,SendRecvClient] Begin sendRecvAsync
     [java] [INFO,SendRecvClient] onMessage, recv text=A text msg
     [java] [INFO,SendRecvClient] sendRecvAsync, sent text=A text msg
     [java] [INFO,SendRecvClient] End sendRecvAsync
     [java] [INFO,SendRecvClient] End SendRecvClient

9.1.2. A Pub-Sub Example

The JMS publish/subscribe (Pub-Sub) message model is a one-to-many model. A publisher sends a message to a topic and all active subscribers of the topic receive the message. Subscribers that are not actively listening to the topic will miss the published message. shows a complete JMS client that sends a javax.jms.TextMessage to a topic and asynchronously receives the message from the same topic.

Example 9.2. A Pub-Sub JMS client example

package org.jboss.book.jms.ex1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;

/**
 *  A complete JMS client example program that sends a TextMessage to
 *  a Topic and asynchronously receives the message from the same
 *  Topic.
 * 
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.9 $
 */

public class TopicSendRecvClient
{
    static CountDown done = new CountDown(1);
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    
    public static class ExListener implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                System.out.println("onMessage, recv text=" + tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }
    
    public void setupPubSub()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection();
        topic = (Topic) iniCtx.lookup("topic/testTopic");
        session = conn.createTopicSession(false,
                                          TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void sendRecvAsync(String text)
        throws JMSException, NamingException
    {
        System.out.println("Begin sendRecvAsync");
        // Setup the PubSub connection, session
        setupPubSub();
        // Set the async listener
        
        TopicSubscriber recv = session.createSubscriber(topic);
        recv.setMessageListener(new ExListener());
        // Send a text msg
        TopicPublisher send = session.createPublisher(topic);
        TextMessage tm = session.createTextMessage(text);
        send.publish(tm);
        System.out.println("sendRecvAsync, sent text=" + tm.getText());
        send.close();
        System.out.println("End sendRecvAsync");
    }
    
    public void stop() throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) throws Exception
    {
        System.out.println("Begin TopicSendRecvClient, now=" + 
                           System.currentTimeMillis());
        TopicSendRecvClient client = new TopicSendRecvClient();
        client.sendRecvAsync("A text msg, now="+System.currentTimeMillis());
        client.done.acquire();
        client.stop();
        System.out.println("End TopicSendRecvClient");
        System.exit(0);
    }
    
}
The client may be run using the following command line:
[examples]$ ant -Dchap=jms -Dex=1ps run-example
...
run-example1ps:
     [java] Begin TopicSendRecvClient, now=1102809427043
     [java] Begin sendRecvAsync
     [java] onMessage, recv text=A text msg, now=1102809427071
     [java] sendRecvAsync, sent text=A text msg, now=1102809427071
     [java] End sendRecvAsync
     [java] End TopicSendRecvClient
Now let's break the publisher and subscribers into separate programs to demonstrate that subscribers only receive messages while they are listening to a topic. Example 9.3, “A JMS publisher client” shows a variation of the previous pub-sub client that only publishes messages to the topic/testTopic topic. The subscriber only client is shown in Example 9.4, “A JMS subscriber client”.

Example 9.3. A JMS publisher client

package org.jboss.book.jms.ex1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSlistubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/** 
 *  A JMS client example program that sends a TextMessage to a Topic
 *    
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.9 $
 */
public class TopicSendClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    
    public void setupPubSub()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection();
        topic = (Topic) iniCtx.lookup("topic/testTopic");
        session = conn.createTopicSession(false,
                                          TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void sendAsync(String text)
        throws JMSException, NamingException
    {
        System.out.println("Begin sendAsync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Send a text msg
        TopicPublisher send = session.createPublisher(topic);
        TextMessage tm = session.createTextMessage(text);
        send.publish(tm);
        System.out.println("sendAsync, sent text=" +  tm.getText());
        send.close();
        System.out.println("End sendAsync");
    }
    
    public void stop() 
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) 
        throws Exception
    {
        System.out.println("Begin TopicSendClient, now=" + 
		                   System.currentTimeMillis());
        TopicSendClient client = new TopicSendClient();
	    client.sendAsync("A text msg, now="+System.currentTimeMillis());
        client.stop();
        System.out.println("End TopicSendClient");
        System.exit(0);
    }
    
}

Example 9.4. A JMS subscriber client

package org.jboss.book.jms.ex1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * A JMS client example program that synchronously receives a message a Topic
 *  
 * @author Scott.Stark@jboss.org
 * @version $Revision: 1.9 $
 */
public class TopicRecvClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    
    public void setupPubSub()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");
        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection();
        topic = (Topic) iniCtx.lookup("topic/testTopic");
        session = conn.createTopicSession(false,
                                          TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void recvSync()
        throws JMSException, NamingException
    {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();

        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createSubscriber(topic);
        Message msg = recv.receive(5000);
        if (msg == null) {
            System.out.println("Timed out waiting for msg");
        } else {
            System.out.println("TopicSubscriber.recv, msgt="+msg);
        }
    }
    
    public void stop()
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) 
        throws Exception
    {
        System.out.println("Begin TopicRecvClient, now=" +
                           System.currentTimeMillis());
        TopicRecvClient client = new TopicRecvClient();
        client.recvSync();
        client.stop();
        System.out.println("End TopicRecvClient");
        System.exit(0);
    }
    
}
Run the TopicSendClient followed by the TopicRecvClient as follows:
[examples]$ ant -Dchap=jms -Dex=1ps2 run-example
...
run-example1ps2:
     [java] Begin TopicSendClient, now=1102810007899
     [java] Begin sendAsync
     [java] sendAsync, sent text=A text msg, now=1102810007909
     [java] End sendAsync
     [java] End TopicSendClient
     [java] Begin TopicRecvClient, now=1102810011524
     [java] Begin recvSync
     [java] Timed out waiting for msg
     [java] End TopicRecvClient
The output shows that the topic subscriber client (TopicRecvClient) fails to receive the message sent by the publisher due to a timeout.

9.1.3. A Pub-Sub With Durable Topic Example

JMS supports a messaging model that is a cross between the P2P and pub-sub models. When a pub-sub client wants to receive all messages posted to the topic it subscribes to even when it is not actively listening to the topic, the client may achieve this behavior using a durable topic. Let's look at a variation of the preceding subscriber client that uses a durable topic to ensure that it receives all messages, include those published when the client is not listening to the topic. Example 9.5, “A durable topic JMS client example” shows the durable topic client with the key differences between the Example 9.4, “A JMS subscriber client” client highlighted in bold.

Example 9.5. A durable topic JMS client example

package org.jboss.book.jms.ex1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 *  A JMS client example program that synchronously receives a message a Topic
 *     
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.9 $
 */
public class DurableTopicRecvClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    
    public void setupPubSub()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");

        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("john", "needle");
        topic = (Topic) iniCtx.lookup("topic/testTopic");

        session = conn.createTopicSession(false,
                                          TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void recvSync()
        throws JMSException, NamingException
    {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createDurableSubscriber(topic, "jms-ex1dtps");
        Message msg = recv.receive(5000);
        if (msg == null) {
            System.out.println("Timed out waiting for msg");
        } else {
            System.out.println("DurableTopicRecvClient.recv, msgt=" + msg);
        } 
    }
    
    public void stop() 
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) 
        throws Exception
    {
        System.out.println("Begin DurableTopicRecvClient, now=" + 
                           System.currentTimeMillis());
        DurableTopicRecvClient client = new DurableTopicRecvClient();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }
    
}
Now run the previous topic publisher with the durable topic subscriber as follows:
[examples]$ ant -Dchap=jms -Dex=1psdt run-example
...                
run-example1psdt:
     [java] Begin DurableTopicSetup
     [java] End DurableTopicSetup
     [java] Begin TopicSendClient, now=1102899834273
     [java] Begin sendAsync
     [java] sendAsync, sent text=A text msg, now=1102899834345
     [java] End sendAsync
     [java] End TopicSendClient
     [java] Begin DurableTopicRecvClient, now=1102899840043
     [java] Begin recvSync
     [java] DurableTopicRecvClient.recv, msgt=SpyTextMessage {
     [java] Header { 
     [java]    jmsDestination  : TOPIC.testTopic.DurableSubscription[
               clientId=DurableSubscriberExample name=jms-ex1dtps selector=null]
     [java]    jmsDeliveryMode : 2
     [java]    jmsExpiration   : 0
     [java]    jmsPriority     : 4
     [java]    jmsMessageID    : ID:3-11028998375501
     [java]    jmsTimeStamp    : 1102899837550
     [java]    jmsCorrelationID: null
     [java]    jmsReplyTo      : null
     [java]    jmsType         : null
     [java]    jmsRedelivered  : false
     [java]    jmsProperties   : {}
     [java]    jmsPropReadWrite: false
     [java]    msgReadOnly     : true
     [java]    producerClientId: ID:3
     [java] }
     [java] Body {
     [java]    text            :A text msg, now=1102899834345
     [java] }
     [java] }
     [java] End DurableTopicRecvClient
Items of note for the durable topic example include:
  • The TopicConnectionFactory creation in the durable topic client used a username and password, and the TopicSubscriber creation was done using the createDurableSubscriber(Topic, String) method. This is a requirement of durable topic subscribers. The messaging server needs to know what client is requesting the durable topic and what the name of the durable topic subscription is. We will discuss the details of durable topic setup in the configuration section.
  • An org.jboss.book.jms.DurableTopicSetup client was run prior to the TopicSendClient. The reason for this is a durable topic subscriber must have registered a subscription at some point in the past in order for the messaging server to save messages. JBoss supports dynamic durable topic subscribers and the DurableTopicSetup client simply creates a durable subscription receiver and the exits. This leaves an active durable topic subscriber on the topic/testTopic and the messaging server knows that any messages posted to this topic must be saved for latter delivery.
  • The TopicSendClient does not change for the durable topic. The notion of a durable topic is a subscriber only notion.
  • The DurableTopicRecvClient sees the message published to the topic/testTopic even though it was not listening to the topic at the time the message was published.

9.1.4. A Point-To-Point With MDB Example

Example 9.6, “A TextMessage processing MDB” shows an message driven bean (MDB) that transforms the TextMessages it receives and sends the transformed messages to the queue found in the incoming message JMSReplyTo header.

Example 9.6. A TextMessage processing MDB

package org.jboss.book.jms.ex2;
                
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.ejb.EJBException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/** 
 * An MDB that transforms the TextMessages it receives and send the
 * transformed messages to the Queue found in the incoming message
 * JMSReplyTo header.
 * 
 * @author Scott.Stark@jboss.org
 * @version $Revision: 1.9 $
 */
public class TextMDB 
    implements MessageDrivenBean, MessageListener
{
    private MessageDrivenContext ctx = null;
    private QueueConnection conn;
    private QueueSession session;
    
    public TextMDB()
    {
        System.out.println("TextMDB.ctor, this="+hashCode());
    }
    
    public void setMessageDrivenContext(MessageDrivenContext ctx)
    {
        this.ctx = ctx;
        System.out.println("TextMDB.setMessageDrivenContext, this=" + 
                           hashCode());
    }
    
    public void ejbCreate()
    {
        System.out.println("TextMDB.ejbCreate, this="+hashCode());
        try {
            setupPTP();
        } catch (Exception e) {
            throw new EJBException("Failed to init TextMDB", e);
        }
    }

    public void ejbRemove()
    {
        System.out.println("TextMDB.ejbRemove, this="+hashCode());
        ctx = null;
        try {
            if (session != null) {
                session.close();
            }
            if (conn != null) {
                conn.close();
            }
        } catch(JMSException e) {
            e.printStackTrace();
        }
    }
                
    public void onMessage(Message msg)
    {
        System.out.println("TextMDB.onMessage, this="+hashCode());
        try {
            TextMessage tm = (TextMessage) msg;
            String text = tm.getText() + "processed by: "+hashCode();
            Queue dest = (Queue) msg.getJMSReplyTo();
            sendReply(text, dest);
        } catch(Throwable t) {
            t.printStackTrace();
        }
    }
                
    private void setupPTP()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("java:comp/env/jms/QCF");
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }

    private void sendReply(String text, Queue dest)
        throws JMSException
    {
        System.out.println("TextMDB.sendReply, this=" + 
                           hashCode() + ", dest="+dest);
        QueueSender sender = session.createSender(dest);
        TextMessage tm = session.createTextMessage(text);
        sender.send(tm);
        sender.close();
    }
}
The MDB ejb-jar.xml and jboss.xml deployment descriptors are shown in Example 9.7, “The MDB ejb-jar.xml descriptor” and Example 9.8, “The MDB jboss.xml descriptor”.

Example 9.7. The MDB ejb-jar.xml descriptor

<?xml version="1.0"?>
<!DOCTYPE ejb-jar PUBLIC 
          "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN"
          "http://java.sun.com/dtd/ejb-jar_2_0.dtd">
<ejb-jar>
    <enterprise-beans>
        <message-driven>
            <ejb-name>TextMDB</ejb-name>
            <ejb-class>org.jboss.book.jms.ex2.TextMDB</ejb-class>
            <transaction-type>Container</transaction-type>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <message-driven-destination>
                <destination-type>javax.jms.Queue</destination-type>
            </message-driven-destination>
            <res-ref-name>jms/QCF</res-ref-name>
            <resource-ref>
                <res-type>javax.jms.QueueConnectionFactory</res-type>
                <res-auth>Container</res-auth>
            </resource-ref>
        </message-driven>
    </enterprise-beans>
</ejb-jar>

Example 9.8. The MDB jboss.xml descriptor

<?xml version="1.0"?>
<jboss>
    <enterprise-beans>
        <message-driven>
            <ejb-name>TextMDB</ejb-name>
            <destination-jndi-name>queue/B</destination-jndi-name>
            <resource-ref>
                <res-ref-name>jms/QCF</res-ref-name>
                <jndi-name>ConnectionFactory</jndi-name>
            </resource-ref>
        </message-driven>
    </enterprise-beans>
</jboss>
Example 9.9, “A JMS client that interacts with the TextMDB” shows a variation of the P2P client that sends several messages to the queue/B destination and asynchronously receives the messages as modified by TextMDB from queue A.

Example 9.9. A JMS client that interacts with the TextMDB

package org.jboss.book.jms.ex2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;

/**
 *  A complete JMS client example program that sends N TextMessages to
 *  a Queue B and asynchronously receives the messages as modified by
 *  TextMDB from Queue A.
 *
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.9 $
 */
public class SendRecvClient
{
    static final int N = 10;
    static CountDown done = new CountDown(N);

    QueueConnection conn;
    QueueSession session;
    Queue queA;
    Queue queB;
    
    public static class ExListener 
        implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                System.out.println("onMessage, recv text="+tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }
    
    public void setupPTP()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        queA = (Queue) iniCtx.lookup("queue/A");
        queB = (Queue) iniCtx.lookup("queue/B");
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void sendRecvAsync(String textBase)
        throws JMSException, NamingException, InterruptedException
    {
        System.out.println("Begin sendRecvAsync");

        // Setup the PTP connection, session
        setupPTP();

        // Set the async listener for queA
        QueueReceiver recv = session.createReceiver(queA);
        recv.setMessageListener(new ExListener());

        // Send a few text msgs to queB
        QueueSender send = session.createSender(queB);

        for(int m = 0; m < 10; m ++) {
            TextMessage tm = session.createTextMessage(textBase+"#"+m);
            tm.setJMSReplyTo(queA);
            send.send(tm);
            System.out.println("sendRecvAsync, sent text=" + tm.getText());
        }
        System.out.println("End sendRecvAsync");
    }
    
    public void stop() 
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) 
        throws Exception
    {
        System.out.println("Begin SendRecvClient,now=" + 
                           System.currentTimeMillis());
        SendRecvClient client = new SendRecvClient();
        client.sendRecvAsync("A text msg");
        client.done.acquire();
        client.stop();
        System.exit(0);
        System.out.println("End SendRecvClient");
    }
    
}
Run the client as follows:
[examples]$ ant -Dchap=jms -Dex=2 run-example
...
run-example2:
...
     [java] Begin SendRecvClient, now=1102900541558
     [java] Begin sendRecvAsync
     [java] sendRecvAsync, sent text=A text msg#0
     [java] sendRecvAsync, sent text=A text msg#1
     [java] sendRecvAsync, sent text=A text msg#2
     [java] sendRecvAsync, sent text=A text msg#3
     [java] sendRecvAsync, sent text=A text msg#4
     [java] sendRecvAsync, sent text=A text msg#5
     [java] sendRecvAsync, sent text=A text msg#6
     [java] sendRecvAsync, sent text=A text msg#7
     [java] sendRecvAsync, sent text=A text msg#8
     [java] sendRecvAsync, sent text=A text msg#9
     [java] End sendRecvAsync
     [java] onMessage, recv text=A text msg#0processed by: 12855623
     [java] onMessage, recv text=A text msg#5processed by: 9399816
     [java] onMessage, recv text=A text msg#9processed by: 6598158
     [java] onMessage, recv text=A text msg#3processed by: 8153998
     [java] onMessage, recv text=A text msg#4processed by: 10118602
     [java] onMessage, recv text=A text msg#2processed by: 1792333
     [java] onMessage, recv text=A text msg#7processed by: 14251014
     [java] onMessage, recv text=A text msg#1processed by: 10775981
     [java] onMessage, recv text=A text msg#8processed by: 6056676
     [java] onMessage, recv text=A text msg#6processed by: 15679078
The corresponding JBoss server console output is:
19:15:40,232 INFO  [EjbModule] Deploying TextMDB
	19:15:41,498 INFO  [EJBDeployer] Deployed: file:/jboss-eap-4.2/jboss-as/server/production/deploy/
  jms-ex2.jar
19:15:45,606 INFO  [TextMDB] TextMDB.ctor, this=10775981
19:15:45,620 INFO  [TextMDB] TextMDB.ctor, this=1792333
19:15:45,627 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=10775981
19:15:45,638 INFO  [TextMDB] TextMDB.ejbCreate, this=10775981
19:15:45,640 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=1792333
19:15:45,640 INFO  [TextMDB] TextMDB.ejbCreate, this=1792333
19:15:45,649 INFO  [TextMDB] TextMDB.ctor, this=12855623
19:15:45,658 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=12855623
19:15:45,661 INFO  [TextMDB] TextMDB.ejbCreate, this=12855623
19:15:45,742 INFO  [TextMDB] TextMDB.ctor, this=8153998
19:15:45,744 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=8153998
19:15:45,744 INFO  [TextMDB] TextMDB.ejbCreate, this=8153998
19:15:45,763 INFO  [TextMDB] TextMDB.ctor, this=10118602
19:15:45,764 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=10118602
19:15:45,764 INFO  [TextMDB] TextMDB.ejbCreate, this=10118602
19:15:45,777 INFO  [TextMDB] TextMDB.ctor, this=9399816
19:15:45,779 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=9399816
19:15:45,779 INFO  [TextMDB] TextMDB.ejbCreate, this=9399816
19:15:45,792 INFO  [TextMDB] TextMDB.ctor, this=15679078
19:15:45,798 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=15679078
19:15:45,799 INFO  [TextMDB] TextMDB.ejbCreate, this=15679078
19:15:45,815 INFO  [TextMDB] TextMDB.ctor, this=14251014
19:15:45,816 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=14251014
19:15:45,817 INFO  [TextMDB] TextMDB.ejbCreate, this=14251014
19:15:45,829 INFO  [TextMDB] TextMDB.ctor, this=6056676
19:15:45,831 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=6056676
19:15:45,864 INFO  [TextMDB] TextMDB.ctor, this=6598158
19:15:45,903 INFO  [TextMDB] TextMDB.ejbCreate, this=6056676
19:15:45,906 INFO  [TextMDB] TextMDB.setMessageDrivenContext, this=6598158
19:15:45,906 INFO  [TextMDB] TextMDB.ejbCreate, this=6598158
19:15:46,236 INFO  [TextMDB] TextMDB.onMessage, this=12855623
19:15:46,238 INFO  [TextMDB] TextMDB.sendReply, this=12855623, dest=QUEUE.A
19:15:46,734 INFO  [TextMDB] TextMDB.onMessage, this=9399816
19:15:46,736 INFO  [TextMDB] TextMDB.onMessage, this=8153998
19:15:46,737 INFO  [TextMDB] TextMDB.onMessage, this=6598158
19:15:46,768 INFO  [TextMDB] TextMDB.sendReply, this=9399816, dest=QUEUE.A
19:15:46,768 INFO  [TextMDB] TextMDB.sendReply, this=6598158, dest=QUEUE.A
19:15:46,774 INFO  [TextMDB] TextMDB.sendReply, this=8153998, dest=QUEUE.A
19:15:46,903 INFO  [TextMDB] TextMDB.onMessage, this=10118602
19:15:46,904 INFO  [TextMDB] TextMDB.sendReply, this=10118602, dest=QUEUE.A
19:15:46,927 INFO  [TextMDB] TextMDB.onMessage, this=1792333
19:15:46,928 INFO  [TextMDB] TextMDB.sendReply, this=1792333, dest=QUEUE.A
19:15:47,002 INFO  [TextMDB] TextMDB.onMessage, this=14251014
19:15:47,007 INFO  [TextMDB] TextMDB.sendReply, this=14251014, dest=QUEUE.A
19:15:47,051 INFO  [TextMDB] TextMDB.onMessage, this=10775981
19:15:47,051 INFO  [TextMDB] TextMDB.sendReply, this=10775981, dest=QUEUE.A
19:15:47,060 INFO  [TextMDB] TextMDB.onMessage, this=6056676
19:15:47,061 INFO  [TextMDB] TextMDB.sendReply, this=6056676, dest=QUEUE.A
19:15:47,064 INFO  [TextMDB] TextMDB.onMessage, this=15679078
19:15:47,065 INFO  [TextMDB] TextMDB.sendReply, this=15679078, dest=QUEUE.A
Items of note in this example include:
  • The JMS client has no explicit knowledge that it is dealing with an MDB. The client simply uses the standard JMS APIs to send messages to a queue and receive messages from another queue.
  • The MDB declares whether it will listen to a queue or topic in the ejb-jar.xml descriptor. The name of the queue or topic must be specified using a jboss.xml descriptor. In this example the MDB also sends messages to a JMS queue. MDBs may act as queue senders or topic publishers within their onMessage callback.
  • The messages received by the client include a "processed by: NNN" suffix, where NNN is the hashCode value of the MDB instance that processed the message. This shows that many MDBs may actively process messages posted to a destination. Concurrent processing is one of the benefits of MDBs.