2.10. Message Prefetch Behavior

Overview

Figure 2.3, “Consumer Prefetch Limit” illustrates the behavior of a broker, as it waits to receive acknowledgments for the messages it has already sent to a consumer.

Figure 2.3. Consumer Prefetch Limit

Consumer Prefetch Limit
If a consumer is slow to acknowledge messages, the broker may send it another message before the previous message is acknowledged. If the consumer continues to be slow, the number of unacknowledged messages can grow continuously larger. The broker does not continue to send messages indefinitely. When the number of unacknowledged messages reaches a set limit—the prefetch limit—the server ceases sending new messages to the consumer. No more messages will be sent until the consumer starts sending back some acknowledgments.
Note
The broker relies on acknowledgement of delivery to determine if it can dispatch additional messages to a consumer's prefetch buffer. So, if a consumer's prefetch buffer is set to 1 and it is slow to acknowledge the processing of the message, it is possible that the broker will dispatch an additional message to the consumer and the pending message count will be 2.
Red Hat JBoss A-MQ provides various options for fine tuning prefetch limits for specific circumstances. The prefetch limits can be specified for different types of consumers. You can also set the prefetch limit on a per broker, per connection, or per destination basis.

Consumer specific prefetch limits

Different prefetch limits can be set for each consumer type. Table 2.2, “Prefetch Limit Defaults” list the property name and default value for each consumer type's prefetch limit.

Table 2.2. Prefetch Limit Defaults

Consumer TypePropertyDefault
Queue consumerqueuePrefetch1000
Queue browserqueueBrowserPrefetch500
Topic consumertopicPrefetch32766
Durable topic subscriberdurableTopicPrefetch100

Setting prefetch limits per broker

You can define the prefetch limits for all consumers that attach to a particular broker by setting a destination policy on the broker. To set the destination policy, add a destinationPolicy element as a child of the broker element in the broker's configuration, as shown in Example 2.13, “Configuring a Destination Policy”.

Example 2.13. Configuring a Destination Policy

<broker ... >
  ...
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue="queue.>" queuePrefetch=”1”/>
        <policyEntry topic="topic.>" topicPrefetch=”1000”/>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
  ...
</broker>
In Example 2.13, “Configuring a Destination Policy”, the queue prefetch limit for all queues whose names start with queue. is set to 1 (the > character is a wildcard symbol that matches one or more name segments); and the topic prefetch limit for all topics whose names start with topic. is set to 1000.

Setting prefetch limits per connection

In a consumer, you can specify the prefetch limits on a connection by setting properties on the ActiveMQConnectionFactory instance. Example 2.14, “Setting Prefetch Limit Properties Per Connection” shows how to specify the prefetch limits for all consumer types on a connection factory.

Example 2.14. Setting Prefetch Limit Properties Per Connection

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty("prefetchPolicy.queueBrowserPrefetch", "500");
props.setProperty("prefetchPolicy.durableTopicPrefetch", "100");
props.setProperty("prefetchPolicy.topicPrefetch", "32766");

factory.setProperties(props);
Note
You can also set the prefetch limits using the consumer properties as part of the broker URI used when creating the connection factory.

Setting prefetch limits per destination

At the finest level of granularity, you can specify the prefetch limit on each destination instance that you create in a consumer. Example 2.15, “Setting the Prefetch Limit on a Destination” shows code create the queue TEST.QUEUE with a prefetch limit of 10. The option is set as a destination option as part of the URI used to create the queue.

Example 2.15. Setting the Prefetch Limit on a Destination

Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

MessageConsumer consumer = session.createConsumer(queue);

Disabling the prefetch extension logic

The default behavior of a broker is to use delivery acknowledgements to determine the state of a consumer's prefetch buffer. For example, if a consumer's prefetch limit is configured as 1 the broker will dispatch 1 message to the consumer and when the consumer acknowledges receiving the message, the broker will dispatch a second message. If the initial message takes a long time to process, the message sitting in the prefetch buffer cannot be processed by a faster consumer.
This behavior can also cause issues when using the JCA resource adapter and transacted clients.
If the behavior is causing issues, it can be changed such that the broker will wait for the consumer to acknowledge that the message is processed before refilling the prefetch buffer. This is accomplished by setting a destination policy on the broker to disable the prefetch extension for specific destinations.
Example 2.16, “Disabling the Prefetch Extension” shows configuration for disabling the prefetch extension on all of a broker's queues.

Example 2.16. Disabling the Prefetch Extension

<broker ... >
  ...
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue=">" usePrefetchExtension=”false”/>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
  ...
</broker>