Chapter 7. Upgrading AMQ Streams

AMQ Streams can be upgraded with no cluster downtime. Each version of AMQ Streams supports one or more versions of Apache Kafka. You can upgrade to a higher Kafka version as long as it is supported by your version of AMQ Streams.

Newer versions of AMQ Streams may support newer versions of Kafka, but you need to upgrade AMQ Streams before you can upgrade to a higher supported Kafka version.

To upgrade AMQ Streams Operators, you can use the Operator Lifecycle Manager (OLM) on your OpenShift Container Platform cluster.

Important

If applicable, Resource upgrades must be performed after upgrading AMQ Streams and Kafka.

7.1. AMQ Streams and Kafka upgrades

Upgrading AMQ Streams is a two-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the following order:

  1. Update your Cluster Operator to the latest AMQ Streams version. The approach you take depends on how you deployed the Cluster Operator.

    • If you deployed the Cluster Operator using the installation YAML files, perform your upgrade by modifying the Operator installation files.
    • If you deployed the Cluster Operator from the OperatorHub, use the Operator Lifecycle Manager (OLM) to change the update channel for the AMQ Streams Operators to the new AMQ Streams version.

      Depending on your chosen upgrade strategy, following the channel update either:

      • An automatic upgrade is initiated
      • A manual upgrade then requires approval before the installation begins

      For more information on using the OperatorHub to upgrade Operators, see Upgrading installed Operators.

  2. Upgrade all Kafka brokers and client applications to the latest Kafka version.

7.1.1. Kafka versions

Kafka’s log message format version and inter-broker protocol version specify the log format version appended to messages and the version of protocol used in a cluster. As a result, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers) to ensure the correct versions are used.

The following table shows the differences between Kafka versions:

Kafka versionInterbroker protocol versionLog message format versionZooKeeper version

2.5.0

2.5

2.5

3.5.8

2.6.0

2.6

2.6

3.5.8

Message format version

When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages include a version identifying which version of the format they were encoded with. You can configure a Kafka broker to convert messages from newer format versions to a given older format version before the broker appends the message to the log.

In Kafka, there are two different methods for setting the message format version:

  • The message.format.version property is set on topics.
  • The log.message.format.version property is set on Kafka brokers.

The default value of message.format.version for a topic is defined by the log.message.format.version that is set on the Kafka broker. You can manually set the message.format.version of a topic by modifying its topic configuration.

The upgrade tasks in this section assume that the message format version is defined by the log.message.format.version.

7.1.2. Upgrading the Cluster Operator

The steps to upgrade your Cluster Operator deployment to use AMQ Streams 1.6 are outlined in this section.

The availability of Kafka clusters managed by the Cluster Operator is not affected by the upgrade operation.

Note

Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.

7.1.2.1. Upgrading the Cluster Operator to a later version

This procedure describes how to upgrade a Cluster Operator deployment to a later version.

Prerequisites

Procedure

  1. Take note of any configuration changes made to the existing Cluster Operator resources (in the /install/cluster-operator directory). Any changes will be overwritten by the new version of the Cluster Operator.
  2. Update the Cluster Operator.

    1. Modify the installation files for the new version according to the namespace the Cluster Operator is running in.

      On Linux, use:

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      On MacOS, use:

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. If you modified one or more environment variables in your existing Cluster Operator Deployment, edit the install/cluster-operator/060-Deployment-cluster-operator.yaml file to use those environment variables.
  3. When you have an updated configuration, deploy it along with the rest of the installation resources:

    oc apply -f install/cluster-operator

    Wait for the rolling updates to complete.

  4. Get the image for the Kafka pod to ensure the upgrade was successful:

    oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    The image tag shows the new AMQ Streams version followed by the Kafka version. For example, <New AMQ Streams version>-kafka-<Current Kafka version>.

  5. Update existing resources to handle deprecated custom resource properties.

You now have an updated Cluster Operator, but the version of Kafka running in the cluster it manages is unchanged.

What to do next

Following the Cluster Operator upgrade, you can perform a Kafka upgrade.

7.1.3. Upgrading Kafka

After you have upgraded your Cluster Operator to 1.6, the next step is to upgrade all Kafka brokers to the latest supported version of Kafka.

Kafka upgrades are performed by the Cluster Operator through rolling updates of the Kafka brokers.

The Cluster Operator initiates rolling updates based on the Kafka cluster configuration.

If Kafka.spec.kafka.config contains…​The Cluster Operator initiates…​

Both the inter.broker.protocol.version and the log.message.format.version.

A single rolling update. After the update, the inter.broker.protocol.version must be updated manually, followed by log.message.format.version. Changing each will trigger a further rolling update.

Either the inter.broker.protocol.version or the log.message.format.version.

Two rolling updates.

No configuration for the inter.broker.protocol.version or the log.message.format.version.

Two rolling updates.

As part of the Kafka upgrade, the Cluster Operator initiates rolling updates for ZooKeeper.

  • A single rolling update occurs even if the ZooKeeper version is unchanged.
  • Additional rolling updates occur if the new version of Kafka requires a new ZooKeeper version.

7.1.3.1. Kafka version and image mappings

When upgrading Kafka, consider your settings for the STRIMZI_KAFKA_IMAGES and Kafka.spec.kafka.version properties.

  • Each Kafka resource can be configured with a Kafka.spec.kafka.version.
  • The Cluster Operator’s STRIMZI_KAFKA_IMAGES environment variable provides a mapping between the Kafka version and the image to be used when that version is requested in a given Kafka resource.

    • If Kafka.spec.kafka.image is not configured, the default image for the given version is used.
    • If Kafka.spec.kafka.image is configured, the default image is overridden.
Warning

The Cluster Operator cannot validate that an image actually contains a Kafka broker of the expected version. Take care to ensure that the given image corresponds to the given Kafka version.

7.1.3.2. Strategies for upgrading clients

The best approach to upgrading your client applications (including Kafka Connect connectors) depends on your particular circumstances.

Consuming applications need to receive messages in a message format that they understand. You can ensure that this is the case in one of two ways:

  • By upgrading all the consumers for a topic before upgrading any of the producers.
  • By having the brokers down-convert messages to an older format.

Using broker down-conversion puts extra load on the brokers, so it is not ideal to rely on down-conversion for all topics for a prolonged period of time. For brokers to perform optimally they should not be down converting messages at all.

Broker down-conversion is configured in two ways:

  • The topic-level message.format.version configures it for a single topic.
  • The broker-level log.message.format.version is the default for topics that do not have the topic-level message.format.version configured.

Messages published to a topic in a new-version format will be visible to consumers, because brokers perform down-conversion when they receive messages from producers, not when they are sent to consumers.

There are a number of strategies you can use to upgrade your clients:

Consumers first
  1. Upgrade all the consuming applications.
  2. Change the broker-level log.message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy is straightforward, and avoids any broker down-conversion. However, it assumes that all consumers in your organization can be upgraded in a coordinated way, and it does not work for applications that are both consumers and producers. There is also a risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log so that you cannot revert to the previous consumer version.

Per-topic consumers first

For each topic:

  1. Upgrade all the consuming applications.
  2. Change the topic-level message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy avoids any broker down-conversion, and means you can proceed on a topic-by-topic basis. It does not work for applications that are both consumers and producers of the same topic. Again, it has the risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log.

Per-topic consumers first, with down conversion

For each topic:

  1. Change the topic-level message.format.version to the old version (or rely on the topic defaulting to the broker-level log.message.format.version).
  2. Upgrade all the consuming and producing applications.
  3. Verify that the upgraded applications function correctly.
  4. Change the topic-level message.format.version to the new version.

    This strategy requires broker down-conversion, but the load on the brokers is minimized because it is only required for a single topic (or small group of topics) at a time. It also works for applications that are both consumers and producers of the same topic. This approach ensures that the upgraded producers and consumers are working correctly before you commit to using the new message format version.

    The main drawback of this approach is that it can be complicated to manage in a cluster with many topics and applications.

Other strategies for upgrading client applications are also possible.

Note

It is also possible to apply multiple strategies. For example, for the first few applications and topics the "per-topic consumers first, with down conversion" strategy can be used. When this has proved successful another, more efficient strategy can be considered acceptable to use instead.

7.1.3.3. Upgrading Kafka brokers and client applications

This procedure describes how to upgrade a AMQ Streams Kafka cluster to the latest supported Kafka version.

Compared to your current Kafka version, the new version might support a higher log message format version or inter-broker protocol version, or both. Follow the steps to upgrade these versions, if required. For more information, see Section 7.1.1, “Kafka versions”.

You should also choose a strategy for upgrading clients. Kafka clients are upgraded in step 6 of this procedure.

Prerequisites

For the Kafka resource to be upgraded, check that:

  • The Cluster Operator, which supports both versions of Kafka, is up and running.
  • The Kafka.spec.kafka.config does not contain options that are not supported in the new Kafka version.

Procedure

  1. Update the Kafka cluster configuration:

    oc edit kafka my-cluster
  2. If configured, ensure that Kafka.spec.kafka.config has the log.message.format.version and inter.broker.protocol.version set to the defaults for the current Kafka version.

    For example, if upgrading from Kafka version 2.5.0 to 2.6.0:

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.5.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.5"
          # ...

    If log.message.format.version and inter.broker.protocol.version are not configured, AMQ Streams automatically updates these versions to the current defaults after the update to the Kafka version in the next step.

    Note

    The value of log.message.format.version and inter.broker.protocol.version must be strings to prevent them from being interpreted as floating point numbers.

  3. Change the Kafka.spec.kafka.version to specify the new Kafka version; leave the log.message.format.version and inter.broker.protocol.version at the defaults for the current Kafka version.

    Note

    Changing the kafka.version ensures that all brokers in the cluster will be upgraded to start using the new broker binaries. During this process, some brokers are using the old binaries while others have already upgraded to the new ones. Leaving the inter.broker.protocol.version unchanged ensures that the brokers can continue to communicate with each other throughout the upgrade.

    For example, if upgrading from Kafka 2.5.0 to 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0 1
        config:
          log.message.format.version: "2.5" 2
          inter.broker.protocol.version: "2.5" 3
          # ...
    1
    Kafka version is changed to the new version.
    2
    Message format version is unchanged.
    3
    Inter-broker protocol version is unchanged.
    Warning

    You cannot downgrade Kafka if the inter.broker.protocol.version for the new Kafka version changes. The inter-broker protocol version determines the schemas used for persistent metadata stored by the broker, including messages written to __consumer_offsets. The downgraded cluster will not understand the messages.

  4. If the image for the Kafka cluster is defined in the Kafka custom resource, in Kafka.spec.kafka.image, update the image to point to a container image with the new Kafka version.

    See Kafka version and image mappings

  5. Save and exit the editor, then wait for rolling updates to complete.

    Check the progress of the rolling updates by watching the pod state transitions:

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    The rolling updates ensure that each pod is using the broker binaries for the new version of Kafka.

  6. Depending on your chosen strategy for upgrading clients, upgrade all client applications to use the new version of the client binaries.

    If required, set the version property for Kafka Connect and MirrorMaker as the new version of Kafka:

    1. For Kafka Connect, update KafkaConnect.spec.version.
    2. For MirrorMaker, update KafkaMirrorMaker.spec.version.
    3. For MirrorMaker 2.0, update KafkaMirrorMaker2.spec.version.
  7. If configured, update the Kafka resource to use the new inter.broker.protocol.version version. Otherwise, go to step 9.

    For example, if upgrading to Kafka 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.6"
          # ...
  8. Wait for the Cluster Operator to update the cluster.
  9. If configured, update the Kafka resource to use the new log.message.format.version version. Otherwise, go to step 10.

    For example, if upgrading to Kafka 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.6"
          inter.broker.protocol.version: "2.6"
          # ...
  10. Wait for the Cluster Operator to update the cluster.

    • The Kafka cluster and clients are now using the new Kafka version.
    • The brokers are configured to send messages using the inter-broker protocol version and message format version of the new version of Kafka.

Following the Kafka upgrade, if required, you can:

7.1.3.4. Updating listener configuration

AMQ Streams provides a GenericKafkaListener schema for the configuration of Kafka listeners in a Kafka resource.

GenericKafkaListener replaces the KafkaListeners schema, which is deprecated.

With the GenericKafkaListener schema, you can configure as many listeners as required, as long as their names and ports are unique. The listeners configuration is defined as an array, but the deprecated format is also supported.

For clients inside the OpenShift cluster, you can create plain (without encryption) or tls internal listeners.

For clients outside the OpenShift cluster, you create external listeners and specify a connection mechanism, which can be nodeport, loadbalancer, ingress or route.

The KafkaListeners schema uses sub-properties for plain, tls and external listeners, with fixed ports for each. After the Kafka upgrade, you can convert listeners configured using the KafkaListeners schema into the format of the GenericKafkaListener schema.

For example, if you are currently using the following configuration in your Kafka configuration:

Old listener configuration

listeners:
  plain:
    # ...
  tls:
    # ...
  external:
    type: loadbalancer
    # ...

Convert the listeners into the new format using:

New listener configuration

listeners:
  #...
  - name: plain
    port: 9092
    type: internal
    tls: false 1
  - name: tls
    port: 9093
    type: internal
    tls: true
  - name: external
    port: 9094
    type: EXTERNAL-LISTENER-TYPE 2
    tls: true

1
The TLS property is now required for all listeners.
2
Options: ingress, loadbalancer, nodeport, route.

Make sure to use the exact names and port numbers shown.

For any additional configuration or overrides properties used with the old format, you need to update them to the new format.

Changes introduced to the listener configuration:

  • overrides is merged with the configuration section
  • dnsAnnotations has been renamed annotations
  • preferredAddressType has been renamed preferredNodePortAddressType
  • address has been renamed alternativeNames
  • loadBalancerSourceRanges and externalTrafficPolicy move to the listener configuration from the now deprecated template

For example, this configuration:

Old additional listener configuration

listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          #...

Changes to:

New additional listener configuration

listeners:
    #...
  - name: external
    port: 9094
    type:loadbalancer
    tls: true
    authentication:
      type: tls
    configuration:
      bootstrap:
        annotations:
          #...

Important

The name and port numbers shown in the new listener configuration must be used for backwards compatibility. Using any other values will cause renaming of the Kafka listeners and OpenShift services.

For more information on the configuration options available for each type of listener, see the GenericKafkaListener schema reference.

7.1.3.5. Upgrading consumers and Kafka Streams applications to cooperative rebalancing

You can upgrade Kafka consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol for partition rebalances instead of the default eager rebalance protocol. The new protocol was added in Kafka 2.4.0.

Consumers keep their partition assignments in a cooperative rebalance and only revoke them at the end of the process, if needed to achieve a balanced cluster. This reduces the unavailability of the consumer group or Kafka Streams application.

Note

Upgrading to the incremental cooperative rebalance protocol is optional. The eager rebalance protocol is still supported.

Prerequisites

Procedure

To upgrade a Kafka consumer to use the incremental cooperative rebalance protocol:

  1. Replace the Kafka clients .jar file with the new version.
  2. In the consumer configuration, append cooperative-sticky to the partition.assignment.strategy. For example, if the range strategy is set, change the configuration to range, cooperative-sticky.
  3. Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
  4. Reconfigure each consumer in the group by removing the earlier partition.assignment.strategy from the consumer configuration, leaving only the cooperative-sticky strategy.
  5. Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.

To upgrade a Kafka Streams application to use the incremental cooperative rebalance protocol:

  1. Replace the Kafka Streams .jar file with the new version.
  2. In the Kafka Streams configuration, set the upgrade.from configuration parameter to the Kafka version you are upgrading from (for example, 2.3).
  3. Restart each of the stream processors (nodes) in turn.
  4. Remove the upgrade.from configuration parameter from the Kafka Streams configuration.
  5. Restart each consumer in the group in turn.

Additional resources

7.2. AMQ Streams resource upgrades

The kafka.strimzi.io/v1alpha1 API version is deprecated for the following AMQ Streams resources:

  • Kafka
  • KafkaConnect
  • KafkaConnectS2I
  • KafkaMirrorMaker
  • KafkaTopic
  • KafkaUser

Update these resources to use the kafka.strimzi.io/v1beta1 API version.

This section describes the upgrade steps for the resources.

Important

The upgrade of resources must be performed after upgrading the Cluster Operator, so the Cluster Operator can understand the resources.

What if the resource upgrade does not take effect?

If the upgrade does not take effect, a warning is given in the logs on reconciliation to indicate that the resource cannot be updated until the apiVersion is updated.

To trigger the update, make a cosmetic change to the custom resource, such as adding an annotation.

Example annotation:

metadata:
  # ...
  annotations:
    upgrade: "Upgraded to kafka.strimzi.io/v1beta1"

The following procedures describe the steps to update specific resources to use the kafka.strimzi.io/v1beta1 API version:

7.2.1. Upgrading Kafka resources

Prerequisites

  • A Cluster Operator supporting the v1beta1 API version is up and running.

Procedure

Execute the following steps for each Kafka resource in your deployment.

  1. Update the Kafka resource in an editor.

    oc edit kafka my-cluster
  2. Replace:

    apiVersion: kafka.strimzi.io/v1alpha1

    with:

    apiVersion: kafka.strimzi.io/v1beta1
  3. If the Kafka resource has:

    Kafka.spec.topicOperator

    Replace it with:

    Kafka.spec.entityOperator.topicOperator

    For example, replace:

    spec:
      # ...
      topicOperator: {}

    with:

    spec:
      # ...
      entityOperator:
        topicOperator: {}
  4. If present, move:

    Kafka.spec.entityOperator.affinity
    Kafka.spec.entityOperator.tolerations

    to:

    Kafka.spec.entityOperator.template.pod.affinity
    Kafka.spec.entityOperator.template.pod.tolerations

    For example, move:

    spec:
      # ...
      entityOperator:
        affinity {}
        tolerations {}

    to:

    spec:
      # ...
      entityOperator:
        template:
          pod:
            affinity {}
            tolerations {}
  5. If present, move:

    Kafka.spec.kafka.affinity
    Kafka.spec.kafka.tolerations

    to:

    Kafka.spec.kafka.template.pod.affinity
    Kafka.spec.kafka.template.pod.tolerations

    For example, move:

    spec:
      # ...
      kafka:
        affinity {}
        tolerations {}

    to:

    spec:
      # ...
      kafka:
        template:
          pod:
            affinity {}
            tolerations {}
  6. If present, move:

    Kafka.spec.zookeeper.affinity
    Kafka.spec.zookeeper.tolerations

    to:

    Kafka.spec.zookeeper.template.pod.affinity
    Kafka.spec.zookeeper.template.pod.tolerations

    For example, move:

    spec:
      # ...
      zookeeper:
        affinity {}
        tolerations {}

    to:

    spec:
      # ...
      zookeeper:
        template:
          pod:
            affinity {}
            tolerations {}
  7. Save the file, exit the editor and wait for the updated resource to be reconciled.

7.2.2. Upgrading Kafka Connect resources

Prerequisites

  • A Cluster Operator supporting the v1beta1 API version is up and running.

Procedure

Execute the following steps for each KafkaConnect resource in your deployment.

  1. Update the KafkaConnect resource in an editor.

    oc edit kafkaconnect my-connect
  2. Replace:

    apiVersion: kafka.strimzi.io/v1alpha1

    with:

    apiVersion: kafka.strimzi.io/v1beta1
  3. If present, move:

    KafkaConnect.spec.affinity
    KafkaConnect.spec.tolerations

    to:

    KafkaConnect.spec.template.pod.affinity
    KafkaConnect.spec.template.pod.tolerations

    For example, move:

    spec:
      # ...
      affinity {}
      tolerations {}

    to:

    spec:
      # ...
      template:
        pod:
          affinity {}
          tolerations {}
  4. Save the file, exit the editor and wait for the updated resource to be reconciled.

7.2.3. Upgrading Kafka Connect S2I resources

Prerequisites

  • A Cluster Operator supporting the v1beta1 API version is up and running.

Procedure

Execute the following steps for each KafkaConnectS2I resource in your deployment.

  1. Update the KafkaConnectS2I resource in an editor.

    oc edit kafkaconnects2i my-connect
  2. Replace:

    apiVersion: kafka.strimzi.io/v1alpha1

    with:

    apiVersion: kafka.strimzi.io/v1beta1
  3. If present, move:

    KafkaConnectS2I.spec.affinity
    KafkaConnectS2I.spec.tolerations

    to:

    KafkaConnectS2I.spec.template.pod.affinity
    KafkaConnectS2I.spec.template.pod.tolerations

    For example, move:

    spec:
      # ...
      affinity {}
      tolerations {}

    to:

    spec:
      # ...
      template:
        pod:
          affinity {}
          tolerations {}
  4. Save the file, exit the editor and wait for the updated resource to be reconciled.

7.2.4. Upgrading Kafka MirrorMaker resources

Prerequisites

  • A Cluster Operator supporting the v1beta1 API version is up and running.

Procedure

Execute the following steps for each KafkaMirrorMaker resource in your deployment.

  1. Update the KafkaMirrorMaker resource in an editor.

    oc edit kafkamirrormaker my-connect
  2. Replace:

    apiVersion: kafka.strimzi.io/v1alpha1

    with:

    apiVersion: kafka.strimzi.io/v1beta1
  3. If present, move:

    KafkaConnectMirrorMaker.spec.affinity
    KafkaConnectMirrorMaker.spec.tolerations

    to:

    KafkaConnectMirrorMaker.spec.template.pod.affinity
    KafkaConnectMirrorMaker.spec.template.pod.tolerations

    For example, move:

    spec:
      # ...
      affinity {}
      tolerations {}

    to:

    spec:
      # ...
      template:
        pod:
          affinity {}
          tolerations {}
  4. Save the file, exit the editor and wait for the updated resource to be reconciled.

7.2.5. Upgrading Kafka Topic resources

Prerequisites

  • A Topic Operator supporting the v1beta1 API version is up and running.

Procedure

Execute the following steps for each KafkaTopic resource in your deployment.

  1. Update the KafkaTopic resource in an editor.

    oc edit kafkatopic my-topic
  2. Replace:

    apiVersion: kafka.strimzi.io/v1alpha1

    with:

    apiVersion: kafka.strimzi.io/v1beta1
  3. Save the file, exit the editor and wait for the updated resource to be reconciled.

7.2.6. Upgrading Kafka User resources

Prerequisites

  • A User Operator supporting the v1beta1 API version is up and running.

Procedure

Execute the following steps for each KafkaUser resource in your deployment.

  1. Update the KafkaUser resource in an editor.

    oc edit kafkauser my-user
  2. Replace:

    apiVersion: kafka.strimzi.io/v1alpha1

    with:

    apiVersion: kafka.strimzi.io/v1beta1
  3. Save the file, exit the editor and wait for the updated resource to be reconciled.