8.4.4. Kafka ブローカーおよびクライアントアプリケーションのアップグレード

この手順では、AMQ Streams Kafka クラスターを最新のサポートされる Kafka バージョンにアップグレードする方法を説明します。

新しい Kafka バージョンを現在のバージョンと比較すると、新しいバージョンは ログメッセージ形式の上位バージョンブローカー間プロトコルの上位バージョン、またはその両方をサポートする可能性があります。必要に応じて、これらのバージョンをアップグレードする手順を実行します。詳細は、「Kafka バージョン」 を参照してください。

クライアントをアップグレードするストラテジーを選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。

前提条件

Kafka リソースをアップグレードするには、以下を確認します。

  • 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
  • Kafka.spec.kafka.config には、新しい Kafka バージョンでサポートされないオプションが含まれていない

手順

  1. Kafka クラスター設定を更新します。

    oc edit kafka my-cluster
  2. 設定されている場合、Kafka.spec.kafka.configlog.message.format.version があり、inter.broker.protocol.version現在の Kafka バージョンのデフォルトに設定されていることを確認します。

    たとえば、Kafka 2.7.0 から 2.8.0 へのアップグレードは以下のようになります。

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.7.0
        config:
          log.message.format.version: "2.7"
          inter.broker.protocol.version: "2.7"
          # ...

    log.message.format.version および inter.broker.protocol.version が設定されていない場合、AMQ Streams では、次のステップの Kafka バージョンの更新後、これらのバージョンを現在のデフォルトに自動的に更新します。

    注記

    log.message.format.version および inter.broker.protocol.version の値は、浮動小数点数として解釈されないように文字列である必要があります。

  3. Kafka.spec.kafka.version を変更して、新しい Kafka バージョンを指定します。現在の Kafka バージョンのデフォルトで log.message.format.version および inter.broker.protocol.version のままにします。

    注記

    kafka.version を変更すると、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリの使用が開始されます。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。Inter.broker.protocol.version を変更しないと、ブローカーはアップグレード中も相互に通信を継続できます。

    たとえば、Kafka 2.7.0 から 2.8.0 へのアップグレードは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.8.0 1
        config:
          log.message.format.version: "2.7" 2
          inter.broker.protocol.version: "2.7" 3
          # ...
    1
    Kafka のバージョンが新しいバージョンに変更されます。
    2
    メッセージ形式のバージョンは変更されません。
    3
    ブローカー間のプロトコルバージョンは変更されません。
    警告

    新しい Kafka バージョンの inter.broker.protocol.version が変更された場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsets に書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。

  4. Kafka クラスターのイメージが Kafka.spec.kafka.image の Kafka カスタムリソースで定義されている場合、image を更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。

    Kafka バージョンおよびイメージマッピング」を参照してください。

  5. エディターを保存して終了し、ローリングアップデートの完了を待ちます。

    Pod の状態の遷移を監視して、ローリングアップデートの進捗を確認します。

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    ローリングアップデートにより、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用することが保証されます。

  6. クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。

    必要に応じて、Kafka Connect および MirrorMaker の version プロパティーを新バージョンの Kafka として設定します。

    1. Kafka Connect では、KafkaConnect.spec.version を更新します。
    2. MirrorMaker では、KafkaMirrorMaker.spec.version を更新します。
    3. MirrorMaker 2.0 の場合は、KafkaMirrorMaker2.spec.version を更新します。
  7. 設定されている場合、新しい inter.broker.protocol.version バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。

    たとえば、Kafka 2.8.0 へのアップグレードでは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.8.0
        config:
          log.message.format.version: "2.7"
          inter.broker.protocol.version: "2.8"
          # ...
  8. Cluster Operator によってクラスターが更新されるまで待ちます。
  9. 設定されている場合、新しい log.message.format.version バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に進みます。

    たとえば、Kafka 2.8.0 へのアップグレードでは以下のようになります。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.8.0
        config:
          log.message.format.version: "2.8"
          inter.broker.protocol.version: "2.8"
          # ...
  10. Cluster Operator によってクラスターが更新されるまで待ちます。

    • これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
    • ブローカーは、ブローカー間プロトコルのバージョンと、新しいバージョンの Kafka のメッセージ形式のバージョンを使用して、メッセージを送信するように設定されます。

Kafka のアップグレードに従い、必要な場合は以下を行うことができます。