9.4. 例

たとえば、新規注文を行うとします。システムには異なる 2 つのサービスである、注文を管理するサービスとクレジット (残高) を管理するサービスがあるとします。ロジックとしては、十分なクレジット (残高) がある場合には注文を行うことができます。Saga EIP を使用すると、direct:buy ルートを 2 つの異なるアクションで設定される Saga としてモデル化できます。1 つ目は注文の作成、もう 1 つ目はクレジット (残高) の確保です。どちらのアクションも実行されるか、またはどちらも実行されないかのいずれかである必要があります。クレジット (残高) がないのに注文が行われるのは、不整合な結果と見なされるからです (注文がないのに支払いが行われるのも同様です)。

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

この Buy アクションについては、これ以降の例の中で変更はありません。New Order および Reserve Credit アクションをモデリングする上で使用されるオプションは以下のとおりです。

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

ここでは、伝搬モードは MANDATORY に設定されています。これは、このルートを流れるエクスチェンジはすでに Saga の一部でなければならないことを意味します (この例では、direct:buy ルートで Saga が作成されているため条件を満たします)。direct:newOrder ルートは direct:cancelOrder と呼ばれる補正アクションを宣言しており、Saga がキャンセルされた場合に注文の取り消しを行います。

各エクスチェンジには常に Exchange.SAGA_LONG_RUNNING_ACTION ヘッダーが含まれ、ここでは注文の ID として使用されます。この ID は対応する補正アクション内で削除すべき注文を特定するのに使われますが、必須ではありません (オプションで代替ソリューションを使用できます) 。direct:newOrder の補正アクションは direct:cancelOrder で、以下のようになります。

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

このアクションは、注文を取り消す必要があるときに Saga EIP 実装によって自動的に呼び出されます。このアクションはエラーで終了することはありません。direct:cancelOrder ルートでエラーが出力された場合、EIP 実装は一定の上限回数まで補正アクションの実行を定期的に再試行します。つまり、補正アクションはべき等でなければならないことを意味します。アクションが複数回トリガーされる可能性を考慮に入れる必要があり、またどのような場合でも失敗しないようにする必要があります。再試行回数の上限に達しても補正アクションが終わらなかった場合には、手動による介入プロセスを Saga 実装からトリガーする必要があります。

注記

direct:newOrder ルートの実行に遅延が生じたために、その間に他の参加者によって Saga がキャンセルされることがあります (並列実行中のルートでのエラーや、Saga レベルでのタイムアウトなど)。そのため、補正アクション direct:cancelOrder が呼び出されたときには、キャンセルされた Order レコードが見つからないことがあります。完全にグローバルな一貫性を保証するためには、主となるアクションとそれに対応した補正アクションが可換であることが重要です。たとえば、もし補正が主となるアクションより前に実行されたとしても、同じ結果となる必要があります。

もう 1 つの取りうるアプローチは、振る舞いを可換にするのが不可能な場合に、主となるアクションで生成されるデータが見つかるまで (または最大再試行回数に到達するまで) 補正アクションの中で失敗し続けるようにすることです。このアプローチは多くの状況で機能するかもしれませんが、ヒューリスティックになります。

クレジット (残高) サービスは、注文サービスとほぼ同様に実装されます。

from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

補正アクションは以下のとおりです。

from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

ここでは、クレジットの予約に対する補正アクションは予約解除 (refund) です。

9.4.1. 完了イベントの取り扱い

Saga の完了時には、何らかの処理が必要となります。何らかの問題が発生して Saga がキャンセルされた場合には、補正エンドポイントが呼び出されます。Saga が正常に完了した場合は、完了エンドポイント を呼び出して追加の処理を行うことができます。たとえば、上記の注文サービスでは、実際に注文の準備を開始するために、注文がいつ完了したか (そしてクレジット (残高) が予約されたか) を知る必要があることがあります。支払いが完了していないのに、注文の準備を開始したくはありません (最新の CPU のように、読み込み権限を確認する前に予約メモリーへのアクセス権を与えてしまうのとは異なります) 。これは、direct:newOrder エンドポイントを変更すると簡単に実現できます。

  1. 完了エンドポイントを呼び出します。
from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");
  1. direct:cancelOrder は直前の例と同じです。正常に完了した場合は以下のとおりです。
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

Saga が完了すると、注文は準備のために JMS キューに送信されます。補正アクションと同様、完了アクションも Saga のコーディネーターによって複数回呼び出される可能性があります (特にネットワークエラーなどのエラーが発生した場合) 。この例では、 prepareOrder JMS キューをリッスンするサービスが重複を受け取る可能性について備えています (重複をどのように扱うかについての例は、Idempotent Consumer EIP を参照してください)。

9.4.2. カスタム識別子とオプションの使用

Saga のいくつかのオプションを使用してカスタム識別子を登録することができます。たとえば、クレジット (残高) サービスは以下のようにリファクタリングされます。

  1. 以下のように、カスタム ID を生成してボディーに設定します。
from("direct:reserveCredit")
  .bean(idService, "generateCustomId")
  .to("direct:creditReservation")
  1. アクションを委譲し、現在のボディーを補正アクションに必要なものとしてマークします。
from("direct:creditReservation")
  .saga()
  .propagation(SagaPropagation.SUPPORTS)
  .option("CreditId", body())
  .compensation("direct:creditRefund")
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
  1. Saga がキャンセルされた場合のみ、ヘッダーから CreditId オプションを取得します。
from("direct:creditRefund")
  .transform(header("CreditId")) // retrieve the CreditId option from headers
  .bean(creditService, "refundCredit")
  .log("Credit for Custom Id ${body} refunded");

direct:creditReservation エンドポイントは、伝搬モードを SUPPORTS に設定することで Saga の外から呼び出すことができます。このようにして、複数のオプションを 1 つの Saga ルートの中で宣言できます。

9.4.3. タイムアウトの設定

Saga EIP でタイムアウトを設定することで、マシン障害の発生時に Saga が永久に停止したままにならないことが保証されます。Saga EIP の実装では、明示的にタイムアウトを指定していないすべての Saga EIP に対してデフォルトのタイムアウトが設定されます。タイムアウトの期限が切れると、Saga EIP はそれ以前に異なる決定がなされていない限り、Saga のキャンセル (およびすべての参加者への補正) を決定します。

タイムアウトは、Saga の参加者に対して以下のように設定できます。

from("direct:newOrder")
  .saga()
  .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    // ...
    .log("Order ${body} created");

すべての参加者 (クレジット (残高) サービス、注文サービスなど) は、それぞれ独自のタイムアウトを設定できます。これらの参加者が 1 つに設定されたときに、それらのタイムアウトの中の最小値が Saga のタイムアウトとなります。タイムアウトは、以下のように Saga レベルで指定することもできます。

from("direct:buy")
  .saga()
  .timeout(5, TimeUnit.MINUTES) // timeout at saga level
    .to("direct:newOrder")
    .to("direct:reserveCredit");

9.4.4. 伝播方法の選択

上記の例では、MANDATORY および SUPPORTS の伝搬モードを使用していますが、他に何も指定されなかった場合に使用されるデフォルトの伝播モードである REQUIRED もあります。これらの伝搬モードは、トランザクションの文脈で使用される同等のモードと 1 対 1 に対応します。

伝搬方法説明

REQUIRED

既存の Saga に参加するか、または存在しない場合は新しい Saga を作成します。

REQUIRES_NEW

常に新しい Saga を作成します。古い Saga は一時停止し、新しい Saga が終了したときに再開します。

MANDATORY

すでに Saga が存在している必要があります。既存の Saga に参加します。

SUPPORTS

Saga がすでに存在している場合は参加します。

NOT_SUPPORTED

Saga がすでに存在している場合は一時停止し、現在のブロックが完了したときに再開します。

NEVER

現在のブロックを Saga 内で呼び出すことはできません。

9.4.5. 手動完了の使用 (高度な設定)

Saga をすべて同期的に実行できず、たとえば非同期通信チャネルを使用した外部サービスとの通信などが必要となる場合には、完了モードを AUTO (デフォルト) には設定できません。これは、Saga を作成したエクスチェンジが完了した時点ではその Saga は完了していないためです。実行期間が長い (数時間、数日) Saga EIP では、このようなことがよくあります。このような場合には、MANUAL 完了モードを使用する必要があります。

from("direct:mysaga")
  .saga()
  .completionMode(SagaCompletionMode.MANUAL)
  .completion("direct:finalize")
  .timeout(2, TimeUnit.HOURS)
    .to("seda:newOrder")
    .to("seda:reserveCredit");

seda:newOrder および seda:reserveCredit に非同期の処理を追加します。これらは seda:operationCompleted に非同期のコールバックを送信します。

from("seda:operationCompleted") // an asynchronous callback
  .saga()
  .propagation(SagaPropagation.MANDATORY)
    .bean(controlService, "actionExecuted")
    .choice()
      .when(body().isEqualTo("ok"))
        .to("saga:complete") // complete the current saga manually (saga component)
    .end()

direct:finalize エンドポイントを追加することで、最終のアクションを実行できます。

完了モードを MANUAL に設定すると、ルート direct:mysaga でエクスチェンジが処理されても Saga が完了せずに持続します (最大持続時間は 2 時間に設定されます)。非同期アクションが両方とも完了することで、Saga が完了します。完了の呼び出しは、Camel Saga コンポーネントの saga :complete エンドポイントを使用して行われます。手動で Saga を補正するための同様のエンドポイントがあります (saga:compensate) 。