第7章 AMQ Streams のアップグレード
AMQ Streams は、クラスターのダウンタイムを発生せずにアップグレードできます。各バージョンの AMQ Streams は、単一または複数バージョンの Apache Kafka をサポートします。ご使用のバージョンの AMQ Streams によってサポートされれば、上位バージョンの Kafka にアップグレードできます。
より新しいバージョンの AMQ Streams はより新しいバージョンの Kafka をサポートしますが、AMQ Streams をアップグレードしてから、サポートされる上位バージョンの Kafka にアップグレードする必要があります。
AMQ Streams Operator をアップグレードするには、OpenShift Container Platform クラスターで Operator Lifecycle Manager (OLM) を使用できます。
該当する場合、AMQ Streams と Kafka をアップグレードしてから リソースのアップグレード を実行する 必要 があります。
7.1. AMQ Streams および Kafka のアップグレード
AMQ Streams のアップグレードは 2 段階のプロセスで行います。ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。
Cluster Operator を最新の AMQ Streams バージョンに更新します。実施する手法は、Cluster Operator のデプロイ 方法によって異なります。
- インストール用 YAML ファイルを使用して Cluster Operator をデプロイした場合は、Operator のインストールファイルを変更して アップグレードを実行します。
OperatorHub から Cluster Operator をデプロイした場合は、Operator Lifecycle Manager (OLM) を使用して AMQ Streams Operator の更新チャネルを新しい AMQ Streams バージョンに変更します。
選択したアップグレードストラテジーに応じて、以下のチャネル更新のいずれかに従います。
- 自動 アップグレードが開始される。
- 手動 アップグレードでは、インストールを開始する前に承認が必要です。
OperatorHub を使用した Operator のアップグレードについての詳細は、「Upgrading installed Operators」を参照してください。
- すべての Kafka ブローカーとクライアントアプリケーションを、最新の Kafka バージョンにアップグレードします。
7.1.1. Kafka バージョン
Kafka のログメッセージ形式バージョンおよびブローカー間のプロトコルバージョンは、メッセージに追加されるログ形式バージョンとクラスターで使用されるプロトコルのバージョンを指定します。そのためアップグレードプロセスでは、既存の Kafka ブローカーの設定変更およびクライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更により、必ず正しいバージョンを使用されるようにする必要になります。
以下の表は、Kafka バージョンの違いを示しています。
| Kafka のバージョン | Interbroker プロトコルのバージョン | ログメッセージ形式のバージョン | ZooKeeper のバージョン |
|---|---|---|---|
| 2.5.0 | 2.5 | 2.5 | 3.5.8 |
| 2.6.0 | 2.6 | 2.6 | 3.5.8 |
メッセージ形式バージョン
プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリースによって変わるため、メッセージにはエンコードに使用された形式のバージョンが含まれます。ブローカーがメッセージをログに追加する前に、メッセージを新しい形式バージョンから特定の旧形式バージョンに変換するように、Kafka ブローカーを設定できます。
Kafka には、メッセージ形式のバージョンを設定する 2 通りの方法があります。
-
message.format.versionプロパティーをトピックに設定します。 -
log.message.format.versionプロパティーを Kafka ブローカーに設定します。
トピックの message.format.version のデフォルト値は、Kafka ブローカーに設定される log.message.format.version によって定義されます。トピックの message.format.version は、トピック設定を編集すると手動で設定できます。
本セクションのアップグレード作業では、メッセージ形式のバージョンが log.message.format.version によって定義されることを前提としています。
7.1.2. Cluster Operator のアップグレード
このセクションでは、AMQ Streams 1.6 を使用するように Cluster Operator デプロイメントをアップグレードする手順について説明します。
Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作による影響を受けません。
特定バージョンの AMQ Streams へのアップグレード方法については、そのバージョンをサポートするドキュメントを参照してください。
7.1.2.1. Cluster Operator の後続バージョンへのアップグレード
この手順では、Cluster Operator デプロイメントを後続バージョンにアップグレードする方法を説明します。
前提条件
- 既存の Cluster Operator デプロイメントを利用できる必要があります。
- すでに新規バージョンのインストールファイルをダウンロードしてある必要があります。
手順
-
既存の Cluster Operator リソース (
/install/cluster-operatorディレクトリー内) に追加した設定変更を覚えておきます。すべての変更は、新しいバージョンの Cluster Operator によって上書きされます。 Cluster Operator を更新します。
Cluster Operator を実行している namespace に従い、新しいバージョンのインストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yamlMacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml-
既存の Cluster Operator
Deploymentで 1 つ以上の環境変数を編集した場合、install/cluster-operator/060-Deployment-cluster-operator.yamlファイルを編集し、これらの環境変数を使用します。
設定を更新したら、残りのインストールリソースとともにデプロイします。
oc apply -f install/cluster-operator
ローリングアップデートが完了するのを待ちます。
Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。
oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'イメージタグには、新しい AMQ Streams バージョンと Kafka バージョンが順に示されます。例:
<New AMQ Streams version>-kafka-<Current Kafka version>既存のリソースを更新して、非推奨になったカスタムリソースプロパティーを処理します。
これで Cluster Operator が更新されましたが、その管理下にあるクラスターで実行している Kafka のバージョンは変わりません。
次のステップ
Cluster Operator のアップグレードの次に、Kafka アップグレードを実行できます。
7.1.3. Kafka のアップグレード。
Cluster Operator を 1.6 にアップグレードした後、次の手順では、すべての Kafka ブローカーをサポートされる最新バージョンの Kafka にアップグレードします。
Kafka のアップグレードは、Kafka ブローカーのローリングアップデートによって Cluster Operator によって実行されます。
Cluster Operator は、Kafka クラスターの設定に基づいてローリングアップデートを開始します。
Kafka.spec.kafka.config に以下が含まれている場合 | Cluster Operator によって開始されるもの |
|---|---|
|
|
単一のローリングアップデート更新後、 |
|
| 2 つのローリングアップデート |
|
| 2 つのローリングアップデート |
Cluster Operator は、Kafka のアップグレードの一環として、ZooKeeper のローリングアップデートを開始します。
- ZooKeeper バージョンが変更されなくても、単一のローリングアップデートが発生します。
- 新しいバージョンの Kafka に新しいバージョンの ZooKeeper が必要な場合、追加のローリングアップデートが発生します。
その他のリソース
7.1.3.1. Kafka バージョンおよびイメージマッピング
Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES および Kafka.spec.kafka.version プロパティーの設定について考慮してください。
-
それぞれの
KafkaリソースはKafka.spec.kafka.versionで設定できます。 Cluster Operator の
STRIMZI_KAFKA_IMAGES環境変数により、Kafka のバージョンと、指定のKafkaリソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。-
Kafka.spec.kafka.imageを設定しないと、そのバージョンのデフォルトのイメージが使用されます。 -
Kafka.spec.kafka.imageを設定すると、デフォルトのイメージがオーバーライドされます。
-
Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。
7.1.3.2. クライアントをアップグレードするストラテジー
クライアントアプリケーション (Kafka Connect コネクターを含む) をアップグレードする最善の方法は、特定の状況によって異なります。
消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。
- プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
- ブローカーでメッセージをダウンコンバートする。
ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。
ブローカーのダウンコンバートは 2 通りの方法で設定できます。
-
トピックレベルの
message.format.versionでは単一のとピックが設定されます。 -
ブローカーレベルの
log.message.format.versionは、トピックレベルのmessage.format.versionが設定されてないトピックのデフォルトです。
新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。
クライアントのアップグレードに使用できるストラテジーは複数あります。
- コンシューマーを最初にアップグレード
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
ブローカーレベル
log.message.format.versionを新バージョンに変更します。 プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。
- トピック単位でコンシューマーを最初にアップグレード
トピックごとに以下を実行します。
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
トピックレベルの
message.format.versionを新バージョンに変更します。 プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。
- トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり
トピックごとに以下を実行します。
-
トピックレベルの
message.format.versionを、旧バージョンに変更します (または、デフォルトがブローカーレベルのlog.message.format.versionのトピックを利用します)。 - コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
- アップグレードしたアプリケーションが正しく機能することを確認します。
トピックレベルの
message.format.versionを新バージョンに変更します。このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。
この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。
-
トピックレベルの
クライアントアプリケーションをアップグレードするストラテジーは他にもあります。
複数のストラテジーを適用することもできます。たとえば、最初のいくつかのアプリケーションとトピックに、「トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり」のストラテジーを適用します。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。
7.1.3.3. Kafka ブローカーおよびクライアントアプリケーションのアップグレード
この手順では、AMQ Streams Kafka クラスターを最新のサポートされる Kafka バージョンにアップグレードする方法を説明します。
新しい Kafka バージョンを現在のバージョンと比較すると、新しいバージョンは ログメッセージ形式の上位バージョン、ブローカー間プロトコルの上位バージョン、またはその両方をサポートする可能性があります。必要に応じて、これらのバージョンをアップグレードする手順を実行します。詳細は、「Kafka バージョン」 を参照してください。
クライアントをアップグレードするストラテジーを選択する必要もあります。Kafka クライアントは、この手順の 6 でアップグレードされます。
前提条件
Kafka リソースをアップグレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.configには、新しい Kafka バージョンでサポートされないオプションが含まれていない。
手順
Kafka クラスター設定を更新します。
oc edit kafka my-cluster設定されている場合、
Kafka.spec.kafka.configにlog.message.format.versionがあり、inter.broker.protocol.versionが現在の Kafka バージョンのデフォルトに設定されていることを確認します。たとえば、Kafka バージョン 2.5.0 から 2.6.0 へのアップグレードは以下のようになります。
kind: Kafka spec: # ... kafka: version: 2.5.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.5" # ...log.message.format.versionおよびinter.broker.protocol.versionが設定されていない場合、AMQ Streams では、次のステップの Kafka バージョンの更新後、これらのバージョンを現在のデフォルトに自動的に更新します。注記log.message.format.versionおよびinter.broker.protocol.versionの値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka.spec.kafka.versionを変更して、新しい Kafka バージョンを指定します。現在の Kafka バージョンのデフォルトでlog.message.format.versionおよびinter.broker.protocol.versionのままにします。注記kafka.versionを変更すると、クラスターのすべてのブローカーがアップグレードされ、新しいブローカーバイナリの使用が開始されます。このプロセスでは、一部のブローカーは古いバイナリーを使用し、他のブローカーはすでに新しいバイナリーにアップグレードされています。Inter.broker.protocol.versionを変更しないと、ブローカーはアップグレード中も相互に通信を継続できます。たとえば、Kafka 2.5.0 から 2.6.0 へのアップグレードは以下のようになります。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 1 config: log.message.format.version: "2.5" 2 inter.broker.protocol.version: "2.5" 3 # ...警告新しい Kafka バージョンの
inter.broker.protocol.versionが変更された場合は、Kafka をダウングレードできません。ブローカー間プロトコルのバージョンは、__consumer_offsetsに書き込まれたメッセージなど、ブローカーによって保存される永続メタデータに使用されるスキーマを判断します。ダウングレードされたクラスターはメッセージを理解しません。Kafka クラスターのイメージが
Kafka.spec.kafka.imageの Kafka カスタムリソースで定義されている場合、imageを更新して、新しい Kafka バージョンでコンテナーイメージを示すようにします。「Kafka バージョンおよびイメージマッピング」を参照してください。
エディターを保存して終了し、ローリングアップデートの完了を待ちます。
Pod の状態の遷移を監視して、ローリングアップデートの進捗を確認します。
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'ローリングアップデートにより、各 Pod が新バージョンの Kafka のブローカーバイナリーを使用するようになります。
クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。
必要に応じて、Kafka Connect および MirrorMaker の
versionプロパティーを新バージョンの Kafka として設定します。-
Kafka Connect では、
KafkaConnect.spec.versionを更新します。 -
MirrorMaker では、
KafkaMirrorMaker.spec.versionを更新します。 -
MirrorMaker 2.0 の場合は、
KafkaMirrorMaker2.spec.versionを更新します。
-
Kafka Connect では、
設定されている場合、新しい
inter.broker.protocol.versionバージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 9 に進みます。たとえば、Kafka 2.6.0 へのアップグレードでは以下のようになります。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.6" # ...- Cluster Operator によってクラスターが更新されるまで待ちます。
設定されている場合、新しい
log.message.format.versionバージョンを使用するように Kafka リソースを更新します。それ以外の場合は、ステップ 10 に移動します。たとえば、Kafka 2.6.0 へのアップグレードでは以下のようになります。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.6" inter.broker.protocol.version: "2.6" # ...Cluster Operator によってクラスターが更新されるまで待ちます。
- これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
- ブローカーは、新バージョンの Kafka のブローカー間プロトコルバージョンとメッセージ形式バージョンを使用してメッセージを送信するように設定されます。
Kafka のアップグレードに従い、必要な場合は以下を行うことができます。
7.1.3.4. リスナー設定の更新
AMQ Streams では、Kafka リソースの KafkaListener スキーマが提供されます。
Kafka リスナーを設定するための Generic
GenericKafkaListener は、非推奨となった KafkaListeners スキーマに置き換わりました。
GenericKafkaListener スキーマ を使用すると、名前とポートが一意であれば、必要なリスナーをいくつでも設定できます。リスナー設定 は配列として定義されますが、非推奨の形式もサポートされます。
OpenShift クラスター内のクライアントの場合は、plain (暗号化なし)または tls internal リスナーを作成できます。
OpenShift クラスター外のクライアントの場合は、外部リスナー を作成し、nodeport、loadbalancer、ingress、または route などの接続メカニズムを指定します。
KafkaListeners スキーマは、plain、tls、および external リスナーのサブプロパティーを使用し、それぞれ固定ポートを使用します。Kafka のアップグレード後、KafkaListeners スキーマを使用して設定されたリスナーを
GenericKafkaListener スキーマの形式に変換できます。
たとえば、現在 Kafka 設定で以下の設定を使用しているとします。
これまでのリスナー設定
listeners:
plain:
# ...
tls:
# ...
external:
type: loadbalancer
# ...
以下を使用して、リスナーを新しい形式に変換します。
新しいリスナー設定
listeners:
#...
- name: plain
port: 9092
type: internal
tls: false 1
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: EXTERNAL-LISTENER-TYPE 2
tls: true
必ず 正確な 名前とポート番号を使用してください。
古い形式で 使用される追加の プロパティーについては、新しい形式に更新する必要があります。
設定または オーバーライド
リスナー設定 に導入された変更:
-
overridesはconfigurationセクションとマージされます。 -
dnsAnnotationsの名前がアノテーションになりました。 -
preferredAddressTypeの名前がpreferredNodePortAddressType -
addressの名前がalternativeNames -
loadBalancerSourceRangesおよびexternalTrafficPolicyが、現在の非推奨のテンプレートからリスナー設定に移動します。
例として、以下の設定を見てみましょう。
従来の追加リスナー設定
listeners:
external:
type: loadbalancer
authentication:
type: tls
overrides:
bootstrap:
dnsAnnotations:
#...
これを以下に変更します。
新しい追加リスナー設定
listeners:
#...
- name: external
port: 9094
type:loadbalancer
tls: true
authentication:
type: tls
configuration:
bootstrap:
annotations:
#...
後方互換性を維持するため、新しいリスナー設定にある名前およびポート番号を使用する 必要 があります。他の値を使用すると、Kafka リスナーおよび OpenShift サービスの名前が変更されます。
各タイプのリスナーで利用可能な設定オプションの詳細は、「 GenericKafkaListener スキーマ参照」 を参照してください。
7.1.3.5. コンシューマーおよび Kafka Streams アプリケーションの Cooperative Rebalancing へのアップグレード
Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。
コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。
Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。
前提条件
- Kafka 2.6.0 に Kafka ブローカーおよびクライアントアプリケーションをアップグレード済みであることが必要です。
手順
Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードするには以下を行います。
-
Kafka クライアント
.jarファイルを新バージョンに置き換えます。 -
コンシューマー設定で、
partition.assignment.strategyにcooperative-stickyを追加します。たとえば、rangeストラテジーが設定されている場合は、設定をrange, cooperative-stickyに変更します。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
-
コンシューマー設定から前述の
partition.assignment.strategyを削除して、グループの各コンシューマーを再設定し、cooperative-stickyストラテジーのみを残します。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待ちます。
Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードするには以下を行います。
-
Kafka Streams の
.jarファイルを新バージョンに置き換えます。 -
Kafka Streams の設定で、
upgrade.from設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。 - 各ストリームプロセッサー (ノード) を順次再起動します。
-
upgrade.from設定パラメーターを Kafka Streams 設定から削除します。 - グループ内の各コンシューマーを順次再起動します。
関連情報
- Apache Kafka ドキュメントの「Notable changes in 2.4.0」。