Red Hat Training

A Red Hat training course is available for Red Hat Fuse

6.6. Generic XA-Aware Connection Pool Library

Overview

The generic XA-aware connection pool library is provided via the org.apache.activemq.jms.pool component. The library enables third-party JMS providers to participate in XA transactions managed by any JTA transaction manager—Apache Geronimo in particular—and to recover properly from system and application failures.
When an application uses the JMS bridge or the camel JMS component with a third-party JMS provider, this library enables it to leverage the shared pooled implementation at both ends of the bridge or the camel route.
Note
org.apache.activemq.pool extends org.apache.activemq.jms.pool.
The library contains three different pooled connection factories:
  • PooledConnectionFactory
    PooledConnectionFactory is a simple generic connection factory that pools connection, session, and message producer instances, so they can be used with tools, such as Apache Camel and Spring's JMSTemplate and MessageListenerContainer. Connections, sessions, and producers are returned to the pool for later reuse by the application.
  • XaPooledConnectionFactory
    XaPooledConnectionFactory is a generic XA pooled connection factory that automatically enlists the XA resource for each transaction and pools JMS connections, sessions and message producers.
  • JcaPooledConnectionFactory
    JcaPooledConnectionFactory is a generic XA pooled connection factory that works with the Geronimo transaction manager (Aries) and provides for proper recovery after a system or application failure using the GenericResourceManager. It also automatically enlists the XA resource for each transaction and pools JMS connections, sessions and message producers.

Dependencies

Maven users need to add a dependency on activemq-jms-pool to their pom.xml file as shown here:
<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-jms-pool</artifactId>
   <version>x.x.x</version>  <!-- Use same version as your activemq core  -->
<dependency>

PooledConnectionFactory

The PooledConnectionFactory has one required and nine optional parameters:

Table 6.1. PooledConnectionFactory parameters

ParameterRequiredDescription
connectionFactory YesSpecifies the underlying JMS connectionFactory.
blockIfSessionPoolIsFull No
Controls the behavior of the internal session pool when it is full.
  • true—(default) Blocks until a session object becomes available
  • false—Throws an exception
Note: maximumActiveSessionPerConnection controls the size of the session pool.
createConnectionOnStartup  
Specifies whether a connection will be created immediately on a call to start(). This is useful for warming up the pool on startup.
Note: Any exception that occurs during startup is logged at WARN level and ignored.
Defaults to true.
expiryTimeout No
Specifies the time, in milliseconds, at which connections are allowed to expire, regardless of load or idleTimeout.
Provides a simple way to achieve load balancing when used with the failover protocol (see Failover in the Transport Reference). Since failover works only when a connection is initially made, a pooled connection doesn't get another chance to load balance, unless expiry is forced on the pool.
If your application has no need to load balance frequently—for example, when a destination's producers and consumers are colocated on the same broker—set this parameter to a large number.
Defaults to 0 (disabled).
idleTimeout No
Specifies the time, in milliseconds, newly created connections can remain idle before they are allowed to expire. On expiring, a connection is closed and removed from the pool.
Note: Connections are normally tested on each attempt to borrow one from the pool. So if connections are infrequently requested, a connection instance could remain in the pool much longer than its configured setting.
Defaults to 30*1000 .
maxConnections No
Specifies the maximum number of connections to use. Each call to createConnection creates a new connection, up to maxConnections.
Defaults to 1.
maximumActiveSessionPerConnection No
Specifies the maximum number of active sessions allowed per connection. Once this maximum has been reached and a new session is requested, the connection either throws an exception or blocks until a session becomes available, according to blockIfSessionPoolIsFull.
Defaults to 500.
numConnections No
Specifies the number of connections currently in the pool.
timeBetweenExpirationCheckMillis No
Specifies the number of milliseconds to sleep between each run of the idle connection eviction thread.
  • non positive value—no eviction thread is run. Pooled connections are checked for excessive idle time or other failures only when one is borrowed from the pool.
  • positive value—Specifies the number of milliseconds to wait between each run of the idle connection eviction thread.
Defaults to -1, so no eviction thread will ever run.
useAnonymousProducers No
Specifies whether sessions will use one anonymous MessageProducer for all producer requests or create a new MessageProducer for each producer request.
  • true—(default) Only one anonymous MessageProducer is allocated for all requests
    You can use the MessageProducer#send(destination, message) method with anonymous message producers.
    Note: Using this approach prevents the broker from showing producers per destination.
  • false—A new MessageProducer is allocated per request

XaPooledConnectionFactory

The XaPooledConnectionFactory extends the PooledConnectionFactory, implementing three additional optional parameters:

Table 6.2. XaPooledConnectionFactory parameters

ParameterRequiredDescription
transactionManager NoSpecifies the JTA transaction manager to use.
tmFromJndi NoResolves a transaction manager from JNDI using the specified tmJndiName.
tmJndiName No
Specifies the name of the transaction manager to use when resolving one from JNDI.
Defaults to java:/TransactionManager.
Note
When no transaction manager is specified, the XaConnecionPool class reverts to behaving as a non XA-based connection.

JcaPooledConnectionFactory

The JcaPooledConnectionFactory pool extends theXaPooledConnectionFactory, implementing one additional required parameter:

Table 6.3. JcaPooledConnectionFactory parameters

ParameterRequiredDescription
name Yes
Specifies the name of the resource manager that the JTA transaction manager will use to detect whether two-phase commits must be employed.
The resource manager name must uniquely identify the broker.
Note: To start the recovery process, the GenericResourceManager must also be configured.

Examples

  • PooledConnectionFactory
    This example (Example 6.1) shows a simple pool that configures some connection parameters for a standalone ActiveMQ broker. In this scenario, you can use either the activemq-specific pool org.apache.activemq.pool or the generic pool org.apache.activemq.jms.pool.

    Example 6.1. Simple pooled connection factory configured using Blueprint

    <bean id="internalConnectionFactory"  
          class="org.apache.activemq.ConnectionFactory">
      <argument value="tcp://localhost:61616" />
    </bean>
    
    <bean id="connectionFactory"
          class="org.apache.activemq.jms.pool.PooledConnectionFactory"
          init-method="start" destroy-method="stop">
           <property name="connectionFactory"  ref="internalConnectionFactory"/>
           <property name="name" value="activemq" />
           <property name="maxConnections" value="2" />
           <property name="blockIfSessionPoolIsFull" value="true" />
    </bean>
  • XaPooledConnectionFactory
    This example (Example 6.2) uses two data sources, one standalone ActiveMQ broker and one standalone HornetMQ broker, to perform XA transactions, in which data is consumed from the HornetMQ broker and produced to the ActiveMQ broker. The HornetMQ broker is configured to use the generic pool org.apache.activemq.jms.pool, while the ActiveMQ broker is configured to use the activemq-specific pool org.apache.activemq.pool. This example uses the default settings for the optional parameters.

    Example 6.2. XA pooled connection factory configured programmatically

    //Transaction manager     
       javax.transaction.TransactionManager transactionManager;
    
    //generic pool used for hornetq 
     org.apache.activemq.jms.pool.XaPooledConnectionFactory 
     hqPooledConnectionFactory=new org.apache.activemq.jms.pool.XaPooledConnectionFactory();
    
    //pooledConnectionFactory for activemQ
      XaPooledConnectionFactory amqPooledConnectionFactory=new XaPooledConnectionFactory();
    
    //set transaction manager
      hqPooledConnectionFactory.setTransactionManager(transactionManager);
      amqPooledConnectionFactory.setTransactionManager(transactionManager);
    
    //set connection factory
      amqPooledConnectionFactory.setConnectionFactory(new ActiveMQXAConnectionFactory
         ("admin","admin", "tcp://localhost:61616"));
      hqPooledConnectionFactory.setConnectionFactory(getHornetQConnectionFactory());
    	
    //create Connections 
      XAConnection hornetQXaConnection=(XAConnection)
        ((org.apache.activemq.jms.pool.PooledConnection) 
           hqPooledConnectionFactory.createConnection()).getConnection();
      XAConnection amqXAConnection=(XAConnection)
        ((org.apache.activemq.jms.pool.PooledConnection)
           amqPooledConnectionFactory.createConnection()).getConnection();
               
        hornetPooledConn=hqPooledConnectionFactory.createConnection();
        amqPooledConnection=amqPooledConnectionFactory.createConnection();
     
        hornetQXaConnection.start();
        amqXAConnection.start();
    
        transactionManager.begin();
        Transaction trans=transactionManager.getTransaction();
         
    //XA resources are automatically enlisted by creating session 
      hornetQXasession=(XASession) hornetPooledConn.createSession
         (false, Session.AUTO_ACKNOWLEDGE);
      amqXASession=(XASession) amqPooledConnection.createSession
         (false,Session.AUTO_ACKNOWLEDGE);
    
    //some transaction .... 
      trans.rollback();
            
    //enlist again ..   
      hornetQXasession=(XASession) hornetPooledConn.createSession(false,Session.AUTO_ACKNOWLEDGE);
      amqXASession=(XASession) amqPooledConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            
    //other transaction
      trans.commit();
  • JcaPooledConnectionFactory
    This example (Example 6.3) shows the configuration of an XA application that uses the JcaPooledConnectionFactory, allowing a remote third-party JMS broker to participate in XA transactions with an ActiveMQ broker deployed in JBoss Fuse.
    A class specific to the Apache Geronimo transaction manager is used to register the pool with the transaction as required to enable recovery via the GenericResourceManager. Both the transactionManager and XAConnectionFactory (ActiveMQXAConnectionFactory) are defined and passed as properties to JcaPooledConnectionFactory, and the GenericResourceManager is configured to recover transactions upon a system or application failure.

    Example 6.3. JCA pooled connection factory configured using Blueprint

    <reference id="transactionManager"interface="org.apache.geronimo.
               transaction.manager.RecoverableTransactionManager" availability="mandatory">
    
    <bean id="platformTransactionManager" 
            class="org.springframework.transaction.jta.JtaTransactionManager"
                     init-method="afterPropertiesSet">
               <property name="transactionManager" ref="transactionManager"/>
               <property name="autodetectUserTransaction" value="false"/>
    </bean>
    
    <bean id="internalConnectionFactory" 
            class="org.apache.activemq.ActiveMQXAConnectionFactory">
               <argument value="tcp://localhost:61616" />
               <property name="userName" value="admin" />
               <property name="password" value="admin" />
    </bean>
    
    <bean id="connectionFactory" 
            class="org.apache.activemq.jms.pool.JcaPooledConnectionFactory"
                     init-method="start" destroy-method="stop">
               <property name="connectionFactory" ref="internalConnectionFactory"/>
               <property name="transactionManager" ref="transactionManager"/>
               <property name="name" value="activemq" />
    </bean>
    
    <bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" 
                     init-method="recoverResource">
               <property name="connectionFactory" ref="internalConnectionFactory"/>
               <property name="transactionManager" ref="transactionManager"/>
               <property name="resourceName" value="activemq" />
               <property name="userName" value="admin" />
               <property name="password" value="admin" />
    </bean>