Red Hat Training

A Red Hat training course is available for Red Hat Fuse

7.6. Resequencer

Overview

The resequencer pattern, shown in Figure 7.8, “Resequencer Pattern”, enables you to resequence messages according to a sequencing expression. Messages that generate a low value for the sequencing expression are moved to the front of the batch and messages that generate a high value are moved to the back.

Figure 7.8. Resequencer Pattern

Resequencer pattern
Apache Camel supports two resequencing algorithms:
  • Batch resequencing — Collects messages into a batch, sorts the messages and sends them to their output.
  • Stream resequencing — Re-orders (continuous) message streams based on the detection of gaps between messages.
By default the resequencer does not support duplicate messages and will only keep the last message, in cases where a message arrives with the same message expression. However, in batch mode you can enable the resequencer to allow duplicates.

Batch resequencing

The batch resequencing algorithm is enabled by default. For example, to resequence a batch of incoming messages based on the value of a timestamp contained in the TimeStamp header, you can define the following route in Java DSL:
from("direct:start").resequence(header("TimeStamp")).to("mock:result");
By default, the batch is obtained by collecting all of the incoming messages that arrive in a time interval of 1000 milliseconds (default batch timeout), up to a maximum of 100 messages (default batch size). You can customize the values of the batch timeout and the batch size by appending a batch() DSL command, which takes a BatchResequencerConfig instance as its sole argument. For example, to modify the preceding route so that the batch consists of messages collected in a 4000 millisecond time window, up to a maximum of 300 messages, you can define the Java DSL route as follows:
import org.apache.camel.model.config.BatchResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result");
    }
};
You can also specify a batch resequencer pattern using XML configuration. The following example defines a batch resequencer with a batch size of 300 and a batch timeout of 4000 milliseconds:
<camelContext id="resequencerBatch" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start" />
    <resequence>
      <!-- 
        batch-config can be omitted for default (batch) resequencer settings
      -->
      <batch-config batchSize="300" batchTimeout="4000" />
      <simple>header.TimeStamp</simple>
      <to uri="mock:result" />
    </resequence>
  </route>
</camelContext>

Batch options

Table 7.4, “Batch Resequencer Options” shows the options that are available in batch mode only.

Table 7.4. Batch Resequencer Options

Java DSLXML DSLDefaultDescription
allowDuplicates()batch-config/@allowDuplicatesfalseIf true, do not discard duplicate messages from the batch (where duplicate means that the message expression evaluates to the same value).
reverse()batch-config/@reversefalseIf true, put the messages in reverse order (where the default ordering applied to a message expression is based on Java's string lexical ordering, as defined by String.compareTo()).
For example, if you want to resequence messages from JMS queues based on JMSPriority, you would need to combine the options, allowDuplicates and reverse, as follows:
from("jms:queue:foo")
        // sort by JMSPriority by allowing duplicates (message can have same JMSPriority)
        // and use reverse ordering so 9 is first output (most important), and 0 is last
        // use batch mode and fire every 3th second
        .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse()
        .to("mock:result");

Stream resequencing

To enable the stream resequencing algorithm, you must append stream() to the resequence() DSL command. For example, to resequence incoming messages based on the value of a sequence number in the seqnum header, you define a DSL route as follows:
from("direct:start").resequence(header("seqnum")).stream().to("mock:result");
The stream-processing resequencer algorithm is based on the detection of gaps in a message stream, rather than on a fixed batch size. Gap detection, in combination with timeouts, removes the constraint of needing to know the number of messages of a sequence (that is, the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the successor of 3 is missing. The resequencer therefore must retain message 5 until message 4 arrives (or a timeout occurs).
By default, the stream resequencer is configured with a timeout of 1000 milliseconds, and a maximum message capacity of 100. To customize the stream's timeout and message capacity, you can pass a StreamResequencerConfig object as an argument to stream(). For example, to configure a stream resequencer with a message capacity of 5000 and a timeout of 4000 milliseconds, you define a route as follows:
// Java
import org.apache.camel.model.config.StreamResequencerConfig;

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("direct:start").resequence(header("seqnum")).
            stream(new StreamResequencerConfig(5000, 4000L)).
            to("mock:result");
    }
};
If the maximum time delay between successive messages (that is, messages with adjacent sequence numbers) in a message stream is known, the resequencer's timeout parameter should be set to this value. In this case, you can guarantee that all messages in the stream are delivered in the correct order to the next processor. The lower the timeout value that is compared to the out-of-sequence time difference, the more likely it is that the resequencer will deliver messages out of sequence. Large timeout values should be supported by sufficiently high capacity values, where the capacity parameter is used to prevent the resequencer from running out of memory.
If you want to use sequence numbers of some type other than long, you would must define a custom comparator, as follows:
// Java
ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator);
from("direct:start").resequence(header("seqnum")).stream(config).to("mock:result");
You can also specify a stream resequencer pattern using XML configuration. The following example defines a stream resequencer with a message capacity of 5000 and a timeout of 4000 milliseconds:
<camelContext id="resequencerStream" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <resequence>
      <stream-config capacity="5000" timeout="4000"/>
      <simple>header.seqnum</simple>
      <to uri="mock:result" />
    </resequence>
  </route>
</camelContext>

Ignore invalid exchanges

The resequencer EIP throws a CamelExchangeException exception, if the incoming exchange is not valid—that is, if the sequencing expression cannot be evaluated for some reason (for example, due to a missing header). You can use the ignoreInvalidExchanges option to ignore these exceptions, which means the resequencer will skip any invalid exchanges.
from("direct:start")
  .resequence(header("seqno")).batch().timeout(1000)
    // ignore invalid exchanges (they are discarded)
    .ignoreInvalidExchanges()
  .to("mock:result");

Reject old messages

The rejectOld option can be used to prevent messages being sent out of order, regardless of the mechanism used to resequence messages. When the rejectOld option is enabled, the resequencer rejects an incoming message (by throwing a MessageRejectedException exception), if the incoming messages is older (as defined by the current comparator) than the last delivered message.
from("direct:start")
    .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
    .resequence(header("seqno")).stream().timeout(1000).rejectOld()
    .to("mock:result");