Red Hat Training

A Red Hat training course is available for Red Hat Fuse

8.12. Composed Message Processor

Composed Message Processor

The composed message processor pattern, as shown in Figure 8.11, “Composed Message Processor Pattern”, allows you to process a composite message by splitting it up, routing the sub-messages to appropriate destinations, and then re-aggregating the responses back into a single message.

Figure 8.11. Composed Message Processor Pattern

Composed Message Processor Pattern

Java DSL example

The following example checks that a multipart order can be filled, where each part of the order requires a check to be made at a different inventory:
// split up the order so individual OrderItems can be validated by the appropriate bean
from("direct:start")
    .split().body()
    .choice() 
        .when().method("orderItemHelper", "isWidget")
            .to("bean:widgetInventory")
        .otherwise()
            .to("bean:gadgetInventory")
    .end()
    .to("seda:aggregate");

// collect and re-assemble the validated OrderItems into an order again
from("seda:aggregate")
    .aggregate(new MyOrderAggregationStrategy())
    .header("orderId")
    .completionTimeout(1000L)
    .to("mock:result");

XML DSL example

The preceding route can also be written in XML DSL, as follows:
 <route>
   <from uri="direct:start"/>
   <split>
     <simple>body</simple>
     <choice>
       <when>
         <method bean="orderItemHelper" method="isWidget"/>
 	<to uri="bean:widgetInventory"/>
       </when>
       <otherwise>
 	<to uri="bean:gadgetInventory"/>
       </otherwise>
     </choice>
     <to uri="seda:aggregate"/>
   </split>
 </route>
 
 <route>
   <from uri="seda:aggregate"/>
   <aggregate strategyRef="myOrderAggregatorStrategy" completionTimeout="1000">
     <correlationExpression>
       <simple>header.orderId</simple>
     </correlationExpression>
     <to uri="mock:result"/>
   </aggregate>
 </route>

Processing steps

Processing starts by splitting the order, using a Splitter. The Splitter then sends individual OrderItems to a Content Based Router, which routes messages based on the item type. Widget items get sent for checking in the widgetInventory bean and gadget items get sent to the gadgetInventory bean. Once these OrderItems have been validated by the appropriate bean, they are sent on to the Aggregator which collects and re-assembles the validated OrderItems into an order again.
Each received order has a header containing an order ID. We make use of the order ID during the aggregation step: the .header("orderId") qualifier on the aggregate() DSL command instructs the aggregator to use the header with the key, orderId, as the correlation expression.
For full details, check the ComposedMessageProcessorTest.java example source at camel-core/src/test/java/org/apache/camel/processor.