2.8. スレッドモデル

Java スレッドプール API

Apache Camel のスレッドモデルは、強力な Java 並行処理 API パッケージ java.util.concurrent に基づいています。この API は、Sun の JDK 1.5 で初めて利用可能になったものです。この API の主要なインターフェイスは、スレッドプールを表す ExecutorService インターフェイスです。並行処理 API を使用すると、幅広いシナリオに対応する、さまざまな種類のスレッドプールを作成できます。

Apache Camel スレッドプール API

Apache Camel スレッドプール API は Java 並行処理 API 上で構築されており、Apache Camel アプリケーションのすべてのスレッドプールに対して中心的なファクトリー (org.apache.camel.spi.ExecutorServiceManager 型) が提供されます。このような方法でスレッドプールの作成を一元化することには、以下のようにいくつかの利点があります。

  • ユーティリティークラスを使用することで、スレッドプールの作成を簡素化できる。
  • スレッドプールを正常なシャットダウンに統合できる。
  • スレッドに有益な名前を自動的に付与できる。これは、ロギングと管理に役立ちます。

コンポーネントのスレッドモデル

SEDA、JMS、Jetty など、Apache Camel コンポーネントには本質的にマルチスレッドなものがあります。こうしたコンポーネントはすべて Apache Camel のスレッドモデルとスレッドプール API を使用して実装されています。

独自の Apache Camel コンポーネントを実装しようとしている場合、マルチスレッドなコードは Apache Camel のスレッドモデルと統合することが推奨されます。たとえば、コンポーネントにスレッドプールが必要な場合は、CamelContext の ExecutorServiceManager オブジェクトを使用して作成することが推奨されます。

プロセッサーのスレッドモデル

Apache Camel の標準プロセッサーの中には、デフォルトで独自のスレッドプールを作成するものがあります。これらのスレッド対応プロセッサーも Apache Camel のスレッドモデルと統合されており、使用するスレッドプールのカスタマイズを可能にするさまざまなオプションを提供しています。

表2.8「プロセッサーのマルチスレッドオプション」 では、Apache Camel に組み込まれているスレッド対応のプロセッサーでスレッドプールを制御および設定するためのさまざまなオプションをまとめています。

表2.8 プロセッサーのマルチスレッドオプション

プロセッサーJava DSLXML DSL

aggregate

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

multicast

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

recipientList

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

split

parallelProcessing()
executorService()
executorServiceRef()
@parallelProcessing
@executorServiceRef

threads

executorService()
executorServiceRef()
poolSize()
maxPoolSize()
keepAliveTime()
timeUnit()
maxQueueSize()
rejectedPolicy()
@executorServiceRef
@poolSize
@maxPoolSize
@keepAliveTime
@timeUnit
@maxQueueSize
@rejectedPolicy

wireTap

wireTap(String uri, ExecutorService executorService)
wireTap(String uri, String executorServiceRef)
@executorServiceRef

threads DSL オプション

threads プロセッサーは汎用の DSL コマンドであり、ルートにスレッドプールを導入するために使用できます。スレッドプールをカスタマイズするために、以下のオプションをサポートしています。

poolSize()
プールの最小スレッド数 (および初期プールサイズ)。
maxPoolSize()
プールの最大スレッド数。
keepAliveTime()
スレッドがこの期間 (秒単位で指定) よりも長い間アイドル状態になっている場合、スレッドを終了させる。
timeUnit()
keep alive の時間単位。java.util.concurrent.TimeUnit タイプを使用して指定します。
maxQueueSize()
このスレッドプールが受信タスクキューに保持できる保留中の最大タスク数。
rejectedPolicy()
受信タスクキューが満杯の場合に実行すべきアクションを指定する。表2.10「スレッドプールビルダーのオプション」 を参照してください。
注記

前述のスレッドプールのオプションは、executorServiceRef オプションと互換性が ありません (たとえば、これらのオプションを使用して、executorServiceRef オプションで参照されるスレッドプールの設定を上書きすることはできません)。この DSL の制約は Apache Camel が検証することで強制されます。

デフォルトのスレッドプールの作成

スレッド対応プロセッサーに対してデフォルトのスレッドプールを作成するには、parallelProcessing オプションを有効にします。Java DSL では parallelProcessing() サブ句を、XML DSL では parallelProcessing 属性を使用します。

たとえば、Java DSL では、以下のようにデフォルトのスレッドプールを使用して multicast プロセッサーを呼び出すことができます (スレッドプールはマルチキャストの宛先を同時に処理するために使用されます)。

from("direct:start")
  .multicast().parallelProcessing()
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

以下のように XML DSL で同じルートを定義できます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <multicast parallelProcessing="true">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

デフォルトスレッドプールプロファイルの設定

デフォルトのスレッドプールは、スレッドファクトリーがデフォルトスレッドプールプロファイル から設定を取得することによって自動的に作成されます。デフォルトスレッドプールプロファイルが持つ設定は、 表2.9「デフォルトスレッドプールプロファイルの設定」 に示す通りです (これらの設定はアプリケーションコードによって変更されていないことを前提とします)。

表2.9 デフォルトスレッドプールプロファイルの設定

スレッドオプションデフォルト値

maxQueueSize

1000

poolSize

10

maxPoolSize

20

keepAliveTime

60 (秒)

rejectedPolicy

CallerRuns

デフォルトスレッドプールプロファイルの変更

デフォルトスレッドプールプロファイルの設定を変更することで、後続のすべてのデフォルトスレッドプールをカスタムの設定で作成することができます。プロファイルは Java または Spring XML のどちらでも変更できます。

たとえば、Java DSL では、以下のようにデフォルトスレッドプールプロファイルの poolSize オプションと maxQueueSize オプションをカスタマイズできます。

// Java
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.ThreadPoolProfile;
...
ExecutorServiceManager manager = context.getExecutorServiceManager();
ThreadPoolProfile defaultProfile = manager.getDefaultThreadPoolProfile();

// Now, customize the profile settings.
defaultProfile.setPoolSize(3);
defaultProfile.setMaxQueueSize(100);
...

XML DSL では、以下のようにデフォルトスレッドプールプロファイルをカスタマイズできます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
        id="changedProfile"
        defaultProfile="true"
        poolSize="3"
        maxQueueSize="100"/>
    ...
</camelContext>

前述の XML DSL の例では defaultProfile 属性を true に設定することが不可欠です。そうしないと、そのスレッドプールプロファイルはデフォルトスレッドプールプロファイルを置き換えるのではなく、カスタムのスレッドプールプロファイルとして扱われてしまいます (「カスタムスレッドプールプロファイルの作成」 を参照)。

プロセッサーのスレッドプールのカスタマイズ

executorService または executorServiceRef オプションのいずれかを使用 (parallelProcessing オプションの代わりにこれらのオプションを使用) することで、スレッド対応のプロセッサーにスレッドプールを直接指定することもできます。以下のように、プロセッサーのスレッドプールをカスタマイズするには 2 つの方法があります。

  • カスタムスレッドプールの値の指定 - ExecutorService (スレッドプール) インスタンスを明示的に作成し、これを executorService オプションに渡します。
  • カスタムスレッドプールプロファイルの指定 - カスタムスレッドプールファクトリーを作成して登録します。executorServiceRef オプションを使用してこのファクトリーを参照すると、プロセッサーは自動的にそのファクトリーを使用して、カスタムスレッドプールインスタンスを作成します。

Bean ID を executorServiceRef オプションに渡すと、スレッド対応のプロセッサーはまずその ID を持つカスタムスレッドプールをレジストリーの中から検索します。その ID でスレッドプールが登録されていない場合、プロセッサーはレジストリーの中からカスタムスレッドプールプロファイルを検索し、そのカスタムスレッドプールプロファイルを使用してカスタムスレッドプールをインスタンス化します。

カスタムスレッドプールの作成

カスタムスレッドプールは、java.util.concurrent.ExecutorService 型の任意のスレッドプールです。Apache Camel では、スレッドプールインスタンスを作成する上で以下の方法が推奨されています。

  • org.apache.camel.builder.ThreadPoolBuilder ユーティリティーを使用して、スレッドプールクラスをビルドします。
  • 現在の CamelContext から org.apache.camel.spi.ExecutorServiceManager インスタンスを使用して、スレッドプールクラスを作成します。

ThreadPoolBuilder は実際には ExecutorServiceManager インスタンスを使用して定義されているため、究極的には 2 つのアプローチには大きな違いはありません。通常、ThreadPoolBuilder が推奨されます。これは、より単純なアプローチを提供するためです。ただし、少なくとも 1 種類のスレッド (ScheduledExecutorService) は、ExecutorServiceManager インスタンスに直接アクセスする方法でしか作成できません。

表2.10「スレッドプールビルダーのオプション」 は、ThreadPoolBuilder クラスがサポートするオプションを示します。これらのオプションは、新しいカスタムスレッドプールを定義する際に設定できます。

表2.10 スレッドプールビルダーのオプション

Builder オプション説明

maxQueueSize()

このスレッドプールが受信タスクキューに保持できる保留中の最大タスク数を設定します。-1 の値は上限なしキューを指定します。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。

poolSize()

プールの最小スレッド数を設定します (これは初期プールサイズにもなります)。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。

maxPoolSize()

プールで使用できる最大スレッド数を設定します。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。

keepAliveTime()

スレッドがこの期間 (秒単位で指定) よりも長い間アイドル状態になっている場合、スレッドを終了させる。これにより、負荷が軽くなるとスレッドプールが縮小されます。デフォルト値はデフォルトスレッドプールプロファイルから取得されます。

rejectedPolicy()

受信タスクキューが満杯の場合に実行すべきアクションを指定する。以下の 4 つの値から指定できます。

CallerRuns
(デフォルト値) 呼び出し元のスレッドを使用して、最後に受信したタスクを実行します。しかし、このオプションでは最後に受信したタスクの処理が完了するまで、呼び出し元のスレッドがそれ以上のタスク受信をブロックします。
Abort
例外を発生させて、最後に受信したタスクを中断します。
Discard
例外を発生させずに、最後に受信したタスクを破棄します。
DiscardOldest
最も古い未処理のタスクを破棄して、最後に受信したタスクをタスクキューに入れようと試みます。

build()

カスタムスレッドプールの構築を終了し、build() に引数として指定された ID の下に新しいスレッドプールを登録します。

Java DSL では、以下のように ThreadPoolBuilder を使用してカスタムスレッドプールを定義できます。

// Java
import org.apache.camel.builder.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
...
ThreadPoolBuilder poolBuilder = new ThreadPoolBuilder(context);
ExecutorService customPool = poolBuilder.poolSize(5).maxPoolSize(5).maxQueueSize(100).build("customPool");
...

from("direct:start")
  .multicast().executorService(customPool)
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

オブジェクト参照 customPool を直接 executorService() オプションに渡す代わりに、以下のように Bean ID を executorServiceRef() オプションに渡すことで、レジストリーの中からスレッドプールを検索できます。

// Java
from("direct:start")
  .multicast().executorServiceRef("customPool")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

XML DSL では、threadPool 要素を使用して ThreadPoolBuilder にアクセスします。その後、以下のように executorServiceRef 属性を使用して Spring レジストリーでスレッドプールを ID で検索することで、カスタムスレッドプールを参照できます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPool id="customPool"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customPool">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

カスタムスレッドプールプロファイルの作成

多くのカスタムスレッドプールインスタンスを作成する場合は、スレッドプールのファクトリーとして機能するカスタムスレッドプールプロファイルを定義しておくと便利です。スレッド対応プロセッサーからスレッドプールプロファイルを参照するだけで、プロセッサーは自動的にそのプロファイルを使用して新しいスレッドプールインスタンスを作成します。カスタムスレッドプールプロファイルは、Java DSL または XML DSL のどちらでも定義できます。

たとえば、Java DSL では、以下のようにして Bean ID customProfile を持つカスタムスレッドプールプロファイルを作成し、ルート内でそのプロファイルを参照できます。

// Java
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.impl.ThreadPoolProfileSupport;
...
// Create the custom thread pool profile
ThreadPoolProfile customProfile = new ThreadPoolProfileSupport("customProfile");
customProfile.setPoolSize(5);
customProfile.setMaxPoolSize(5);
customProfile.setMaxQueueSize(100);
context.getExecutorServiceManager().registerThreadPoolProfile(customProfile);
...
// Reference the custom thread pool profile in a route
from("direct:start")
  .multicast().executorServiceRef("customProfile")
    .to("mock:first")
    .to("mock:second")
    .to("mock:third");

XML DSL では、threadPoolProfile 要素を使用してカスタムプールプロファイルを作成します (これはデフォルトスレッドプールプロファイル ではない ため、ここでは defaultProfile オプションをデフォルトで false に設定します)。以下のようにして、Bean ID customProfile を持つカスタムスレッドプールプロファイルを作成し、ルート内でそのプロファイルを参照できます。

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <threadPoolProfile
                id="customProfile"
                poolSize="5"
                maxPoolSize="5"
                maxQueueSize="100" />

    <route>
        <from uri="direct:start"/>
        <multicast executorServiceRef="customProfile">
            <to uri="mock:first"/>
            <to uri="mock:second"/>
            <to uri="mock:third"/>
        </multicast>
    </route>
</camelContext>

コンポーネント間でのスレッドプールの共有

File や FTP など、標準のポーリングベースのコンポーネントの中には、使用するスレッドプールを指定できるものがあります。これにより、異なるコンポーネントが同じスレッドプールを共有することが可能となり、JVM 内のスレッド総数を削減することができます。

たとえば、Apache Camel Component Reference GuideFile2Apache Camel Component Reference GuideFtp2 は、どちらも scheduledExecutorService プロパティーを公開しており、コンポーネントの ExecutorService オブジェクトを指定するために使用できます。

スレッド名のカスタマイズ

アプリケーションのログをより読みやすくするために、スレッド名 (ログ内でスレッドを識別するために使用されるもの) をカスタマイズすることが推奨されます。スレッド名をカスタマイズするには、ExecutorServiceStrategy クラスまたは ExecutorServiceManager クラスの setThreadNamePattern メソッドを呼び出すことで、スレッド名パターン を設定します。または、スレッド名パターンを設定するより簡単な方法として、CamelContext オブジェクトに threadNamePattern プロパティーを設定する方法もあります。

スレッド名パターンでは、以下のプレースホルダーが使用できます。

#camelId#
現在の CamelContext の名前。
#counter#
インクリメントカウンターとして実装された一意のスレッド ID。
#name#
通常の Camel スレッド名。
#longName#
長いスレッド名。エンドポイントパラメーターなどを含めることができる。

以下は、スレッド名パターンの典型的な例です。

Camel (#camelId#) thread #counter# - #name#

以下の例は、XML DSL を使用して Camel コンテキストに threadNamePattern 属性を設定する方法を示しています。

<camelContext xmlns="http://camel.apache.org/schema/spring"
              threadNamePattern="Riding the thread #counter#" >
  <route>
    <from uri="seda:start"/>
    <to uri="log:result"/>
    <to uri="mock:result"/>
  </route>
</camelContext>