Chapter 14. Using Generic JMS

Abstract

Apache CXF provides a generic implementation of a JMS transport. The generic JMS transport is not restricted to using SOAP messages and allows for connecting to any application that uses JMS.

NOTE: Support for the JMS 1.0.2 APIs has been removed in CXF 3.0. If you are using RedHat JBoss Fuse 6.2 or higher (includes CXF 3.0), your JMS provider must support the JMS 1.1 APIs.

14.1. Approaches to Configuring JMS

The Apache CXF generic JMS transport can connect to any JMS provider and work with applications that exchange JMS messages with bodies of either TextMessage or ByteMessage.

There are two ways to enable and configure the JMS transport:

14.2. Using the JMS configuration bean

Overview

To simplify JMS configuration and make it more powerful, Apache CXF uses a single JMS configuration bean to configure JMS endpoints. The bean is implemented by the org.apache.cxf.transport.jms.JMSConfiguration class. It can be used to either configure endpoint’s directly or to configure the JMS conduits and destinations.

Configuration namespace

The JMS configuration bean uses the Spring p-namespace to make the configuration as simple as possible. To use this namespace you need to declare it in the configuration’s root element as shown in Example 14.1, “Declaring the Spring p-namespace”.

Example 14.1. Declaring the Spring p-namespace

<beans ...
  xmlns:p="http://www.springframework.org/schema/p"
  ... >
  ...
</beans>

Specifying the configuration

You specify the JMS configuration by defining a bean of class org.apache.cxf.transport.jms.JMSConfiguration. The properties of the bean provide the configuration settings for the transport.

Important

In CXF 3.0, the JMS transport no longer has a dependency on Spring JMS, so some Spring JMS-related options have been removed.

Table 14.1, “General JMS Configuration Properties” lists properties that are common to both providers and consumers.

Table 14.1. General JMS Configuration Properties

PropertyDefaultDescription

connectionFactory

 

[Required] Specifies a reference to a bean that defines a JMS ConnectionFactory.

wrapInSingleConnectionFactory

true [pre v3.0]

Removed in CXF 3.0

pre CXF 3.0 Specifies whether to wrap the ConnectionFactory with a Spring SingleConnectionFactory.

Enable this property when using a ConnectionFactory that does not pool connections, as it will improve the performance of the JMS transport. This is so because the JMS transport creates a new connection for each message, and the SingleConnectionFactory is needed to cache the connection, so it can be reused.

reconnectOnException

false

Deprecated in CXF 3.0 CXF always reconnects when an exception occurs.

pre CXF 3.0 Specifies whether to create a new connection when an exception occurs.

When wrapping the ConnectionFactory with a Spring SingleConnectionFactory:

  • true — on an exception, create a new connection

    Do not enable this option when using a PooledConnectionFactory, as this option only returns the pooled connection, but does not reconnect.

  • false — on an exception, do not try to reconnect

targetDestination

 

Specifies the JNDI name or provider-specific name of a destination.

replyDestination

 

Specifies the JMS name of the JMS destination where replies are sent. This property allows the use of a user-defined destination for replies. For more details see Section 14.6, “Using a Named Reply Destination”.

destinationResolver

DynamicDestinationResolver

Specifies a reference to a Spring DestinationResolver.

This property allows you to define how destination names are resolved to JMS destinations. Valid values are:

  • DynamicDestinationResolver — resolve destination names using the features of the JMS provider.
  • JndiDestinationResolver — resolve destination names using JNDI.

transactionManager

 

Specifies a reference to a Spring transaction manager. This enables the service to participate in JTA transactions.

taskExecutor

SimpleAsyncTaskExecutor

Removed in CXF 3.0

pre CXF 3.0 Specifies a reference to a Spring TaskExecutor. This is used in listeners to decide how to handle incoming messages.

useJms11

false

Removed in CXF 3.0 CXF 3.0 supports JMS 1.1 features only.

pre CXF 3.0 Specifies whether JMS 1.1 features are used. Valid values are:

  • true — JMS 1.1 features
  • false — JMS 1.0.2 features

messageIdEnabled

true

Removed in CXF 3.0

pre CXF 3.0 Specifies whether the JMS transport wants the JMS broker to provide message IDs. Valid values are:

  • true — broker needs to provide message IDs
  • false — broker need not provide message IDs

    In this case, the endpoint calls its message producer’s setDisableMessageID() method with a value of true. The broker is then given a hint that it need not generate message IDs or add them to the endpoint’s messages. The broker either accepts the hint or ignores it.

messageTimestampEnabled

true

Removed in CXF 3.0

pre CXF 3.0 Specifies whether the JMS transport wants the JMS broker to provide message time stamps. Valid values are:

  • true — broker needs to provide message timestamps
  • false — broker need not provide message timestamps

    In this case, the endpoint calls its message producer’s setDisableMessageTimestamp() method with a value of true. The broker is then given a hint that it need not generate time stamps or add them to the endpoint’s messages. The broker either accepts the hint or ignores it.

cacheLevel

-1 (feature disabled)

Removed in CXF 3.0

pre CXF 3.0 Specifies the level of caching that the JMS listener container may apply. Valid values are:

  • 0 — CACHE_NONE
  • 1 — CACHE_CONNECTION
  • 2 — CACHE_SESSION
  • 3 — CACHE_CONSUMER
  • 4 — CACHE_AUTO

For details, see Class DefaultMessageListenerContainer

pubSubNoLocal

false

Specifies whether to receive your own messages when using topics.

  • true — do not receive your own messages
  • false — receive your own messages

receiveTimeout

60000

Specifies the time, in milliseconds, to wait for response messages.

explicitQosEnabled

false

Specifies whether the QoS settings (such as priority, persistence, time to live) are explicitly set for each message (true) or use the default values (false).

deliveryMode

2

Specifies whether a message is persistent. Valid values are:

  • 1 (NON_PERSISTENT)—messages are kept memory only
  • 2 (PERSISTENT)—messages are persisted to disk

priority

4

Specifies message priority. JMS priority values range from 0 (lowest) to 9 (highest). See your JMS provider’s documentation for details.

timeToLive

0 (indefinitely)

Specifies the time, in milliseconds, before a message that has been sent is discarded.

sessionTransacted

false

Specifies whether JMS transactions are used.

concurrentConsumers

1

Removed in CXF 3.0

pre CXF 3.0 Specifies the minimum number of concurrent consumers for the listener.

maxConcurrentConsumers

1

Removed in CXF 3.0

pre CXF 3.0 Specifies the maximum number of concurrent consumers for the listener.

messageSelector

 

Specifies the string value of the selector used to filter incoming messages. This property enables multiple connections to share a queue. For more information on the syntax used to specify message selectors, see the JMS 1.1 specification.

subscriptionDurable

false

Specifies whether the server uses durable subscriptions.

durableSubscriptionName

 

Specifies the name (string) used to register the durable subscription.

messageType

text

Specifies how the message data will be packaged as a JMS message. Valid values are:

  • text — specifies that the data will be packaged as a TextMessage
  • byte — specifies that the data will be packaged as an array of bytes (byte[])
  • binary — specifies that the data will be packaged as an ByteMessage

pubSubDomain

false

Specifies whether the target destination is a topic or a queue. Valid values are:

  • true — topic
  • false — queue

jmsProviderTibcoEms

false

Specifies whether the JMS provider is Tibco EMS.

When set to true, the principal in the security context is populated from the JMS_TIBCO_SENDER header.

useMessageIDAsCorrelationID

false

Removed in CXF 3.0

Specifies whether JMS will use the message ID to correlate messages.

When set to true, the client sets a generated correlation ID.

maxSuspendedContinuations

-1 (feature disabled)

CXF 3.0 Specifies the maximum number of suspended continuations the JMS destination may have. When the current number exceeds the specified maximum, the JMSListenerContainer is stopped.

reconnectPercentOfMax

70

CXF 3.0 Specifies when to restart the JMSListenerContainer stopped for exceeding maxSuspendedContinuations.

The listener container is restarted when its current number of suspended continuations falls below the value of (maxSuspendedContinuations * reconnectPercentOfMax/100).

As shown in Example 14.2, “JMS configuration bean”, the bean’s properties are specified as attributes to the bean element. They are all declared in the Spring p namespace.

Example 14.2. JMS configuration bean

<bean id="jmsConfig"
      class="org.apache.cxf.transport.jms.JMSConfiguration"
      p:connectionFactory="jmsConnectionFactory"
      p:targetDestination="dynamicQueues/greeter.request.queue"
      p:pubSubDomain="false" />

Applying the configuration to an endpoint

The JMSConfiguration bean can be applied directly to both server and client endpoints using the Apache CXF features mechanism. To do so:

  1. Set the endpoint’s address attribute to jms://.
  2. Add a jaxws:feature element to the endpoint’s configuration.
  3. Add a bean of type org.apache.cxf.transport.jms.JMSConfigFeature to the feature.
  4. Set the bean element’s p:jmsConfig-ref attribute to the ID of the JMSConfiguration bean.

Example 14.3, “Adding JMS configuration to a JAX-WS client” shows a JAX-WS client that uses the JMS configuration from Example 14.2, “JMS configuration bean”.

Example 14.3. Adding JMS configuration to a JAX-WS client

<jaxws:client id="CustomerService"
              xmlns:customer="http://customerservice.example.com/"
              serviceName="customer:CustomerServiceService"
              endpointName="customer:CustomerServiceEndpoint"
              address="jms://"
              serviceClass="com.example.customerservice.CustomerService">
  <jaxws:features>
    <bean xmlns="http://www.springframework.org/schema/beans"
          class="org.apache.cxf.transport.jms.JMSConfigFeature"
          p:jmsConfig-ref="jmsConfig"/>
  </jaxws:features>
</jaxws:client>

Applying the configuration to the transport

The JMSConfiguration bean can be applied to JMS conduits and JMS destinations using the jms:jmsConfig-ref element. The jms:jmsConfig-ref element’s value is the ID of the JMSConfiguration bean.

Example 14.4, “Adding JMS configuration to a JMS conduit” shows a JMS conduit that uses the JMS configuration from Example 14.2, “JMS configuration bean”.

Example 14.4. Adding JMS configuration to a JMS conduit

<jms:conduit name="{http://cxf.apache.org/jms_conf_test}HelloWorldQueueBinMsgPort.jms-conduit">
  ...
  <jms:jmsConfig-ref>jmsConf</jms:jmsConfig-ref>
</jms:conduit>

14.3. Optimizing Client-Side JMS Performance

Overview

Two major settings affect the JMS performance of clients: pooling and synchronous receives.

Pooling

On the client side, CXF creates a new JMS session and JMS producer for each message. This is so because neither session nor producer objects are thread safe. Creating a producer is especially time intensive because it requires communicating with the server.

Pooling connection factories improves performance by caching the connection, session, and producer.

For ActiveMQ, configuring pooling is simple; for example:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;

ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
PooledConnectionFactory pcf = new PooledConnectionFactory();

//Set expiry timeout because the default (0) prevents reconnection on failure
pcf.setExpiryTimeout(5000);
pcf.setConnectionFactory(cf);

JMSConfiguration jmsConfig = new JMSConfiguration();

jmsConfig.setConnectionFactory(pdf);

For more information on pooling, see "Appendix A Optimizing Performance of JMS Single- and Multiple-Resource Transactions" in the Red Hat JBoss Fuse Transaction Guide

Avoiding synchronous receives

For request/reply exchanges, the JMS transport sends a request and then waits for a reply. Whenever possible, request/reply messaging is implemented asynchronously using a JMS MessageListener.

However, CXF must use a synchronous Consumer.receive() method when it needs to share queues between endpoints. This scenario requires the MessageListener to use a message selector to filter the messages. The message selector must be known in advance, so the MessageListener is opened only once.

Two cases in which the message selector cannot be known in advance should be avoided:

  • When JMSMessageID is used as the JMSCorrelationID

    If the JMS properties useConduitIdSelector and conduitSelectorPrefix are not set on the JMS transport, the client does not set a JMSCorrelationId. This causes the server to use the JMSMessageId of the request message as the JMSCorrelationId. As JMSMessageID cannot be known in advance, the client has to use a synchronous Consumer.receive() method.

    Note that you must use the Consumer.receive() method with IBM JMS endpoints (their default).

  • The user sets the JMStype in the request message and then sets a custom JMSCorrelationID.

    Again, as the custom JMSCorrelationID cannot be known in advance, the client has to use a synchronous Consumer.receive() method.

So the general rule is to avoid using settings that require using a synchronous receive.

14.4. Configuring JMS Transactions

Overview

CXF 3.0 supports both local JMS transactions and JTA transactions on CXF endpoints, when using one-way messaging.

Local transactions

Transactions using local resources roll back the JMS message only when an exception occurs. They do not directly coordinate other resources, such as database transactions.

To set up a local transaction, configure the endpoint as you normally would, and set the property sessionTrasnsacted to true.

Note

For more information on transactions and pooling, see the Red Hat JBoss Fuse Transaction Guide.

JTA transactions

Using JTA transactions, you can coordinate any number of XA resources. If a CXF endpoint is configured for JTA transactions, it starts a transaction before calling the service implementation. The transaction will be committed if no exception occurs. Otherwise, it will be rolled back.

In JTA transactions, a JMS message is consumed and the data written to a database. When an exception occurs, both resources are rolled back, so either the message is consumed and the data is written to the database, or the message is rolled back and the data is not written to the database.

Configuring JTA transactions requires two steps:

  1. Defining a transaction manager

    • bean method

      • Define a transaction manager

        <bean id="transactionManager"
           class="org.apache.geronimo.transaction.manager.GeronimoTransactionManager"/>
      • Set the name of the transaction manager in the JMS URI

        jms:queue:myqueue?jndiTransactionManager=TransactionManager

        This example finds a bean with the ID TransactionManager.

    • OSGi reference method

      • Look up the transaction manager as an OSGi service using Blueprint

        <reference id="TransactionManager" interface="javax.transaction.TransactionManager"/>
      • Set the name of the transaction manager in the JMS URI

        jms:jndi:myqueue?jndiTransactionManager=java:comp/env/TransactionManager

        This example looks up the transaction manager in JNDI.

  2. Configuring a JCA pooled connection factory

    Using Spring to define the JCA pooled connection factory:

    <bean id="xacf" class="org.apache.activemq.ActiveMQXAConnectionFactory">
      <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
    
    <bean id="ConnectionFactory" class="org.apache.activemq.jms.pool.JcaPooledConnectionFactory">
      <property name="transactionManager" ref="transactionManager" />
      <property name="connectionFactory" ref="xacf" />
    </bean>

    In this example, the first bean defines an ActiveMQ XA connection factory, which is given to a JcaPooledConnectionFactory. The JcaPooledConnectionFactory is then provided as the default bean with id ConnectionFactory.

    Note that the JcaPooledConnectionFactory looks like a normal ConnectionFactory. But when a new connection and session are opened, it checks for an XA transaction and, if found, automatically registers the JMS session as an XA resource. This allows the JMS session to participate in the JMS transaction.

    Important

    Directly setting an XA ConnectionFactory on the JMS transport will not work!

14.5. Using WSDL to configure JMS

14.5.1. JMS WSDL Extension Namespance

The WSDL extensions for defining a JMS endpoint are defined in the namespace http://cxf.apache.org/transports/jms. In order to use the JMS extensions you will need to add the line shown in Example 14.5, “JMS WSDL extension namespace” to the definitions element of your contract.

Example 14.5. JMS WSDL extension namespace

xmlns:jms="http://cxf.apache.org/transports/jms"

14.5.2. Basic JMS configuration

Overview

The JMS address information is provided using the jms:address element and its child, the jms:JMSNamingProperties element. The jms:address element’s attributes specify the information needed to identify the JMS broker and the destination. The jms:JMSNamingProperties element specifies the Java properties used to connect to the JNDI service.

Important

Information specified using the JMS feature will override the information in the endpoint’s WSDL file.

Specifying the JMS address

The basic configuration for a JMS endpoint is done by using a jms:address element as the child of your service’s port element. The jms:address element used in WSDL is identical to the one used in the configuration file. Its attributes are listed in Table 14.2, “JMS endpoint attributes”.

Table 14.2. JMS endpoint attributes

AttributeDescription

destinationStyle

Specifies if the JMS destination is a JMS queue or a JMS topic.

jndiConnectionFactoryName

Specifies the JNDI name bound to the JMS connection factory to use when connecting to the JMS destination.

jmsDestinationName

Specifies the JMS name of the JMS destination to which requests are sent.

jmsReplyDestinationName

Specifies the JMS name of the JMS destinations where replies are sent. This attribute allows you to use a user defined destination for replies. For more details see Section 14.6, “Using a Named Reply Destination”.

jndiDestinationName

Specifies the JNDI name bound to the JMS destination to which requests are sent.

jndiReplyDestinationName

Specifies the JNDI name bound to the JMS destinations where replies are sent. This attribute allows you to use a user defined destination for replies. For more details see Section 14.6, “Using a Named Reply Destination”.

connectionUserName

Specifies the user name to use when connecting to a JMS broker.

connectionPassword

Specifies the password to use when connecting to a JMS broker.

The jms:address WSDL element uses a jms:JMSNamingProperties child element to specify additional information needed to connect to a JNDI provider.

Specifying JNDI properties

To increase interoperability with JMS and JNDI providers, the jms:address element has a child element, jms:JMSNamingProperties, that allows you to specify the values used to populate the properties used when connecting to the JNDI provider. The jms:JMSNamingProperties element has two attributes: name and value. name specifies the name of the property to set. value attribute specifies the value for the specified property. jms:JMSNamingProperties element can also be used for specification of provider specific properties.

The following is a list of common JNDI properties that can be set:

  1. java.naming.factory.initial
  2. java.naming.provider.url
  3. java.naming.factory.object
  4. java.naming.factory.state
  5. java.naming.factory.url.pkgs
  6. java.naming.dns.url
  7. java.naming.authoritative
  8. java.naming.batchsize
  9. java.naming.referral
  10. java.naming.security.protocol
  11. java.naming.security.authentication
  12. java.naming.security.principal
  13. java.naming.security.credentials
  14. java.naming.language
  15. java.naming.applet

For more details on what information to use in these attributes, check your JNDI provider’s documentation and consult the Java API reference material.

Example

Example 14.6, “JMS WSDL port specification” shows an example of a JMS WSDL port specification.

Example 14.6. JMS WSDL port specification

<service name="JMSService">
  <port binding="tns:Greeter_SOAPBinding" name="SoapPort">
    <jms:address jndiConnectionFactoryName="ConnectionFactory"
                 jndiDestinationName="dynamicQueues/test.Celtix.jmstransport" >
      <jms:JMSNamingProperty name="java.naming.factory.initial"
                             value="org.activemq.jndi.ActiveMQInitialContextFactory" />
      <jms:JMSNamingProperty name="java.naming.provider.url"
                             value="tcp://localhost:61616" />
    </jms:address>
  </port>
</service>

14.5.3. JMS client configuration

Overview

JMS consumer endpoints specify the type of messages they use. JMS consumer endpoint can use either a JMS ByteMessage or a JMS TextMessage.

When using an ByteMessage the consumer endpoint uses a byte[] as the method for storing data into and retrieving data from the JMS message body. When messages are sent, the message data, including any formating information, is packaged into a byte[] and placed into the message body before it is placed on the wire. When messages are received, the consumer endpoint will attempt to unmarshall the data stored in the message body as if it were packed in a byte[].

When using a TextMessage, the consumer endpoint uses a string as the method for storing and retrieving data from the message body. When messages are sent, the message information, including any format-specific information, is converted into a string and placed into the JMS message body. When messages are received the consumer endpoint will attempt to unmarshall the data stored in the JMS message body as if it were packed into a string.

When native JMS applications interact with Apache CXF consumers, the JMS application is responsible for interpreting the message and the formatting information. For example, if the Apache CXF contract specifies that the binding used for a JMS endpoint is SOAP, and the messages are packaged as TextMessage, the receiving JMS application will get a text message containing all of the SOAP envelope information.

Specifying the message type

The type of messages accepted by a JMS consumer endpoint is configured using the optional jms:client element. The jms:client element is a child of the WSDL port element and has one attribute:

Table 14.3. JMS Client WSDL Extensions

messageType

Specifies how the message data will be packaged as a JMS message. text specifies that the data will be packaged as a TextMessage. binary specifies that the data will be packaged as an ByteMessage.

Example

Example 14.7, “WSDL for a JMS consumer endpoint” shows the WSDL for configuring a JMS consumer endpoint.

Example 14.7. WSDL for a JMS consumer endpoint

<service name="JMSService">
  <port binding="tns:Greeter_SOAPBinding" name="SoapPort">
    <jms:address jndiConnectionFactoryName="ConnectionFactory"
                 jndiDestinationName="dynamicQueues/test.Celtix.jmstransport" >
      <jms:JMSNamingProperty name="java.naming.factory.initial"
                             value="org.activemq.jndi.ActiveMQInitialContextFactory" />
      <jms:JMSNamingProperty name="java.naming.provider.url"
                             value="tcp://localhost:61616" />
    </jms:address>
    <jms:client messageType="binary" />
  </port>
</service>

14.5.4. JMS provider configuration

Overview

JMS provider endpoints have a number of behaviors that are configurable. These include:

  • how messages are correlated
  • the use of durable subscriptions
  • if the service uses local JMS transactions
  • the message selectors used by the endpoint

Specifying the configuration

Provider endpoint behaviors are configured using the optional jms:server element. The jms:server element is a child of the WSDL wsdl:port element and has the following attributes:

Table 14.4. JMS provider endpoint WSDL extensions

AttributeDescription

useMessageIDAsCorrealationID

Specifies whether JMS will use the message ID to correlate messages. The default is false.

durableSubscriberName

Specifies the name used to register a durable subscription.

messageSelector

Specifies the string value of a message selector to use. For more information on the syntax used to specify message selectors, see the JMS 1.1 specification.

transactional

Specifies whether the local JMS broker will create transactions around message processing. The default is false. [a]

[a] Currently, setting the transactional attribute to true is not supported by the runtime.

Example

Example 14.8, “WSDL for a JMS provider endpoint” shows the WSDL for configuring a JMS provider endpoint.

Example 14.8. WSDL for a JMS provider endpoint

<service name="JMSService">
  <port binding="tns:Greeter_SOAPBinding" name="SoapPort">
    <jms:address jndiConnectionFactoryName="ConnectionFactory"
                 jndiDestinationName="dynamicQueues/test.Celtix.jmstransport" >
      <jms:JMSNamingProperty name="java.naming.factory.initial"
                             value="org.activemq.jndi.ActiveMQInitialContextFactory" />
      <jms:JMSNamingProperty name="java.naming.provider.url"
                             value="tcp://localhost:61616" />
    </jms:address>
    <jms:server messageSelector="cxf_message_selector"
                useMessageIDAsCorrelationID="true"
                transactional="true"
                durableSubscriberName="cxf_subscriber" />
  </port>
</service>

14.6. Using a Named Reply Destination

Overview

By default, Apache CXF endpoints using JMS create a temporary queue for sending replies back and forth. If you prefer to use named queues, you can configure the queue used to send replies as part of an endpoint’s JMS configuration.

Setting the reply destination name

You specify the reply destination using either the jmsReplyDestinationName attribute or the jndiReplyDestinationName attribute in the endpoint’s JMS configuration. A client endpoint will listen for replies on the specified destination and it will specify the value of the attribute in the ReplyTo field of all outgoing requests. A service endpoint will use the value of the jndiReplyDestinationName attribute as the location for placing replies if there is no destination specified in the request’s ReplyTo field.

Example

Example 14.9, “JMS Consumer Specification Using a Named Reply Queue” shows the configuration for a JMS client endpoint.

Example 14.9. JMS Consumer Specification Using a Named Reply Queue

<jms:conduit name="{http://cxf.apache.org/jms_endpt}HelloWorldJMSPort.jms-conduit">
    <jms:address destinationStyle="queue"
                 jndiConnectionFactoryName="myConnectionFactory"
                 jndiDestinationName="myDestination"
                 jndiReplyDestinationName="myReplyDestination" >
      <jms:JMSNamingProperty name="java.naming.factory.initial"
                             value="org.apache.cxf.transport.jms.MyInitialContextFactory" />
      <jms:JMSNamingProperty name="java.naming.provider.url"
                             value="tcp://localhost:61616" />
    </jms:address>
  </jms:conduit>