Red Hat Training

A Red Hat training course is available for Red Hat Fuse

11.3. Wire Tap

Wire Tap

The wire tap pattern, as shown in Figure 11.1, “Wire Tap Pattern”, enables you to route a copy of the message to a separate tap location, while the original message is forwarded to the ultimate destination.

Figure 11.1. Wire Tap Pattern

Wire Tap Pattern
Streams
If you Wire Tap a stream message body, you should consider enabling Stream Caching to ensure the message body can be re-read. See more details at Stream Caching

WireTap node

Apache Camel 2.0 introduces the wireTap node for doing wire taps. The wireTap node copies the original exchange to a tapped exchange, whose exchange pattern is set to InOnly, because the tapped exchange should be propagated in a oneway style. The tapped exchange is processed in a separate thread, so that it can run concurrently with the main route.
The wireTap supports two different approaches to tapping an exchange:
  • Tap a copy of the original exchange.
  • Tap a new exchange instance, enabling you to customize the tapped exchange.
Note
From Camel 2.16, the Wire Tap EIP emits event notifications when you send the exchange to the wire tap destination.

Tap a copy of the original exchange

Using the Java DSL:
from("direct:start")
    .to("log:foo")
    .wireTap("direct:tap")
    .to("mock:result");
Using Spring XML extensions:
<route>
    <from uri="direct:start"/>
    <to uri="log:foo"/>
    <wireTap uri="direct:tap"/>
    <to uri="mock:result"/>
</route>

Tap and modify a copy of the original exchange

Using the Java DSL, Apache Camel supports using either a processor or an expression to modify a copy of the original exchange. Using a processor gives you full power over how the exchange is populated, because you can set properties, headers and so on. The expression approach can only be used to modify the In message body.
For example, to modify a copy of the original exchange using the processor approach:
from("direct:start")
    .wireTap("direct:foo", new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setHeader("foo", "bar");
        }
    }).to("mock:result");


from("direct:foo").to("mock:foo");
And to modify a copy of the original exchange using the expression approach:
from("direct:start")
    .wireTap("direct:foo", constant("Bye World"))
    .to("mock:result");

from("direct:foo").to("mock:foo");
Using the Spring XML extensions, you can modify a copy of the original exchange using the processor approach, where the processorRef attribute references a spring bean with the myProcessor ID:
<route>
    <from uri="direct:start2"/>
    <wireTap uri="direct:foo" processorRef="myProcessor"/>
    <to uri="mock:result"/>
</route>
And to modify a copy of the original exchange using the expression approach:
<route>
    <from uri="direct:start"/>
    <wireTap uri="direct:foo">
        <body><constant>Bye World</constant></body>
    </wireTap>
    <to uri="mock:result"/>
</route>

Tap a new exchange instance

You can define a wiretap with a new exchange instance by setting the copy flag to false (the default is true). In this case, an initially empty exchange is created for the wiretap.
For example, to create a new exchange instance using the processor approach:
from("direct:start")
    .wireTap("direct:foo", false, new Processor() {
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody("Bye World");
            exchange.getIn().setHeader("foo", "bar");
        }
    }).to("mock:result");


from("direct:foo").to("mock:foo");
Where the second wireTap argument sets the copy flag to false, indicating that the original exchange is not copied and an empty exchange is created instead.
To create a new exchange instance using the expression approach:
from("direct:start")
    .wireTap("direct:foo", false, constant("Bye World"))
    .to("mock:result");

from("direct:foo").to("mock:foo");
Using the Spring XML extensions, you can indicate that a new exchange is to be created by setting the wireTap element's copy attribute to false.
To create a new exchange instance using the processor approach, where the processorRef attribute references a spring bean with the myProcessor ID, as follows:
<route>
    <from uri="direct:start2"/>
    <wireTap uri="direct:foo" processorRef="myProcessor" copy="false"/>
    <to uri="mock:result"/>
</route>
And to create a new exchange instance using the expression approach:
<route>
    <from uri="direct:start"/>
    <wireTap uri="direct:foo" copy="false">
        <body><constant>Bye World</constant></body>
    </wireTap>
    <to uri="mock:result"/>
</route>

Sending a new Exchange and set headers in DSL

Available as of Camel 2.8
If you send a new messages using the Wire Tap then you could only set the message body using an Expression from the DSL. If you also need to set new headers you would have to use a Processors for that. So in Camel 2.8 onwards we have improved this situation so you can now set headers as well in the DSL.
The following example sends a new message which has
  • "Bye World" as message body
  • a header with key "id" with the value 123
  • a header with key "date" which has current date as value

Java DSL

from("direct:start")
     // tap a new message and send it to direct:tap
     // the new message should be Bye World with 2 headers
     .wireTap("direct:tap")
         // create the new tap message body and headers
         .newExchangeBody(constant("Bye World"))
         .newExchangeHeader("id", constant(123))
         .newExchangeHeader("date", simple("${date:now:yyyyMMdd}"))
     .end()
     // here we continue routing the original messages
     .to("mock:result");
 
 // this is the tapped route
 from("direct:tap")
     .to("mock:tap");

XML DSL

The XML DSL is slightly different than Java DSL as how you configure the message body and headers. In XML you use <body> and <setHeader> as shown:
<route>
     <from uri="direct:start"/>
     <!-- tap a new message and send it to direct:tap -->
     <!-- the new message should be Bye World with 2 headers -->
     <wireTap uri="direct:tap">
         <!-- create the new tap message body and headers -->
         <body><constant>Bye World</constant></body>
         <setHeader headerName="id"><constant>123</constant></setHeader>
         <setHeader headerName="date"><simple>${date:now:yyyyMMdd}</simple></setHeader>
     </wireTap>
     <!-- here we continue routing the original message -->
     <to uri="mock:result"/>
 </route>

Using Dynamic URIs

Wire Tap supports dynamic endpoint URIs. The following example displays how to wire tap to a JMS queue where the header ID is a part of the queue name.
from("direct:start")
   .wireTap("jms:queue:backup-${header.id}")
   .to("bean:doSomething");
For more information about dynamic endpoint URIs, see the section called “Dynamic To”.

Using onPrepare to execute custom logic when preparing messages

Available as of Camel 2.8
For details, see Multicast.

Options

The wireTap DSL command supports the following options:
Name Default Value Description
uri The endpoint uri where to send the wire tapped message. You should use either uri or ref.
ref Refers to the endpoint where to send the wire tapped message. You should use either uri or ref.
executorServiceRef Refers to a custom Thread Pool to be used when processing the wire tapped messages. If not set then Camel uses a default thread pool.
processorRef Refers to a custom Processorsto be used for creating a new message (eg the send a new message mode). See below.
copy true Camel 2.3: Should a copy of the Exchange to used when wire tapping the message.
onPrepareRef Camel 2.8: Refers to a custom Processors to prepare the copy of the Exchange to be wire tapped. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc.