Chapter 1. Features

The features added in this release, and that were not in previous releases of AMQ Streams, are outlined below.

1.1. Kafka 2.4.0 support

AMQ Streams now supports Apache Kafka version 2.4.0.

AMQ Streams uses Kafka 2.4.0. Only Kafka distributions built by Red Hat are supported.

For upgrade instructions, see AMQ Streams and Kafka upgrades.

Refer to the Kafka 2.3.0 and Kafka 2.4.0 Release Notes for additional information.

Note

Kafka 2.3.x is supported in AMQ Streams 1.4 only for upgrade purposes.

For more information on supported versions, see the Red Hat AMQ 7 Component Details Page on the Customer Portal.

Changes to the partition rebalance protocol in Kafka 2.4.0

Kafka 2.4.0 adds incremental cooperative rebalancing for consumers and Kafka Streams applications. This is an improved rebalance protocol for implementing partition rebalances according to a defined rebalance strategy. Using the new protocol, consumers keep their assigned partitions during a rebalance and only revoke them at the end of the process if required to achieve cluster balance. This reduces the unavailability of the consumer group or Kafka Streams application during a rebalance.

To take advantage of incremental cooperative rebalancing, you must upgrade consumers and Kafka Streams applications to use the new protocol instead of the old eager rebalance protocol.

See Upgrading consumers and Kafka Streams applications to cooperative rebalancing and Notable changes in 2.4.0 in the Apache Kafka documentation.

1.2. ZooKeeper 3.5.7

Kafka version 2.4.0 requires a new version of ZooKeeper, version 3.5.7. Therefore, the first step involved in upgrading to AMQ Streams 1.4 is to upgrade ZooKeeper to version 3.5.7, as described in AMQ Streams and Kafka upgrades.

Refer to the ZooKeeper 3.5.7 Release Notes for additional information. Note the changes to the configuration format.

Before you start the upgrade, be aware of the following information relating to the new version of ZooKeeper.

1.2.1. New configuration options and address format

ZooKeeper 3.5.7 adds the following configuration options:

  • reconfigEnabled - Enables or disables dynamic reconfiguration. Must be enabled in order to add or remove servers to a ZooKeeper 3.5.7 cluster.
  • standaloneEnabled - Enables or disables standalone mode, where ZooKeeper runs with only one server.

The format of ZooKeeper server addresses has changed. The client port is now specified in the server address instead of the static ZooKeeper configuration file. For example:

server.1=172.17.0.1:2888:3888:participant;172.17.0.1:2181
server.2=172.17.0.2:2888:3888:participant;172.17.0.2:2181
server.3=172.17.0.3:2888:3888:participant;172.17.0.3:2181

You must apply these changes to the ZooKeeper configuration file as part of the ZooKeeper 3.5.7 upgrade.

1.2.2. Four letter word commands

ZooKeeper four letter word commands must now be added to the allow list before they can be used.

1.2.3. Missing snapshots error when upgrading to ZooKeeper 3.5.7

Snapshots check

To enable faster recovery from crashed nodes, at startup ZooKeeper 3.5.7 checks that at least one snapshot file exists in the data directory for the ZooKeeper cluster, /var/lib/zookeeper/. If the data directory does not contain any snapshot files, ZooKeeper 3.5.7 fails to start with the following error:

java.io.IOException: No snapshot found, but there are log entries. Something is broken!

If you see this error after upgrading ZooKeeper to version 3.5.7, you must perform some additional steps, described below. You must successfully complete the ZooKeeper 3.5.7 upgrade before proceeding with upgrading the Kafka brokers.

Note

Snapshot files are likely to exist for a long-running ZooKeeper cluster, so the missing snapshots error does not occur.

Additional steps to recover from missing snapshots error in ZooKeeper 3.5.7 upgrades

Perform the following steps if you see the missing snapshots error when performing the Upgrading ZooKeeper procedure.

  1. Enable the snapshot.trust.empty configuration option in the ZooKeeper configuration file, /opt/kafka/config/zookeeper.properties. For example:

    timeTick=2000
    dataDir=/var/lib/zookeeper/
    clientPort=2181
    snapshot.trust.empty=true
  2. Restart ZooKeeper:

    /opt/kafka/bin/zookeeper-server-start.sh -daemon
    /opt/kafka/config/zookeeper.properties
  3. Wait until at least one snapshot file is created in the /var/lib/zookeeper/ directory.
  4. Disable the snapshot.trust.empty configuration option. For example:

    timeTick=2000
    dataDir=/var/lib/zookeeper/
    clientPort=2181
    snapshot.trust.empty=false
  5. Restart ZooKeeper again:

    /opt/kafka/bin/zookeeper-server-start.sh -daemon
    /opt/kafka/config/zookeeper.properties
  6. If the upgrade to ZooKeeper 3.5.7 completes without any errors, you can continue with the AMQ Streams 1.4 upgrade.

1.2.4. Scaling ZooKeeper 3.5.7 up or down

The configuration procedure for ZooKeeper 3.5.7 servers is significantly different than for ZooKeeper 3.4.x servers. Referred to as dynamic reconfiguration, the new configuration procedure requires that servers are added or removed using the ZooKeeper CLI or Admin API. This ensures that a stable ZooKeeper cluster is maintained during the scale up or scale down process.

To scale a ZooKeeper 3.5.7 cluster up or down, you must perform the procedures described here. Scaling ZooKeeper up means adding servers to a ZooKeeper cluster. Scaling ZooKeeper down means removing servers from a ZooKeeper cluster.

Scaling up ZooKeeper in an AMQ Streams 1.4 cluster

This procedure assumes that authentication is enabled for the ZooKeeper cluster, and you can access the server using the authentication mechanism configured for the ZooKeeper cluster.

Perform the following steps for each ZooKeeper server, one at a time:

  1. Add a server to the ZooKeeper cluster as described in Running a multi-node ZooKeeper cluster and start ZooKeeper.
  2. Note the IP address and configured access ports of the new server.
  3. Start a zookeeper-shell session for the server. Run the following command from a machine that has access to the cluster (this might be one of the ZooKeeper nodes or your local machine, if it has access).

    su - kafka
    /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
  4. In the shell session, with the ZooKeeper node running, enter the following line to add the new server to the quorum as a voting member:

    reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>

    For example:

    reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181

    Where <positive-id> is the new server ID 4.

    For the two ports, <port1> 2888 is for communication between ZooKeeper servers, and <port2> 3888 is for leader election.

    The new configuration propagates to the other servers in the ZooKeeper cluster; the new server is now a full member of the quorum.

  5. Repeat steps 1-4 for the other servers that you want to add.

Scaling down ZooKeeper in an AMQ Streams 1.4 cluster

This procedure assumes that authentication is enabled for the ZooKeeper cluster. Before proceeding, read the notes on "Removing servers" in the ZooKeeper documentation.

Perform the following steps for each ZooKeeper server, one at a time:

  1. Log in to the zookeeper-shell on one of the servers that will be retained after the scale down (for example, server 1).

    Note

    Access the server using the authentication mechanism configured for the ZooKeeper cluster.

  2. Remove a server, for example server 5.

    reconfig -remove 5
  3. Deactivate the server that you removed.
  4. Repeat steps 1-3 to reduce the cluster size.

1.3. OAuth 2.0 authentication

Support for OAuth 2.0 authentication moves from a Technology Preview to a generally available component of AMQ Streams.

AMQ Streams supports the use of OAuth 2.0 authentication using the SASL OAUTHBEARER mechanism. Using OAuth 2.0 token based authentication, application clients can access resources on application servers (called ‘resource servers’) without exposing account credentials. The client presents an access token as a means of authenticating, which application servers can also use to find more information about the level of access granted. The authorization server handles the granting of access and inquiries about access.

In the context of AMQ Streams:

  • Kafka brokers act as resource servers
  • Kafka clients act as resource clients

The brokers and clients communicate with the OAuth 2.0 authorization server, as necessary, to obtain or validate access tokens.

For a deployment of AMQ Streams, OAuth 2.0 integration provides:

  • Server-side OAuth 2.0 support for Kafka brokers
  • Client-side OAuth 2.0 support for Kafka MirrorMaker, Kafka Connect and the Kafka Bridge

Red Hat Single Sign-On integration

You can deploy Red Hat Single Sign-On as an authorization server and configure it for integration with AMQ Streams.

You can use Red Hat Single Sign-On to:

  • Configure authentication for Kafka brokers
  • Configure and authorize clients
  • Configure users and roles
  • Obtain access and refresh tokens

See Using OAuth 2.0 token-based authentication.