9.4. 例子

例如,您想要在系统中放置新顺序,并且在您的系统上有两个不同的服务:一个管理订单和一个管理该信用。逻辑上,如果您有足够的信用,可以放置一个订购。通过 Saga EIP,您可以对 直接:buy 路由建模为由两个不同的操作的 Saga 组成,一个用于创建顺序,另一个用于取信。这两个操作都必须执行,或者任何 一个没有信用的订单都被视为不一致的结果(以及没有订单的付款)。

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

对于其余示例,购买操作不会改变。用于对 New Order 和 Reserve Credit 操作建模的不同选项如下:

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

此处的传播模式被设置为 MANDATORY 意味着此路由中的任何交换流都必须是 Saga 的一部分(在本例中是如此,因为 Saga 在 direct:buy 路由中创建)。direct:newOrder 路由声明了一个名为 direct:cancelOrder 的操作,在 Saga 被取消时负责撤销顺序。

每个交换始终都包含一个 Exchange.SAGA_LONG_RUNNING_ACTION 标头,该标头在此处用作顺序的 id。这标识了在相应的补偿操作中删除的顺序,但这不是要求(选项可用作替代解决方案)。direct:newOrder 的 compensating 操作为 direct:cancelOrder,它如下所示:

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

在应取消订单时,Saga EIP 实现会自动调用它。它不会终止并显示错误。如果在 direct:cancelOrder 路由中抛出错误,则 EIP 实现应定期重试,以执行对特定限制的补偿操作。这意味着,任何补偿操作都必须是幂等的,因此应该考虑它可能会多次触发,任何情况都不应该失败。如果在所有重试后无法执行补偿,则 Saga 实现应触发手动干预过程。

注意

这可能是因为 direct:newOrder 路由执行延迟导致 Saga 在平均时间被另一方取消(由于并行路由中的错误或 Saga 级别超时)。因此,当调用 compensating action direct:cancelOrder 时,它可能无法找到取消的 Order 记录。务必要保证完全的全局一致性,例如,任何主要操作及其相应的补偿操作在主操作之前发生,例如,在应有同样效果的主要操作前进行。

另一种可能的方法(无法使用建议行为时),在找到主操作生成的数据之前,持续失败,直到主操作生成的数据被耗尽(或最大重试次数被耗尽)。这种方法可能在很多环境中工作,但其实质上是 个体

信用服务几乎与订购服务相同。

from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

对 compensation 操作调用:

from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

此处为信用订票的补偿措施是退款的。

9.4.1. 处理完成事件

Saga 完成后,需要执行某些类型处理。当发生错误并且 Saga 被取消时,会调用补偿端点。当 Saga 成功完成时,可以调用完成端点来进一步处理。例如,在以上顺序服务中,我们可能需要知道订单何时完成(以及保留信用)来实际开始准备订单。如果没有付款,我们不希望开始准备订单(与大多数现代 CPU 不同,在确保您有权阅读前为您提供保留内存的访问权限)。这可以通过 direct:newOrder 端点的修改版本轻松完成:

  1. 调用完整端点:
from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");
  1. direct:cancelOrder 与上例中的相同。按如下方式调用成功完成:
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

完成 Saga 后,顺序将发送到 JMS 队列以进行准备。与补偿操作一样,Saga coordinator 可以多次调用完成操作(特别是出现错误时,如网络错误)。在本例中,侦听 prepareOrder JMS 队列的服务已准备好保存可能的重复数据(请参阅 Idempotent Consumer EIP 以了解如何处理重复的示例)。

9.4.2. 使用自定义标识符和选项

您可以使用 Saga 选项来注册自定义标识符。例如,信用服务被重构,如下所示:

  1. 生成自定义 ID 并在正文中设置它,如下所示:
from("direct:reserveCredit")
  .bean(idService, "generateCustomId")
  .to("direct:creditReservation")
  1. 在补偿操作中,根据需要委派操作并标记当前的正文。
from("direct:creditReservation")
  .saga()
  .propagation(SagaPropagation.SUPPORTS)
  .option("CreditId", body())
  .compensation("direct:creditRefund")
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
  1. 仅在取消 saga 时从标头中检索 creditId 选项。
from("direct:creditRefund")
  .transform(header("CreditId")) // retrieve the CreditId option from headers
  .bean(creditService, "refundCredit")
  .log("Credit for Custom Id ${body} refunded");

direct:creditReservation 端点可以在 Saga 之外调用,方法是将传播模式设置为 SUPPORTS。这样,可在 Saga 路由中声明多个选项。

9.4.3. 设置超时

在 Saga EIPs 上设置超时可确保 Saga 在出现机器故障时不会永久卡住。Saga EIP 实现在所有未明确指定的 Saga EIP 上设置了一个默认超时。当超时到期时,Saga EIP 将决定 取消 Saga (并补偿所有参与者),除非之前已进行了不同的决定。

Saga 参与中可以设置超时,如下所示:

from("direct:newOrder")
  .saga()
  .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    // ...
    .log("Order ${body} created");

所有参与者(例如,信用服务、订单服务)都可以设置自己的超时。这些超时的最小值是 saga 的超时时间,当它们被组成时。也可以在 Saga 级别中指定超时,如下所示:

from("direct:buy")
  .saga()
  .timeout(5, TimeUnit.MINUTES) // timeout at saga level
    .to("direct:newOrder")
    .to("direct:reserveCredit");

9.4.4. 选择传播

在上例中,我们使用了 MANDATORYSUPPORTS 传播模式,但也使用 REQUIRED 传播模式,这是未指定其他时所用的默认传播模式。这些传播模式映射了 1:1,这是事务上下文中使用的等效模式。

propagation描述

必需

加入现有的 Saga 或创建新 Saga (如果不存在)。

REQUIRES_NEW

始终创建新 Saga。挂起旧的 Saga,并在新终止时恢复它。

必需

必须已存在 Saga。现有 Saga 已加入。

支持

如果 Saga 已存在,则加入它。

NOT_SUPPORTED

如果 Saga 已存在,它会在当前块完成后暂停并恢复。

NEVER

当前块不能在 Saga 中调用。

9.4.5. 使用手动完成(高级)

当 Saga 无法以同步方式执行时,但需要与使用异步通信通道的外部服务通信时,无法将完成模式设置为 AUTO (默认),因为在创建的交换时 Saga 不会完成。对于 Saga EIP,通常会有较长的执行时间(小时、天)。在这些情况下,应使用 MANUAL 完成模式。

from("direct:mysaga")
  .saga()
  .completionMode(SagaCompletionMode.MANUAL)
  .completion("direct:finalize")
  .timeout(2, TimeUnit.HOURS)
    .to("seda:newOrder")
    .to("seda:reserveCredit");

为 seda:newOrder 和 seda:reserveCredit 添加异步处理。它们将异步回调发送到 seda:operationCompleted。

from("seda:operationCompleted") // an asynchronous callback
  .saga()
  .propagation(SagaPropagation.MANDATORY)
    .bean(controlService, "actionExecuted")
    .choice()
      .when(body().isEqualTo("ok"))
        .to("saga:complete") // complete the current saga manually (saga component)
    .end()

您可以添加 direct:finalize 端点来执行最终操作。

将完成模式设置为 MANUAL 表示当交换在路由 direct:mysaga 中处理时,不会完成 Saga,但它将持续更长(最大持续时间设置为 2 小时)。当两个异步操作都完成时,Saga 已完成。调用完成方法是使用 Camel Saga 组件的 saga:complete 端点完成。还有类似的端点用于手动补偿 Saga (saga:compensate)。