9.4. 例子

例如,您要放置一个新顺序,并在系统中有两个不同的服务:一个管理订单和一个管理学点。如果您有足够的学分,以逻辑方式排列一个订单。通过 Saga EIP,您可以将直接建模为 直接:buy route as a Saga aga aga aga aga EIP,一个用于创建顺序,另一个用于推行学。必须执行这两个操作,或记录 它们没有顺序,而没有字母顺序就可能被视为不一致的结果(以及没有订单的支付)。

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

对于剩余的示例,购买操作不会改变。用于为 New Order 和 Reserve Point 操作建模不同的选项如下:

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

这里的 propagation 模式被设置为 MANDATORY。此路由中的任何交换流都必须已经属于 Saga(在此示例中是该例中,因为 Saga 在 direct:buy 路由中创建)。direct:newOrder 路由声明了一个名为 direct:cancelOrder 的编译操作,负责撤销 Saga 取消的顺序。

每个交换总是包含一个 Exchange.SAGA_LONG_RUNNING_ACTION 标头,这里用作订单的 id。这标识在相应的编译操作中要删除的顺序,但它不是要求(选项可用作替代解决方案)。跳过 direct:newOrder 的过滤操作是 direct:cancelOrder,它如下所示:

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

当订购应该被取消时,Saga EIP 实施会自动调用。它不会以错误结尾。如果在 direct:cancelOrder 路由中引发错误,EIP 实施应定期重试,以将操作编译到特定限制。这意味着,任何处理操作都必须是幂等的,因此应该考虑它可以被多次触发,且不应在任何情况下失败。如果在所有重试后无法进行编译,则 Saga 实施应触发手动干预过程。

注意

这可能是因为执行 直接:newOrder 路由中的延迟而发生,因此 Saga 在平均时间里被另一方取消(因为并行路由出现错误或 Saga 一级的超时)。因此,当调用编译操作 direct:cancelOrder 时,它可能无法找到被取消的 Order 记录。为了保证完全全局一致性,任何主要操作及其对应的过滤操作都是合情,例如,在主操作 应有相同的影响前,如果对操作进行编译,这很重要。

另一种可能的解决方法是不可能在编译操作中持续失败,直到找到主操作生成的数据(或者重试次数上限)。这种方法可以在很多上下文中工作,但这是一个极端

支持服务的实施方式几乎与订购服务相同。

from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

调用 compensation 操作:

from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

这里,对信用卡保留的换算行动是一个退还。

9.4.1. 处理编译事件

当 Saga 完成后,需要进行一些处理。当发生错误并取消 Saga 时,会调用相应的端点。可以调用 完成端点,以便在成功完成 Saga 时进一步处理。例如,根据以上顺序服务,可能需要知道订单已完成(保留的信度)以实际开始准备订单。如果没有完成支付,我们不想开始准备订单(与大多数现代 CPU 一样,可让您访问保留内存后再确保您有权读它)。这可以通过直接修改的 version :newOrder 端点轻松完成

  1. 调用完整的端点:
from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");
  1. direct:cancelOrder 与上例中的相同。在成功完成时调用,如下所示:
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

Saga 完成后,订购将发送到 JMS 队列以进行准备。与处理操作一样,Saga 协调器(特别是针对错误,比如网络错误)可以多次调用完成操作。在本例中,侦听 prepareOrder JMS 队列的服务已准备好容纳可能的重复状态(请参阅 Idempent Consumer EIP EIP 如何处理重复)。

9.4.2. 使用自定义标识符和选项

您可以使用 Saga 选项来注册自定义标识符。例如,贡献度服务被重构,如下所示:

  1. 生成自定义 ID 并在正文中设置它,如下所示:
from("direct:reserveCredit")
  .bean(idService, "generateCustomId")
  .to("direct:creditReservation")
  1. 在编写操作中需要委派操作,并根据需要标记当前的正文。
from("direct:creditReservation")
  .saga()
  .propagation(SagaPropagation.SUPPORTS)
  .option("CreditId", body())
  .compensation("direct:creditRefund")
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
  1. 仅在 saga 被取消时从标头检索 creditId 选项。
from("direct:creditRefund")
  .transform(header("CreditId")) // retrieve the CreditId option from headers
  .bean(creditService, "refundCredit")
  .log("Credit for Custom Id ${body} refunded");

可以在 Saga 外部调用 direct:creditReservation 端点,方法是将 propagation 模式设置为 SUPPORTS。这样,可以在 Saga 路由中声明多个选项。

9.4.3. 设置超时

在 Saga EIP 中设置超时可确保 Saga 在机器发生故障时不会一直处于卡住状态。Saga EIP 实施在未明确指定它的所有 Saga EIP 上设置了默认超时。当超时过期时,Saga EIP 将决定 取消 Saga (并合约所有参与者),除非之前已进行了不同的决定。

在 Saga 参与者中可以按照如下方式设置超时:

from("direct:newOrder")
  .saga()
  .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    // ...
    .log("Order ${body} created");

所有报名者(如服务、订单服务)都可以设置自己的超时。当由 saga 组成时,这些超时的最小值会作为超时。也可以在 Saga 级别指定超时,如下所示:

from("direct:buy")
  .saga()
  .timeout(5, TimeUnit.MINUTES) // timeout at saga level
    .to("direct:newOrder")
    .to("direct:reserveCredit");

9.4.4. 选择传播

在上面的示例中,我们使用了 MANDATORYSUPPORTS 传播模式,也使用了 REQUIRED 传播模式,这是没有指定其他时使用的默认传播模式。这些传播模式映射 1:1 在事务上下文中使用的等效模式。

Propagation描述

必需

加入现有 Saga 或创建新 Saga(如果不存在)。

REQUIRES_NEW

始终创建一个新的 Saga。挂起旧的 Saga,并在新卷终止时恢复它。

必需

Saga 必须已经存在。现有 Saga 已加入。

支持

如果 Saga 已存在,请加入它。

NOT_SUPPORTED

如果 Saga 已存在,它会在当前块完成后暂停并恢复。

NEVER

在 Saga 中不能调用当前块。

9.4.5. 使用手动完成(高级)

当 Saga 无法以同步的方式执行,但需要它,例如:使用异步通信频道与外部服务通信时,无法将完成模式设置为 AUTO (默认),因为 Saga 在创建交换时没有完成。这通常是具有长执行时间(小时、天)的 Saga EIP。在这些情况下,应使用 MANUAL 完成模式。

from("direct:mysaga")
  .saga()
  .completionMode(SagaCompletionMode.MANUAL)
  .completion("direct:finalize")
  .timeout(2, TimeUnit.HOURS)
    .to("seda:newOrder")
    .to("seda:reserveCredit");

为 seda:newOrder 和 seda:reserveCredit 添加异步处理。这些将异步回调发送到 seda:operationCompleted。

from("seda:operationCompleted") // an asynchronous callback
  .saga()
  .propagation(SagaPropagation.MANDATORY)
    .bean(controlService, "actionExecuted")
    .choice()
      .when(body().isEqualTo("ok"))
        .to("saga:complete") // complete the current saga manually (saga component)
    .end()

您可以添加 direct:finalize 端点来执行最终操作。

将完成模式设置为 MANUAL 表示,当路由 direct:mysaga 中处理交换时,Saga 不会被完成,但最后会更长时间(最大持续时间设置为 2 小时)。当两个异步操作都完成后,Saga 已完成。要完成的调用是使用 Camel Saga Component saga:complete 端点完成的。有一个类似的端点用于手动处理 Saga(saga:compensate)。