2.14. OnCompletion

概述

OnCompletion DSL 名称用于定义在完成 工作单元时要执行的操作工作单元是包含 整个交换的 Camel 概念。请参阅 第 34.1 节 “交换”onCompletion 命令有以下功能:

  • OnCompletion 命令的范围可以是全局或每个路由。路由范围会覆盖全局范围。
  • OnCompletion 可以配置为在成功时触发失败。
  • onWhen predicate 只能在某些情况下触发该 onCompletion
  • 您可以定义是否要使用线程池,但默认值为线程池。

在编译时只有路由范围

当在交换中指定Completion DSL 时,Camel 启动一个新的线程。这允许原始线程在不 与Completion 任务进行干预的情况下继续。路由只支持一个 在Completion 中。在以下示例中,无论交换成功完成是成功还是失败,会触发 Completion。这是默认的操作。

from("direct:start")
     .onCompletion()
         // This route is invoked when the original route is complete.
         // This is similar to a completion callback.
         .to("log:sync")
         .to("mock:sync")
     // Must use end to denote the end of the onCompletion route.
     .end()
     // here the original route contiues
     .process(new MyProcessor())
     .to("mock:result");

对于 XML,格式如下:

<route>
    <from uri="direct:start"/>
    <!-- This onCompletion block is executed when the exchange is done being routed. -->
    <!-- This callback is always triggered even if the exchange fails. -->
    <onCompletion>
        <!-- This is similar to an after completion callback. -->
        <to uri="log:sync"/>
        <to uri="mock:sync"/>
    </onCompletion>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>

要在失败时触发 Completion,可以使用 onFailureOnly 参数。同样,若要成功触发 Completion,请使用 onCompleteOnly 参数。

from("direct:start")
     // Here onCompletion is qualified to invoke only when the exchange fails (exception or FAULT body).
     .onCompletion().onFailureOnly()
         .to("log:sync")
         .to("mock:sync")
     // Must use end to denote the end of the onCompletion route.
     .end()
     // here the original route continues
     .process(new MyProcessor())
     .to("mock:result");

对于 XML,在 Completion tag 中将 onFailureOnlyonCompleteOnly 来表示:

<route>
    <from uri="direct:start"/>
    <!-- this onCompletion block will only be executed when the exchange is done being routed -->
    <!-- this callback is only triggered when the exchange failed, as we have onFailure=true -->
    <onCompletion onFailureOnly="true">
        <to uri="log:sync"/>
        <to uri="mock:sync"/>
    </onCompletion>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>

Completion 全局范围

为多个路由定义 Completion

// define a global on completion that is invoked when the exchange is complete
 onCompletion().to("log:global").to("mock:sync");

 from("direct:start")
     .process(new MyProcessor())
     .to("mock:result");

使用 OnWhen

要在某些情况下触发 Completion,请使用 onWhen predicate。当消息正文包含单词 Hello 时,以下示例会触发 onCompletion

/from("direct:start")
     .onCompletion().onWhen(body().contains("Hello"))
         // this route is only invoked when the original route is complete as a kind
         // of completion callback. And also only if the onWhen predicate is true
         .to("log:sync")
         .to("mock:sync")
     // must use end to denote the end of the onCompletion route
     .end()
     // here the original route contiues
     .to("log:original")
     .to("mock:result");

使用带有或没有线程池的处理

自 Camel 2.14 起,Completion 默认不使用线程池。要强制使用线程池,请将 executorService 设置为 true,或者将 parallelProcessing 设为 true。例如,在 Java DSL 中,使用以下格式:

onCompletion().parallelProcessing()
     .to("mock:before")
     .delay(1000)
     .setBody(simple("OnComplete:${body}"));

对于 XML,格式为:

<onCompletion parallelProcessing="true">
   <to uri="before"/>
   <delay><constant>1000</constant></delay>
   <setBody><simple>OnComplete:${body}<simple></setBody>
 </onCompletion>

使用 executorServiceRef 选项引用特定的线程池:

<onCompletion executorServiceRef="myThreadPool"
   <to uri="before"/>
   <delay><constant>1000</constant></delay>
   <setBody><simple>OnComplete:${body}</simple></setBody>
 </onCompletion>>

在 Consumer Sends Response 之前在Completion 上运行

在Completion 中有两种模式运行:

  • AfterConsumer - 在消费者完成后运行的默认模式
  • BeforeConsumer - 在消费者写出请求之前运行 给调用者。这可以在 编译时 修改 Exchange,如添加特殊标头,或将 Exchange 记录为响应日志记录器。

例如,要将标头 创建的 添加到响应中,请使用 modeBeforeConsumer(),如下所示:

.onCompletion().modeBeforeConsumer()
     .setHeader("createdBy", constant("Someone"))
 .end()

对于 XML,将 mode 属性设置为 BeforeConsumer

<onCompletion mode="BeforeConsumer">
   <setHeader headerName="createdBy">
     <constant>Someone</constant>
   </setHeader>
 </onCompletion>