Red Hat Training
A Red Hat training course is available for Red Hat Fuse
8.12. Composed Message Processor
Composed Message Processor
The composed message processor pattern, as shown in Figure 8.11, “Composed Message Processor Pattern”, allows you to process a composite message by splitting it up, routing the sub-messages to appropriate destinations, and then re-aggregating the responses back into a single message.
Figure 8.11. Composed Message Processor Pattern
Java DSL example
The following example checks that a multipart order can be filled, where each part of the order requires a check to be made at a different inventory:
// split up the order so individual OrderItems can be validated by the appropriate bean from("direct:start") .split().body() .choice() .when().method("orderItemHelper", "isWidget") .to("bean:widgetInventory") .otherwise() .to("bean:gadgetInventory") .end() .to("seda:aggregate"); // collect and re-assemble the validated OrderItems into an order again from("seda:aggregate") .aggregate(new MyOrderAggregationStrategy()) .header("orderId") .completionTimeout(1000L) .to("mock:result");
XML DSL example
The preceding route can also be written in XML DSL, as follows:
<route> <from uri="direct:start"/> <split> <simple>body</simple> <choice> <when> <method bean="orderItemHelper" method="isWidget"/> <to uri="bean:widgetInventory"/> </when> <otherwise> <to uri="bean:gadgetInventory"/> </otherwise> </choice> <to uri="seda:aggregate"/> </split> </route> <route> <from uri="seda:aggregate"/> <aggregate strategyRef="myOrderAggregatorStrategy" completionTimeout="1000"> <correlationExpression> <simple>header.orderId</simple> </correlationExpression> <to uri="mock:result"/> </aggregate> </route>
Processing starts by splitting the order, using a Splitter. The Splitter then sends individual
OrderItemsto a Content Based Router, which routes messages based on the item type. Widget items get sent for checking in the
widgetInventorybean and gadget items get sent to the
gadgetInventorybean. Once these
OrderItemshave been validated by the appropriate bean, they are sent on to the Aggregator which collects and re-assembles the validated
OrderItemsinto an order again.
Each received order has a header containing an order ID. We make use of the order ID during the aggregation step: the
.header("orderId")qualifier on the
aggregate()DSL command instructs the aggregator to use the header with the key,
orderId, as the correlation expression.
For full details, check the example source here: