6.3. Dead Letter Channel

概要

図6.3「Dead Letter Channel パターン」 で示されている Dead Letter Channel パターンは、メッセージングシステムが目的の受信者にメッセージを配信できない場合に実行するアクションを記述します。これには、配信を再試行する機能などが含まれ、最終的に配信に失敗した場合には、メッセージが Dead Letter Channel に送信され、未達のメッセージをアーカイブします。

図6.3 Dead Letter Channel パターン

Dead Letter Channel パターン

Java DSL でのデッドレターチャネルの作成

以下の例は、Java DSL を使用してデッドレターチャネルを作成する方法を示しています。

errorHandler(deadLetterChannel("seda:errors"));
from("seda:a").to("seda:b");

errorHandler() メソッドは Java DSL インターセプターで、現在のルートビルダーで定義された すべて のルートがこの設定の影響を受けることを意味します。deadLetterChannel() メソッドは、指定の宛先エンドポイント seda:errors で新しいデッドレターチャネルを作成する Java DSL コマンドです。

errorHandler() インターセプターは、すべて のエラータイプを処理するためのキャッチオールメカニズムを提供します。例外処理により粒度の細かい方法を適用する場合は、代わりに onException 句を使用できます (「onException 句」 を参照)。

XML DSL の例

以下のように、XML DSL でデッドレターチャネルを定義できます。

 <route errorHandlerRef="myDeadLetterErrorHandler">
    ...
 </route>

 <bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder">
     <property name="deadLetterUri" value="jms:queue:dead"/>
     <property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/>
 </bean>

 <bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy">
     <property name="maximumRedeliveries" value="3"/>
     <property name="redeliveryDelay" value="5000"/>
 </bean>

再配信ポリシー

通常、配信に失敗した場合、デッドレターチャネルに直接メッセージを送信することはありません。代わりに、最大限度まで再送信を試み、再配信の試行がすべて失敗した場合は、メッセージをデッドレターチャネルに送信します。メッセージの再配信をカスタマイズするには、デッドレターチャネルを設定して 再配信ポリシー を取得します。たとえば、再配信の最大試行回数を 2 回に指定し、配信試行間の遅延に指数バックオフアルゴリズムを適用するには、以下のようにデッドレターチャネルを設定できます。

errorHandler(deadLetterChannel("seda:errors").maximumRedeliveries(2).useExponentialBackOff());
from("seda:a").to("seda:b");

ここでは、チェーンの関連メソッドを呼び出して、デッドレターチャネルに再配信オプションを設定します (チェーンの各メソッドは現在の RedeliveryPolicy オブジェクトの参照を返します)。表6.1「再配信ポリシーの設定」 には、再配信ポリシーの設定に使用できるメソッドがまとめられています。

表6.1 再配信ポリシーの設定

メソッドの署名デフォルト説明

allowRedeliveryWhileStopping()

true

正常なシャットダウン中またはルートが停止している間、再配信を試行するかどうかを制御します。すでに進行中の配信は停止時に中断され ません

backOffMultiplier(double multiplier)

2

指数バックオフが有効な場合は、m をバックオフ定数とし、d を最初の遅延とします。その後、再配信試行シーケンスは以下のようになります。

d, m*d, m*m*d, m*m*m*d, ...

collisionAvoidancePercent(double collisionAvoidancePercent)

15

競合の回避が有効になっている場合は、p を競合回避の割合 (パーセント) にします。競合回避ポリシーは、現在の値にその p% を足し引きした値を最大値および最小値とするランダムな値で、次の遅延を調整します。

deadLetterHandleNewException

true

Camel 2.15: デッドレターチャネルでメッセージの処理中に発生する例外を処理するかどうかを指定します。true の場合は、例外が処理され、WARN レベルでログに記録されます (そのため、デッドレターチャネルの完了が保証されます)。false の場合は例外が処理されないため、デッドレターチャネルは失敗し、新しい例外が伝播されます。

delayPattern(String delayPattern)

なし

Apache Camel 2.0: 「Redeliver Delay パターン」 を参照してください。

disableRedelivery()

true

Apache Camel 2.0: 再配信機能を無効にします。再配信を有効にするには、maximumRedeliveries() を正の整数値に設定します。

handled(boolean handled)

true

Apache Camel 2.0: true の場合は、メッセージがデッドレターチャネルに移動されたときに現在の例外が消去されます。false の場合は、例外はクライアントに伝播されます。

initialRedeliveryDelay(long initialRedeliveryDelay)

1000

最初の再配信を試みるまでの遅延 (ミリ秒単位) を指定します。

logNewException

true

デッドレターチャネルで例外が発生した場合に WARN レベルでログに記録するかどうかを指定します。

logStackTrace(boolean logStackTrace)

false

Apache Camel 2.0: true の場合は、JVM スタックトレースがエラーログに含まれます。

maximumRedeliveries(int maximumRedeliveries)

0

Apache Camel 2.0: 配信の最大試行回数。

maximumRedeliveryDelay(long maxDelay)

60000

Apache Camel 2.0: 指数バックオフストラテジーを使用する場合 (useExponentialBackOff() を参照)、理論的に再配信の遅延が制限なく増加する可能性があります。このプロパティーは再配信の遅延の上限を指定します (ミリ秒単位)。

onRedelivery(Processor processor)

なし

Apache Camel 2.0: 再配信を試みる前に呼び出されるプロセッサーを設定します。

redeliveryDelay(long int)

0

Apache Camel 2.0: 再配信の試行間の遅延 (ミリ秒単位) を指定します。Apache Camel 2.16.0: デフォルトの再配信遅延は 1 秒です。

retriesExhaustedLogLevel(LoggingLevel logLevel)

LoggingLevel.ERROR

Apache Camel 2.0: 配信の失敗をログに記録するログレベルを指定します (org.apache.camel.LoggingLevel 定数として指定されます)。

retryAttemptedLogLevel(LoggingLevel logLevel)

LoggingLevel.DEBUG

Apache Camel 2.0: 再配信の試行に対するログレベルを指定します (org.apache.camel.LoggingLevel 定数として指定されます)。

useCollisionAvoidance()

false

競合の回避を有効にします。これにより、一定のランダム化をバックオフのタイミングに追加して競合の可能性を低減します。

useOriginalMessage()

false

Apache Camel 2.0: この機能が有効な場合、デッドレターチャネルに送信されたメッセージは、ルートの開始時に存在した (from() ノードで) のメッセージエクスチェンジのコピーになります。

useExponentialBackOff()

false

指数バックオフを有効にします。

再配信ヘッダー

Apache Camel がメッセージの再配信を試みると、表6.2「デッドレター再配信ヘッダー」 に記載されているヘッダーを In メッセージに自動設定します。

表6.2 デッドレター再配信ヘッダー

ヘッダー名説明

CamelRedeliveryCounter

Integer

Apache Camel 2.0: 配信に失敗した回数を返します。この値は、Exchange.REDELIVERY_COUNTER でも設定されます。

CamelRedelivered

ブール値

Apache Camel 2.0: 再配信が 1 回以上試行された場合は true です。この値は Exchange.REDELIVERED でも設定されます。

CamelRedeliveryMaxCounter

Integer

Apache Camel 2.6: 再配信の最大設定を保持します (Exchange.REDELIVERY_MAX_COUNTER エクスチェンジ プロパティーにも設定されます)。retryWhile を使用する場合や、再配信の最大回数が無制限に設定されている場合は、このヘッダーは設定されません。

再配信エクスチェンジプロパティー

Apache Camel がメッセージの再配信を試みると、表6.3「再配信エクスチェンジプロパティー」 に記載されているエクスチェンジプロパティーを自動設定します。

表6.3 再配信エクスチェンジプロパティー

エクスチェンジプロパティー名説明

Exchange.FAILURE_ROUTE_ID

String

失敗したルートのルート ID を提供します。このプロパティーのリテラル名は CamelFailureRouteId です。

元のメッセージの使用

Apache Camel 2.0 で利用可能: エクスチェンジオブジェクトはルートを通過する際に変更される可能性があります。そのため、例外が発生したときに現行であるエクスチェンジがデッドレターチャネルの保存に適したコピーであるとは限りません。多くの場合、ルートによる変換の対象となる前に、ルート開始時に到達したメッセージをログに記録することが推奨されます。たとえば、以下のルートを見てみましょう。

from("jms:queue:order:input")
       .to("bean:validateOrder");
       .to("bean:transformOrder")
       .to("bean:handleOrder");

上記のルートは受信 JMS メッセージをリッスンした後、validateOrdertransformOrder、および handleOrder の Bean のシーケンスを使用してメッセージを処理します。ただし、エラーが発生した場合にメッセージがどの状態であるかは分かりません。transformOrder Bean の前または後にエラーが発生しましたか ?以下のように useOriginalMessage オプションを有効にすると、jms:queue:order:input からの元のメッセージのログを確実に Dead Letter Chanel に記録することができます。

// will use original body
errorHandler(deadLetterChannel("jms:queue:dead")
       .useOriginalMessage().maximumRedeliveries(5).redeliveryDelay(5000);

Redeliver Delay パターン

Apache Camel 2.0 で利用可能: delayPattern オプションは、再配信回数の特定範囲に遅延を指定するために使用されます。遅延パターンの構文: limit1:delay1;limit2:delay2;limit3:delay3;…​。各 delayN は範囲 limitNredeliveryCount < limitN+1 で再配信するように適用されます

たとえば、パターン 5:1000;10:5000;20:20000 について考えてみましょう。このパターンでは、3 つのグループが定義され、以下の再配信の遅延が発生します。

  • 1 から 4 の試行 = 0 ミリ秒 (最初のグループは 5 で始まるため) 。
  • 5 から 9 の試行 = 1000 ミリ秒 (最初のグループ)。
  • 10 から 19 の試行 = 5000 ミリ秒 (2 番目のグループ)。
  • 20 以上の試行 = 20000 ミリ秒 (最後のグループ)。

制限 1 を加えてグループを開始し、開始遅延を定義できます。たとえば、1:1000;5:5000 では以下の再配信の遅延が発生します。

  • 1 から 4 の試行 = 1000 ミリ (最初のグループ)。
  • 5 以上の試行 = 5000 ミリ (最後のグループ)。

次の遅延を前の遅延よりも長くする必要はありません。あらゆる遅延値を使用できます。たとえば、Delay パターン 1:5000;3:1000 は 5 秒の遅延で始まり、遅延を 1 秒に減らします。

失敗したエンドポイント

Apache Camel ルートメッセージ時に、エクスチェンジが送信された 最後 のエンドポイントが含まれるエクスチェンジプロパティーを更新します。したがって、以下のコードを使用して、現在のエクスチェンジが最後に送信された宛先の URI を取得できます。

// Java
String lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);

Exchange.TO_ENDPOINTCamelToEndpoint と同等の文字列の定数になります。このプロパティーは、Camel がメッセージを 任意 のエンドポイントに送信するたびに更新されます。

ルーティング中にエラーが発生し、エクスチェンジがデッドレターキューに移動された場合、Apache Camel は CamelFailureEndpoint という名前のプロパティーを追加で設定します。これは、エラーが発生する前にエクスチェンジが最後に送信された宛先を特定します。したがって、以下のコードを使用すると、デッドレターキュー内から失敗したエンドポイントにアクセスできます。

// Java
String failedEndpointUri = exchange.getProperty(Exchange.FAILURE_ENDPOINT, String.class);

Exchange.FAILURE_ENDPOINT は、CamelFailureEndpoint と同等の文字列定数です。

注記

これらのプロパティーは、指定の宛先エンドポイントの処理が完了したに障害が発生した場合でも、現在のエクスチェンジで設定された状態を維持します。たとえば、以下のルートを見てみましょう。

        from("activemq:queue:foo")
        .to("http://someserver/somepath")
        .beanRef("foo");

foo Bean で障害が発生したと仮定します。この場合、Exchange.TO_ENDPOINT プロパティーと Exchange.FAILURE_ENDPOINT プロパティーに値が含まれ続けます。

onRedelivery プロセッサー

Dead Letter Channel が再配信を実行する場合、再配信を試みる 直前 に実行される Processor を設定できます。これは、メッセージを再配信する前に変更する必要がある場合に使用できます。

たとえば、以下の Dead Letter Channel は、エクスチェンジの再配信前に MyRedeliverProcessor を呼び出すように設定されます。

// we configure our Dead Letter Channel to invoke
// MyRedeliveryProcessor before a redelivery is
// attempted. This allows us to alter the message before
errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(5)
        .onRedelivery(new MyRedeliverProcessor())
        // setting delay to zero is just to make unit teting faster
        .redeliveryDelay(0L));

ここで MyRedeliveryProcessor プロセスは以下のように実装されます。

// This is our processor that is executed before every redelivery attempt
// here we can do what we want in the java code, such as altering the message
public class MyRedeliverProcessor implements Processor {

    public void process(Exchange exchange) throws Exception {
        // the message is being redelivered so we can alter it

        // we just append the redelivery counter to the body
        // you can of course do all kind of stuff instead
        String body = exchange.getIn().getBody(String.class);
        int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);

        exchange.getIn().setBody(body + count);

        // the maximum redelivery was set to 5
        int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
        assertEquals(5, max);
    }
}

シャットダウンまたは停止中の再配信の制御

ルートを停止したり、正常なシャットダウンを開始する場合、再配信の試行を継続するのがエラー処理のデフォルトの挙動になります。通常、これは望ましい動作ではないため、以下の例のように、allowRedeliveryWhileStopping オプションを false に設定すると、シャットダウンまたは停止中に再配信を無効にすることができます。

errorHandler(deadLetterChannel("jms:queue:dead")
    .allowRedeliveryWhileStopping(false)
    .maximumRedeliveries(20)
    .redeliveryDelay(1000)
    .retryAttemptedLogLevel(LoggingLevel.INFO));
注記

後方互換性の理由から、allowRedelivery whileStopping オプションはデフォルトで true になります。ただし、強行なシャットダウン中は、このオプションの設定に関係なく、再配信が常に抑制されます (たとえば、正常なシャットダウンがタイムアウトした場合など)。

onExceptionOccurred プロセッサーの使用

Dead Letter Channel は、例外発生後にメッセージのカスタム処理を可能にする onExceptionOccurred プロセッサーをサポートします。これは、カスタムロギングにも使用できます。onExceptionOccurred プロセッサーから出力される新しい例外は WARN としてログに記録され、無視されます。既存の例外を上書きすることはありません。

onRedelivery プロセッサーと onExceptionOccurred プロセッサーの違いは、onRedelivery プロセッサーは再配信の試行直前に処理できることです。ただし、例外の発生直後には処理できません。たとえば、再配信を試行する間隔で 5 秒の遅延が発生するようにエラーハンドラーを設定すると、再配信プロセスは例外発生から 5 秒後に呼び出されます。

以下の例は、例外発生時にカスタムロギングを実行する方法を示しています。onExceptionOccurred がカスタムプロセッサーを使用するように設定する必要があります。

errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(5000).onExceptionOccurred(myProcessor));

onException 句

ルートビルダーで errorHandler() インターセプターを使用する代わりに、さまざまな例外タイプに異なる再配信ポリシーとデッドレターチャネルを定義する、一連の onException() 句を定義できます。たとえば、NullPointerExceptionIOExceptionException タイプごとに異なる動作を定義するには、Java DSL を使用してルートビルダーに以下のルールを定義できます。

onException(NullPointerException.class)
    .maximumRedeliveries(1)
    .setHeader("messageInfo", "Oh dear! An NPE.")
    .to("mock:npe_error");

onException(IOException.class)
    .initialRedeliveryDelay(5000L)
    .maximumRedeliveries(3)
    .backOffMultiplier(1.0)
    .useExponentialBackOff()
    .setHeader("messageInfo", "Oh dear! Some kind of I/O exception.")
    .to("mock:io_error");

onException(Exception.class)
    .initialRedeliveryDelay(1000L)
    .maximumRedeliveries(2)
    .setHeader("messageInfo", "Oh dear! An exception.")
    .to("mock:error");

from("seda:a").to("seda:b");

再配信オプションは、再配信ポリシーメソッドをチェーンして指定されます (表6.1「再配信ポリシーの設定」 のように)。また、to() DSL コマンドを使用して Dead Letter Channel のエンドポイントを指定します。onException() 句で他の Java DSL コマンドを呼び出すこともできます。たとえば、前述の例は setHeader() を呼び出して、messageInfo という名前のメッセージヘッダーにエラーの情報を記録します。

この例では、NullPointerException および IOException 例外タイプが特別に設定されています。その他のすべての例外タイプは、汎用 Exception 例外インターセプターによって処理されます。デフォルトでは、Apache Camel は出力された例外に最も一致する例外インターセプターを適用します。完全に一致するものが見つからない場合は、最も近いベースタイプなどとの一致を試みます。最後に、他のインターセプターと一致しない場合、その Exception タイプのインターセプターは残りの例外すべてと一致します。

OnPrepareFailure

デッドレターキューにエクスチェンジを渡す前に、onPrepare オプションを使用してカスタムプロセッサーがエクスチェンジを準備できるようにすることができます。これにより、エクスチェンジ失敗の原因など、エクスチェンジに関する情報を追加できます。たとえば、以下のプロセッサーは例外メッセージが含まれるヘッダーを追加します。

public class MyPrepareProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
        exchange.getIn().setHeader("FailedBecause", cause.getMessage());
    }
}

以下のように、プロセッサーを使用するようにエラーハンドラーを設定できます。

errorHandler(deadLetterChannel("jms:dead").onPrepareFailure(new MyPrepareProcessor()));

ただし、onPrepare オプションは、デフォルトのエラーハンドラーを使用して使用することもできます。

<bean id="myPrepare"
class="org.apache.camel.processor.DeadLetterChannelOnPrepareTest.MyPrepareProcessor"/>

<errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="jms:dead" onPrepareFailureRef="myPrepare"/>