Chapter 18. AMQ Streams and Kafka upgrades

AMQ Streams can be upgraded with no cluster downtime. Each version of AMQ Streams supports one or more versions of Apache Kafka: you can upgrade to a higher Kafka version as long as it is supported by your version of AMQ Streams. Newer versions of AMQ Streams may support newer versions of Kafka, but you need to upgrade AMQ Streams before you can upgrade to a higher supported Kafka version.

18.1. Upgrade prerequisites

Before you begin the upgrade process, make sure that:

18.2. Upgrade process

Upgrading AMQ Streams is a two-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the following order:

  1. Upgrade to the latest AMQ Streams version.

  2. Upgrade all Kafka brokers and client applications to the latest Kafka version

18.3. Kafka versions

Kafka’s log message format version and inter-broker protocol version specify the log format version appended to messages and the version of protocol used in a cluster. As a result, the upgrade process involves making configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers) to ensure the correct versions are used.

The following table shows the differences between Kafka versions:

Kafka versionInterbroker protocol versionLog message format versionZooKeeper version

2.5.0

2.5

2.5

3.5.8

2.6.0

2.6

2.6

3.5.8

Message format version

When a producer sends a message to a Kafka broker, the message is encoded using a specific format. The format can change between Kafka releases, so messages include a version identifying which version of the format they were encoded with. You can configure a Kafka broker to convert messages from newer format versions to a given older format version before the broker appends the message to the log.

In Kafka, there are two different methods for setting the message format version:

  • The message.format.version property is set on topics.
  • The log.message.format.version property is set on Kafka brokers.

The default value of message.format.version for a topic is defined by the log.message.format.version that is set on the Kafka broker. You can manually set the message.format.version of a topic by modifying its topic configuration.

The upgrade tasks in this section assume that the message format version is defined by the log.message.format.version.

18.4. Upgrading to AMQ Streams 1.6

The steps to upgrade your deployment to use AMQ Streams 1.6 are outlined in this section.

The availability of Kafka clusters managed by AMQ Streams is not affected by the upgrade operation.

Note

Refer to the documentation supporting a specific version of AMQ Streams for information on how to upgrade to that version.

18.4.1. Upgrading Kafka brokers and ZooKeeper

This procedure describes how to upgrade Kafka brokers and ZooKeeper on a host machine to use the latest version of AMQ Streams.

Prerequisites

  • You are logged in to Red Hat Enterprise Linux as the kafka user.

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. Download the AMQ Streams archive from the Customer Portal.

    Note

    If prompted, log in to your Red Hat account.

  2. On the command line, create a temporary directory and extract the contents of the amq-streams-x.y.z-bin.zip file.

    mkdir /tmp/kafka
    unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
  3. If running, stop ZooKeeper and the Kafka broker running on the host.

    /opt/kafka/bin/zookeeper-server-stop.sh
    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep zookeeper
    jcmd | grep kafka
  4. Delete the libs, bin, and docs directories from your existing installation:

    rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
  5. Copy the libs, bin, and docs directories from the temporary directory:

    cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/
    cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/
    cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
  6. Delete the temporary directory.

    rm -r /tmp/kafka
  7. In a text editor, open the broker properties file, commonly stored in the /opt/kafka/config/ directory.
  8. Check that the inter.broker.protocol.version and log.message.format.version properties are set to the current version:

    inter.broker.protocol.version=2.5
    log.message.format.version=2.5

    Leaving the inter.broker.protocol.version unchanged ensures that the brokers can continue to communicate with each other throughout the upgrade.

    If the properties are not configured, add them with the current version.

  9. Restart the updated ZooKeeper and Kafka broker:

    /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

    The Kafka broker and Zookeeper will start using the binaries for the latest Kafka version.

  10. Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the kafka-topics.sh tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.
  11. Perform the procedures to upgrade Kafka, as described in Section 18.5, “Upgrading Kafka”.

18.4.2. Upgrading Kafka Connect

This procedure describes how to upgrade a Kafka Connect cluster on a host machine.

Kafka Connect is a client application and should be included in your chosen strategy for upgrading clients. For more information, see Strategies for upgrading clients.

Prerequisites

  • You are logged in to Red Hat Enterprise Linux as the kafka user.
  • Kafka Connect is not started.

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. Download the AMQ Streams archive from the Customer Portal.

    Note

    If prompted, log in to your Red Hat account.

  2. On the command line, create a temporary directory and extract the contents of the amq-streams-x.y.z-bin.zip file.

    mkdir /tmp/kafka
    unzip amq-streams-x.y.z-bin.zip -d /tmp/kafka
  3. If running, stop the Kafka broker and ZooKeeper running on the host.

    /opt/kafka/bin/kafka-server-stop.sh
    /opt/kafka/bin/zookeeper-server-stop.sh
  4. Delete the libs, bin, and docs directories from your existing installation:

    rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
  5. Copy the libs, bin, and docs directories from the temporary directory:

    cp -r /tmp/kafka/kafka_y.y-x.x.x/libs /opt/kafka/
    cp -r /tmp/kafka/kafka_y.y-x.x.x/bin /opt/kafka/
    cp -r /tmp/kafka/kafka_y.y-x.x.x/docs /opt/kafka/
  6. Delete the temporary directory.

    rm -r /tmp/kafka
  7. Start Kafka Connect in either standalone or distributed mode.

    • To start in standalone mode, run the connect-standalone.sh script. Specify the Kafka Connect standalone configuration file and the configuration files of your Kafka Connect connectors.

      su - kafka
      /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
      [connector2.properties ...]
    • To start in distributed mode, start the Kafka Connect workers with the /opt/kafka/config/connect-distributed.properties configuration file on all Kafka Connect nodes:

      su - kafka
      /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  8. Verify that Kafka Connect is running:

    • In standalone mode:

      jcmd | grep ConnectStandalone
    • In distributed mode:

      jcmd | grep ConnectDistributed
  9. Verify that Kafka Connect is producing and consuming data as expected.

18.5. Upgrading Kafka

After you have upgraded your binaries to use the latest version of AMQ Streams, you can upgrade your brokers and clients to use a higher supported version of Kafka.

Take care to follow the steps in the correct order:

Following the Kafka upgrade, if required, you can upgrade Kafka consumers to use the incremental cooperative rebalance protocol:

18.5.1. Upgrading Kafka brokers to use the new inter-broker protocol version

Manually configure and restart all Kafka brokers to use the new inter-broker protocol version. After performing these steps, data is transmitted between the Kafka brokers using the new inter-broker protocol version.

Messages received are still appended to the message logs in the earlier message format version.

Warning

Downgrading AMQ Streams is not possible after completing this procedure.

Prerequisites

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the /opt/kafka/config/ directory.
  2. Set the inter.broker.protocol.version to 2.6.

    inter.broker.protocol.version=2.6
  3. On the command line, stop the Kafka broker that you modified:

    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep kafka
  4. Restart the Kafka broker that you modified:

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  5. Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the kafka-topics.sh tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.

18.5.2. Strategies for upgrading clients

The best approach to upgrading your client applications (including Kafka Connect connectors) depends on your particular circumstances.

Consuming applications need to receive messages in a message format that they understand. You can ensure that this is the case in one of two ways:

  • By upgrading all the consumers for a topic before upgrading any of the producers.
  • By having the brokers down-convert messages to an older format.

Using broker down-conversion puts extra load on the brokers, so it is not ideal to rely on down-conversion for all topics for a prolonged period of time. For brokers to perform optimally they should not be down converting messages at all.

Broker down-conversion is configured in two ways:

  • The topic-level message.format.version configures it for a single topic.
  • The broker-level log.message.format.version is the default for topics that do not have the topic-level message.format.version configured.

Messages published to a topic in a new-version format will be visible to consumers, because brokers perform down-conversion when they receive messages from producers, not when they are sent to consumers.

There are a number of strategies you can use to upgrade your clients:

Consumers first
  1. Upgrade all the consuming applications.
  2. Change the broker-level log.message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy is straightforward, and avoids any broker down-conversion. However, it assumes that all consumers in your organization can be upgraded in a coordinated way, and it does not work for applications that are both consumers and producers. There is also a risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log so that you cannot revert to the previous consumer version.

Per-topic consumers first

For each topic:

  1. Upgrade all the consuming applications.
  2. Change the topic-level message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy avoids any broker down-conversion, and means you can proceed on a topic-by-topic basis. It does not work for applications that are both consumers and producers of the same topic. Again, it has the risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log.

Per-topic consumers first, with down conversion

For each topic:

  1. Change the topic-level message.format.version to the old version (or rely on the topic defaulting to the broker-level log.message.format.version).
  2. Upgrade all the consuming and producing applications.
  3. Verify that the upgraded applications function correctly.
  4. Change the topic-level message.format.version to the new version.

    This strategy requires broker down-conversion, but the load on the brokers is minimized because it is only required for a single topic (or small group of topics) at a time. It also works for applications that are both consumers and producers of the same topic. This approach ensures that the upgraded producers and consumers are working correctly before you commit to using the new message format version.

    The main drawback of this approach is that it can be complicated to manage in a cluster with many topics and applications.

Other strategies for upgrading client applications are also possible.

Note

It is also possible to apply multiple strategies. For example, for the first few applications and topics the "per-topic consumers first, with down conversion" strategy can be used. When this has proved successful another, more efficient strategy can be considered acceptable to use instead.

18.5.3. Upgrading client applications to the new Kafka version

This procedure describes one possible approach to upgrading your client applications to the Kafka version used for AMQ Streams 1.6.

The procedure is based on the "per-topic consumers first, with down conversion" approach outlined in Strategies for upgrading clients.

Client applications include producers, consumers, Kafka Connect, Kafka Streams applications, and MirrorMaker.

Procedure

For each topic:

  1. On the command line, set the message.format.version configuration option to 2.5.

    bin/kafka-configs.sh --bootstrap-server <BrokerAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.5
  2. Upgrade all the consumers and producers for the topic.
  3. Optionally, to upgrade consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol, which was added in Kafka 2.4.0, see Section 18.5.5, “Upgrading consumers and Kafka Streams applications to cooperative rebalancing”.
  4. Verify that the upgraded applications function correctly.
  5. Change the topic’s message.format.version configuration option to 2.6.

    bin/kafka-configs.sh --bootstrap-server <BrokerAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.6

Additional resources

18.5.4. Upgrading Kafka brokers to use the new message format version

When client applications have been upgraded, you can update the Kafka brokers to use the new message format version.

If you did not modify topic configurations when you upgraded your client applications to use the Kafka version required for AMQ Streams 1.6, the Kafka brokers are now converting messages down to the previous message format version, which can cause a reduction in performance. Therefore, it is important that you update all Kafka brokers to use the new message format version as soon as possible.

Note

Update and restart the Kafka brokers one-by-one. Before you restart a modified broker, stop the broker you configured and restarted previously.

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the /opt/kafka/config/ directory.
  2. Set the log.message.format.version to 2.6.

    log.message.format.version=2.6
  3. On the command line, stop the Kafka broker that you most recently modified and restarted as part of this procedure. If you are modifying the first Kafka broker in this procedure, go to step four.

    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep kafka
  4. Restart the Kafka broker whose configuration you modified in step two:

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  5. Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the kafka-topics.sh tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.

18.5.5. Upgrading consumers and Kafka Streams applications to cooperative rebalancing

You can upgrade Kafka consumers and Kafka Streams applications to use the incremental cooperative rebalance protocol for partition rebalances instead of the default eager rebalance protocol. The new protocol was added in Kafka 2.4.0.

Consumers keep their partition assignments in a cooperative rebalance and only revoke them at the end of the process, if needed to achieve a balanced cluster. This reduces the unavailability of the consumer group or Kafka Streams application.

Note

Upgrading to the incremental cooperative rebalance protocol is optional. The eager rebalance protocol is still supported.

Procedure

To upgrade a Kafka consumer to use the incremental cooperative rebalance protocol:

  1. Replace the Kafka clients .jar file with the new version.
  2. In the consumer configuration, append cooperative-sticky to the partition.assignment.strategy. For example, if the range strategy is set, change the configuration to range, cooperative-sticky.
  3. Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.
  4. Reconfigure each consumer in the group by removing the earlier partition.assignment.strategy from the consumer configuration, leaving only the cooperative-sticky strategy.
  5. Restart each consumer in the group in turn, waiting for the consumer to rejoin the group after each restart.

To upgrade a Kafka Streams application to use the incremental cooperative rebalance protocol:

  1. Replace the Kafka Streams .jar file with the new version.
  2. In the Kafka Streams configuration, set the upgrade.from configuration parameter to the Kafka version you are upgrading from (for example, 2.3).
  3. Restart each of the stream processors (nodes) in turn.
  4. Remove the upgrade.from configuration parameter from the Kafka Streams configuration.
  5. Restart each consumer in the group in turn.

Additional resources