Chapter 9. AMQ Streams and Kafka upgrades

AMQ Streams can be upgraded with no cluster downtime. Each version of AMQ Streams supports one or more versions of Apache Kafka: you can upgrade to a higher Kafka version as long as it is supported by your version of AMQ Streams. In some cases, you can also downgrade to a lower supported Kafka version.

Newer versions of AMQ Streams may support newer versions of Kafka, but you need to upgrade AMQ Streams before you can upgrade to a higher supported Kafka version.

9.1. Upgrade prerequisites

Before you begin the upgrade process, make sure that:

9.2. Upgrade process

Upgrading AMQ Streams is a two-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the following order:

  1. Update your Cluster Operator to the latest AMQ Streams version.

  2. Upgrade all Kafka brokers and client applications to the latest Kafka version.

9.3. Kafka versions

AMQ Streams is based on a specific version of Apache Kafka.

AMQ Streams versionKafka version

1.2

2.2.1

Kafka’s log message format version and inter-broker protocol version specify the log format version appended to messages and the version of protocol used in a cluster. As a result, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers) to ensure the correct versions are used.

The following table shows the differences between Kafka versions:

Kafka version

Interbroker protocol version

Log message format version

Zookeeper version

2.1.1

2.1

2.1

3.4.13

2.2.1

2.2

2.2

3.4.14

Although Kafka versions may use the same version of Zookeeper, it is recommended that you update your Zookeeper cluster to use the newest Zookeeper binaries before proceeding with the main AMQ Streams upgrade.

Message format version

When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages include a version identifying which version of the format they were encoded with. You can configure a Kafka broker to convert messages from newer format versions to a given older format version before the broker appends the message to the log.

In Kafka, there are two different methods for setting the message format version:

  • The log.message.format.version property is set on Kafka brokers.
  • The message.format.version property is set on topics.

The default value of message.format.version for a topic is defined by the log.message.format.version that is set on the Kafka broker. You can manually set the message.format.version of a topic by modifying its topic configuration.

The upgrade tasks in this section assume that the message format version is defined by the log.message.format.version.

9.4. Upgrading the Cluster Operator

The steps to upgrade your Cluster Operator deployment to use AMQ Streams 1.2 are outlined in this section.

The availability of Kafka clusters managed by the Cluster Operator is not affected by the upgrade operation.

Note

Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.

9.4.1. Upgrading the Cluster Operator to a later version

This procedure describes how to upgrade a Cluster Operator deployment to a later version.

Prerequisites

  • An existing Cluster Operator deployment.

Procedure

  1. Backup the existing Cluster Operator resources:

    oc get all -l app=strimzi -o yaml > strimzi-backup.yaml
  2. Update the Cluster Operator.

    Modify the installation files according to the OpenShift project or Kubernetes namespace the Cluster Operator is running in.

    On Linux, use:

    sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    On MacOS, use:

    sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    If you modified one or more environment variables in your existing Cluster Operator Deployment, edit the install/cluster-operator/050-Deployment-cluster-operator.yaml file to reflect the changes that you made in the new version of the Cluster Operator.

  3. When you have an updated configuration, deploy it along with the rest of the install resources:

    oc apply -f install/cluster-operator

    Wait for the rolling updates to complete.

  4. Get the image for the Kafka pod to ensure the upgrade was successful:

    oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    The image tag shows the new AMQ Streams version followed by the Kafka version. For example, <New AMQ Streams version>-kafka-<Current Kafka version>.

  5. Update existing resources to handle deprecated custom resource properties.

You now have an updated Cluster Operator, but the version of Kafka running in the cluster it manages is unchanged.

What to do next

Following the Cluster Operator upgrade, you can perform a Kafka upgrade.

9.5. Upgrading Kafka

After you have upgraded your Cluster Operator, you can upgrade your brokers to a higher supported version of Kafka.

Kafka upgrades are performed using the Cluster Operator. How the Cluster Operator performs an upgrade depends on the differences between versions of:

  • Interbroker protocol
  • Log message format
  • ZooKeeper

When the versions are the same for the current and target Kafka version, as is typically the case for a patch level upgrade, the Cluster Operator can upgrade through a single rolling update of the Kafka brokers.

When one or more of these versions differ, the Cluster Operator requires two or three rolling updates of the Kafka brokers to perform the upgrade.

9.5.1. Kafka version and image mappings

When upgrading Kafka, consider your settings for the STRIMZI_KAFKA_IMAGES and Kafka.spec.kafka.version properties.

  • Each Kafka resource can be configured with a Kafka.spec.kafka.version.
  • The Cluster Operator’s STRIMZI_KAFKA_IMAGES environment variable provides a mapping between the Kafka version and the image to be used when that version is requested in a given Kafka resource.

    • If Kafka.spec.kafka.image is not configured, the default image for the given version is used.
    • If Kafka.spec.kafka.image is configured, the default image is overridden.
Warning

The Cluster Operator cannot validate that an image actually contains a Kafka broker of the expected version. Take care to ensure that the given image corresponds to the given Kafka version.

9.5.2. Strategies for upgrading clients

The best approach to upgrading your client applications (including Kafka Connect connectors) depends on your particular circumstances.

Consuming applications need to receive messages in a message format that they understand. You can ensure that this is the case in one of two ways:

  • By upgrading all the consumers for a topic before upgrading any of the producers.
  • By having the brokers down-convert messages to an older format.

Using broker down-conversion puts extra load on the brokers, so it is not ideal to rely on down-conversion for all topics for a prolonged period of time. For brokers to perform optimally they should not be down converting messages at all.

Broker down-conversion is configured in two ways:

  • The topic-level message.format.version configures it for a single topic.
  • The broker-level log.message.format.version is the default for topics that do not have the topic-level message.format.version configured.

Messages published to a topic in a new-version format will be visible to consumers, because brokers perform down-conversion when they receive messages from producers, not when they are sent to consumers.

There are a number of strategies you can use to upgrade your clients:

Consumers first
  1. Upgrade all the consuming applications.
  2. Change the broker-level log.message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy is straightforward, and avoids any broker down-conversion. However, it assumes that all consumers in your organization can be upgraded in a coordinated way, and it does not work for applications that are both consumers and producers. There is also a risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log so that you cannot revert to the previous consumer version.

Per-topic consumers first

For each topic:

  1. Upgrade all the consuming applications.
  2. Change the topic-level message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy avoids any broker down-conversion, and means you can proceed on a topic-by-topic basis. It does not work for applications that are both consumers and producers of the same topic. Again, it has the risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log.

Per-topic consumers first, with down conversion

For each topic:

  1. Change the topic-level message.format.version to the old version (or rely on the topic defaulting to the broker-level log.message.format.version).
  2. Upgrade all the consuming and producing applications.
  3. Verify that the upgraded applications function correctly.
  4. Change the topic-level message.format.version to the new version.

    This strategy requires broker down-conversion, but the load on the brokers is minimized because it is only required for a single topic (or small group of topics) at a time. It also works for applications that are both consumers and producers of the same topic. This approach ensures that the upgraded producers and consumers are working correctly before you commit to using the new message format version.

    The main drawback of this approach is that it can be complicated to manage in a cluster with many topics and applications.

Other strategies for upgrading client applications are also possible.

Note

It is also possible to apply multiple strategies. For example, for the first few applications and topics the "per-topic consumers first, with down conversion" strategy can be used. When this has proved successful another, more efficient strategy can be considered acceptable to use instead.

9.5.3. Upgrading Kafka brokers and client applications

This procedure describes how to upgrade a AMQ Streams Kafka cluster to a higher version of Kafka.

Prerequisites

For the Kafka resource to be upgraded, check:

  • The Cluster Operator, which supports both versions of Kafka, is up and running.
  • The Kafka.spec.kafka.config does not contain options that are not supported in the version of Kafka that you are upgrading to.
  • Whether the log.message.format.version for the current Kafka version needs to be updated for the new version.

    Consult the Kafka versions table.

Procedure

  1. Update the Kafka cluster configuration in an editor, as required:

    On OpenShift, use:

    oc edit kafka my-cluster
    1. If the log.message.format.version of the current Kafka version is the same as that of the new Kafka version, proceed to the next step.

      Otherwise, ensure that Kafka.spec.kafka.config has the log.message.format.version configured to the default for the current version.

      For example, if upgrading from Kafka 2.1.1:

      kind: Kafka
      spec:
        # ...
        kafka:
          version: 2.1.1
          config:
            log.message.format.version: "2.1"
            # ...

      If the log.message.format.version is unset, set it to the current version.

      Note

      The value of log.message.format.version must be a string to prevent it from being interpreted as a floating point number.

    2. Change the Kafka.spec.kafka.version to specify the new version (leaving the log.message.format.version as the current version).

      For example, if upgrading from Kafka 2.1.1 to 2.2.1:

      apiVersion: v1alpha1
      kind: Kafka
      spec:
        # ...
        kafka:
          version: 2.2.1 1
          config:
            log.message.format.version: "2.1" 2
            # ...
      1
      This is changed to the new version
      2
      This remains at the current version
    3. If the image for the Kafka version is different from the image defined in STRIMZI_KAFKA_IMAGES for the Cluster Operator, update Kafka.spec.kafka.image.

      See Section 9.5.1, “Kafka version and image mappings”

  2. Save and exit the editor, then wait for rolling updates to complete.

    Check the update in the logs or by watching the pod state transitions:

    On OpenShift, use:

    oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version upgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
    oc get po -w

    If the current and new versions of Kafka have different interbroker protocol versions, check the Cluster Operator logs for an INFO level message:

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 2 of 2 completed

    Alternatively, if the current and new versions of Kafka have the same interbroker protocol version, check for:

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 1 of 1 completed

    The rolling updates:

    • Ensure each pod is using the broker binaries for the new version of Kafka
    • Configure the brokers to send messages using the interbroker protocol of the new version of Kafka

      Note

      Clients are still using the old version, so brokers will convert messages to the old version before sending them to the clients. To minimize this additional load, updates the clients as quickly as possible.

  3. Depending on your chosen strategy for upgrading clients, upgrade all client applications to use the new version of the client binaries.

    See Section 9.5.2, “Strategies for upgrading clients”

    Warning

    You cannot downgrade after completing this step. If you need to revert the update at this point, follow the procedure Section 9.6.2, “Downgrading Kafka brokers and client applications”.

    If required, set the version property for Kafka Connect and Mirror Maker as the new version of Kafka:

    1. For Kafka Connect, update KafkaConnect.spec.version
    2. For MIrror Maker, update KafkaMirrorMaker.spec.version
  4. If the log.message.format.version identified in step 1 is the same as the new version proceed to the next step.

    Otherwise change the log.message.format.version in Kafka.spec.kafka.config to the default version for the new version of Kafka now being used.

    For example, if upgrading to 2.2.1:

    apiVersion: v1alpha1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.2.1
        config:
          log.message.format.version: "2.2"
          # ...
  5. Wait for the Cluster Operator to update the cluster.

    The Kafka cluster and clients are now using the new Kafka version.

Additional resources

9.6. Downgrading Kafka

Kafka version downgrades are performed using the Cluster Operator.

Whether and how the Cluster Operator performs a downgrade depends on the differences between versions of:

  • Interbroker protocol
  • Log message format
  • Zookeeper

9.6.1. Target downgrade version

How the Cluster Operator handles a downgrade operation depends on the log.message.format.version.

  • If the target downgrade version of Kafka has the same log.message.format.version as the current version, the Cluster Operator downgrades by performing a single rolling restart of the brokers.
  • If the target downgrade version of Kafka has a different log.message.format.version, downgrading is only possible if the running cluster has always had log.message.format.version set to the version used by the downgraded version.

    This is typically only the case if the upgrade procedure was aborted before the log.message.format.version was changed. In this case, the downgrade requires:

    • Two rolling restarts of the brokers if the interbroker protocol of the two versions is different
    • A single rolling restart if they are the same

9.6.2. Downgrading Kafka brokers and client applications

This procedure describes how you can downgrade a AMQ Streams Kafka cluster to a lower (previous) version of Kafka, such as downgrading from 2.2.1 to 2.1.1.

Important

Downgrading is not possible if the new version has ever used a log.message.format.version that is not supported by the previous version, including when the default value for log.message.format.version is used. For example, this resource can be downgraded to Kafka version 2.1.1 because the log.message.format.version has not been changed:

apiVersion: v1alpha1
kind: Kafka
spec:
  # ...
  kafka:
    version: 2.2.1
    config:
      log.message.format.version: "2.1"
      # ...

The downgrade would not be possible if the log.message.format.version was set at "2.2" or a value was absent (so that the parameter took the default value for a 2.2.1 broker of 2.2).

Prerequisites

For the Kafka resource to be downgraded, check:

  • The Cluster Operator, which supports both versions of Kafka, is up and running.
  • The Kafka.spec.kafka.config does not contain options that are not supported in the version of Kafka you are downgrading to.
  • The Kafka.spec.kafka.config has a log.message.format.version that is supported by the version being downgraded to.

Procedure

  1. Update the Kafka cluster configuration in an editor, as required:

    On OpenShift, use:

    oc edit kafka my-cluster
    1. Change the Kafka.spec.kafka.version to specify the previous version.

      For example, if downgrading from Kafka 2.2.1 to 2.1.1:

      apiVersion: v1alpha1
      kind: Kafka
      spec:
        # ...
        kafka:
          version: 2.1.1 1
          config:
            log.message.format.version: "2.1" 2
            # ...
      1
      This is changed to the previous version
      2
      This is unchanged
      Note

      You must format the value of log.message.format.version as a string to prevent it from being interpreted as a floating point number.

    2. If the image for the Kafka version is different from the image defined in STRIMZI_KAFKA_IMAGES for the Cluster Operator, update Kafka.spec.kafka.image.

      See Section 9.5.1, “Kafka version and image mappings”

  2. Save and exit the editor, then wait for rolling updates to complete.

    Check the update in the logs or by watching the pod state transitions:

    On OpenShift use:

    oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
    oc get po -w

    If the previous and current versions of Kafka have different interbroker protocol versions, check the Cluster Operator logs for an INFO level message:

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 2 of 2 completed

    Alternatively, if the previous and current versions of Kafka have the same interbroker protocol version, check for:

    Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 1 of 1 completed
  3. Downgrade all client applications (consumers) to use the previous version of the client binaries.

    The Kafka cluster and clients are now using the previous Kafka version.