Chapter 9. Saga EIP

9.1. Overview

The Saga EIP provides a way to define a series of related actions in a Camel route that can be either completed successfully or not executed or compensated. Saga implementations coordinate distributed services communicating using any transport towards a globally consistent outcome. Saga EIPs are different from classical ACID distributed (XA) transactions because the status of the different participating services is guaranteed to be consistent only at the end of the Saga and not in any intermediate step.

Saga EIPs are suitable for the use cases where usage of distributed transactions is discouraged. For example, services participating in a Saga are allowed to use any kind of datastore, such as classical databases or even NoSQL non-transactional datastores. They are also suitable for being used in stateless cloud services as they do not require a transaction log to be stored alongside the service. Saga EIPs are also not required to be completed in a small amount of time, because they don’t use database level locks, which is different from transactions. Hence they can live for a longer time span, from few seconds to several days.

Saga EIPs do not use locks on data. Instead they define the concept of Compensating Action, which is an action that should be executed when the standard flow encounters an error, with the purpose of restoring the status that was present before the flow execution. Compensating actions can be declared in Camel routes using the Java or XML DSL and are invoked by Camel only when needed (if the saga is canceled due to an error).

9.2. Saga EIP Options

The Saga EIP supports 6 options which are listed below:

NameDescriptionDefaultType

propagation

Set the Saga propagation mode (REQUIRED, REQUIRES_NEW, MANDATORY, SUPPORTS, NOT_SUPPORTED, NEVER).

REQUIRED

SagaPropagation

completionMode

Determine how the Saga should be considered complete. When set to AUTO, the Saga is completed when the exchange that initiates the Saga is processed successfully, or compensated when it completes exceptionally. When set to MANUAL, the user must complete or compensate the Saga using the saga:complete or saga:compensate endpoints.

AUTO

SagaCompletionMode

timeoutInMilliseconds

Set the maximum amount of time for the Saga. After the timeout is expired, the saga is compensated automatically (unless a different decision has been taken in the meantime).

 

Long

compensation

The compensation endpoint URI that must be called to compensate all changes done in the route. The route corresponding to the compensation URI must perform compensation and complete without error. If error occurs during compensation, the Saga service calls the compensation URI again to retry.

 

SagaActionUriDefinition

completion

The completion endpoint URI that is called when the Saga is completed successfully. The route corresponding to the completion URI must perform completion tasks and terminate without error. If error occurs during completion, the Saga service calls the completion URI again to retry.

 

SagaActionUriDefinition

option

Allows to save properties of the current exchange in order to reuse them in a compensation or completion callback route. Options are usually helpful, for example, to store and retrieve identifiers of objects that are deleted in compensating actions. Option values are transformed into input headers of the compensation/completion exchange.

 

List

9.3. Saga Service Configuration

The Saga EIP requires that a service implementing the interface org.apache.camel.saga.CamelSagaService is added to the Camel context. Camel currently supports the following Saga Service:

  • InMemorySagaService: This is a basic implementation of the Saga EIP that does not support advanced features (no remote context propagation, no consistency guarantee in case of application failure).

9.3.1. Using the In-Memory Saga Service

The In-memory Saga service is not recommended for production environments as it does not support persistence of the Saga status (it is kept only in-memory), so it cannot guarantee consistency of the Saga EIPs in case of application failure (for example, JVM crash). Also, when using a in-memory Saga service, Saga contexts cannot be propagated to remote services using transport-level headers (it can be done with other implementations). You can add the following code to customize the Camel context when you want to use the in-memory saga service. The service belongs to the camel-core module.

context.addService(new org.apache.camel.impl.saga.InMemorySagaService());

9.4. Examples

For example, you want to place a new order and you have two distinct services in your system: one managing the orders and one managing the credit. Logically you can place a order if you have enough credit for it. With the Saga EIP you can model the direct:buy route as a Saga composed of two distinct actions, one to create the order and one to take the credit. Both actions must be executed, or none of them as an order placed without credit can be considered a inconsistent outcome (as well as a payment without an order).

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

The buy action does not change for the rest of the examples. Different options that are used to model the New Order and Reserve Credit action are as follows:

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

Here the propagation mode is set to MANDATORY meaning that any exchange flowing in this route must be already part of a Saga (and it is the case in this example, since the Saga is created in the direct:buy route). The direct:newOrder route declares a compensating action that is called direct:cancelOrder, responsible for undoing the order in case the Saga is canceled.

Each exchange always contains a Exchange.SAGA_LONG_RUNNING_ACTION header that is used here as the id of the order. This identifies the order to delete in the corresponding compensating action, but it is not a requirement (options can be used as alternative solution). The compensating action of direct:newOrder is direct:cancelOrder and it is shown below:

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

It is called automatically by the Saga EIP implementation when the order should be cancelled. It does not terminate with an error. In case an error is thrown in the direct:cancelOrder route, the EIP implementation should periodically retry to execute the compensating action up to a certain limit. This means that any compensating action must be idempotent, so it should take into account that it may be triggered multiple times and should not fail in any case. If compensation cannot be done after all retries, a manual intervention process should be triggered by the Saga implementation.

Note

It may happen that due to a delay in the execution of the direct:newOrder route the Saga is cancelled by another party in the meantime (due to an error in a parallel route or a timeout at Saga level). So, when the compensating action direct:cancelOrder is called, it may not find the Order record that is cancelled. It is important, in order to guarantee full global consistency, that any main action and its corresponding compensating action are commutative, for example, if compensation occurs before the main action it should have the same effect.

Another possible approach, when using a commutative behavior is not possible, is to consistently fail in the compensating action until data produced by the main action is found (or the maximum number of retries is exhausted). This approach may work in many contexts, but it’s heuristic.

The credit service is implemented almost in the same way as the order service.

from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

Call on compensation action:

from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

Here the compensating action for a credit reservation is a refund.

9.4.1. Handling Completion Events

Some type of processing is required when the Saga is completed. Compensation endpoints are invoked when something wrong happens and the Saga is cancelled. The completion endpoints can be invoked to do further processing when the Saga is completed successfully. For example, in the order service above, we may need to know when the order is completed (and the credit reserved) to actually start preparing the order. We do not want to start to prepare the order if the payment is not done (unlike most modern CPUs that give you access to reserved memory before ensuring that you have rights to read it). This can be done easily with a modified version of the direct:newOrder endpoint:

  1. Invoke completeion endpoint:
from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");
  1. The direct:cancelOrder is the same as in the previous example. Call on the successful completion as follows:
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

When the Saga is completed, the order is sent to a JMS queue for preparation. Like compensating actions, also completion actions may be called multiple times by the Saga coordinator (especially in case of errors, like network errors). In this example, the service listening to the prepareOrder JMS queue is prepared to hold possible duplicates (see the Idempotent Consumer EIP for examples on how to handle duplicates).

9.4.2. Using Custom Identifiers and Options

You can use Saga options to register custom identifiers. For example, the credit service is refactored as follows:

  1. Generate a custom ID and set it in the body as follows:
from("direct:reserveCredit")
  .bean(idService, "generateCustomId")
  .to("direct:creditReservation")
  1. Delegate action and mark the current body as needed in the compensating action.
from("direct:creditReservation")
  .saga()
  .propagation(SagaPropagation.SUPPORTS)
  .option("CreditId", body())
  .compensation("direct:creditRefund")
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
  1. Retrieve the CreditId option from the headers only if the saga is cancelled.
from("direct:creditRefund")
  .transform(header("CreditId")) // retrieve the CreditId option from headers
  .bean(creditService, "refundCredit")
  .log("Credit for Custom Id ${body} refunded");

The direct:creditReservation endpoint can be called outside of the Saga, by setting the propagation mode to SUPPORTS. This way multiple options can be declared in a Saga route.

9.4.3. Setting Timeouts

Setting timeouts on Saga EIPs guarantees that a Saga does not remain stuck forever in the case of machine failure. The Saga EIP implementation has a default timeout set on all Saga EIPs that do not specify it explicitly. When the timeout expires, the Saga EIP will decide to cancel the Saga (and compensate all participants), unless a different decision has been taken before.

Timeouts can be set on Saga participants as follows:

from("direct:newOrder")
  .saga()
  .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    // ...
    .log("Order ${body} created");

All participants (for example, credit service, order service) can set their own timeout. The minimum value of those timeouts is taken as timeout for the saga when they are composed together. A timeout can also be specified at the Saga level as follows:

from("direct:buy")
  .saga()
  .timeout(5, TimeUnit.MINUTES) // timeout at saga level
    .to("direct:newOrder")
    .to("direct:reserveCredit");

9.4.4. Choosing Propagation

In the examples above, we have used the MANDATORY and SUPPORTS propagation modes, but also the REQUIRED propagation mode, that is the default propagation used when nothing else is specified. These propagation modes map 1:1 the equivalent modes used in transactional contexts.

PropagationDescription

REQUIRED

Join the existing Saga or create a new one if it does not exist.

REQUIRES_NEW

Always create a new Saga. Suspend the old Saga and resume it when the new one terminates.

MANDATORY

A Saga must be already present. The existing Saga is joined.

SUPPORTS

If a Saga already exists, then join it.

NOT_SUPPORTED

If a Saga already exists, it is suspended and resumed when the current block completes.

NEVER

The current block must never be invoked within a Saga.

9.4.5. Using Manual Completion (Advanced)

When a Saga cannot be all executed in a synchronous way, but it requires, for example, communication with external services using asynchronous communication channels, then the completion mode cannot be set to AUTO (default), because the Saga is not completed when the exchange that creates it is done. This is often the case for the Saga EIPs that have long execution times (hours, days). In these cases, the MANUAL completion mode should be used.

from("direct:mysaga")
  .saga()
  .completionMode(SagaCompletionMode.MANUAL)
  .completion("direct:finalize")
  .timeout(2, TimeUnit.HOURS)
    .to("seda:newOrder")
    .to("seda:reserveCredit");

Add the asynchronous processing for seda:newOrder and seda:reserveCredit. These send the asynchronous callbacks to seda:operationCompleted.

from("seda:operationCompleted") // an asynchronous callback
  .saga()
  .propagation(SagaPropagation.MANDATORY)
    .bean(controlService, "actionExecuted")
    .choice()
      .when(body().isEqualTo("ok"))
        .to("saga:complete") // complete the current saga manually (saga component)
    .end()

You can add the direct:finalize endpoint to execute final actions.

Setting the completion mode to MANUAL means that the Saga is not completed when the exchange is processed in the route direct:mysaga but it will last longer (max duration is set to 2 hours). When both asynchronous actions are completed the Saga is completed. The call to complete is done using the Camel Saga Component’s saga:complete endpoint. There is a similar endpoint for manually compensating the Saga (saga:compensate).

9.5. XML Configuration

Saga features are available for users that want to use the XML configuration. The following snippet shows an example:

<route>
  <from uri="direct:start"/>
  <saga>
    <compensation uri="direct:compensation" />
    <completion uri="direct:completion" />
    <option optionName="myOptionKey">
      <constant>myOptionValue</constant>
    </option>
    <option optionName="myOptionKey2">
      <constant>myOptionValue2</constant>
    </option>
  </saga>
  <to uri="direct:action1" />
  <to uri="direct:action2" />
</route>