第7章 AMQ Streams のアップグレード

AMQ Streams は、クラスターのダウンタイムを発生せずにアップグレードできます。各バージョンの AMQ Streams は、単一または複数バージョンの Apache Kafka をサポートします。ご使用のバージョンの AMQ Streams によってサポートされれば、上位バージョンの Kafka にアップグレードできます。

より新しいバージョンの AMQ Streams はより新しいバージョンの Kafka をサポートしますが、AMQ Streams をアップグレードしてから、サポートされる上位バージョンの Kafka にアップグレードする必要があります。

AMQ Streams Operator をアップグレードするには、OpenShift Container Platform クラスターで Operator Lifecycle Manager (OLM) を使用できます。

重要

該当する場合、AMQ Streams と Kafka をアップグレードしてから リソースのアップグレード を実行する 必要 があります。

7.1. AMQ Streams および Kafka のアップグレード

AMQ Streams のアップグレードは 2 段階のプロセスで行います。ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。

  1. Cluster Operator を最新の AMQ Streams バージョンに更新します。実施する手法は、Cluster Operator のデプロイ 方法によって異なります。

    • インストール用 YAML ファイルを使用して Cluster Operator をデプロイした場合は、Operator のインストールファイルを変更して アップグレードを実行します。
    • OperatorHub から Cluster Operator をデプロイした場合は、Operator Lifecycle Manager (OLM) を使用して AMQ Streams Operator の更新チャネルを新しい AMQ Streams バージョンに変更します。

      選択したアップグレードストラテジーに応じて、以下のチャネル更新のいずれかに従います。

      • 自動 アップグレードが開始される。
      • 手動 アップグレードでは、インストールを開始する前に承認が必要です。

      OperatorHub を使用した Operator のアップグレードについての詳細は、「Upgrading installed Operators」を参照してください。

  2. すべての Kafka ブローカーとクライアントアプリケーションを、最新の Kafka バージョンにアップグレードします。

7.1.1. Kafka バージョン

Kafka のログメッセージ形式バージョンおよびブローカー間のプロトコルバージョンは、メッセージに追加されるログ形式バージョンとクラスターで使用されるプロトコルのバージョンを指定します。そのためアップグレードプロセスでは、既存の Kafka ブローカーの設定変更およびクライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更により、必ず正しいバージョンを使用されるようにする必要になります。

以下の表は、Kafka バージョンの違いを示しています。

Kafka のバージョンInterbroker プロトコルのバージョンログメッセージ形式のバージョンZooKeeper のバージョン

2.5.0

2.5

2.5

3.5.8

2.6.0

2.6

2.6

3.5.8

メッセージ形式バージョン

プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリースによって変わるため、メッセージにはエンコードに使用された形式のバージョンが含まれます。ブローカーがメッセージをログに追加する前に、メッセージを新しい形式バージョンから特定の旧形式バージョンに変換するように、Kafka ブローカーを設定できます。

Kafka には、メッセージ形式のバージョンを設定する 2 通りの方法があります。

  • message.format.version プロパティーをトピックに設定します。
  • log.message.format.version プロパティーを Kafka ブローカーに設定します。

トピックの message.format.version のデフォルト値は、Kafka ブローカーに設定される log.message.format.version によって定義されます。トピックの message.format.version は、トピック設定を編集すると手動で設定できます。

本セクションのアップグレード作業では、メッセージ形式のバージョンが log.message.format.version によって定義されることを前提としています。

7.1.2. Cluster Operator のアップグレード

このセクションでは、AMQ Streams 1.6 を使用するように Cluster Operator デプロイメントをアップグレードする手順について説明します。

Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作による影響を受けません。

注記

特定バージョンの AMQ Streams へのアップグレード方法については、そのバージョンをサポートするドキュメントを参照してください。

7.1.2.1. Cluster Operator の後続バージョンへのアップグレード

この手順では、Cluster Operator デプロイメントを後続バージョンにアップグレードする方法を説明します。

前提条件

手順

  1. 既存の Cluster Operator リソース (/install/cluster-operator ディレクトリー内) に追加した設定変更を覚えておきます。すべての変更は、新しいバージョンの Cluster Operator によって上書きされます。
  2. Cluster Operator を更新します。

    1. Cluster Operator を実行している namespace に従い、新しいバージョンのインストールファイルを編集します。

      Linux の場合は、以下を使用します。

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      MacOS の場合は、以下を使用します。

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. 既存の Cluster Operator Deployment で 1 つ以上の環境変数を編集した場合、install/cluster-operator/060-Deployment-cluster-operator.yaml ファイルを編集し、これらの環境変数を使用します。
  3. 設定を更新したら、残りのインストールリソースとともにデプロイします。

    oc apply -f install/cluster-operator

    ローリングアップデートが完了するのを待ちます。

  4. Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。

    oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    イメージタグには、新しい AMQ Streams バージョンと Kafka バージョンが順に示されます。例: <New AMQ Streams version>-kafka-<Current Kafka version>

  5. 既存のリソースを更新して、非推奨になったカスタムリソースプロパティーを処理します。

これで Cluster Operator が更新されましたが、その管理下にあるクラスターで実行している Kafka のバージョンは変わりません。

次のステップ

Cluster Operator のアップグレードの次に、Kafka アップグレードを実行できます。

7.1.3. Kafka のアップグレード。

Cluster Operator を 1.6 にアップグレードした後、次の手順では、すべての Kafka ブローカーをサポートされる最新バージョンの Kafka にアップグレードします。

Kafka のアップグレードは、Kafka ブローカーのローリングアップデートによって Cluster Operator によって実行されます。

Cluster Operator は、Kafka クラスターの設定に基づいてローリングアップデートを開始します。

Kafka.spec.kafka.config に以下が含まれている場合Cluster Operator によって開始されるもの

inter.broker.protocol.versionlog.message.format.version の両方。

単一のローリングアップデート更新後、inter.broker.protocol.version を手動で更新し、続いて log.message.format.version を更新する必要があります。それぞれを変更すると、さらにローリングアップデートがトリガーされます。

inter.broker.protocol.version または log.message.format.version のいずれか。

2 つのローリングアップデート

inter.broker.protocol.version または log.message.format.version の設定なし。

2 つのローリングアップデート

Cluster Operator は、Kafka のアップグレードの一環として、ZooKeeper のローリングアップデートを開始します。

  • ZooKeeper バージョンが変更されなくても、単一のローリングアップデートが発生します。
  • 新しいバージョンの Kafka に新しいバージョンの ZooKeeper が必要な場合、追加のローリングアップデートが発生します。

7.1.3.1. Kafka バージョンおよびイメージマッピング

Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES および Kafka.spec.kafka.version プロパティーの設定について考慮してください。

  • それぞれの Kafka リソースは Kafka.spec.kafka.version で設定できます。
  • Cluster Operator の STRIMZI_KAFKA_IMAGES 環境変数により、Kafka のバージョンと、指定の Kafka リソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。

    • Kafka.spec.kafka.image を設定しないと、そのバージョンのデフォルトのイメージが使用されます。
    • Kafka.spec.kafka.image を設定すると、デフォルトのイメージがオーバーライドされます。
警告

Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。

7.1.3.2. クライアントをアップグレードするストラテジー

クライアントアプリケーション (Kafka Connect コネクターを含む) をアップグレードする最善の方法は、特定の状況によって異なります。

消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。

  • プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
  • ブローカーでメッセージをダウンコンバートする。

ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。

ブローカーのダウンコンバートは 2 通りの方法で設定できます。

  • トピックレベルの message.format.version では単一のとピックが設定されます。
  • ブローカーレベルの log.message.format.version は、トピックレベルの message.format.version が設定されてないトピックのデフォルトです。

新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。

クライアントのアップグレードに使用できるストラテジーは複数あります。

コンシューマーを最初にアップグレード
  1. コンシューマーとして機能するアプリケーションをすべてアップグレードします。
  2. ブローカーレベル log.message.format.version を新バージョンに変更します。
  3. プロデューサーとして機能するアプリケーションをアップグレードします。

    このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。

トピック単位でコンシューマーを最初にアップグレード

トピックごとに以下を実行します。

  1. コンシューマーとして機能するアプリケーションをすべてアップグレードします。
  2. トピックレベルの message.format.version を新バージョンに変更します。
  3. プロデューサーとして機能するアプリケーションをアップグレードします。

    このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。

トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり

トピックごとに以下を実行します。

  1. トピックレベルの message.format.version を、旧バージョンに変更します (または、デフォルトがブローカーレベルの log.message.format.version のトピックを利用します)。
  2. コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
  3. アップグレードしたアプリケーションが正しく機能することを確認します。
  4. トピックレベルの message.format.version を新バージョンに変更します。

    このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。

    この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。

クライアントアプリケーションをアップグレードするストラテジーは他にもあります。

注記

複数のストラテジーを適用することもできます。たとえば、最初のいくつかのアプリケーションとトピックに、「トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり」のストラテジーを適用します。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。

7.1.3.3. Kafka ブローカーおよびクライアントアプリケーションのアップグレード

この手順では、AMQ Streams Kafka クラスターを最新のサポートされる Kafka バージョンにアップグレードする方法を説明します。

新しい Kafka バージョンを現在のバージョンと比較すると、新しいバージョンは ログメッセージ形式の上位バージョンブローカー間プロトコルの上位バージョン、またはその両方をサポートする可能性があります。必要に応じて、これらのバージョンをアップグレードする手順を実行します。詳細は、「Kafka バージョン」 を参照してください。

クライアントをアップグレードするストラテジーを選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。

前提条件

Kafka リソースをアップグレードするには、以下を確認します。

  • 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
  • Kafka.spec.kafka.config には、新しい Kafka バージョンでサポートされないオプションが含まれていない

手順

  1. Kafka クラスター設定を更新します。

    oc edit kafka my-cluster
  2. 設定されている場合、Kafka.spec.kafka.configlog.message.format.version があり、inter.broker.protocol.version現在の Kafka バージョンのデフォルトに設定されていることを確認します。

    たとえば、Kafka バージョン 2.5.0 から 2.6.0 へのアップグレードは以下のようになります。

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.5.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.5"
          # ...

    log.message.format.version および inter.broker.protocol.version が設定されていない場合、AMQ Streams では、次のステップの Kafka バージョンの更新後、これらのバージョンを現在のデフォルトに自動的に更新します。

    注記

    log.message.format.version および inter.broker.protocol.version の値は、浮動小数点数として解釈されないように文字列である必要があります。

  3. Kafka.spec.kafka.version を変更して、新しい Kafka バージョンを指定します。現在の Kafka バージョンのデフォルトで log.message.format.version および inter.broker.protocol.version のままにします。

    注記

    kafka.version を変更すると、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリの使用が開始されます。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。Inter.broker.protocol.version を変更しないと、ブローカーはアップグレード中も相互に通信を継続できます。

    たとえば、Kafka 2.5.0 から 2.6.0 へのアップグレードは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0 1
        config:
          log.message.format.version: "2.5" 2
          inter.broker.protocol.version: "2.5" 3
          # ...
    1
    Kafka のバージョンが新しいバージョンに変更されます。
    2
    メッセージ形式のバージョンは変更されません。
    3
    ブローカー間のプロトコルバージョンは変更されません。
    警告

    新しい Kafka バージョンの inter.broker.protocol.version が変更された場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsets に書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。

  4. Kafka クラスターのイメージが Kafka.spec.kafka.image の Kafka カスタムリソースで定義されている場合、image を更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。

    Kafka バージョンおよびイメージマッピング」を参照してください。

  5. エディターを保存して終了し、ローリングアップデートの完了を待ちます。

    Pod の状態の遷移を監視して、ローリングアップデートの進捗を確認します。

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    ローリングアップデートにより、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用するようになります。

  6. クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。

    必要に応じて、Kafka Connect および MirrorMaker の version プロパティーを新バージョンの Kafka として設定します。

    1. Kafka Connect では、KafkaConnect.spec.version を更新します。
    2. MirrorMaker では、KafkaMirrorMaker.spec.version を更新します。
    3. MirrorMaker 2.0 の場合は、KafkaMirrorMaker2.spec.version を更新します。
  7. 設定されている場合、新しい inter.broker.protocol.version バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。

    たとえば、Kafka 2.6.0 へのアップグレードでは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.6"
          # ...
  8. Cluster Operator によってクラスターが更新されるまで待ちます。
  9. 設定されている場合、新しい log.message.format.version バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に移動します。

    たとえば、Kafka 2.6.0 へのアップグレードでは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.6"
          inter.broker.protocol.version: "2.6"
          # ...
  10. Cluster Operator によってクラスターが更新されるまで待ちます。

    • これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
    • ブローカーは、新バージョンの Kafka のブローカー間プロトコルバージョンとメッセージ形式バージョンを使用してメッセージを送信するように設定されます。

Kafka のアップグレードに従い、必要な場合は以下を行うことができます。

7.1.3.4. リスナー設定の更新

AMQ Streams では、Kafka リソースの Kafka リスナーを設定するための Generic KafkaListener スキーマが提供されます。

GenericKafkaListener は、非推奨となった KafkaListeners スキーマに置き換わりました。

GenericKafkaListener スキーマ を使用すると、名前とポートが一意であれば、必要なリスナーをいくつでも設定できます。リスナー設定 は配列として定義されますが、非推奨の形式もサポートされます。

OpenShift クラスター内のクライアントの場合は、plain (暗号化なし)または tls internal リスナーを作成できます。

OpenShift クラスター外のクライアントの場合は、外部リスナー を作成し、nodeportloadbalanceringress、または route などの接続メカニズムを指定します。

KafkaListeners スキーマは、plain、tls、および external リスナーのサブプロパティーを使用し、それぞれ固定ポートを使用します。Kafka のアップグレード後、KafkaListeners スキーマを使用して設定されたリスナーを GenericKafkaListener スキーマの形式に変換できます。

たとえば、現在 Kafka 設定で以下の設定を使用しているとします。

これまでのリスナー設定

listeners:
  plain:
    # ...
  tls:
    # ...
  external:
    type: loadbalancer
    # ...

以下を使用して、リスナーを新しい形式に変換します。

新しいリスナー設定

listeners:
  #...
  - name: plain
    port: 9092
    type: internal
    tls: false 1
  - name: tls
    port: 9093
    type: internal
    tls: true
  - name: external
    port: 9094
    type: EXTERNAL-LISTENER-TYPE 2
    tls: true

1
すべてのリスナーに TLS プロパティーが必要になります。
2
オプション: ingressloadbalancernodeportroute

必ず 正確な 名前とポート番号を使用してください。

古い形式で 使用される追加の 設定または オーバーライド プロパティーについては、新しい形式に更新する必要があります。

リスナー設定 に導入された変更:

  • overridesconfiguration セクションとマージされます。
  • dnsAnnotations の名前が アノテーションになりました。
  • preferredAddressType の名前が preferredNodePortAddressType
  • address の名前が alternativeNames
  • loadBalancerSourceRanges および externalTrafficPolicy が、現在の非推奨の テンプレートからリスナー設定に移動します。

例として、以下の設定を見てみましょう。

従来の追加リスナー設定

listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          #...

これを以下に変更します。

新しい追加リスナー設定

listeners:
    #...
  - name: external
    port: 9094
    type:loadbalancer
    tls: true
    authentication:
      type: tls
    configuration:
      bootstrap:
        annotations:
          #...

重要

後方互換性を維持するため、新しいリスナー設定にある名前およびポート番号を使用する 必要 があります。他の値を使用すると、Kafka リスナーおよび OpenShift サービスの名前が変更されます。

各タイプのリスナーで利用可能な設定オプションの詳細は、「 GenericKafkaListener スキーマ参照」 を参照してください。

7.1.3.5. コンシューマーおよび Kafka Streams アプリケーションの Cooperative Rebalancing へのアップグレード

Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。

コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。

注記

Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。

手順

Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードするには以下を行います。

  1. Kafka クライアント .jar ファイルを新バージョンに置き換えます。
  2. コンシューマー設定で、partition.assignment.strategycooperative-sticky を追加します。たとえば、range ストラテジーが設定されている場合は、設定を range, cooperative-sticky に変更します。
  3. グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
  4. コンシューマー設定から前述の partition.assignment.strategy を削除して、グループの各コンシューマーを再設定し、cooperative-sticky ストラテジーのみを残します。
  5. グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。

Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードするには以下を行います。

  1. Kafka Streams の .jar ファイルを新バージョンに置き換えます。
  2. Kafka Streams の設定で、upgrade.from 設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。
  3. 各ストリームプロセッサー (ノード) を順次再起動します。
  4. upgrade.from 設定パラメーターを Kafka Streams 設定から削除します。
  5. グループ内の各コンシューマーを順次再起動します。

関連情報