Chapter 135. Hazelcast Queue Component

Available as of Camel version 2.7

The Hazelcast Queue component is one of Camel Hazelcast Components which allows you to access Hazelcast distributed queue.

135.1. Options

The Hazelcast Queue component supports 3 options which are listed below.

NameDescriptionDefaultType

hazelcastInstance (advanced)

The hazelcast instance reference which can be used for hazelcast endpoint. If you don’t specify the instance reference, camel use the default hazelcast instance from the camel-hazelcast instance.

 

HazelcastInstance

hazelcastMode (advanced)

The hazelcast mode reference which kind of instance should be used. If you don’t specify the mode, then the node mode will be the default.

node

String

resolveProperty Placeholders (advanced)

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

boolean

The Hazelcast Queue endpoint is configured using URI syntax:

hazelcast-queue:cacheName

with the following path and query parameters:

135.1.1. Path Parameters (1 parameters):

NameDescriptionDefaultType

cacheName

Required The name of the cache

 

String

135.1.2. Query Parameters (16 parameters):

NameDescriptionDefaultType

defaultOperation (common)

To specify a default operation to use, if no operation header has been provided.

 

HazelcastOperation

hazelcastInstance (common)

The hazelcast instance reference which can be used for hazelcast endpoint.

 

HazelcastInstance

hazelcastInstanceName (common)

The hazelcast instance reference name which can be used for hazelcast endpoint. If you don’t specify the instance reference, camel use the default hazelcast instance from the camel-hazelcast instance.

 

String

reliable (common)

Define if the endpoint will use a reliable Topic struct or not.

false

boolean

bridgeErrorHandler (consumer)

Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored.

false

boolean

pollingTimeout (consumer)

Define the polling timeout of the Queue consumer in Poll mode

10000

long

poolSize (consumer)

Define the Pool size for Queue Consumer Executor

1

int

queueConsumerMode (consumer)

Define the Queue Consumer mode: Listen or Poll

Listen

HazelcastQueueConsumer Mode

exceptionHandler (consumer)

To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored.

 

ExceptionHandler

exchangePattern (consumer)

Sets the exchange pattern when the consumer creates an exchange.

 

ExchangePattern

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

concurrentConsumers (seda)

To use concurrent consumers polling from the SEDA queue.

1

int

onErrorDelay (seda)

Milliseconds before consumer continues polling after an error has occurred.

1000

int

pollTimeout (seda)

The timeout used when consuming from the SEDA queue. When a timeout occurs, the consumer can check whether it is allowed to continue running. Setting a lower value allows the consumer to react more quickly upon shutdown.

1000

int

transacted (seda)

If set to true then the consumer runs in transaction mode, where the messages in the seda queue will only be removed if the transaction commits, which happens when the processing is complete.

false

boolean

transferExchange (seda)

If set to true the whole Exchange will be transfered. If header or body contains not serializable objects, they will be skipped.

false

boolean

135.2. Queue producer – to(“hazelcast-queue:foo”)

The queue producer provides 10 operations: * add * put * poll * peek * offer * remove value * remaining capacity * remove all * remove if * drain to * take * retain all

135.2.1. Sample for add:

from("direct:add")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.ADD))
.toF("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX);

135.2.2. Sample for put:

from("direct:put")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.PUT))
.toF("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX);

135.2.3. Sample for poll:

from("direct:poll")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.POLL))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

135.2.4. Sample for peek:

from("direct:peek")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.PEEK))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

135.2.5. Sample for offer:

from("direct:offer")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.OFFER))
.toF("hazelcast:%sbar", HazelcastConstants.QUEUE_PREFIX);

135.2.6. Sample for removevalue:

from("direct:removevalue")
.setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMOVE_VALUE))
.toF("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX);

135.2.7. Sample for remaining capacity:

from("direct:remaining-capacity").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMAINING_CAPACITY)).to(
String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));

135.2.8. Sample for remove all:

from("direct:removeAll").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMOVE_ALL)).to(
String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));

135.2.9. Sample for remove if:

from("direct:removeIf").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.REMOVE_IF)).to(
String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));

135.2.10. Sample for drain to:

from("direct:drainTo").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.DRAIN_TO)).to(
String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));

135.2.11. Sample for take:

from("direct:take").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.TAKE)).to(
String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));

135.2.12. Sample for retain all:

from("direct:retainAll").setHeader(HazelcastConstants.OPERATION, constant(HazelcastOperation.RETAIN_ALL)).to(
String.format("hazelcast-%sbar", HazelcastConstants.QUEUE_PREFIX));

135.3. Queue consumer – from(“hazelcast-queue:foo”)

The queue consumer provides two different modes:

  • Poll
  • Listen

Sample for Poll mode

fromF("hazelcast-%sfoo?queueConsumerMode=Poll", HazelcastConstants.QUEUE_PREFIX)).to("mock:result");

In this way the consumer will poll the queue and return the head of the queue or null after a timeout.

In Listen mode instead the consumer will listen for events on queue.

The queue consumer in Listen mode provides 2 operations: * add * remove

Sample for Listen mode

fromF("hazelcast-%smm", HazelcastConstants.QUEUE_PREFIX)
   .log("object...")
   .choice()
    .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.ADDED))
            .log("...added")
        .to("mock:added")
    .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.REMOVED))
        .log("...removed")
        .to("mock:removed")
    .otherwise()
        .log("fail!");