2.8. 线程模型

Java 线程池 API

Apache Camel 线程模型基于强大的 Java 并发 API 软件包 java.util.concurrent,它最初在 Sun 的 JDK 1.5 中提供。此 API 中的密钥接口是 ExecutorService 接口,它代表一个线程池。使用并发 API,您可以创建许多不同类型的线程池,涵盖各种场景。

Apache Camel 线程池 API

Apache Camel 线程池 API 基于 Java 并发 API 构建,方法是为您的 Apache Camel 应用程序中的所有线程池提供中央工厂( org.apache.camel.spi.ExecutorServiceManager 类型)。以这种方式集中创建线程池提供了几个优点,包括:

  • 使用实用程序类简化线程池的创建。
  • 将线程池与安全关闭集成。
  • 线程自动给出信息性名称,这对于日志记录和管理非常有用。

组件线程模型

一些 Apache Camel 组件是型的,如 SEDA、JMS 和 Jetty iwl-osgi 是以多线程方式本质上是多线程的。这些组件都使用 Apache Camel 线程模型和线程池 API 实施。

如果您计划实施自己的 Apache Camel 组件,建议您将线程代码与 Apache Camel 线程模式集成。例如,如果您的组件需要线程池,建议您使用 CamelContext 的 ExecutorServiceManager 对象创建它。

处理器线程模型

默认情况下,Apache Camel 中的一些标准处理器创建自己的线程池。这些线程感知型处理器也与 Apache Camel 线程模型集成,它们提供了各种选项,供您自定义它们使用的线程池。

表 2.8 “处理器线程选项” 显示在 Apache Camel 内置的线程感知处理器上控制和设置线程池的各种选项。

表 2.8. 处理器线程选项

处理器Java DSLXML DSL

聚合

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

multicast

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

recipientList

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

split

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

threads

executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy

wireTap

wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
@executorServiceRef

threads DSL 选项

线程 处理器是一个通用的 DSL 命令,可用于将线程池引入路由。它支持以下选项来自定义线程池:

poolSize()
池中的最小线程数量(和初始池大小)。
maxPoolSize()
池中的最大线程数量。
keepAliveTime()
如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则终止它们。
timeUnit()
keep alive 的时间单位,使用 java.util.concurrent.TimeUnit 类型指定。
maxQueueSize()
此线程池可以存储在其传入任务队列中的最大待处理任务数量。
rejectedPolicy()
指定在传入的任务队列满时要采取哪些操作课程。请查看 表 2.10 “线程池构建器选项”
注意

前面的线程池选项与 executorServiceRef 选项 不兼容 (例如,您无法使用这些选项来覆盖 executorServiceRef 选项引用的线程池中的设置)。Apache Camel 验证 DSL 以执行此操作。

创建默认线程池

要为一个线程感知的处理器创建默认线程池,请在 XML DSL 中使用 parallelProcessing () sub-clause, 或 parallelProcessing 属性启用 parallelProcessing 选项。

例如,在 Java DSL 中,您可以使用默认线程池调用多播处理器(线程池用于同时处理多播目的地),如下所示:

from("direct:start")
  .multicast().parallelProcessing()
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

您可以在 XML DSL 中定义相同的路由,如下所示

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <multicast parallelProcessing="true">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

默认线程池配置集设置

默认线程池由线程工厂自动创建,该工厂从默认的 线程池配置文件 获取其设置。默认线程池配置集在 表 2.9 “默认线程池配置文件设置” 中显示的设置(假设这些设置还没有由应用程序代码修改)。

表 2.9. 默认线程池配置文件设置

线程选项默认值

maxQueueSize

1000

poolSize

10

maxPoolSize

20

keepAliveTime

60 (秒)

rejectedPolicy

CallerRuns

更改默认线程池配置集

可以更改默认线程池配置集设置,以便使用自定义设置创建后续所有默认线程池。您可以在 Java 中或 Spring XML 中更改配置集。

例如,在 Java DSL 中,您可以自定义 default 线程池配置文件中的 poolSize 选项和 maxQueueSize 选项,如下所示:

// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();

// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...

在 XML DSL 中,您可以自定义默认线程池配置集,如下所示:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
        id="changedProfile"
        defaultProfile="true"
        poolSize="3"
        maxQueueSize="100"/>
    ...
</camelContext>

请注意,在前面的 XML DSL 示例中,务必要将 defaultProfile 属性设置为 true,否则线程池配置集将被视为自定义线程池配置文件(请参阅 “创建自定义线程池配置集”一节),而不是替换默认的线程池配置集。

自定义处理器线程池

也可以使用 executorServiceexecutorServiceRef 选项(其中使用这些选项而不是 parallelProcessing 选项)为线程感知处理器指定线程池。您可以使用两种方法来自定义处理器的线程池,如下所示:

  • 指定自定义线程池 criu -wagonexplicitly 创建一个 ExecutorService (线程池)实例,并将其传递给 executorService 选项。
  • 指定自定义线程池配置集 criu -wagoncreate,并注册自定义线程池工厂。当您使用 executorServiceRef 选项引用此工厂时,处理器会自动使用工厂创建自定义线程池实例。

当您将 bean ID 传递给 executorServiceRef 选项时,线程感知处理器首先会尝试在 registry 中使用该 ID 查找带有该 ID 的自定义线程池。如果没有使用该 ID 注册线程池,处理器会尝试在 registry 中查找自定义线程池配置集,并使用自定义线程池配置集实例化自定义线程池。

创建自定义线程池

自定义线程池可以是 java.util.concurrent.ExecutorService 类型的任何线程池。Apache Camel 中建议使用以下创建线程池实例的方法:

  • 使用 org.apache.camel.builder.ThreadPoolBuilder 实用程序构建线程池类。
  • 使用当前 CamelContext 中的 org.apache.camel.spi.ExecutorServiceManager 实例来创建线程池类。

最终,这两种方法之间没有区别,因为 ThreadPoolBuilder 实际上是使用 ExecutorServiceManager 实例定义的。通常,ThreadPoolBuilder 是首选的,因为它提供了一种更简单的方法。但是,至少有一个线程( ScheduledExecutorService)只能通过访问 ExecutorService 实例目录来创建。

表 2.10 “线程池构建器选项” 显示 ThreadPoolBuilder 类支持的选项,您可以在定义新的自定义线程池时设置这些选项。

表 2.10. 线程池构建器选项

构建器选项描述

maxQueueSize()

设置此线程池可在其传入任务队列中存储的最大待处理任务数量。值 -1 指定未绑定的队列。默认值取自默认的线程池配置文件。

poolSize()

设置池中最少的线程数(这也是初始池大小)。默认值取自默认的线程池配置文件。

maxPoolSize()

设置池中可以的最大线程数。默认值取自默认的线程池配置文件。

keepAliveTime()

如果任何线程闲置的时间超过这个时间段(以秒为单位指定),则终止它们。这允许线程池在负载为 light 时缩小。默认值取自默认的线程池配置文件。

rejectedPolicy()

指定在传入的任务队列满时要采取哪些操作课程。您可以指定四个可能的值:

CallerRuns
(默认值) 获取调用者线程来运行最新的传入任务。作为副作用,此选项可防止调用者线程接收任何更多任务,直到处理最新的传入的任务为止。
Abort
通过抛出异常来中止最新的传入任务。
discard
以静默方式丢弃最新的传入的任务。
DiscardOldest
丢弃最旧的未处理的任务,然后尝试对任务队列中的最新传入的任务进行排队。

build()

完成构建自定义线程池,并在指定为 build () 参数的 ID 下注册新的线程池。

在 Java DSL 中,您可以使用 ThreadPoolBuilder 定义自定义线程池,如下所示:

// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...

from("direct:start")
  .multicast().executorService(customPool)
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

您可以通过将对象 ID 传递给 executorServiceRef ()选项,而不是将对象引用(Custom Pool )传递给 executorService Ref ()选项,而是在 registry 中查找线程池,方法是将其 bean ID 传递给 executorServiceRef () 选项,如下所示:

// Java
from("direct:start")
  .multicast().executorServiceRef("customPool")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

在 XML DSL 中,您可以使用 threadPool 元素访问 ThreadPoolBuilder。然后,您可以使用 executorServiceRef 属性引用自定义线程池,来根据 Spring registry 中的 ID 查找线程池,如下所示:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPool id="customPool"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customPool">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

创建自定义线程池配置集

如果您有许多自定义线程池实例来创建,您可能会发现定义自定义线程池配置集更方便,它充当线程池的工厂。每当您从线程感知的处理器引用线程池配置集时,处理器会自动使用配置集来创建新的线程池实例。您可以在 Java DSL 或 XML DSL 中定义自定义线程池配置集。

例如,在 Java DSL 中,您可以创建一个带有 bean ID ( customProfile )的自定义线程池配置集,并从路由中引用它,如下所示:

// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
  .multicast().executorServiceRef("customProfile")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

在 XML DSL 中,使用 threadPoolProfile 元素创建自定义池配置集(其中,默认Profile 选项默认为 false,因为这不是默认的线程池配置集)。您可以使用 bean ID customProfile 创建自定义线程池配置集,并从路由中引用它,如下所示:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
                id="customProfile"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customProfile">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

在组件间共享线程池

一些基于标准轮询的组件,如 File 和 FTP iwl-setuptoolsallow,以指定要使用的线程池。这使得不同的组件能够共享同一线程池,从而减少 JVM 中的线程总数。

例如,Apache Camel 组件参考指南 中的 see File2Apache Camel 组件参考指南 中的 Ftp2 都公开 scheduledExecutorService 属性,您可以使用它来指定组件的 ExecutorService 对象。

自定义线程名称

为了让应用程序日志更易读,通常最好自定义线程名称(用于识别日志中线程)。要自定义线程名称,您可以通过调用 ExecutorServiceStrategy 类或 ExecutorServiceManager 类上的 setThreadNamePattern 方法来配置线程名称 模式。或者,设置线程名称模式的一种更简单方法是设置 CamelContext 对象上的 threadNamePattern 属性。

以下占位符可用于线程名称模式:

#camelId#
当前 CamelContext 的名称。
#counter#
唯一的线程标识符,作为递增计数器实施。
#name#
常规 Camel 线程名称。
#longName#
较长的线程名称可以包括端点参数,以此类推。

以下是线程名称模式的典型示例:

Camel (#camelId#) thread #counter# - #name#

以下示例演示了如何使用 XML DSL 在 Camel 上下文上设置 threadNamePattern 属性:

<camelContext xmlns="http://camel.apache.org/schema/spring"
              threadNamePattern="Riding the thread #counter#" >
  <route>
    <from uri="seda:start"/>
    <to uri="log:result"/>
    <to uri="mock:result"/>
  </route>
</camelContext>