Optimize Red Hat JBoss A-MQ for your environment
Copyright © 2011-2015 Red Hat, Inc. and/or its affiliates.
Chapter 1. Introduction to Performance Tuning
- The speed at which messages are written to and read from disk (persistent brokers only).
- The speed at which messages can be marshalled and sent over the network.
- Context switching, due to multi-threading.
Non-persistent and persistent brokers
Chapter 2. General Tuning Techniques
2.1. System Environment
Memory available to the JVM
-Xmxoption. For example, to increase JVM memory to 2048 MB, add
-Xmx2G) as a JVM option.
2.2. Co-locating the Broker
Figure 2.1. Broker Co-located with Producer
The vm:// transport
vm://endpoint from a producer or a consumer in just the same way as you connect to a
tcp://endpoint (or any other protocol supported by Red Hat JBoss A-MQ). But the effect of connecting to a
vm://endpoint is quite different from connecting to a
tcp://endpoint: whereas a
tcp://endpoint initiates a connection to a remote broker instance, the
vm://endpoint actually creates a local, embedded broker instance. The embedded broker runs inside the same JVM as the client and messages are sent to the broker through an internal channel, bypassing the network.
brokerConfigoption. For example, to create a
myBrokerinstance that takes its configuration from the
activemq.xmlconfiguration file, define the following VM endpoint:
A simple optimization
optimizedDispatch, and the consumer option,
dispatchAsync, are also configured to disable asynchronous behaviour, the calling thread can actually dispatch directly to consumers.
2.3. Optimizing the Protocols
- Socket buffer size—the default TCP socket buffer size is 64 KB. While this is adequate for the speed of networks in use at the time TCP was originally designed, this buffer size is sub-optimal for modern high-speed networks. The following rule of thumb can be used to estimate the optimal TCP socket buffer size:Buffer Size = Bandwidth x Round-Trip-TimeWhere the Round-Trip-Time is the time between initially sending a TCP packet and receiving an acknowledgement of that packet (ping time). Typically, it is a good idea to try doubling the socket buffer size to 128 KB. For example:
tcp://hostA:61617?socketBufferSize=131072For more details, see the Wikipedia article on Network Improvement.
- I/O buffer size—the I/O buffer is used to buffer the data flowing between the TCP layer and the protocol that is layered above it (such as OpenWire). The default I/O buffer size is 8 KB and you could try doubling this size to achieve better performance. For example:
Table 2.1. OpenWire Parameters Affecting Performance
|Specifies whether to cache commonly repeated values, in order to optimize marshaling.|
|The number of values to cache. Increase this value to improve performance of marshaling.|
wireFormat.prefix. For example, to double the size of the OpenWire cache, you can specify the cache size on a URI as follows:
useCompressionoption on the
ActiveMQConnectionFactoryclass. For example, to initialize a JMS connection with compression enabled in a Java client, insert the following code:
// Java ... // Create the connection. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connectionFactory.setUseCompression(true); Connection connection = connectionFactory.createConnection(); connection.start();
jms.useCompressionoption on a producer URI—for example:
2.4. Message Encoding
Message body type
BytesMessage(a stream of uninterpreted bytes) is the fastest, while
ObjectMessage(serialization of a Java object) is the slowest.
BytesMessagewhenever possible. We suggest that you use Google's Protobuf, which has excellent performance characteristics.
2.5. Threading Optimizations
trueon all queue destinations. When this option is enabled, the broker no longer uses a dedicated thread to dispatch messages to each destination.
optimizedDispatchoption on all queue destinations, insert the following policy entry into the broker configuration:
<broker ... > <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" optimizedDispatch=”true” /> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
>, is a wildcard that matches all queue names.
2.6. Vertical Scaling
Tricks to optimize vertical scaling
- NIO transport on the broker—to reduce the number of threads required, use the NIO transport (instead of the TCP transport) when defining transport connectors in the broker. Do not use the NIO transport in clients, it is only meant to be used in the broker.
- Allocate more memory to broker—to increase the amount of memory available to the broker, pass the
-Xmxoption to the JVM.
- Reduce initial thread stack size—to allocate a smaller initial stack size for threads, pass the
-Xssoption to the JVM.
2.7. Horizontal Scaling
Figure 2.2. Scaling with Multiple Brokers
Static scales better than dynamic
Asynchronous network connection establishment
networkConnectorStartAsyncattribute on the
true, as follows:
<beans ...> <broker ... networkConnectorStartAsync="true">...</broker> </beans>
Client-side traffic partitioning
- You can use all the tuning techniques for vertical scaling.
- You can achieve better horizontal scalability than a network of brokers (because there is less broker crosstalk).
2.8. Integration with Spring and Camel
JmsTemplate, which allows you to hide some of the lower level JMS details when sending messages and so on. One thing to bear in mind about
JmsTemplate, however, is that it creates a new connection, session, and producer for every message it sends, which is very inefficient. It is implemented like this in order to work inside an EJB container, which typically provides a special JMS connection factory that supports connection pooling.
org.apache.activemq.pool.PooledConnectionFactory, from the
activemq-poolartifact, which pools JMS resources to work efficiently with Spring's
JmsTemplateor with EJBs.
Creating a pooled connection factory
PooledConnectionFactoryis implemented as a wrapper class that is meant to be chained with another connection factory instance. For example, you could use a
PooledConnectionFactoryinstance to wrap a plain Red Hat JBoss A-MQ connection factory, or to wrap an
ActiveMQSslConnectionFactory, and so on.
jmsFactory, that works efficiently with the Spring
myJmsTemplate, define the following bean instances in your Spring configuration file:
<!-- A pooling-based JMS provider --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory"/> </property> </bean>
ActiveMQConnectionFactoryinstance that opens connections to the
2.9. Optimizing Memory Usage in the Broker
Optimize message paging
policyEntryelement, you can tune the message paging to match the amount of memory available in the broker. For example, if there is very large queue and lots of destination memory, increasing the
maxBrowsePageattribute would allow more of those messages to be visible when browsing a queue.
Destination policies to control paging
- The maximum number of messages paged into memory for sending to a destination.
- NoteThe number of messages paged in for browsing cannot exceed the destination's
- The maximum number of messages paged into memory to check for expired messages.
2.10. Optimizing JMX Management
Selective MBean registration
<managementContext> <managementContext suppressMBean="endpoint=dynamicProducer,endpoint=Consumer,connectionName=*,destinationName=ActiveMQ.Advisory.*" /> </managementContext>
Chapter 3. Consumer Performance
3.1. Acknowledgement Modes
Supported acknowledgement modes
- (Default) In this mode, the JMS session automatically acknowledges messages as soon as they are received. In particular, the JMS session acknowledges messages before dispatching them to the application layer. For example, if the consumer application calls
MessageConsumer.receive(), the message has already been acknowledged before the call returns.
- In this mode, the client application code explicitly calls the
Message.acknowledge()method to acknowledge the message. In Apache Camel, this acknowledges not just the message on which it is invoked, but also any other messages in the consumer that have already been completely processed.
- In this mode, the JMS session automatically acknowledges messages, but does so in a lazy manner. If JMS fails while this mode is used, some messages that were completely processed could remain unacknowledged. When JMS is restarted, these messages will be re-sent (duplicate messages).This is one of the fastest acknowledgement modes, but the consumer must be able to cope with possible duplicate messages (for example, by detecting and discarding duplicates).
- When using transactions, the session implicitly works in
SESSION_TRANSACTEDmode. The response to the transaction commit is then equivalent to message acknowledgement.When JMS transactions are used to group multiple messages, transaction mode is very efficient. But avoid using a transaction to send a single message, because this incurs the extra overhead of committing or rolling back the transaction.
- This non-standard mode is similar to
CLIENT_ACKNOWLEDGE, except that it acknowledges only the message on which it is invoked. It does not flush acknowledgements for any other completed messages.
optimizeAcknowledgeoption is exposed on the
ActiveMQConnectionFactoryclass and must be used in conjunction with the
Session.AUTO_ACKNOWLEDGEmode. When set to
true, the consumer acknowledges receipt of messages in batches, where the batch size is set to 65% of the prefetch limit. Alternatively, if message consumption is slow, the batch acknowledgement will be sent after 300ms. Default is
optimizeAcknowledgeoption is only supported by the JMS client API.
Choosing the acknowledgement mode
DUPS_OK_ACKNOWLEDGEmode, which requires you to implement duplicate detection code in your consumer.
3.2. Reducing Context Switching
Optimize message dispatching on the broker side
falseon the transport URI used by the consumer. For example, to disable asynchronous dispatch to the
TEST.QUEUEqueue, use the following URI on the consumer side:
dispatchAsyncproperty to false on the ActiveMQ connection factory—for example:
// Java ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
Optimize message reception on the consumer side
Sessionthreads and the
MessageConsumerthreads. In the special case where only one session is associated with a connection, the two layers are redundant and it is possible to optimize the threading model by eliminating the thread associated with the session layer. This section explains how to enable this consumer threading optimization.
Default consumer threading model
javax.jms.Sessioninstance. The second thread layer consists of a pool of threads, where each thread is associated with a
javax.jms.MessageConsumerinstance. Each thread in this layer picks the relevant messages out of the session queue, inserting each message into a queue inside the
Figure 3.1. Default Consumer Threading Model
Optimized consumer threading model
MessageConsumerthreads can then pull messages directly from the transport layer.
Figure 3.2. Optimized Consumer Threading Model
- There must only be one JMS session on the connection. If there is more than one session, a separate thread is always used for each session, irrespective of the value of the
- One of the following acknowledgement modes must be selected:
optimizeAcknowledgeoption is only supported by the JMS client API.
// Java ... // Create the connection. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connectionFactory.setAlwaysSessionAsync(false); Connection connection = connectionFactory.createConnection(); connection.start(); // Create the one-and-only session on this connection. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
3.3. Prefetch Limit
- Queue consumer
- Default prefetch limit is 1000.If you are using a collection of consumers to distribute the workload (many consumers processing messages from the same queue), you typically want this limit to be small. If one consumer is allowed to accumulate a large number of unacknowledged messages, it could starve the other consumers of messages. Also, if the consumer fails, there would be a large number of messages unavailable for processing until the failed consumer is restored.
- Queue browser
- Default prefetch limit is 500.
- Topic consumer
- Default prefetch limit is 32766.The default limit of 32766 is the largest value of a short and is the maximum possible value of the prefetch limit.
- Durable topic subscriber
- Default prefetch limit is 100.You can typically improve the efficiency of a consumer by increasing this prefetch limit.
Optimizing prefetch limits
- Queue consumers—if you have just a single consumer attached to a queue, you can leave the prefetch limit at a fairly large value. But if you are using a group of consumers to distribute the workload, it is usually better to restrict the prefetch limit to a very small number—for example, 0 or 1.
- Durable topic subscribers—the efficiency of topic subscribers is generally improved by increasing the prefetch limit. Try increasing the limit to 1000.
Chapter 4. Producer Performance
4.1. Async Sends
Configuring on a transport URI
trueon the transport URI that you use to connect to the broker. For example:
Configuring on a connection factory
truedirectly on the
ActiveMQConnectionFactoryinstance. For example:
// Java ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
Configuring on a connection
truedirectly on the
ActiveMQConnectioninstance. For example:
// Java ((ActiveMQConnection)connection).setUseAsyncSend(true);
4.2. Flow Control
Flow control enabled
Figure 4.1. Broker with Flow Control Enabled
Flow control disabled
Figure 4.2. Broker with Flow Control Disabled
PRICES.>pattern (that is, topic names prefixed by
PRICES.), configure the broker as follows:
<beans ... > <broker ...> <!-- lets define the dispatch policy --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="PRICES.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10"/> </pendingMessageLimitStrategy> </policyEntry> ... </policyEntries> </policyMap> </destinationPolicy> </broker> </beans>
How to turn off flow control
FOO., insert a policy entry like the following into the broker's configuration:
<broker ... > ... <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/> ... </policyEntries> </policyMap> </destinationPolicy> ... </broker>
Defining the memory limits
- Per-broker—to set global memory limits on a broker, define a
systemUsageelement as a child of the
brokerelement, as follows:
<broker> ... <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb" /> </storeUsage> <tempUsage> <tempUsage limit="10 gb" /> </tempUsage> </systemUsage> </systemUsage> ... </broker>Where the preceding sample specifies three distinct memory limits, as follows:
The values shown in the preceding example are the defaults.
memoryUsage—specifies the maximum amount of memory allocated to the broker.
storeUsage—for persistent messages, specifies the maximum disk storage for the messages.NoteIn certain scenarios, the actual disk storage used by JBoss A-MQ can exceed the specified limit. For this reason, it is recommended that you set
storeUsageto about 70% of the intended maximum disk storage.
tempUsage—for temporary messages, specifies the maximum amount of memory.
- Per-destination—to set a memory limit on a destination, set the
memoryLimitattribute on the
policyEntryelement. The value of
memoryLimitcan be a string, such as
512 KB. For example, to limit the amount of memory on the
FOO.BARqueue to 10 MB, define a policy entry like the following:
<policyEntry queue="FOO.BAR" memoryLimit="10 MB"/>
Making a producer aware of flow control
send()operation to block, until enough memory is freed up in the broker for the producer to resume sending messages. If you want the producer to be made aware of the fact that the
send()operation is blocked due to flow control, you can enable either of the following attributes on the
true, the broker immediately returns an error when flow control is preventing producer
send()operations; otherwise, revert to default behaviour.
- Specifies a timeout in units of milliseconds. When flow control is preventing producer
send()operations, the broker returns an error, after the specified timeout has elapsed.
<broker> ... <systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="64 mb" /> </memoryUsage> ... </systemUsage> </systemUsage> ... </broker>
Chapter 5. Managing Slow Consumers
- limiting the number of messages retained for a consumerWhen using non-durable topics, you can specify the number of messages that a destination will hold for a consumer. Once the limit is reached, older messages are discarded when new messages arrive.
- aborting slow consumersJBoss A-MQ determines slowness by monitoring how often a consumer's dispatch buffer is full. You can specify that consistently slow consumers be aborted by closing its connection to the broker.
Limiting message retention
pendingMessageLimitStrategy) on a topic to control the number of messages that are held for slow consumers. When set, the topic will retain the specified number of messages in addition to the consumer's prefetch limit.
-1, which means that the topic will retain all of the unconsumed messages for a consumer.
- specifying a constant number of messages over the prefetch limitThe
constantPendingMessageLimitStrategyimplementation allows you to specify constant number of messages to retain as shown in Example 5.1, “Constant Pending Message Limiter”.
Example 5.1. Constant Pending Message Limiter
<broker ... > <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
- specifying a multiplier that is applied to the prefetch limitThe
prefetchRatePendingMessageLimitStrategyimplementation allows you to specify a multiplier that is applied to the prefect limit. Example 5.2, “Prefectch Limit Based Pending Message Limiter” shown configuration that retains twice the prefect limit. So if the prefect limit is 3, the destination will retain 6 pending messages for each consumer.
Example 5.2. Prefectch Limit Based Pending Message Limiter
<broker ... > <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <pendingMessageLimitStrategy> <prefetchRatePendingMessageLimitStrategy multiplier="2"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
Aborting slow consumers
- a consumer is considered slow for specified amount of time
- a consumer is considered slow a specified number of times
Example 5.3. Aborting Slow Consumers
<broker ... > ... <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <slowConsumerStrategy> <abortSlowConsumerStrategy /> </slowConsumerStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
abortSlowConsumerStrategyelement activates the abort slow consumer strategy with default settings. Consumers that are considered slow for more than 30 seconds are aborted. You can modify when slow consumers are aborted using the attributes described in Table 5.1, “Settings for Abort Slow Consumer Strategy”.
Table 5.1. Settings for Abort Slow Consumer Strategy
|Specifies the number of times a consumer can be considered slow before it is aborted. |
|Specifies the maximum amount of time, in milliseconds, that a consumer can be continuously slow before it is aborted.|
|Specifies, in milliseconds, the time between checks for slow consumers.|
|Specifies whether the broker forces the consumer connection to close. The default value specifies that the broker will send a message to the consumer requesting it to close its connection. |
Example 5.4. Aborting Repeatedly Slow Consumers
<abortSlowConsumerStrategy maxSlowCount="30" />
Chapter 6. Persistent Messaging
6.1. Serializing to Disk
KahaDB message store
Synchronous dispatch through a persistent broker
Figure 6.1. Synchronous Dispatch through a Persistent Broker
- The broker pushes the message into the message store. Assuming that the
true, the message store also writes the message to disk, before the broker proceeds.
- The broker now sends the message to all of the interested consumers (but does not wait for consumer acknowledgements). For topics, the broker dispatches the message immediately, while for queues, the broker adds the message to a destination cursor.
- The broker then sends a receipt back to the producer. The receipt can thus be sent back before the consumers have finished acknowledging messages (in the case of topic messages, consumer acknowledgements are usually not required anyway).
Concurrent store and dispatch
Figure 6.2. Concurrent Store and Dispatch
- The broker pushes the message onto the message store and, concurrently, sends the message to all of the interested consumers. After sending the message to the consumers, the broker then sends a receipt back to the producer, without waiting for consumer acknowledgements or for the message store to synchronize to disk.
- As soon as the broker receives acknowledgements from all the consumers, the broker removes the message from the message store. Because consumers typically acknowledge messages faster than a message store can write them to disk, this often means that write to disk is optimized away entirely. That is, the message is removed from the message store before it is ever physically written to disk.
Configuring concurrent store and dispatch
concurrentStoreAndDispatchQueuesflag and the
concurrentStoreAndDispatchTopicsflag. By default, it is enabled for queues, but disabled for topics. To enable concurrent store and dispatch for both queues and topics, configure the
kahaDBelement in the broker configuration as follows:
<broker brokerName="broker" persistent="true" useShutdownHook="false"> ... <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb" concurrentStoreAndDispatchQueues="true" concurrentStoreAndDispatchTopics="true" /> </persistenceAdapter> </broker>
Reducing memory footprint of pending messages
reduceMemoryFootprintoption, as follows:
<broker ... > ... <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" reduceMemoryFootprint="true" /> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
reduceMemoryFootprintoption is enabled, a message's marshalled content is cleared immediately after the message is written to persistent storage. This results in approximately a 50% reduction in the amount of memory occupied by the pending messages.
6.2. KahaDB Optimization
Figure 6.3. KahaDB Architecture
persistenceAdapterelement containing a
<broker brokerName="broker" persistent="true" useShutdownHook="false"> ... <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/> </persistenceAdapter> </broker>
directoryproperty specifies the directory where the KahaDB files are stored and the
journalMaxFileLengthspecifies the maximum size of a data log file.
10000) specifies the size of the cache in units of pages (where one page is 4 KB by default). Generally, the cache should be as large as possible, to avoid swapping pages in and out of memory. Check the size of your metadata store file,
db.data, to get some idea of how big the cache needs to be.
1000) defines the threshold for the number of dirty indexes that are allowed to accumulate, before KahaDB writes the cache to the store. If you want to maximize the speed of the broker, you could set this property to a large value, so that the store is updated only during checkpoints. But this carries the risk of losing a large amount of metadata, in the event of a system failure (causing the broker to restart very slowly).
32mb) when the throughput of a broker is very large, you can fill up a journal file quite quickly. Because there is a cost associated with closing a full journal file and opening a new journal file, you can get a slight performance improvement by increasing the journal file size, so that this cost is incurred less frequently.
true) normally, the broker performs a disk sync (ensuring that a message has been physically written to disk) before sending the acknowledgement back to a producer. You can obtain a substantial improvement in broker performance by disabling disk syncs (setting this property to
false), but this reduces the reliability of the broker somewhat.WarningIf you need to satisfy the JMS durability requirement and be certain that you do not lose any messages, do not disable journal disk syncs.
Optimizing disk syncs
fdatasync()system call instead of the
fsync()system call (the default), when writing to a file. The difference between these system calls is that
fdatasync()updates only the file data, whereas
fsync()updates both the file data and the file metadata (for example, the access time).
etc/system.propertiesfile in your JBoss A-MQ installation:
fdatasync()system call. When this option is enabled, the JBoss A-MQ runtime actually makes a call to
java.nio.channels.FileChannel#force=false. For some JVMs, this can result in a call to
fdatasync()(so that the optimization is effective), but with other JVMs it might be implemented using
fsync()(so that the optimization has no effect).
fsync()on RHEL 6 is noticeably slower than on RHEL 4 (this is due to a bug fix in RHEL 6). So, this optimization works particularly well on the RHEL 6 platform, where
fdatasync()is significantly faster.
6.3. vmCursor on Destination
Configuring destinations to use the vmCursor
vmCursorfor all topics and queues, add the following lines to your broker configuration:
<broker ... > ... <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="Test.DLQ."/> </deadLetterStrategy> <pendingQueuePolicy> <vmQueueCursor /> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>
>, that matches all destination names. You could also specify a more selective destination pattern, so that the VM cursor would be enabled only for those destinations where you are sure that consumers can keep up with the message flow.
6.4. JMS Transactions
Improving efficiency using JMS transactions