Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 257. RabbitMQ Component

Available as of Camel version 2.12

The rabbitmq: component allows you produce and consume messages from RabbitMQ instances. Using the RabbitMQ AMQP client, this component offers a pure RabbitMQ approach over the generic AMQP component.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-rabbitmq</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

257.1. URI format

The old syntax is deprecated:

rabbitmq://hostname[:port]/exchangeName?[options]

Instead the hostname and port is configured on the component level, or can be provided as uri query parameters instead.

The new syntax is:

rabbitmq:exchangeName?[options]

Where hostname is the hostname of the running rabbitmq instance or cluster. Port is optional and if not specified then defaults to the RabbitMQ client default (5672). The exchange name determines which exchange produced messages will sent to. In the case of consumers, the exchange name determines which exchange the queue will bind to.

257.2. Options

The RabbitMQ component supports 49 options which are listed below.

NameDescriptionDefaultType

hostname (common)

The hostname of the running RabbitMQ instance or cluster.

 

String

portNumber (common)

Port number for the host with the running rabbitmq instance or cluster.

5672

int

username (security)

Username in case of authenticated access

guest

String

password (security)

Password for authenticated access

guest

String

vhost (common)

The vhost for the channel

/

String

addresses (common)

If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like server1:12345, server2:12345

 

String

connectionFactory (common)

To use a custom RabbitMQ connection factory. When this option is set, all connection options (connectionTimeout, requestedChannelMax…​) set on URI are not used

 

ConnectionFactory

threadPoolSize (consumer)

The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads.

10

int

autoDetectConnection Factory (advanced)

Whether to auto-detect looking up RabbitMQ connection factory from the registry. When enabled and a single instance of the connection factory is found then it will be used. An explicit connection factory can be configured on the component or endpoint level which takes precedence.

true

boolean

connectionTimeout (advanced)

Connection timeout

60000

int

requestedChannelMax (advanced)

Connection requested channel max (max number of channels offered)

0

int

requestedFrameMax (advanced)

Connection requested frame max (max size of frame offered)

0

int

requestedHeartbeat (advanced)

Connection requested heartbeat (heart-beat in seconds offered)

60

int

automaticRecovery Enabled (advanced)

Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application)

 

Boolean

networkRecoveryInterval (advanced)

Network recovery interval in milliseconds (interval used when recovering from network failure)

5000

Integer

topologyRecoveryEnabled (advanced)

Enables connection topology recovery (should topology recovery be performed)

 

Boolean

prefetchEnabled (consumer)

Enables the quality of service on the RabbitMQConsumer side. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

false

boolean

prefetchSize (consumer)

The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

 

int

prefetchCount (consumer)

The maximum number of messages that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

 

int

prefetchGlobal (consumer)

If the settings should be applied to the entire channel rather than each consumer You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

false

boolean

channelPoolMaxSize (producer)

Get maximum number of opened channel in pool

10

int

channelPoolMaxWait (producer)

Set the maximum number of milliseconds to wait for a channel from the pool

1000

long

requestTimeout (advanced)

Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)

20000

long

requestTimeoutChecker Interval (advanced)

Set requestTimeoutCheckerInterval for inOut exchange

1000

long

transferException (advanced)

When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response

false

boolean

publisher Acknowledgements (producer)

When true, the message will be published with publisher acknowledgements turned on

false

boolean

publisher AcknowledgementsTimeout (producer)

The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server

 

long

guaranteedDeliveries (producer)

When true, an exception will be thrown when the message cannot be delivered (basic.return) and the message is marked as mandatory. PublisherAcknowledgement will also be activated in this case. See also publisher acknowledgements - When will messages be confirmed.

false

boolean

mandatory (producer)

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. If the header is present rabbitmq.MANDATORY it will override this option.

false

boolean

immediate (producer)

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed. If the header is present rabbitmq.IMMEDIATE it will override this option.

false

boolean

args (advanced)

Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each: Exchange: arg.exchange. Queue: arg.queue. Binding: arg.binding. For example to declare a queue with message ttl argument: http://localhost:5672/exchange/queueargs=arg.queue.x-message-ttl=60000

 

Map

clientProperties (advanced)

Connection client properties (client info used in negotiating with the server)

 

Map

sslProtocol (security)

Enables SSL on connection, accepted value are true, TLS and 'SSLv3

 

String

trustManager (security)

Configure SSL trust manager, SSL should be enabled for this option to be effective

 

TrustManager

autoAck (consumer)

If messages should be auto acknowledged

true

boolean

autoDelete (common)

If it is true, the exchange will be deleted when it is no longer in use

true

boolean

durable (common)

If we are declaring a durable exchange (the exchange will survive a server restart)

true

boolean

exclusive (common)

Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes.

false

boolean

passive (common)

Passive queues depend on the queue already to be available at RabbitMQ.

false

boolean

skipQueueDeclare (common)

If true the producer will not declare and bind a queue. This can be used for directing messages via an existing routing key.

false

boolean

skipQueueBind (common)

If true the queue will not be bound to the exchange after declaring it

false

boolean

skipExchangeDeclare (common)

This can be used if we need to declare the queue but not the exchange

false

boolean

declare (common)

If the option is true, camel declare the exchange and queue name and bind them together. If the option is false, camel won’t declare the exchange and queue name on the server.

true

boolean

deadLetterExchange (common)

The name of the dead letter exchange

 

String

deadLetterQueue (common)

The name of the dead letter queue

 

String

deadLetterRoutingKey (common)

The routing key for the dead letter exchange

 

String

deadLetterExchangeType (common)

The type of the dead letter exchange

direct

String

allowNullHeaders (producer)

Allow pass null values to header

false

boolean

resolveProperty Placeholders (advanced)

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

boolean

The RabbitMQ endpoint is configured using URI syntax:

rabbitmq:exchangeName

with the following path and query parameters:

257.2.1. Path Parameters (1 parameters):

NameDescriptionDefaultType

exchangeName

Required The exchange name determines which exchange produced messages will sent to. In the case of consumers, the exchange name determines which exchange the queue will bind to.

 

String

257.2.2. Query Parameters (61 parameters):

NameDescriptionDefaultType

addresses (common)

If this option is set, camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like server1:12345, server2:12345

 

Address[]

autoDelete (common)

If it is true, the exchange will be deleted when it is no longer in use

true

boolean

connectionFactory (common)

To use a custom RabbitMQ connection factory. When this option is set, all connection options (connectionTimeout, requestedChannelMax…​) set on URI are not used

 

ConnectionFactory

deadLetterExchange (common)

The name of the dead letter exchange

 

String

deadLetterExchangeType (common)

The type of the dead letter exchange

direct

String

deadLetterQueue (common)

The name of the dead letter queue

 

String

deadLetterRoutingKey (common)

The routing key for the dead letter exchange

 

String

declare (common)

If the option is true, camel declare the exchange and queue name and bind them together. If the option is false, camel won’t declare the exchange and queue name on the server.

true

boolean

durable (common)

If we are declaring a durable exchange (the exchange will survive a server restart)

true

boolean

exchangeType (common)

The exchange type such as direct or topic.

direct

String

exclusive (common)

Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes.

false

boolean

hostname (common)

The hostname of the running rabbitmq instance or cluster.

 

String

passive (common)

Passive queues depend on the queue already to be available at RabbitMQ.

false

boolean

portNumber (common)

Port number for the host with the running rabbitmq instance or cluster. Default value is 5672.

 

int

queue (common)

The queue to receive messages from

 

String

routingKey (common)

The routing key to use when binding a consumer queue to the exchange. For producer routing keys, you set the header rabbitmq.ROUTING_KEY.

 

String

skipExchangeDeclare (common)

This can be used if we need to declare the queue but not the exchange

false

boolean

skipQueueBind (common)

If true the queue will not be bound to the exchange after declaring it

false

boolean

skipQueueDeclare (common)

If true the producer will not declare and bind a queue. This can be used for directing messages via an existing routing key.

false

boolean

vhost (common)

The vhost for the channel

/

String

autoAck (consumer)

If messages should be auto acknowledged

true

boolean

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

concurrentConsumers (consumer)

Number of concurrent consumers when consuming from broker. (eg similar as to the same option for the JMS component).

1

int

prefetchCount (consumer)

The maximum number of messages that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

 

int

prefetchEnabled (consumer)

Enables the quality of service on the RabbitMQConsumer side. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

false

boolean

prefetchGlobal (consumer)

If the settings should be applied to the entire channel rather than each consumer You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

false

boolean

prefetchSize (consumer)

The maximum amount of content (measured in octets) that the server will deliver, 0 if unlimited. You need to specify the option of prefetchSize, prefetchCount, prefetchGlobal at the same time

 

int

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

 

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange.

 

ExchangePattern

threadPoolSize (consumer)

The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads.

10

int

allowNullHeaders (producer)

Allow pass null values to header

false

boolean

bridgeEndpoint (producer)

If the bridgeEndpoint is true, the producer will ignore the message header of rabbitmq.EXCHANGE_NAME and rabbitmq.ROUTING_KEY

false

boolean

channelPoolMaxSize (producer)

Get maximum number of opened channel in pool

10

int

channelPoolMaxWait (producer)

Set the maximum number of milliseconds to wait for a channel from the pool

1000

long

guaranteedDeliveries (producer)

When true, an exception will be thrown when the message cannot be delivered (basic.return) and the message is marked as mandatory. PublisherAcknowledgement will also be activated in this case. See also publisher acknowledgements - When will messages be confirmed.

false

boolean

immediate (producer)

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed. If the header is present rabbitmq.IMMEDIATE it will override this option.

false

boolean

mandatory (producer)

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. If the header is present rabbitmq.MANDATORY it will override this option.

false

boolean

publisherAcknowledgements (producer)

When true, the message will be published with publisher acknowledgements turned on

false

boolean

publisherAcknowledgements Timeout (producer)

The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server

 

long

args (advanced)

Specify arguments for configuring the different RabbitMQ concepts, a different prefix is required for each: Exchange: arg.exchange. Queue: arg.queue. Binding: arg.binding. For example to declare a queue with message ttl argument: http://localhost:5672/exchange/queueargs=arg.queue.x-message-ttl=60000

 

Map

automaticRecoveryEnabled (advanced)

Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application)

 

Boolean

bindingArgs (advanced)

Deprecated Key/value args for configuring the queue binding parameters when declare=true

 

Map

clientProperties (advanced)

Connection client properties (client info used in negotiating with the server)

 

Map

connectionTimeout (advanced)

Connection timeout

60000

int

exchangeArgs (advanced)

Deprecated Key/value args for configuring the exchange parameters when declare=true

 

Map

exchangeArgsConfigurer (advanced)

Deprecated Set the configurer for setting the exchange args in Channel.exchangeDeclare

 

ArgsConfigurer

networkRecoveryInterval (advanced)

Network recovery interval in milliseconds (interval used when recovering from network failure)

5000

Integer

queueArgs (advanced)

Deprecated Key/value args for configuring the queue parameters when declare=true

 

Map

queueArgsConfigurer (advanced)

Deprecated Set the configurer for setting the queue args in Channel.queueDeclare

 

ArgsConfigurer

requestedChannelMax (advanced)

Connection requested channel max (max number of channels offered)

0

int

requestedFrameMax (advanced)

Connection requested frame max (max size of frame offered)

0

int

requestedHeartbeat (advanced)

Connection requested heartbeat (heart-beat in seconds offered)

60

int

requestTimeout (advanced)

Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)

20000

long

requestTimeoutChecker Interval (advanced)

Set requestTimeoutCheckerInterval for inOut exchange

1000

long

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

topologyRecoveryEnabled (advanced)

Enables connection topology recovery (should topology recovery be performed)

 

Boolean

transferException (advanced)

When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response

false

boolean

password (security)

Password for authenticated access

guest

String

sslProtocol (security)

Enables SSL on connection, accepted value are true, TLS and 'SSLv3

 

String

trustManager (security)

Configure SSL trust manager, SSL should be enabled for this option to be effective

 

TrustManager

username (security)

Username in case of authenticated access

guest

String

See http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html and the AMQP specification for more information on connection options.

257.3. Using connection factory

To connect to RabbitMQ you can setup a ConnectionFactory (same as with JMS) with the login details such as:

<bean id="rabbitConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
  <property name="host" value="localhost"/>
  <property name="port" value="5672"/>
  <property name="username" value="camel"/>
  <property name="password" value="bugsbunny"/>
</bean>

And then refer to the connection factory in the endpoint uri as shown below:

<camelContext>
  <route>
    <from uri="direct:rabbitMQEx2"/>
    <to uri="rabbitmq:ex2?connectionFactory=#rabbitConnectionFactory"/>
  </route>
</camelContext>

From Camel 2.21 onwards the ConnectionFactory is auto-detected by default, so you can just do

<camelContext>
  <route>
    <from uri="direct:rabbitMQEx2"/>
    <to uri="rabbitmq:ex2"/>
  </route>
</camelContext>

257.4. Message Headers

The following headers are set on exchanges when consuming messages.

PropertyValue

rabbitmq.ROUTING_KEY

The routing key that was used to receive the message, or the routing key that will be used when producing a message

rabbitmq.EXCHANGE_NAME

The exchange the message was received from

rabbitmq.DELIVERY_TAG

The rabbitmq delivery tag of the received message

rabbitmq.REDELIVERY_TAG

Whether the message is a redelivered

rabbitmq.REQUEUE

Camel 2.14.2: This is used by the consumer to control rejection of the message. When the consumer is complete processing the exchange, and if the exchange failed, then the consumer is going to reject the message from the RabbitMQ broker. The value of this header controls this behavior. If the value is false (by default) then the message is discarded/dead-lettered. If the value is true, then the message is re-queued. 

The following headers are used by the producer. If these are set on the camel exchange then they will be set on the RabbitMQ message.

PropertyValue

rabbitmq.ROUTING_KEY

The routing key that will be used when sending the message

rabbitmq.EXCHANGE_NAME

The exchange the message was received from

rabbitmq.EXCHANGE_OVERRIDE_NAME

Camel 2.21: Used for force sending the message to this exchange instead of the endpoint configured name on the producer

rabbitmq.CONTENT_TYPE

The contentType to set on the RabbitMQ message

rabbitmq.PRIORITY

The priority header to set on the RabbitMQ message

rabbitmq.CORRELATIONID

The correlationId to set on the RabbitMQ message

rabbitmq.MESSAGE_ID

The message id to set on the RabbitMQ message

rabbitmq.DELIVERY_MODE

If the message should be persistent or not

rabbitmq.USERID

The userId to set on the RabbitMQ message

rabbitmq.CLUSTERID

The clusterId to set on the RabbitMQ message

rabbitmq.REPLY_TO

The replyTo to set on the RabbitMQ message

rabbitmq.CONTENT_ENCODING

The contentEncoding to set on the RabbitMQ message

rabbitmq.TYPE

The type to set on the RabbitMQ message

rabbitmq.EXPIRATION

The expiration to set on the RabbitMQ message

rabbitmq.TIMESTAMP

The timestamp to set on the RabbitMQ message

rabbitmq.APP_ID

The appId to set on the RabbitMQ message

Headers are set by the consumer once the message is received. The producer will also set the headers for downstream processors once the exchange has taken place. Any headers set prior to production that the producer sets will be overriden.

257.5. Message Body

The component will use the camel exchange in body as the rabbit mq message body. The camel exchange in object must be convertible to a byte array. Otherwise the producer will throw an exception of unsupported body type.

257.6. Samples

To receive messages from a queue that is bound to an exchange A with the routing key B,

from("rabbitmq:A?routingKey=B")

To receive messages from a queue with a single thread with auto acknowledge disabled.

from("rabbitmq:A?routingKey=B&threadPoolSize=1&autoAck=false")

To send messages to an exchange called C

to("rabbitmq:C")

Declaring a headers exchange and queue

from("rabbitmq:ex?exchangeType=headers&queue=q&bindingArgs=#bindArgs")

and place corresponding Map<String, Object> with the id of "bindArgs" in the Registry.

For example declaring a method in spring

@Bean(name="bindArgs")
public Map<String, Object> bindArgsBuilder() {
    return Collections.singletonMap("foo", "bar");
}

257.6.1. Issue when routing between exchanges (in Camel 2.20.x or older)

If you for example want to route messages from one Rabbit exchange to another as shown in the example below with foo → bar:

from("rabbitmq:foo")
  .to("rabbitmq:bar")

Then beware that Camel will route the message to itself, eg foo → foo. So why is that? This is because the consumer that receives the message (eg from) provides the message header rabbitmq.EXCHANGE_NAME with the name of the exchange, eg foo. And when the Camel producer is sending the message to bar then the header rabbitmq.EXCHANGE_NAME will override this and instead send the message to foo.

To avoid this you need to either:

  • Remove the header:
from("rabbitmq:foo")
  .removeHeader("rabbitmq.EXCHANGE_NAME")
  .to("rabbitmq:bar")
  • Or turn on bridgeEndpoint mode on the producer:
from("rabbitmq:foo")
  .to("rabbitmq:bar?bridgeEndpoint=true")

From Camel 2.21 onwards this has been improved so you can easily route between exchanges. The header rabbitmq.EXCHANGE_NAME is not longer used by the producer to override the destination exchange. Instead a new header rabbitmq.EXCHANGE_OVERRIDE_NAME can be used to send to a different exchange. For example to send to cheese exchange you can do

from("rabbitmq:foo")
  .setHeader("rabbitmq.EXCHANGE_OVERRIDE_NAME", constant("cheese"))
  .to("rabbitmq:bar")