OnCompletion
Camel has this concept of a Unit of Work that encompass the
Exchange. The unit of work among others supports
synchronization callbacks that are invoked when the
Exchange is complete. The callback API is defined in
org.apache.camel.spi.Synchronization
. and the extended synchronization
org.apache.camel.spi.SynchronizationRouteAware
that have callbacks for
route events.
Getting the UnitOfWork
You can get hold of the org.apache.camel.spi.UnitOfWork
from
org.apache.camel.Exchange
with the method getUnitOfWork()
.
OnCompletion DSL
The onCompletion EIP supports the following features:
-
scope: global and/or per route (route scope override all global scope)
-
multiple global scope
-
triggered either always, only if completed with success, or only if failed
-
onWhen
predicate to only trigger if matched -
mode
to define whether to run either before or after route consumer writes response back to callee (if its InOut) (default AfterConsumer) -
parallelProcessing
whether to run async or sync (use a thread pool or not) (default false)
The onCompletion supports running the completion task in either synchronous or asynchronous mode (using a thread pool) and also whether to run before or after the route consumer is done. The reason is to give more flexibility. For example to specify to run synchronous and before the route consumer is done, which allows to modify the exchange before the consumer writes back any response to the callee. You can use this to for example add customer headers, or send to a log to log the response message, etc.
onCompletion with route scope
The onCompletion DSL allows you to add custom routes/processors when the original Exchange is complete. Camel spin-off a copy of the Exchange and routes it in a separate thread, kinda like a Wire Tap. This allows the original thread to continue while the onCompletion route is running concurrently. We decided for this model as we did not want the onCompletion route to interfere with the original route.
Only 1 onCompletion supported by route scope
You can only have 1 onCompletion in a route. Only at context scoped level you can have multiple. And notice that when you use a route scoped onCompletion then any context scoped are disabled for that given route.
from("direct:start")
.onCompletion()
// this route is only invoked when the original route is complete as a kind
// of completion callback
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route contiues
.process(new MyProcessor())
.to("mock:result");
By default the onCompletion will be triggered when the
Exchange is complete and regardless if the
Exchange completed with success or with a failure
(such as an Exception was thrown). You can limit the trigger to only
occur onCompleteOnly
or by onFailureOnly
as shown below:
from("direct:start")
// here we qualify onCompletion to only invoke when the exchange failed (exception or FAULT body)
.onCompletion().onFailureOnly()
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route continues
.process(new MyProcessor())
.to("mock:result");
You can identify if the Exchange is an
onCompletion Exchange as Camel will add the
property Exchange.ON_COMPLETION
with a boolean value of true
when it
spin-off the onCompletion Exchange.
Using onCompletion from XML DSL
The onCompletion is defined like this with XML DSL:
<route>
<from uri="direct:start"/>
<!-- this onCompletion block will only be executed when the exchange is done being routed -->
<!-- this callback is always triggered even if the exchange failed -->
<onCompletion>
<!-- so this is a kinda like an after completion callback -->
<to uri="log:sync"/>
<to uri="mock:sync"/>
</onCompletion>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
And the onCompleteOnly
and onFailureOnly
is defined as a boolean
attribute on the <onCompletion> tag so the failure example would be:
<route>
<from uri="direct:start"/>
<!-- this onCompletion block will only be executed when the exchange is done being routed -->
<!-- this callback is only triggered when the exchange failed, as we have onFailure=true -->
<onCompletion onFailureOnly="true">
<to uri="log:sync"/>
<to uri="mock:sync"/>
</onCompletion>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
onCompletion with global scope
This works just like the route scope except from the fact that they are defined globally. An example below:
// define a global on completion that is invoked when the exchange is complete
onCompletion().to("log:global").to("mock:sync");
from("direct:start")
.process(new MyProcessor())
.to("mock:result");
Using onCompletion from Spring DSL
This works just like the route scope except from the fact that they are defined globally. An example below:
<!-- this is a global onCompletion route that is invoke when any exchange is complete
as a kind of after callback -->
<onCompletion>
<to uri="log:global"/>
<to uri="mock:sync"/>
</onCompletion>
<route>
<from uri="direct:start"/>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
Route scope override Global scope If an onCompletion is defined in a route, it overrides all global scoped and thus it is only the route scoped that are used. The globally scoped are not in use.
Using onCompletion with onWhen predicate
As other DSL in Camel you can attach a predicate to
the onCompletion so it only triggers in certain conditions, when the
predicate matches. For example to only trigger if the message body contains the word
Hello
we can do like:
from("direct:start")
.onCompletion().onWhen(body().contains("Hello"))
// this route is only invoked when the original route is complete as a kind
// of completion callback. And also only if the onWhen predicate is true
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route contiues
.to("log:original")
.to("mock:result");
Using onCompletion with or without thread pool
To use thread pool then either set a executorService
or set
parallelProcessing
to true.
For example in Java DSL do
onCompletion().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"));
And in XML DSL
<onCompletion parallelProcessing="true">
<to uri="before"/>
<delay><constant>1000</constant></delay>
<setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>
You can also refer to a specific thread pool to be used, using the executorServiceRef option
<onCompletion executorServiceRef="myThreadPool">
<to uri="before"/>
<delay><constant>1000</constant></delay>
<setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>
Using onCompletion to run before route consumer sends back response to callee
OnCompletion supports two modes
-
AfterConsumer - Default mode which runs after the consumer is done
-
BeforeConsumer - Runs before the consumer is done, and before the consumer writes back response to the callee
The AfterConsumer mode is the default mode which is the same behavior as in older Camel releases.
The new BeforeConsumer mode is used to run onCompletion before the consumer writes its response back to the callee (if in InOut mode). This allows the onCompletion to modify the Exchange, such as adding special headers, or to log the Exchange as a response logger etc.
For example to always add a "created by" header you
use modeBeforeConsumer()
as shown below:
.onCompletion().modeBeforeConsumer()
.setHeader("createdBy", constant("Someone"))
.end()
And in XML DSL you set the mode attribute to BeforeConsumer:
<onCompletion mode="BeforeConsumer">
<setHeader name="createdBy">
<constant>Someone</constant>
</setHeader>
</onCompletion>