8.4.4. Kafka ブローカーおよびクライアントアプリケーションのアップグレード
この手順では、AMQ Streams Kafka クラスターを最新のサポートされる Kafka バージョンにアップグレードする方法を説明します。
新しい Kafka バージョンを現在のバージョンと比較すると、新しいバージョンは ログメッセージ形式の上位バージョン、ブローカー間プロトコルの上位バージョン、またはその両方をサポートする可能性があります。必要に応じて、これらのバージョンをアップグレードする手順を実行します。詳細は、「Kafka バージョン」 を参照してください。
クライアントをアップグレードするストラテジーを選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。
前提条件
Kafka
リソースをアップグレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.config
には、新しい Kafka バージョンでサポートされないオプションが含まれていない。
手順
Kafka クラスター設定を更新します。
oc edit kafka my-cluster
設定されている場合、
Kafka.spec.kafka.config
にlog.message.format.version
があり、inter.broker.protocol.version
が現在の Kafka バージョンのデフォルトに設定されていることを確認します。たとえば、Kafka 2.7.0 から 2.8.0 へのアップグレードは以下のようになります。
kind: Kafka spec: # ... kafka: version: 2.7.0 config: log.message.format.version: "2.7" inter.broker.protocol.version: "2.7" # ...
log.message.format.version
およびinter.broker.protocol.version
が設定されていない場合、AMQ Streams では、次のステップの Kafka バージョンの更新後、これらのバージョンを現在のデフォルトに自動的に更新します。注記log.message.format.version
およびinter.broker.protocol.version
の値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka.spec.kafka.version
を変更して、新しい Kafka バージョンを指定します。現在の Kafka バージョンのデフォルトでlog.message.format.version
およびinter.broker.protocol.version
のままにします。注記kafka.version
を変更すると、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリの使用が開始されます。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。Inter.broker.protocol.version
を変更しないと、ブローカーはアップグレード中も相互に通信を継続できます。たとえば、Kafka 2.7.0 から 2.8.0 へのアップグレードは以下のようになります。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 2.8.0 1 config: log.message.format.version: "2.7" 2 inter.broker.protocol.version: "2.7" 3 # ...
警告新しい Kafka バージョンの
inter.broker.protocol.version
が変更された場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsets
に書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。Kafka クラスターのイメージが
Kafka.spec.kafka.image
の Kafka カスタムリソースで定義されている場合、image
を更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。「Kafka バージョンおよびイメージマッピング」を参照してください。
エディターを保存して終了し、ローリングアップデートの完了を待ちます。
Pod の状態の遷移を監視して、ローリングアップデートの進捗を確認します。
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
ローリングアップデートにより、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用することが保証されます。
クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。
必要に応じて、Kafka Connect および MirrorMaker の
version
プロパティーを新バージョンの Kafka として設定します。-
Kafka Connect では、
KafkaConnect.spec.version
を更新します。 -
MirrorMaker では、
KafkaMirrorMaker.spec.version
を更新します。 -
MirrorMaker 2.0 の場合は、
KafkaMirrorMaker2.spec.version
を更新します。
-
Kafka Connect では、
設定されている場合、新しい
inter.broker.protocol.version
バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。たとえば、Kafka 2.8.0 へのアップグレードでは以下のようになります。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 2.8.0 config: log.message.format.version: "2.7" inter.broker.protocol.version: "2.8" # ...
- Cluster Operator によってクラスターが更新されるまで待ちます。
設定されている場合、新しい
log.message.format.version
バージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に進みます。たとえば、Kafka 2.8.0 へのアップグレードでは以下のようになります。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka spec: # ... kafka: version: 2.8.0 config: log.message.format.version: "2.8" inter.broker.protocol.version: "2.8" # ...
Cluster Operator によってクラスターが更新されるまで待ちます。
- これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
- ブローカーは、ブローカー間プロトコルのバージョンと、新しいバージョンの Kafka のメッセージ形式のバージョンを使用して、メッセージを送信するように設定されます。
Kafka のアップグレードに従い、必要な場合は以下を行うことができます。