3.2. Reducing Context Switching

Overview

Through the consumer configuration options, there are two different ways in which you can optimize the threading model:

Optimize message dispatching on the broker side

On the broker side, the broker normally dispatches messages to consumers asynchronously, which usually gives the best performance (that is, it enables the broker to cope better with slow consumers). If you are sure that your consumers are always fast, however, you could achieve better performance by disabling asynchronous dispatch on the broker (thereby avoiding the cost of unnecessary context switching).
Broker-side asynchronous dispatching can be enabled or disabled at the granularity of individual consumers. Hence, you can disable asynchronous dispatching for your fast consumers, but leave it enabled for your (possibly) slow consumers.
To disable broker-side asynchronous dispatching, set the consumer.dispatchAsync option to false on the transport URI used by the consumer. For example, to disable asynchronous dispatch to the TEST.QUEUE queue, use the following URI on the consumer side:
TEST.QUEUE?consumer.dispatchAsync=false
It is also possible to disable asynchronous dispatch by setting the dispatchAsync property to false on the ActiveMQ connection factory—for example:
// Java
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);

Optimize message reception on the consumer side

On the consumer side, there are two layers of threads responsible for receiving incoming messages: the Session threads and the MessageConsumer threads. In the special case where only one session is associated with a connection, the two layers are redundant and it is possible to optimize the threading model by eliminating the thread associated with the session layer. This section explains how to enable this consumer threading optimization.

Default consumer threading model

Figure 3.1, “Default Consumer Threading Model” gives an overview of the default threading model on a consumer. The first thread layer is responsible for pulling messages directly from the transport layer, marshalling each message, and inserting the message into a queue inside a javax.jms.Session instance. The second thread layer consists of a pool of threads, where each thread is associated with a javax.jms.MessageConsumer instance. Each thread in this layer picks the relevant messages out of the session queue, inserting each message into a queue inside the javax.jms.MessageConsumer instance.

Figure 3.1. Default Consumer Threading Model

Default Consumer Threading Model

Optimized consumer threading model

Figure 3.2, “Optimized Consumer Threading Model” gives an overview of the optimized consumer threading model. This threading model can be enabled, only if there is no more than one session associated with the connection. In this case, it is possible to optimize away the session threading layer and the MessageConsumer threads can then pull messages directly from the transport layer.

Figure 3.2. Optimized Consumer Threading Model

Optimized Consumer Threading Model

Prerequisites

This threading optimization only works, if the following prerequisites are satisfied:
  1. There must only be one JMS session on the connection. If there is more than one session, a separate thread is always used for each session, irrespective of the value of the alwaysSessionAsync flag.
  2. One of the following acknowledgment modes must be selected:
    • Session.DUPS_OK_ACKNOWLEDGE
    • Session.AUTO_ACKNOWLEDGE

alwaysSessionAsync option

To enable the consumer threading optimization, set the alwaysSessionAsync option to false on the ActiveMQConnectionFactory (default is true).
Note
The optimizeAcknowledge option is only supported by the JMS client API.

Example

The following example shows how to initialize a JMS connection and session on a consumer that exploits the threading optimization by switching off the alwaysSessionAsync flag:
// Java
...
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connectionFactory.setAlwaysSessionAsync(false);
Connection connection = connectionFactory.createConnection();
connection.start();

// Create the one-and-only session on this connection.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);