Chapter 11. Upgrading an AMQ Streams cluster from 1.0.0 to 1.1.0

This chapter describes how to upgrade AMQ Streams on Red Hat Enterprise Linux from version 1.0.0 to version 1.1.0. No cluster downtime is required.

AMQ Streams 1.0.0 and 1.1.0 are based on different versions of Apache Kafka:

AMQ Streams versionKafka version

1.0.0

2.0.0

1.1.0

2.1.1

Although AMQ Streams 1.1.0 and Kafka 2.1.1 are both minor releases, the Kafka protocol has changed since Kafka 2.0.0 was released. In particular, the message format version and inter-broker protocol version are both now at version 2.1. As a result, the upgrade process involves making both configuration changes to existing Kafka brokers and code changes to client applications (consumers and producers). The following table shows the differences between the two Kafka versions:

Kafka versionInter-broker protocol versionLog message format versionZookeeper version

2.0.0

2.0

2.0

3.4.13

2.1.1

2.1

2.1

3.4.13

Although Kafka 2.0.0 and 2.1.1 use the same version of Zookeeper, Red Hat recommends that you update your Zookeeper cluster to use the newest Zookeeper binaries before proceeding with the main AMQ Streams 1.1.0 upgrade.

Message format version
When a producer sends a message to a Kafka broker, the message is encoded using a specific format. This is referred to as the message format version. You can configure a Kafka broker to convert messages from newer format versions to a given older format version, before the broker appends the message to the log.

In Kafka, there are two different methods for setting the message format version:

  • The log.message.format.version property is set on Kafka brokers.
  • The message.format.version property is set on topics.

The default value of message.format.version for a topic is defined by the log.message.format.version that is set on the Kafka broker. You can manually set the message.format.version of a topic by modifying its topic configuration. This chapter refers to the message format version throughout when discussing both Kafka brokers and topics.

Procedure outline

In summary, upgrading to AMQ Streams 1.1.0 is a three-stage process. To upgrade brokers and clients without downtime, you must complete the upgrade procedures in the order shown here.

  1. First, update your Zookeeper cluster to use the newest Zookeeper binaries.

  2. Second, upgrade all Kafka brokers to AMQ Streams 1.1.0 and configure them to use the previous protocol versions.

  3. Third, upgrade all Kafka brokers and client applications to Kafka 2.1.1. To avoid cluster downtime, this stage involves:

11.1. Upgrade prerequisites

Before you begin the upgrade process, make sure that:

11.2. Updating the Zookeeper binaries

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. Download the Red Hat AMQ Streams 1.1.0 archive from the Customer Portal.

    Note

    If prompted, log in to your Red Hat account.

  2. On the command line, create a temporary directory and extract the contents of the amq-streams-1.1.0-bin.zip file:

    mkdir /tmp/kafka-1-1-0
    unzip amq-streams-1.1.0-bin.zip -d /tmp/kafka-1-1-0
  3. Delete the libs bin and docs directories from your existing installation:

    rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
  4. Copy the libs bin and docs directories from the temporary directory:

    cp -r /tmp/kafka-1-1-0/kafka_y.y-x.x.x/libs /opt/kafka/
    cp -r /tmp/kafka-1-1-0/kafka_y.y-x.x.x/bin /opt/kafka/
    cp -r /tmp/kafka-1-1-0/kafka_y.y-x.x.x/docs /opt/kafka/
  5. Delete the temporary directory:

    rm -rf /tmp/kafka-1-1-0
  6. Restart Zookeeper:

    /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

11.3. Upgrading Kafka brokers to AMQ Streams 1.1.0

To upgrade to AMQ Streams 1.1.0 on the host machine, download and extract the AMQ Streams 1.1.0 archive from the Red Hat Customer Portal.

Note

This procedure is part of the Upgrading an AMQ Streams cluster from 1.0.0 to 1.1.0 chapter. For initial installation instructions, see Installing AMQ Streams.

Prerequisites

  • You are logged in to Red Hat Enterprise Linux as the kafka user.

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. Download the Red Hat AMQ Streams 1.1.0 archive from the Customer Portal.

    Note

    If prompted, log in to your Red Hat account.

  2. On the command line, create a temporary directory and extract the contents of the amq-streams-1.1.0-bin.zip file.

    mkdir /tmp/kafka-1-1-0
    unzip amq-streams-1.1.0-bin.zip -d /tmp/kafka-1-1-0
  3. Delete the libs bin and docs directories from your existing installation:

    rm -rf /opt/kafka/libs /opt/kafka/bin /opt/kafka/docs
  4. Copy the libs bin and docs directories from the temporary directory:

    cp -r /tmp/kafka-1-1-0/kafka_y.y-x.x.x/libs /opt/kafka/
    cp -r /tmp/kafka-1-1-0/kafka_y.y-x.x.x/bin /opt/kafka/
    cp -r /tmp/kafka-1-1-0/kafka_y.y-x.x.x/docs /opt/kafka/
  5. Delete the temporary directory.

    rm -r /tmp/kafka-1-1-0
  6. In a text editor, open the broker properties file, commonly stored in the /opt/kafka/config/ directory.
  7. Temporarily override the default inter-broker protocol and message format versions for Kafka 2.1.1 by adding or updating the following properties in the file:

    inter.broker.protocol.version=2.0
    log.message.format.version=2.0

    This configures the Kafka broker to process data using the previous inter-broker protocol (2.0) and message format versions.

  8. On the command line, stop the Kafka broker that you modified:

    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep kafka
  9. Restart the Kafka broker that you modified:

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Note

    The Kafka broker will start using the version 1.1.0 binaries.

  10. Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the kafka-topics.sh tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.

11.4. Configuring Kafka brokers to use the new inter-broker protocol version

Manually configure and restart all Kafka brokers to use the new inter-broker protocol version (2.1). After performing these steps, data is transmitted between the Kafka brokers using inter-broker protocol version 2.1. Messages received are still appended to the message logs in message format version 2.0.

Warning

Downgrading to AMQ Streams 1.0.0 is not possible after completing this procedure.

Prerequisites

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the /opt/kafka/config/ directory.
  2. Set the inter.broker.protocol.version to 2.1.

    inter.broker.protocol.version=2.1
  3. On the command line, stop the Kafka broker that you most recently modified and restarted as part of this procedure. If you are modifying the first Kafka broker in this procedure, go to step four.

    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep kafka
  4. Restart the Kafka broker whose configuration you modified in step two:

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  5. Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the kafka-topics.sh tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.

11.5. Strategies for upgrading clients

The best approach to upgrading your client applications (including Kafka Connect connectors) depends on your particular circumstances.

Consuming applications need to receive messages in a message format that they understand. You can ensure that this is the case in one of two ways:

  • By upgrading all the consumers for a topic before upgrading any of the producers.
  • By having the brokers down-convert messages to an older format.

Using broker down-conversion puts extra load on the brokers, so it is not ideal to rely on down-conversion for all topics for a prolonged period of time. For brokers to perform optimally they should not be down converting messages at all.

Broker down-conversion is configured in two ways:

  • The topic-level message.format.version configures it for a single topic.
  • The broker-level log.message.format.version is the default for topics that do not have the topic-level message.format.version configured.

Another aspect to consider is that once new-version messages have been published to a topic then they will be visible to consumers. This is because brokers perform down-conversion when they receive messages from producers, not when they are sent to consumers.

There are a number of strategies you can use to upgrade your clients:

Consumers first
  1. Upgrade all the consuming applications.
  2. Change the broker-level log.message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy is straightforward, and avoids any broker down-conversion. However, it assumes that all consumers in your organization can be upgraded in a coordinated way, and it does not work for applications that are both consumers and producers. There is also a risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log so that you cannot revert to the previous consumer version.

Per-topic consumers first

For each topic:

  1. Upgrade all the consuming applications.
  2. Change the topic-level message.format.version to the new version.
  3. Upgrade all the producing applications.

    This strategy avoids any broker down-conversion, and means you can proceed on a topic-by-topic basis. It does not work for applications that are both consumers and producers of the same topic. Again, it has the risk that, if there is a problem with the upgraded clients, new-format messages might get added to the message log.

Per-topic consumers first, with down conversion

For each topic:

  1. Change the topic-level message.format.version to the old version (or rely on the topic defaulting to the broker-level log.message.format.version).
  2. Upgrade all the consuming and producing applications.
  3. Verify that the upgraded applications function correctly.
  4. Change the topic-level message.format.version to the new version.

    This strategy requires broker down-conversion, but the load on the brokers is minimized because it is only required for a single topic (or small group of topics) at a time. It also works for applications that are both consumers and producers of the same topic. This approach ensures that the upgraded producers and consumers are working correctly before you commit to using the new message format version.

    The main drawback of this approach is that it can be complicated to manage in a cluster with many topics and applications.

Other strategies for upgrading client applications are also possible.

Note

It is also possible to apply multiple strategies. For example, for the first few applications and topics the "per-topic consumers first, with down conversion" strategy can be used. When this has proved successful another, more efficient strategy can be considered acceptable to use instead.

11.6. Upgrading client applications to the new Kafka version

This procedure describes one possible approach to upgrading your client applications to Kafka 2.1.1, the version used in AMQ Streams 1.1.0. It is based on the "Per-topic consumers first, with down conversion" approach outlined in Strategies for upgrading clients.

Procedure

For each topic:

  1. On the command line, set the message.format.version configuration option to 2.0.

    bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.0
  2. Upgrade all the consuming and producing applications for the topic.
  3. Verify that the upgraded applications function correctly.
  4. Change the topic’s message.format.version configuration option to 2.1.

    bin/kafka-configs.sh --zookeeper <ZookeeperAddress> --entity-type topics --entity-name <TopicName> --alter --add-config message.format.version=2.1

Additional resources

11.7. Configuring Kafka brokers to use the new message format version

When client applications have been upgraded, you can update the Kafka brokers to use the new message format version (2.1).

If you did not modify topic configurations when you upgraded your client applications to Kafka 2.1.1, the Kafka brokers are now converting messages down to message format version 2.0, which can cause a reduction in performance. Therefore, it is important that you update all Kafka brokers to use the new message format version as soon as possible.

Note

Update and restart the Kafka brokers one-by-one. Before you restart a modified broker, stop the broker you configured and restarted previously.

Procedure

For each Kafka broker in your AMQ Streams cluster and one at a time:

  1. In a text editor, open the broker properties file for the Kafka broker you want to update. Broker properties files are commonly stored in the /opt/kafka/config/ directory.
  2. Set the log.message.format.version to 2.1.

    log.message.format.version=2.1
  3. On the command line, stop the Kafka broker that you most recently modified and restarted as part of this procedure. If you are modifying the first Kafka broker in this procedure, go to step four.

    /opt/kafka/bin/kafka-server-stop.sh
    jcmd | grep kafka
  4. Restart the Kafka broker whose configuration you modified in step two:

    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  5. Verify that the restarted Kafka broker has caught up with the partition replicas it is following. Use the kafka-topics.sh tool to ensure that all replicas contained in the broker are back in sync. For instructions, see Listing and describing topics.