Chapter 132. RabbitMQ

RabbitMQ Component

Available as of Camel 2.12
The rabbitmq: component allows you produce and consume messages from RabbitMQ instances. Using the RabbitMQ AMQP client, this component offers a pure RabbitMQ approach over the generic AMQP component.
Maven users will need to add the following dependency to their pom.xml for this component:
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-rabbitmq</artifactId>
    <version>2.17.0.redhat-630187</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

rabbitmq://hostname[:port]/exchangeName?[options]
Where hostname is the hostname of the running rabbitmq instance or cluster. Port is optional and if not specified then defaults to the RabbitMQ client default (5672). The exchange name determines which exchange produced messages will sent to. In the case of consumers, the exchange name determines which exchange the queue will bind to.

Options

Property Default Description
autoAck true If messages should be auto acknowledged.
autoDelete true If true, the exchange will be deleted when it is no longer in use.
durable true If we are declaring a durable exchange (the exchange will survive a server restart).
queue random uuid The queue to receive messages from.
routingKey null The routing key to use when binding a consumer queue to the exchange. For producer routing keys, you set the header (see header section).
threadPoolSize 10 The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads.
username null username in case of authenticated access.
password null password for authenticated access.
vhost / the vhost for the channel.
exchangeType
direct
Camel 2.12.2: The exchange type such as direct or topic.
bridgeEndpoint
false
Camel 2.12.3: If the bridgeEndpoint is true, the producer will ignore the message header of "rabbitmq.EXCHANGE_NAME" and "rabbitmq.ROUTING_KEY"
addresses
null
Camel 2.12.3: If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like "server1:12345, server2:12345".
connectionTimeout
0
Camel 2.14: Connection timeout.
requestedChannelMax
0
Camel 2.14: Connection requested channel max (max number of channels offered).
requestedFrameMax
0
Camel 2.14: Connection requested frame max (max size of frame offered).
requestedHeartbeat
0
Camel 2.14: Connection requested heartbeat (heart-beat in seconds offered).
sslProtocol
null
Camel 2.14: Enables SSL on connection, accepted value are `true`, `TLS` and 'SSLv3`
trustManager
null
Camel 2.14: Configure SSL trust manager, SSL should be enabled for this option to be effective.
clientProperties
null
Camel 2.14: Connection client properties (client info used in negotiating with the server).
connectionFactory
null
Camel 2.14: Custom RabbitMQ connection factory. When this option is set, all connection options (connectionTimeout, requestedChannelMax...) set on URI are not used.
automaticRecoveryEnabled
false
Camel 2.14: Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application).
networkRecoveryInterval
5000
Camel 2.14: Network recoverty interval in milliseconds (interval used when recovering from network failure).
topologyRecoveryEnabled
true
Camel 2.14: Enables connection topology recovery (should topology recovery be performed?).
prefetchEnabled
false
Camel 2.14: Enables the quality of service on the RabbitMQConsumer side, you need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time
prefetchSize
0
Camel 2.14: The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited.
prefetchCount
0
Camel 2.14: The maximum number of messages that the server will deliver, 0 if unlimited.
prefetchGlobal
false
Camel 2.14: If the settings should be applied to the entire channel rather than each consumer.
declare true Camel 2.14: If the option is true, camel declare the exchange and queue name and bind them together. If the option is false, camel won't declare the exchange and queue name on the server.
concurrentConsumers 1 Camel 2.14: Number of concurrent consumers when consuming from broker. (eg similar as to the same option for the JMS component).
deadLetterRoutingKey Camel 2.14: The routing key for the dead letter exchange.
deadLetterExchange Camel 2.14: The name of the dead letter exchange.
deadLetterExchangeType direct Camel 2.14: The type of the dead letter exchange.
channelPoolMaxSize 10 Camel 2.14.1 (Producer only): Maximum number of channels used to send messages.
channelPoolMaxWait 1000 Camel 2.14.1 (Producer only): Maximum time (in milliseconds) waiting for a channel.
queueArgsConfigurer null Camel 2.15.1: The custom ArgsConfigurer instance which could be used to configure the Args map when declaring the queue.
exchangeArgsConfigurer null Camel 2.15.1: The custom ArgsConfigurer instance which could be used to configure the Args map when declaring the exchange.
requestTimeout 20000 Camel 2.16: Producer Only. The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). The default is 20 seconds. See also the requestTimeoutCheckerInterval option.
requestTimeoutCheckerInterval 1000 Camel 2.16: Configures how often Camel should check for timed out Exchanges when doing request/reply over RabbitMQ. By default Camel checks once per second. But if you must react faster when a timeout occurs, then you can lower this interval, to check more frequently. The timeout is determined by the option requestTimeout.
transferException false Camel 2.16: If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, then the caused Exception will be sent back in the response as a byte[]. If the client is Camel, the returned Exception is rethrown. This allows you to use Camel RabbitMQ as a bridge in your routing - for example, using persistent queues to enable robust routing. The caught exception is required to be serializable. The original Exception on the consumer side can be wrapped in an outer exception such as org.apache.camel.RuntimeCamelException when returned to the producer.
skipQueueDeclare false Camel 2.16.1: If true, the producer will not declare and bind a queue. This can be used for directing messages via an existing routing key.
publisherAcknowledgements false Camel 2.17: If true, the message will be published with publisher acknowledgements turned on.
publisherAcknowledgementsTimeout 0 Camel 2.17: The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server.

Custom connection factory

<bean id="customConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
  <property name="host" value="localhost"/>
  <property name="port" value="5672"/>
  <property name="username" value="camel"/>
  <property name="password" value="bugsbunny"/>
</bean>
<camelContext>
  <route>
    <from uri="direct:rabbitMQEx2"/>
    <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory"/>
  </route>
</camelContext>

Headers

The following headers are set on exchanges when consuming messages.
Property Value
rabbitmq.ROUTING_KEY The routing key that was used to receive the message, or the routing key that will be used when producing a message
rabbitmq.EXCHANGE_NAME The exchange the message was received from
rabbitmq.DELIVERY_TAG The rabbitmq delivery tag of the received message
rabbitmq.REQUEUE Camel 2.14.2: This is used by the consumer to control rejection of the message. When the consumer is complete processing the exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. The value of this header controls this behaviour. If false, the message is discarded/dead-lettered. If true, the message is re-queued. Default is false.
The following headers are used by the producer. If these are set on the camel exchange then they will be set on the RabbitMQ message.
Property Value
rabbitmq.ROUTING_KEY The routing key that will be used when sending the message
rabbitmq.EXCHANGE_NAME The exchange the message was received from, or sent to
rabbitmq.CONTENT_TYPE The contentType to set on the RabbitMQ message
rabbitmq.PRIORITY The priority header to set on the RabbitMQ message
rabbitmq.CORRELATIONID The correlationId to set on the RabbitMQ message
rabbitmq.MESSAGE_ID The message id to set on the RabbitMQ message
rabbitmq.DELIVERY_MODE If the message should be persistent or not
rabbitmq.USERID The userId to set on the RabbitMQ message
rabbitmq.CLUSTERID The clusterId to set on the RabbitMQ message
rabbitmq.REPLY_TO The replyTo to set on the RabbitMQ message
rabbitmq.CONTENT_ENCODING The contentEncoding to set on the RabbitMQ message
rabbitmq.TYPE The type to set on the RabbitMQ message
rabbitmq.EXPIRATION The expiration to set on the RabbitMQ message
rabbitmq.TIMESTAMP The timestamp to set on the RabbitMQ message
rabbitmq.APP_ID The appId to set on the RabbitMQ message
Headers are set by the consumer once the message is received. The producer will also set the headers for downstream processors once the exchange has taken place. Any headers set prior to production that the producer sets will be overriden.

Message Body

The component will use the camel exchange in body as the rabbit mq message body. The camel exchange in object must be convertible to a byte array. Otherwise the producer will throw an exception of unsupported body type.

Samples

To receive messages from a queue that is bound to an exchange A with the routing key B,
from("rabbitmq://localhost/A?routingKey=B")
To receive messages from a queue with a single thread with auto acknowledge disabled.
from("rabbitmq://localhost/A?routingKey=B&threadPoolSize=1&autoAck=false")
To send messages to an exchange called C
...to("rabbitmq://localhost/B")