9.4. 例

たとえば、新規注文を行うとします。システムには異なる 2 つのサービスである、注文を管理するサービスとクレジット (残高) を管理するサービスがあるとします。ロジックとしては、十分なクレジット (残高) がある場合には注文を行うことができます。Saga EIP を使用すると、direct:buy ルートを 2 つの異なるアクションで設定される Saga としてモデル化できます。1 つ目は注文の作成、もう 1 つ目はクレジット (残高) の確保です。どちらのアクションも実行されるか、またはどちらも実行されないかのいずれかである必要があります。クレジット (残高) がないのに注文が行われるのは、不整合な結果と見なされるからです (注文がないのに支払いが行われるのも同様です)。

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

この Buy アクションについては、これ以降の例の中で変更はありません。New Order および Reserve Credit アクションをモデリングする上で使用されるオプションは以下のとおりです。

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

ここでは、伝搬モードは MANDATORY に設定されています。これは、このルートを流れるエクスチェンジはすでに Saga の一部でなければならないことを意味します (この例では、direct:buy ルートで Saga が作成されているため条件を満たします)。direct:newOrder ルートは direct:cancelOrder と呼ばれる補正アクションを宣言しており、Saga がキャンセルされた場合に注文の取り消しを行います。

各エクスチェンジには常に Exchange.SAGA_LONG_RUNNING_ACTION ヘッダーが含まれ、ここでは注文の ID として使用されます。この ID は対応する補正アクション内で削除すべき注文を特定するのに使われますが、必須ではありません (オプションで代替ソリューションを使用できます) 。direct:newOrder の補正アクションは direct:cancelOrder で、以下のようになります。

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

このアクションは、注文を取り消す必要があるときに Saga EIP 実装によって自動的に呼び出されます。このアクションはエラーで終了することはありません。direct:cancelOrder ルートでエラーが出力された場合、EIP 実装は一定の上限回数まで補正アクションの実行を定期的に再試行します。つまり、補正アクションはべき等でなければならないことを意味します。アクションが複数回トリガーされる可能性を考慮に入れる必要があり、またどのような場合でも失敗しないようにする必要があります。再試行回数の上限に達しても補正アクションが終わらなかった場合には、手動による介入プロセスを Saga 実装からトリガーする必要があります。

注記

direct:newOrder ルートの実行に遅延が生じたために、その間に他の参加者によって Saga がキャンセルされることがあります (並列実行中のルートでのエラーや、Saga レベルでのタイムアウトなど)。そのため、補正アクション direct:cancelOrder が呼び出されたときには、キャンセルされた Order レコードが見つからないことがあります。完全にグローバルな一貫性を保証するためには、主となるアクションとそれに対応した補正アクションが可換であることが重要です。たとえば、もし補正が主となるアクションより前に実行されたとしても、同じ結果となる必要があります。

もう 1 つの取りうるアプローチは、振る舞いを可換にするのが不可能な場合に、主となるアクションで生成されるデータが見つかるまで (または最大再試行回数に到達するまで) 補正アクションの中で失敗し続けるようにすることです。このアプローチは多くの状況で機能するかもしれませんが、ヒューリスティックになります。

クレジット (残高) サービスは、注文サービスとほぼ同様に実装されます。

from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

補正アクションは以下のとおりです。

from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

ここでは、クレジットの予約に対する補正アクションは予約解除 (refund) です。

9.4.1. 完了イベントの取り扱い

Saga の完了時には、何らかの処理が必要となります。何らかの問題が発生して Saga がキャンセルされた場合には、補正エンドポイントが呼び出されます。Saga が正常に完了した場合は、完了エンドポイント を呼び出して追加の処理を行うことができます。たとえば、上記の注文サービスでは、実際に注文の準備を開始するために、注文がいつ完了したか (そしてクレジット (残高) が予約されたか) を知る必要があることがあります。支払いが完了していないのに、注文の準備を開始したくはありません (最新の CPU のように、読み込み権限を確認する前に予約メモリーへのアクセス権を与えてしまうのとは異なります) 。これは、direct:newOrder エンドポイントを変更すると簡単に実現できます。

  1. 完了エンドポイントを呼び出します。
from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");
  1. direct:cancelOrder は直前の例と同じです。正常に完了した場合は以下のとおりです。
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

Saga が完了すると、注文は準備のために JMS キューに送信されます。補正アクションと同様、完了アクションも Saga のコーディネーターによって複数回呼び出される可能性があります (特にネットワークエラーなどのエラーが発生した場合) 。この例では、 prepareOrder JMS キューをリッスンするサービスが重複を受け取る可能性について備えています (重複をどのように扱うかについての例は、Idempotent Consumer EIP を参照してください)。