OpenShift での AMQ Streams のデプロイおよびアップグレード
OpenShift Container Platform 上で AMQ Streams 1.5 を使用
概要
第1章 デプロイメントの概要
AMQ Streams は、OpenShift クラスターで Apache Kafka を実行するプロセスを簡素化します。
本ガイドでは、AMQ Streams をデプロイおよびアップグレードするすべての方法の手順を取り上げ、デプロイメントの対象や、OpenShift クラスターで Apache Kafka を実行するために必要なデプロイメントの順序について説明します。
デプロイメントの手順を説明する他に、デプロイメントを準備および検証するためのデプロイメントの前および後の手順についても説明します。追加のデプロイメントオプションには、メトリクスの導入手順が含まれます。アップグレードの手順は、AMQ Streams および Kafka のアップグレードを参照してください。
AMQ Streams は、パブリックおよびプライベートクラウドからデプロイメントを目的とするローカルデプロイメントまで、ディストリビューションに関係なくすべてのタイプの OpenShift クラスターで動作するよう設計されています。
1.1. AMQ Streams による Kafka のサポート
AMQ Streams は、Kafka を OpenShift で実行するためのコンテナーイメージおよび Operator を提供します。AMQ Streams Operator は、AMQ Streams の実行に必要です。AMQ Streams で提供される Operator は、Kafka を効果的に管理するために、専門的なオペレーション情報で目的に合うよう構築されています。
Operator は以下のプロセスを単純化します。
- Kafka クラスターのデプロイおよび実行
- Kafka コンポーネントのデプロイおよび実行
- Kafka へアクセスするための設定
- Kafka へのアクセスのセキュア化
- Kafka のアップグレード
- ブローカーの管理
- トピックの作成および管理
- ユーザーの作成および管理。
1.2. AMQ Streams の Operator
AMQ Streams では Operator を使用して Kafka をサポートし、Kafka のコンポーネントおよび依存関係を OpenShift にデプロイして管理します。
Operator は、OpenShift アプリケーションのパッケージ化、デプロイメント、および管理を行う方法です。AMQ Streams Operator は OpenShift の機能を拡張し、Kafka デプロイメントに関連する共通タスクや複雑なタスクを自動化します。Kafka 操作の情報をコードに実装することで、Kafka の管理タスクは簡素化され、必要な手動の作業が少なくなります。
Operator
AMQ Streams は、OpenShift クラスター内で実行中の Kafka クラスターを管理するための Operator を提供します。
- Cluster Operator
- Apache Kafka クラスター、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter、および Entity Operator をデプロイおよび管理します。
- Entitiy Operator
- Topic Operator および User Operator を設定します。
- Topic Operator
- Kafka トピックを管理します。
- User Operator
- Kafka ユーザーを管理します。
Cluster Operator は、Kafka クラスターと同時に、Topic Operator および User Operator を Entity Operator 設定の一部としてデプロイできます。
AMQ Streams アーキテクチャー内の Operator
1.2.1. Cluster Operator
AMQ Streams では、Cluster Operator を使用して以下のクラスターをデプロイおよび管理します。
- Kafka (ZooKeeper、Entity Operator、Kafka Exporter、Cruise Control を含む)
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
クラスターのデプロイメントにはカスタムリソースが使用されます。
たとえば、以下のように Kafka クラスターをデプロイします。
-
クラスター設定のある
Kafka
リソースが OpenShift クラスター内で作成されます。 -
Kafka
リソースに宣言された内容を基にして、該当する Kafka クラスターが Cluster Operator によってデプロイされます。
Cluster Operator で以下もデプロイできます (Kafka
リソースの設定より)。
-
KafkaTopic
カスタムリソースより Operator スタイルのトピック管理を提供する Topic Operator -
KafkaUser
カスタムリソースより Operator スタイルのユーザー管理を提供する User Operator
デプロイメントの Entity Operator 内の Topic Operator および User Operator 関数。
Cluster Operator のアーキテクチャー例
1.2.2. Topic Operator
Topic Operator は、OpenShift リソースより Kafka クラスターのトピックを管理する方法を提供します。
Topic Operator のアーキテクチャー例
Topic Operator のロールは、対応する Kafka トピックと同期して Kafka トピックを記述する KafkaTopic
OpenShift リソースのセットを保持することです。
特に、KafkaTopic
が
- 作成されると、Topic Operator によってトピックが作成されます。
- 削除されると、Topic Operator によってトピックが削除されます。
- 変更されると、Topick Operator によってトピックが更新されます。
上記と逆の方向で、トピックが
-
Kafka クラスター内で作成されると、Operator によって
KafkaTopic
が作成されます。 -
Kafka クラスターから削除されると、Operator によって
KafkaTopic
が削除されます。 -
Kafka クラスターで変更されると、Operator によって
KafkaTopic
が更新されます。
このため、KafkaTopic
をアプリケーションのデプロイメントの一部として宣言でき、トピックの作成は Topic Operator によって行われます。アプリケーションは、必要なトピックからの作成または消費のみに対処する必要があります。
トピックが再設定された場合や、別の Kafka ノードに再割り当てされた場合、KafkaTopic
は常に最新の状態になります。
1.2.3. User Operator
User Operator は、Kafka ユーザーが記述される KafkaUser
リソースを監視して Kafka クラスターの Kafka ユーザーを管理し、Kafka ユーザーが Kafka クラスターで適切に設定されるようにします。
たとえば、KafkaUser
が
- 作成されると、User Operator によって記述されるユーザーが作成されます。
- 削除されると、User Operator によって記述されるユーザーが削除されます。
- 変更されると、User Operator によって記述されるユーザーが更新されます。
User Operator は Topic Operator とは異なり、Kafka クラスターからの変更は OpenShift リソースと同期されません。アプリケーションで直接 Kafka トピックを Kafka で作成することは可能ですが、ユーザーが User Operator と同時に直接 Kafka クラスターで管理されることは想定されません。
User Operator では、アプリケーションのデプロイメントの一部として KafkaUser
リソースを宣言できます。ユーザーの認証および承認メカニズムを指定できます。たとえば、ユーザーがブローカーへのアクセスを独占しないようにするため、Kafka リソースの使用を制御する ユーザークォータ を設定することもできます。
ユーザーが作成されると、ユーザークレデンシャルが Secret
に作成されます。アプリケーションはユーザーとそのクレデンシャルを使用して、認証やメッセージの生成または消費を行う必要があります。
User Operator は 認証のクレデンシャルを管理する他に、KafkaUser
宣言にユーザーのアクセス権限の記述を含めることで承認も管理します。
1.3. AMQ Streams のカスタムリソース
AMQ Streams を使用した Kafka コンポーネントの OpenShift クラスターへのデプロイメントは、カスタムリソースの適用により高度な設定が可能です。カスタムリソースは、OpenShift リソースを拡張するために CRD (カスタムリソース定義、Custom Resource Definition) によって追加される API のインスタンスとして作成されます。
CRD は、OpenShift クラスターでカスタムリソースを記述するための設定手順として機能し、デプロイメントで使用する Kafka コンポーネントごとに AMQ Streams で提供されます。CRD およびカスタムリソースは YAML ファイルとして定義されます。YAML ファイルのサンプルは AMQ Streams ディストリビューションに同梱されています。
また、CRD を使用すると、CLI へのアクセスや設定検証などのネイティブ OpenShift 機能を AMQ Streams リソースで活用することもできます。
1.3.1. AMQ Streams カスタムリソースの例
AMQ Streams 固有リソースのインスタンス化および管理に使用されるスキーマを定義するため、CRD をクラスターに 1 度インストールする必要があります。
CRD をインストールして新規カスタムリソースタイプをクラスターに追加した後に、その仕様に基づいてリソースのインスタンスを作成できます。
クラスターの設定によりますが、インストールには通常、クラスター管理者権限が必要です。
カスタムリソースの管理は、 AMQ Streams 管理者 のみが行えます。
kind:Kafka
などの新しい kind
リソースは、OpenShift クラスター内で CRD によって定義されます。
Kubernetes API サーバーを使用すると、kind
を基にしたカスタムリソースの作成が可能になり、カスタムリソースが OpenShift クラスターに追加されたときにカスタムリソースの検証および格納方法を CRD から判断します。
CRD が削除されると、そのタイプのカスタムタイプも削除されます。さらに、Pod や Statefulset などのカスタムリソースによって作成されたリソースも削除されます。
AMQ Streams 固有の各カスタムリソースは、リソースの kind
の CRD によって定義されるスキーマに準拠します。AMQ Streams コンポーネントのカスタムリソースには、spec
で定義される共通の設定プロパティーがあります。
CRD とカスタムリソースの関係を理解するため、Kafka トピックの CRD の例を見てみましょう。
Kafka トピックの CRD
apiVersion: kafka.strimzi.io/v1beta1 kind: CustomResourceDefinition metadata: 1 name: kafkatopics.kafka.strimzi.io labels: app: strimzi spec: 2 group: kafka.strimzi.io versions: v1beta1 scope: Namespaced names: # ... singular: kafkatopic plural: kafkatopics shortNames: - kt 3 additionalPrinterColumns: 4 # ... subresources: status: {} 5 validation: 6 openAPIV3Schema: properties: spec: type: object properties: partitions: type: integer minimum: 1 replicas: type: integer minimum: 1 maximum: 32767 # ...
- 1
- CRD を識別するためのトピック CRD、その名前および名前のメタデータ。
- 2
- グループ (ドメイン) 名、複数名、サポート対象のスキーマバージョンなど、この CRD の仕様。トピックの API にアクセスするために URL で使用されます。他の名前は、CLI のインスタンスリソースを識別するために使用されます。たとえば、
oc get kafkaShortNametopic my-topic
やoc get kafkatopics
などです。 - 3
- ShortName は CLI コマンドで使用できます。たとえば、
oc get kafkatopic
の代わりにoc get kt
を略名として使用できます。 - 4
- カスタムリソースで
get
コマンドを使用する場合に示される情報。 - 5
- リソースの スキーマ参照 に記載されている CRD の現在の状態。
- 6
- openAPIV3Schema 検証によって、トピックカスタムリソースの作成が検証されます。たとえば、トピックには 1 つ以上のパーティションと 1 つのレプリカが必要です。
ファイル名に、インデックス番号とそれに続く Crd が含まれるため、AMQ Streams インストールファイルと提供される CRD YAML ファイルを識別できます。
KafkaTopic
カスタムリソースに該当する例は次のとおりです。
Kafka トピックカスタムリソース
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic 1 metadata: name: my-topic labels: strimzi.io/cluster: my-cluster 2 spec: 3 partitions: 1 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 status: conditions: 4 lastTransitionTime: "2019-08-20T11:37:00.706Z" status: "True" type: Ready observedGeneration: 1 / ...
- 1
kind
およびapiVersion
によって、インスタンスであるカスタムリソースの CRD が特定されます。- 2
- トピックまたはユーザーが属する Kafka クラスターの名前 (
Kafka
リソースの名前と同じ) を定義する、KafkaTopic
およびKafkaUser
リソースのみに適用可能なラベル。 - 3
- 指定内容には、トピックのパーティション数およびレプリカ数や、トピック自体の設定パラメーターが示されています。この例では、メッセージがトピックに保持される期間や、ログのセグメントファイルサイズが指定されています。
- 4
KafkaTopic
リソースのステータス条件。lastTransitionTime
でtype
条件がReady
に変更されています。
プラットフォーム CLI からカスタムリソースをクラスターに適用できます。カスタムリソースが作成されると、Kubernetes API の組み込みリソースと同じ検証が使用されます。
KafkaTopic
の作成後、Topic Operator は通知を受け取り、該当する Kafka トピックが AMQ Streams で作成されます。
1.4. AMQ Streams での Prometheus サポート
Prometheus サーバーおよび CoreOS Prometheus Operator は、AMQ Streams ディストリビューションの一部としてサポートされ ません。しかし、メトリクスを公開するために使用される Prometheus エンドポイントと Prometheus JMX Exporter はサポートされます。サポートされるコンポーネントの詳細は、「Prometheus メトリクスの公開」 を参照してください。
監視のために Prometheus を AMQ Streams で使用する場合、詳細な手順とメトリクス設定ファイルのサンプルが提供されます。
6章Kafka へのメトリクスの導入を参照してください。
1.5. AMQ Streams のインストール方法
AMQ Streams を OpenShift にインストールする方法は 2 つあります。
インストール方法 | 説明 | サポート対象バージョン |
---|---|---|
インストールアーティファクト (YAML ファイル) |
AMQ Streams のダウンロードサイト から | OpenShift 3.11 以上 |
OperatorHub | OperatorHub で AMQ Streams Operator を使用し、Cluster Operator を単一またはすべての namespace にデプロイします。 | OpenShift 4.x のみ |
柔軟性が重要な場合は、インストールアーティファクトによる方法を選択します。OpenShift 4 Web コンソールを使用して標準設定で AMQ Streams を OpenShift 4 にインストールする場合は、OperatorHub による方法を選択します。OperatorHub を使用すると、自動更新も利用できます。
どちらの方法でも、Cluster Operator は OpenShift クラスターにデプロイされ、提供される YAML サンプルファイルを使用して、Kafka クラスターから順に AMQ Streams の他のコンポーネントをデプロイする準備が整います。
AMQ Streams インストールアーティファクト
AMQ Streams インストールアーティファクトには、OpenShift にデプロイできるさまざまな YAML ファイルが含まれ、oc
を使用して以下を含むカスタムリソースが作成されます。
- デプロイメント
- Custom Resource Definition (CRD)
- ロールおよびロールバインディング
- サービスアカウント
YAML インストールファイルは、Cluster Operator、Topic Operator、User Operator、および Strimzi Admin ロールに提供されます。
OperatorHub
OpenShift 4 では、Operator Lifecycle Manager (OLM) を使用することにより、クラスター管理者はクラスター全体で実行されるすべての Operator やそれらの関連サービスをインストール、更新、および管理できます。OLM は、Kubernetes のネイティブアプリケーション (Operator) を効率的に自動化された拡張可能な方法で管理するために設計されたオープンソースツールキットの Operator Framework の一部です。
OperatorHub は OpenShift 4 Web コンソールの一部です。クラスター管理者はこれを使用して Operator を検出、インストール、およびアップグレードできます。Operator は OperatorHub からプルでき、単一の (プロジェクト) namespace またはすべての (プロジェクト) namespace への OpenShift クラスターにインストールできます。Operator は OLM で管理できます。エンジニアリングチームは OLM を使用して、独自に開発、テスト、および本番環境でソフトウェアを管理できます。
OperatorHub は、バージョン 4 未満の OpenShift では使用できません。
AMQ Streams Operator
AMQ Streams Operator は OperatorHub からインストールできます。AMQ Streams Operator のインストール後、必要な CRD およびロールベースアクセス制御 (RBAC) リソースと共に Cluster Operator が OpenShift クラスターにデプロイされます。
その他のリソース
インストールアーティファクトを使用した AMQ Streams のインストール:
OperatorHub からの AMQ Streams のインストール:
- 「OperatorHub からの Cluster Operator のデプロイ」
- OpenShift ドキュメントOperator
第2章 AMQ Streams でデプロイされるもの
Apache Kafka コンポーネントは、AMQ Streams ディストリビューションを使用して OpenShift にデプロイする場合に提供されます。Kafka コンポーネントは通常、クラスターとして実行され、可用性を確保します。
Kafka コンポーネントが組み込まれた通常のデプロイメントには以下が含まれます。
- ブローカーノードの Kafka クラスター
- レプリケートされた ZooKeeper インスタンスの zookeeper クラスター
- 外部データ接続用の Kafka Connect クラスター
- セカンダリークラスターで Kafka クラスターをミラーリングする Kafka MirrorMaker クラスター
- 監視用に追加の Kafka メトリクスデータを抽出する Kafka Exporter
- Kafka クラスターに対して HTTP ベースの要求を行う Kafka Bridge
少なくとも Kafka および ZooKeeper は必要ですが、上記のコンポーネントがすべて必須なわけではありません。MirrorMaker や Kafka Connect など、一部のコンポーネントでは Kafka なしでデプロイできます。
2.1. デプロイメントの順序
OpenShift クラスターへのデプロイメントで必要な順序は次のとおりです。
- Cluster Operator をデプロイし、Kafka クラスターを管理します。
- ZooKeeper クラスターとともに Kafka クラスターをデプロイし、Topic Operator および User Operator がデプロイメントに含まれるようにします。
任意で以下をデプロイします。
- Topic Operator および User Operator (Kafka クラスターとともにデプロイしなかった場合)
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
- メトリクスを監視するためのコンポーネント
2.2. その他のデプロイメント設定オプション
本書のデプロイメント手順では、AMQ Streams で提供されるインストール YAML ファイルのサンプルを使用するデプロイメントを説明します。手順では、検討する必要がある重要な設定事項について説明しますが、使用できる設定オプションをすべて取り上げるわけではありません。
カスタムリソースを使用するとデプロイメントを改良できます。
AMQ Streams をデプロイする前に、Kafka コンポーネントに使用できる設定オプションを確認できます。カスタムリソースによる設定の詳細は、デプロイメント設定 を参照してください。
2.2.1. Kafka のセキュリティー
デプロイメントでは、Cluster Operator はクラスター内でのデータの暗号化および認証に対して自動で TLS 証明書を設定します。
AMQ Streams では、暗号化、認証、および 承認 の追加の設定オプションが提供されます。
- Kafka リソースを設定 して、Kafka クラスターとクライアント間のデータ交換をセキュアにします。
- 承認サーバーが OAuth 2.0 認証 および OAuth 2.0 承認 使用するように、デプロイメントを設定します。
- 独自の証明書を使用して Kafka をセキュア にします。
2.2.2. デプロイメントの監視
AMQ Streams は、デプロイメントを監視する追加のデプロイメントオプションをサポートします。
- Prometheus および Grafana を Kafka クラスターでデプロイ し、メトリクスを抽出して、Kafka コンポーネントを監視します。
- Kafka Exporter を Kafka クラスターでデプロイ し、特にコンシューマーラグの監視に関する追加のメトリクスを抽出します。
- 分散トレーシングを設定 し、エンドツーエンドのメッセージ追跡を行います。
第3章 AMQ Streams デプロイメントの準備
ここでは、AMQ Streams デプロイメントを準備する方法を説明します。
本ガイドのコマンドを実行するには、クラスターユーザーに RBAC (ロールベースアクセス制御) および CRD を管理する権限を付与する必要があります。
3.1. デプロイメントの前提条件
AMQ Streams のデプロイする場合、以下を確認してください。
OpenShift 3.11 以降のクラスターが利用できること。
AMQ Streams は AMQ Streams Strimzi 0.18.x をベースとしています。
-
oc
コマンドラインツールがインストールされ、稼働中のクラスターに接続するように設定されていること。
AMQ Streams は、OpenShift 固有の一部機能をサポートします。そのようなインテグレーションは OpenShift ユーザーに有用で、標準の OpenShift を使用した同等の実装はありません。
3.2. AMQ Streams リリースアーティファクト
AMQ Streams をインストールするには、AMQ Streams のダウンロードページ から amq-streams-<version>-ocp-install-examples.zip
ファイルをダウンロードし、リリースアーティファクトを展開します。
AMQ Streams のリリースアーティファクトには、YAML ファイルが含まれています。これらのファイルは、AMQ Streams コンポーネントの OpenShift へのデプロイ、共通の操作の実行、および Kafka クラスターの設定に便利です。
oc
コマンドラインツールを使用して、AMQ Streams と OpenShift クラスターにデプロイします。
AMQ Streams コンテナーイメージは、Red Hat Ecosystem Catalog から使用することもできます。しかし、提供される YAML ファイルを使用して AMQ Streams をデプロイすることが推奨されます。
3.3. コンテナーイメージを独自のレジストリーにプッシュ
AMQ Streams のコンテナーイメージは Red Hat Ecosystem Catalog にあります。AMQ Streams によって提供されるインストール YAML ファイルは、直接 Red Hat Ecosystem Catalog からイメージをプルします。
Red Hat Ecosystem Catalog にアクセスできない場合や独自のコンテナーリポジトリーを使用する場合は以下を行います。
- リストにある すべての コンテナーイメージをプルします。
- 独自のレジストリーにプッシュします。
- インストール YAML ファイルのイメージ名を更新します。
リリースに対してサポートされる各 Kafka バージョンには別のイメージがあります。
コンテナーイメージ | namespace/リポジトリー | 説明 |
---|---|---|
Kafka |
| 次を含む、Kafka を実行するための AMQ Streams イメージ。
|
Operator |
| Operator を実行するための AMQ Streams イメージ。
|
Kafka Bridge |
| AMQ Streams Kafka Bridge を稼働するための AMQ Streams イメージ |
3.4. AMQ Streams の管理者の指名
AMQ Streams では、デプロイメントの設定にカスタムリソースが提供されます。デフォルトでは、これらのリソースの表示、作成、編集、および削除権限は OpenShift クラスター管理者に限定されます。AMQ Streams には、このような権限を他のユーザーに割り当てするために使用する 2 つのクラスターロールがあります。
-
strimzi-view
ロールを指定すると、ユーザーは AMQ Streams リソースを表示できます。 -
strimzi-admin
ロールを指定すると、ユーザーは AMQ Streams リソースを作成、編集、または削除することもできます。
これらのロールをインストールすると、これらの権限が自動的にデフォルトの OpenShift クラスターロールに集約 (追加) されます。strimzi-view
は view
ロールに集約され、strimzi-admin
は edit
および admin
ロールに集約されます。このように集約することで、同様の権限がすでに持つユーザーに、上記のロールを割り当てる必要がなくなります。
以下の手順では、クラスター管理者でないユーザーが AMQ Streams リソースを管理できるようにする strimzi-admin
ロールの割り当て方法を説明します。
システム管理者は、Cluster Operator のデプロイ後に AMQ Streams の管理者を指名できます。
前提条件
- Cluster Operator でデプロイとデプロイされた CRD (カスタムリソース定義) を管理する AMQ Streams の CRD リソースおよび RBAC (ロールベースアクセス制御) リソースが必要です。
手順
OpenShift で
strimzi-view
およびstrimzi-admin
クラスターロールを作成します。oc apply -f install/strimzi-admin
必要な場合は、ユーザーに必要なアクセス権限を付与するロールを割り当てます。
oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2
第4章 AMQ Streams のデプロイ
AMQ Streams のデプロイメント環境の準備 が整ったら、以下を実行できます。
- Kafka クラスターの作成方法
要件に応じてその他の Kafka コンポーネントをデプロイする任意の手順。
これらの手順は、OpenShift クラスターが利用可能で稼働していることを想定しています。
4.1. Kafka クラスターの作成
Kafka クラスターを作成するには、Cluster Operator をデプロイして Kafka クラスターを管理し、Kafka クラスターをデプロイします。
Kafka
リソースを使用して Kafka クラスターをデプロイするときに、Topic Operator および User Operator を同時にデプロイできます。この代わりに、AMQ Streams ではない Kafka クラスターを使用している場合は、Topic Operator および User Operator をスタンドアロンコンポーネントとしてデプロイすることもできます。
Kafka クラスターを Topic Operator および User Operator とデプロイ
AMQ Streams によって管理される Kafka クラスターを Topic Operator および User Operator と使用する場合は、このデプロイメント手順を実行します。
- Cluster Operator をデプロイします。
Cluster Operator を使用して以下をデプロイします。
スタンドアロン Topic Operator および User Operator のデプロイ
AMQ Streams によって管理されない Kafka クラスターを Topic Operator および User Operator と使用する場合は、このデプロイメント手順を実行します。
4.1.1. Cluster Operator のデプロイ
Cluster Operator は、OpenShift クラスター内で Apache Kafka クラスターのデプロイおよび管理を行います。
本セクションの手順は以下を説明します。
以下を監視するよう Cluster Operator をデプロイする方法。
代替のデプロイメント
4.1.1.1. Cluster Operator デプロイメントの監視オプション
Cluster Operator の稼働中に、Kafka リソースの更新に対する監視が開始されます。
Cluster Operator をデプロイして、以下からの Kafka リソースの監視を選択できます。
- 単一の namespace (Cluster Operator が含まれる同じ namespace)
- 複数の namespace
- すべての namespace
AMQ Streams では、デプロイメントの処理を簡単にするため、YAML ファイルのサンプルが提供されます。
Cluster Operator では、以下のリソースの変更が監視されます。
-
Kafka クラスターの
Kafka
。 -
Kafka Connect クラスターの
KafkaConnect
。 -
Source2Image がサポートされる Kafka Connect クラスターの
KafkaConnectS2I
。 -
Kafka Connect クラスターでコネクターを作成および管理するための
KafkaConnector
。 -
Kafka MirrorMaker インスタンスの
KafkaMirrorMaker
。 -
Kafka Bridge インスタンスの
KafkaBridge
。
OpenShift クラスターでこれらのリソースの 1 つが作成されると、Operator によってクラスターの詳細がリソースより取得されます。さらに、StatefulSet、Service、および ConfigMap などの必要な OpenShift リソースが作成され、リソースの新しいクラスターの作成が開始されます。
Kafka リソースが更新されるたびに、リソースのクラスターを設定する OpenShift リソースで該当する更新が Operator によって実行されます。
クラスターの望ましい状態がリソースのクラスターに反映されるようにするため、リソースへのパッチ適用後またはリソースの削除後にリソースが再作成されます。この操作は、サービスの中断を引き起こすローリング更新の原因となる可能性があります。
リソースが削除されると、Operator によってクラスターがアンデプロイされ、関連する OpenShift リソースがすべて削除されます。
4.1.1.2. 単一の namespace を監視対象とする Cluster Operator のデプロイメント
この手順では、OpenShift クラスターの単一の namespace で AMQ Streams リソースを監視するように Cluster Operator をデプロイする方法を説明します。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator がインストールされる namespace を使用するように、AMQ Streams のインストールファイルを編集します。
たとえば、この手順では Cluster Operator は
my-cluster-operator-namespace
という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
Cluster Operator をデプロイします。
oc apply -f install/cluster-operator -n my-cluster-operator-namespace
Cluster Operator が正常にデプロイされたことを確認します。
oc get deployments
4.1.1.3. 複数の namespace を監視対象とする Cluster Operator のデプロイメント
この手順では、OpenShift クラスターの複数の namespace 全体で AMQ Streams リソースを監視するように Cluster Operator をデプロイする方法を説明します。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator がインストールされる namespace を使用するように、AMQ Streams のインストールファイルを編集します。
たとえば、この手順では Cluster Operator は
my-cluster-operator-namespace
という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルを編集し、Cluster Operator によって監視されるすべての namespace のリストをSTRIMZI_NAMESPACE
環境変数に追加します。たとえば、この手順では Cluster Operator は
watched-namespace-1
、watched-namespace-2
、およびwatched-namespace-3
という namespace を監視します。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.5.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: watched-namespace-1,watched-namespace-2,watched-namespace-3
リストした各 namespace に
RoleBindings
をインストールします。この例では、コマンドの
watched-namespace
を前述のステップでリストした namespace に置き換えます。watched-namespace-1
、watched-namespace-2
、およびwatched-namespace-3
に対してこれを行います。oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
Cluster Operator をデプロイします。
oc apply -f install/cluster-operator -n my-cluster-operator-namespace
Cluster Operator が正常にデプロイされたことを確認します。
oc get deployments
4.1.1.4. すべての namespace を対象とする Cluster Operator のデプロイメント
この手順では、OpenShift クラスターのすべての namespace 全体で AMQ Streams リソースを監視するように Cluster Operator をデプロイする方法を説明します。
このモードで実行している場合、Cluster Operator によって、新規作成された namespace でクラスターが自動的に管理されます。
前提条件
-
この手順では、
CustomResourceDefinitions
、ClusterRoles
、およびClusterRoleBindings
を作成できる OpenShift ユーザーアカウントを使用する必要があります。通常、OpenShift クラスターでロールベースアクセス制御 (RBAC) を使用する場合、これらのリソースを作成、編集、および削除する権限を持つユーザーはsystem:admin
などの OpenShift クラスター管理者に限定されます。
手順
Cluster Operator がインストールされる namespace を使用するように、AMQ Streams のインストールファイルを編集します。
たとえば、この手順では Cluster Operator は
my-cluster-operator-namespace
という namespace にインストールされます。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルを編集し、STRIMZI_NAMESPACE
環境変数の値を*
に設定します。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: # ... serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.5.0 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: "*" # ...
クラスター全体ですべての namespace にアクセスできる権限を Cluster Operator に付与する
ClusterRoleBindings
を作成します。oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator
my-cluster-operator-namespace
は、Cluster Operator をインストールする namespace に置き換えます。Cluster Operator を OpenShift クラスターにデプロイします。
oc apply -f install/cluster-operator -n my-cluster-operator-namespace
Cluster Operator が正常にデプロイされたことを確認します。
oc get deployments
4.1.1.5. OperatorHub からの Cluster Operator のデプロイ
OperatorHub から AMQ Streams Operator をインストールして、Cluster Operator を OpenShift クラスターにデプロイできます。OperatorHub は OpenShift 4 のみで使用できます
前提条件
-
Red Hat Operator の
OperatorSource
が OpenShift クラスターで有効になっている必要があります。適切なOperatorSource
が有効になっていれば OperatorHub に Red Hat Operator が表示されます。詳細は、Operatorを参照してください。 - インストールには、Operator を OperatorHub からインストールするための権限を持つユーザーが必要です
手順
- OpenShift 4 Web コンソールで、Operators > OperatorHub をクリックします。
Streaming & Messaging カテゴリーの AMQ Streams Operator を検索または閲覧します。
- AMQ Streams タイルをクリックし、右側のサイドバーで Install をクリックします。
Create Operator Subscription 画面で、以下のインストールおよび更新オプションから選択します。
- Installation Mode: AMQ Streams Operator をクラスターのすべての (プロジェクト) namespace にインストール (デフォルト) するか、特定の (プロジェクト) namespace インストールするかを選択します。namespace を使用して関数を分離することが推奨されます。特定の namespace を Kafka クラスターおよびその他の AMQ Streams コンポーネントの専用とすることが推奨されます。
- Approval Strategy: デフォルトでは、OLM (Operator Lifecycle Manager) によって、AMQ Streams Operator が自動的に最新の AMQ Streams バージョンにアップグレードされます。今後のアップグレードを手動で承認する場合は、Manual を選択します。詳細は、OpenShift ドキュメントのOperatorを参照してください。
Subscribe をクリックすると、AMQ Streams Operator が OpenShift クラスターにインストールされます。
AMQ Streams Operator によって、Cluster Operator、CRD、およびロールベースアクセス制御 (RBAC) リソースは選択された namespace またはすべての namespace にデプロイされます。
Installed Operators 画面で、インストールの進捗を確認します。AMQ Streams Operator は、ステータスが InstallSucceeded に変更されると使用できます。
次に、YAML サンプルファイルを使用して、Kafka クラスターから順に AMQ Streams の他のコンポーネントをデプロイできます。
4.1.2. Kafka のデプロイ
Apache Kafka は、耐障害性のリアルタイムデータフィードを実現する、オープンソースの分散型 publish/subscribe メッセージングシステムです。
本セクションの手順は以下を説明します。
Cluster Operator を使用して以下をデプロイする方法
- 一時 または 永続 Kafka クラスター
Topic Operator および User Operator (
Kafka
カスタムリソースを設定してデプロイする)
Topic Operator および User Operator の代替のスタンドアロンデプロイメント手順
Kafka をインストールする場合、AMQ Streams によって ZooKeeper クラスターもインストールされ、Kafka と ZooKeeper との接続に必要な設定が追加されます。
4.1.2.1. Kafka クラスターのデプロイメント
この手順では、Cluster Operator を使用して Kafka クラスターを OpenShift にデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使って Kafka
リソースが作成されます。
AMQ Streams では、デプロイメントの YAML ファイルのサンプルは examples/kafka/
にあります。
kafka-persistent.yaml
- 3 つの Zookeeper ノードと 3 つの Kafka ノードを使用して永続クラスターをデプロイします。
kafka-jbod.yaml
- それぞれが複数の永続ボリューを使用する、3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、永続クラスターをデプロイします。
kafka-persistent-single.yaml
- 1 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、永続クラスターをデプロイします。
kafka-ephemeral.yaml
- 3 つの ZooKeeper ノードと 3 つの Kafka ノードを使用して、一時クラスターをデプロイします。
kafka-ephemeral-single.yaml
- 3 つの ZooKeeper ノードと 1 つの Kafka ノードを使用して、一時クラスターをデプロイします。
この手順では、一時 および 永続 Kafka クラスターデプロイメントの例を使用します。
- 一時クラスター
-
通常、Kafka の一時クラスターは開発およびテスト環境での使用に適していますが、本番環境での使用には適していません。このデプロイメントでは、ブローカー情報 (ZooKeeper) と、トピックまたはパーティション (Kafka) を格納するための
emptyDir
ボリュームが使用されます。emptyDir
ボリュームを使用すると、その内容は厳密に Pod のライフサイクルと関連し、Pod がダウンすると削除されます。 - 永続クラスター
-
Kafka の永続クラスターでは、
PersistentVolumes
を使用して ZooKeeper および Kafka データを格納します。PersistentVolumeClaim
を使用してPersistentVolume
が取得され、PersistentVolume
の実際のタイプには依存しません。たとえば、YAML ファイルを変更しなくても Amazon AWS デプロイメントで Amazon EBS ボリュームを使用できます。PersistentVolumeClaim
でStorageClass
を使用し、自動ボリュームプロビジョニングをトリガーすることができます。
サンプルクラスターの名前はデフォルトで my-cluster
になります。クラスター名はリソースの名前によって定義され、クラスターがデプロイされた後に変更できません。クラスターをデプロイする前にクラスター名を変更するには、関連する YAML ファイルにある Kafka
リソースの Kafka.metadata.name
プロパティーを編集します。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster # ...
Kafka
リソースの設定に関する詳細は Kafka クラスターの設定 を参照してください。
手順
一時 または 永続 クラスターを作成およびデプロイします。
開発またはテストでは、一時クラスターの使用が適している可能性があります。永続クラスターはどのような状況でも使用することができます。
一時 クラスターを作成およびデプロイするには、以下を実行します。
oc apply -f examples/kafka/kafka-ephemeral.yaml
永続 クラスターを作成およびデプロイするには、以下を実行します。
oc apply -f examples/kafka/kafka-persistent.yaml
Kafka クラスターが正常にデプロイされたことを確認します。
oc get deployments
4.1.2.2. Cluster Operator を使用した Topic Operator のデプロイ
この手順では、Cluster Operator を使用して Topic Operator をデプロイする方法を説明します。
Kafka
リソースの entityOperator
プロパティーを設定し、topicOperator
が含まれるようにします。
AMQ Streams によって管理されない Kafka クラスターを Topic Operator と使用する場合は、Topic Operator をスタンドアロンコンポーネントとしてデプロイ する必要があります。
entityOperator
および topicOperator
プロパティーの設定に関する詳細は Entity Operator を参照してください。
手順
Kafka
リソースのentityOperator
プロパティーを編集し、topicOperator
が含まれるようにします。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
EntityTopicOperatorSpec
スキーマ参照に記載されているプロパティーを使用して、Topic Operator のspec
を設定します。すべてのプロパティーにデフォルト値を使用する場合は、空のオブジェクト (
{}
) を使用します。リソースを作成または更新します。
次のように
oc apply
を使用します。oc apply -f <your-file>
4.1.2.3. Cluster Operator を使用した User Operator のデプロイ
この手順では、Cluster Operator を使用して User Operator をデプロイする方法を説明します。
Kafka
リソースの entityOperator
プロパティーを設定し、userOperator
が含まれるようにします。
AMQ Streams によって管理されない Kafka クラスターを User Operator と使用する場合は、User Operator をスタンドアロンコンポーネントとしてデプロイ する必要があります。
entityOperator
および userOperator
プロパティーの設定に関する詳細は Entity Operator を参照してください。
手順
Kafka
リソースのentityOperator
プロパティーを編集し、userOperator
が含まれるようにします。apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
EntityUserOperatorSpec
スキーマ参照 に記載されているプロパティーを使用して、User Operator のspec
を設定します。すべてのプロパティーにデフォルト値を使用する場合は、空のオブジェクト (
{}
) を使用します。リソースを作成または更新します。
oc apply -f <your-file>
4.1.3. AMQ Streams Operator の代替のスタンドアロンデプロイメントオプション
Cluster Operator を使用して Kafka クラスターをデプロイするときに、Topic Operator および User Operator をデプロイすることもできます。この代わりに、スタンドアロンデプロイメントを行うことができます。
スタンドアロンデプロイメントとは、Topic Operator および User Operator が AMQ Streams によって管理されない Kafka クラスターと操作できることを意味します。
4.1.3.1. スタンドアロン Topic Operator のデプロイ
この手順では、Topic Operator をスタンドアロンコンポーネントとしてデプロイする方法を説明します。
スタンドアロンデプロイメントには、環境変数の設定が必要で、Cluster Operator を使用した Topic Operator のデプロイ よりも複雑です。しかし、Topic Operator は Cluster Operator によってデプロイされた Kafka クラスターに限らず、あらゆる Kafka クラスターと操作できるため、スタンドアロンデプロイメントの柔軟性は高くなります。
前提条件
- Topic Operator が接続する既存の Kafka クラスターが必要です。
手順
以下を設定し、
install/topic-operator/05-Deployment-strimzi-topic-operator.yaml
ファイルのDeployment.spec.template.spec.containers[0].env
プロパティーを編集します。-
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
。hostname:port
ペアのコンマ区切りリストで Kafka クラスターのブートストラップブローカーを指定します。 -
STRIMZI_ZOOKEEPER_CONNECT
。hostname:port
ペアのコンマ区切りリストで ZooKeeper ノードを指定します。これは、Kafka クラスターが使用する ZooKeeper クラスターと同じである必要があります。 -
STRIMZI_NAMESPACE
。Operator がKafkaTopic
リソースを監視する OpenShift namespace。 -
STRIMZI_RESOURCE_LABELS
。Operator によって管理されるKafkaTopic
リソースを識別するために使用されるラベルセレクター。 -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
。定期的な調整の間隔 (秒単位) を指定します。 -
STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS
。Kafka からトピックメタデータを取得するための試行回数を指定します。各試行の間隔は、指数バックオフとして定義されます。パーティションまたはレプリカの数によって、トピックの作成に時間がかかる可能性がある場合は、この値を増やすことを検討してください。デフォルトは6
です。 -
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
。ZooKeeper セッションのタイムアウト (ミリ秒単位)。例:10000
。デフォルトは20000
(20 秒) です。 -
STRIMZI_TOPICS_PATH
。Topic Operator がそのメタデータを保存する Zookeeper ノードパス。デフォルトは/strimzi/topics
です。 -
STRIMZI_TLS_ENABLED
。Kafka ブローカーとの通信を暗号化するために、TLS サポートを有効にします。デフォルトはtrue
です。 -
STRIMZI_TRUSTSTORE_LOCATION
。TLS ベースの通信を有効にするための証明書が含まれるトラストストアへのパス。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_TRUSTSTORE_PASSWORD
。STRIMZI_TRUSTSTORE_LOCATION
で定義されるトラストストアにアクセスするためのパスワード。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_KEYSTORE_LOCATION
。TLS ベースの通信を有効にするための秘密鍵が含まれるキーストアへのパス。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_KEYSTORE_PASSWORD
。STRIMZI_KEYSTORE_LOCATION
で定義されるキーストアにアクセスするためのパスワード。TLS がSTRIMZI_TLS_ENABLED
によって有効化された場合のみ必須です。 -
STRIMZI_LOG_LEVEL
。ロギングメッセージの出力レベル。値は、ERROR
、WARNING
、INFO
、DEBUG
、およびTRACE
に設定できます。デフォルトはINFO
です。 -
STRIMZI_JAVA_OPTS
(任意)。Topic Operator を実行する JVM に使用される Java オプション。例:-Xmx=512M -Xms=256M
-
STRIMZI_JAVA_SYSTEM_PROPERTIES
(任意)。Topic Operator に設定される-D
オプションをリストします。例:-Djavax.net.debug=verbose -DpropertyName=value
.
-
Topic Operator をデプロイします。
oc apply -f install/topic-operator
Topic Operator が正常にデプロイされていることを確認します。
oc describe deployment strimzi-topic-operator
Replicas:
エントリーに1 available
が表示されれば、Topic Operator はデプロイされています。注記OpenShift への接続が低速な場合やイメージがこれまでダウンロードされたことがない場合は、デプロイメントに遅延が発生することがあります。
4.1.3.2. スタンドアロン User Operator のデプロイ
この手順では、User Operator をスタンドアロンコンポーネントとしてデプロイする方法を説明します。
スタンドアロンデプロイメントには、環境変数の設定が必要で、Cluster Operator を使用した User Operator のデプロイ よりも複雑です。しかし、User Operator は Cluster Operator によってデプロイされた Kafka クラスターに限らず、あらゆる Kafka クラスターと操作できるため、スタンドアロンデプロイメントの柔軟性は高くなります。
前提条件
- User Operator が接続する既存の Kafka クラスターが必要です。
手順
以下を設定し、
install/user-operator/05-Deployment-strimzi-user-operator.yaml
ファイルのDeployment.spec.template.spec.containers[0].env
プロパティーを編集します。-
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
。hostname:port
ペアのコンマ区切りリストで Kafka ブローカーを指定します。 -
STRIMZI_ZOOKEEPER_CONNECT
。hostname:port
ペアのコンマ区切りリストで ZooKeeper ノードを指定します。これは、Kafka クラスターが使用する ZooKeeper クラスターと同じである必要があります。TLS 暗号化で ZooKeeper ノードに接続することはサポートされません。 -
STRIMZI_NAMESPACE
。Operator がKafkaUser
リソースを監視する OpenShift namespace。 -
STRIMZI_LABELS
。Operator によって管理されるKafkaUser
リソースを識別するために使用されるラベルセレクター。 -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
。定期的な調整の間隔 (秒単位) を指定します。 -
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
。ZooKeeper セッションのタイムアウト (ミリ秒単位)。例:10000
。デフォルトは20000
(20 秒) です。 -
STRIMZI_CA_CERT_NAME
。TLS クライアント認証に対して新しいユーザー証明書を署名するための認証局の公開鍵が含まれる OpenShiftSecret
を示します。Secret
のca.crt
キーに、認証局の公開鍵が含まれている必要があります。 -
STRIMZI_CA_KEY_NAME
。TLS クライアント認証に対して新しいユーザー証明書を署名するための認証局の秘密鍵が含まれる OpenShiftSecret
を示します。Secret
のca.crt
キーに、認証局の秘密鍵が含まれている必要があります。 -
STRIMZI_CLUSTER_CA_CERT_SECRET_NAME
。TLS ベースの通信を有効にするために Kafka ブローカーの証明書の署名に使用される認証局の秘密鍵が含まれる OpenShiftSecret
を示します。Secret
のca.crt
キーに、認証局の公開鍵が含まれている必要があります。この環境変数の設定は任意で、Kafka クラスターとの通信が TLS ベースである場合のみ設定する必要があります。 -
STRIMZI_EO_KEY_SECRET_NAME
。Kafka クラスターに対する TLS クライアント認証の秘密鍵と関連する証明書が含まれる OpenShiftSecret
を示します。Secret
のentity-operator.p12
キーに、秘密鍵と証明書が含まれるキーストアが含まれ、entity-operator.password
キーに関連するパスワードが含まれる必要があります。この環境変数の設定は任意で、Kafka クラスターとの通信が TLS ベースで、TLS のクライアント認証が必要な場合のみ設定する必要があります。 -
STRIMZI_CA_VALIDITY
。認証局の有効期限。デフォルトは365
日です。 -
STRIMZI_CA_RENEWAL
。認証局の更新期限。 -
STRIMZI_LOG_LEVEL
。ロギングメッセージの出力レベル。値は、ERROR
、WARNING
、INFO
、DEBUG
、およびTRACE
に設定できます。デフォルトはINFO
です。 -
STRIMZI_GC_LOG_ENABLED
。ガベージコレクション (GC) ロギングを有効にします。デフォルトはtrue
です。デフォルトでは、古い証明書が期限切れになる前の証明書の更新期間は30
日です。 -
STRIMZI_JAVA_OPTS
(任意)。User Operator を実行する JVM に使用される Java オプション。例:-Xmx=512M -Xms=256M
-
STRIMZI_JAVA_SYSTEM_PROPERTIES
(任意)。User Operator に設定される-D
オプションをリストします。例:-Djavax.net.debug=verbose -DpropertyName=value
.
-
User Operator をデプロイします。
oc apply -f install/user-operator
User Operator が正常にデプロイされていることを確認します。
oc describe deployment strimzi-user-operator
Replicas:
エントリーに1 available
が表示されれば、User Operator はデプロイされています。注記OpenShift への接続が低速な場合やイメージがこれまでダウンロードされたことがない場合は、デプロイメントに遅延が発生することがあります。
4.2. Kafka Connect のデプロイ
Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。
AMQ Streams では、Kafka Connect は分散 (distributed) モードでデプロイされます。Kafka Connect はスタンドアロンモードでも動作しますが、AMQ Streams ではサポートされません。
Kafka Connect では、コネクター の概念を使用し、スケーラビリティーと信頼性を維持しながら Kafka クラスターで大量のデータを出し入れするためのフレームワークが提供されます。
Kafka Connect は通常、Kafka を外部データベース、ストレージシステム、およびメッセージングシステムと統合するために使用されます。
本セクションの手順では以下の方法を説明します。
コネクター という用語は、Kafka Connect クラスター内で実行されているコネクターインスタンスや、コネクタークラスと同じ意味で使用されます。本ガイドでは、本文の内容で意味が明確である場合に コネクター という用語を使用します。
4.2.1. Kafka Connect の OpenShift クラスターへのデプロイ
この手順では、Cluster Operator を使用して Kafka Connect クラスターを OpenShift クラスターにデプロイする方法を説明します。
Kafka Connect クラスターは、設定可能なノード数 (このノードの別称: ワーカー) を指定して Deployment
として実装されます。このノードは、メッセージフローのスケーラビリティーと信頼性が高くなるように、コネクターのワークロードを タスク として分散します。
デプロイメントでは、YAML ファイルの仕様を使って KafkaConnect
リソースが作成されます。
この手順では、AMQ Streams にある以下のサンプルファイルを使用します。
-
examples/kafka-connect/kafka-connect.yaml
KafkaConnect
リソースの設定に関する詳細は以下を参照してください。
手順
Kafka Connect を OpenShift クラスターにデプロイします。
oc apply -f examples/kafka-connect/kafka-connect.yaml
Kafka Connect が正常にデプロイされたことを確認します。
oc get deployments
4.2.2. コネクタープラグインでの Kafka Connect の拡張
Kafka Connect の AMQ Streams コンテナーイメージには、ファイルベースのデータを Kafka クラスターで出し入れするために 2 つの組み込みコネクターが含まれています。
ファイルコネクター | 説明 |
---|---|
| ファイル (ソース) から Kafka クラスターにデータを転送します。 |
| Kafka クラスターからファイル (シンク) にデータを転送します。 |
Cluster Operator では、Kafka Connect クラスターを OpenShift クラスターにデプロイするために作成したイメージを使用することもできます。
ここの手順では、以下を行って、独自のコネクタークラスをコネクターイメージに追加する方法を説明します。
Kafka Connect REST API または KafkaConnector カスタムリソースを使用 して直接コネクターの設定を作成します。
4.2.2.1. Kafka Connect ベースイメージからの Docker イメージの作成
この手順では、カスタムイメージを作成し、/opt/kafka/plugins
ディレクトリーに追加する方法を説明します。
Red Hat Ecosystem Catalog の Kafka コンテナーイメージを、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。
AMQ Stream バージョンの Kafka Connect は起動時に、/opt/kafka/plugins
ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。
手順
registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0
をベースイメージとして使用して、新規のDockerfile
を作成します。FROM registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
プラグインファイルの例
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
- コンテナーイメージをビルドします。
- カスタムイメージをコンテナーレジストリーにプッシュします。
新しいコンテナーイメージを示します。
以下のいずれかを行います。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。設定された場合、このプロパティーによって Cluster Operator の
STRIMZI_KAFKA_CONNECT_IMAGES
変数がオーバーライドされます。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... image: my-new-container-image 2 config: 3 #...
または
-
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
ファイルのSTRIMZI_KAFKA_CONNECT_IMAGES
変数を編集して新しいコンテナーイメージを示すようにした後、Cluster Operator を再インストールします。
その他のリソース
-
KafkaConnect.spec.image property
の詳細は、コンテナーイメージ を参照してください。 -
STRIMZI_KAFKA_CONNECT_IMAGES
変数の詳細は、Cluster Operator の設定 を参照してください。
4.2.2.2. OpenShift ビルドおよび S2I (Source-to-Image) を使用したコンテナーイメージの作成
この手順では、OpenShift ビルド と S2I (Source-to-Image) フレームワークを使用して、新しいコンテナーイメージを作成する方法を説明します。
OpenShift ビルドは、S2I がサポートされるビルダーイメージとともに、ユーザー提供のソースコードおよびバイナリーを取得し、これらを使用して新しいコンテナーイメージを構築します。構築後、コンテナーイメージは OpenShfit のローカルコンテナーイメージリポジトリーに格納され、デプロイメントで使用可能になります。
S2I がサポートされる Kafka Connect ビルダーイメージは、registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0
イメージの一部として、Red Hat Ecosystem Catalog で提供されます。この S2I イメージは、バイナリー (プラグインおよびコネクターとともに) を取得し、/tmp/kafka-plugins/s2i
ディレクトリーに格納されます。このディレクトリーから、Kafka Connect デプロイメントとともに使用できる新しい Kafka Connect イメージを作成します。改良されたイメージの使用を開始すると、Kafka Connect は /tmp/kafka-plugins/s2i
ディレクトリーからサードパーティープラグインをロードします。
手順
コマンドラインで
oc apply
コマンドを使用し、Kafka Connect の S2I クラスターを作成およびデプロイします。oc apply -f examples/connect/kafka-connect-s2i.yaml
Kafka Connect プラグインでディレクトリーを作成します。
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
oc start-build
コマンドで、準備したディレクトリーを使用してイメージの新しいビルドを開始します。oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
注記ビルドの名前は、デプロイされた Kafka Connect クラスターと同じになります。
- ビルドが完了したら、Kafka Connect のデプロイメントによって新しいイメージが自動的に使用されます。
4.2.3. コネクターの作成および管理
コネクタープラグインのコンテナーイメージを作成したら、Kafka Connect クラスターにコネクターインスタンスを作成する必要があります。その後、稼働中のコネクターインスタンスを設定、監視、および管理できます。
コネクターは特定の コネクタークラス のインスタンスで、関連のある外部システムとメッセージについて通信する方法を認識しています。コネクターは多くの外部システムで使用でき、独自のコネクターを作成することもできます。
ソース および シンク タイプのコネクターを作成できます。
- ソースコネクター
- ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
- シンクコネクター
- シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。
AMQ Streams では、コネクターの作成および管理に 2 つの API が提供されます。
-
KafkaConnector
リソース (KafkaConnectors
と呼ばれます) - Kafka Connect REST API
API を使用すると、以下を行うことができます。
- コネクターインスタンスのステータスの確認。
- 稼働中のコネクターの再設定。
- コネクターインスタンスのタスク数の増減。
-
失敗したタスクの再起動 (
KafkaConnector
リソースによってサポートされません)。 - コネクターインスタンスの一時停止。
- 一時停止したコネクターインスタンスの再開。
- コネクターインスタンスの削除。
4.2.3.1. KafkaConnector
リソース
KafkaConnectors
を使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できるため、cURL などの HTTP クライアントが必要ありません。その他の Kafka リソースと同様に、コネクターの望ましい状態を OpenShift クラスターにデプロイされた KafkaConnector
YAML ファイルに宣言し、コネクターインスタンスを作成します。
該当する KafkaConnector
を更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。該当する KafkaConnector
を削除して、コネクターを削除します。
これまでのバージョンの AMQ Streams との互換性を維持するため、KafkaConnectors
はデフォルトで無効になっています。Kafka Connect クラスターのために有効にするには、KafkaConnect
リソースでアノテーションを使用する必要があります。手順は KafkaConnector
リソースの有効化 を参照してください。
KafkaConnectors
が有効になると、Cluster Operator によって監視が開始されます。KafkaConnectors
に定義された設定と一致するよう、稼働中のコネクターインスタンスの設定を更新します。
AMQ Streams には、examples/connect/source-connector.yaml
という名前のサンプル KafkaConnector
が含まれます。このサンプルを使用して、FileStreamSourceConnector
を作成および管理できます。
4.2.3.2. Kafka Connect REST API の可用性
Kafka Connect REST API は、<connect-cluster-name>-connect-api
サービスとして 8083 番ポートで使用できます。
KafkaConnectors
が有効になっている場合、Kafka Connect REST API に直接手作業で追加された変更は Cluster Operator によって元に戻されます。
REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。
4.2.4. KafkaConnector
リソースの Kafka Connect へのデプロイ
この手順では、KafkaConnector
の例を Kafka Connect クラスターにデプロイする方法を説明します。
YAML の例によって FileStreamSourceConnector
が作成され、ライセンスファイルの各行が my-topic
という名前のトピックでメッセージとして Kafka に送信されます。
前提条件
-
KafkaConnectors
が有効になっている Kafka Connect デプロイメントが必要です。 - 稼働中の Cluster Operator が必要です。
手順
examples/connect/source-connector.yaml
ファイルを編集します。apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" topic: my-topic # ...
OpenShift クラスターで
KafkaConnector
を作成します。oc apply -f examples/connect/source-connector.yaml
リソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=my-connect-cluster -o name
4.3. Kafka MirrorMaker のデプロイ
Cluster Operator によって、1 つ以上の Kafka MirrorMaker のレプリカがデプロイされ、Kafka クラスターの間でデータが複製されます。このプロセスはミラーリングと言われ、Kafka パーティションのレプリケーションの概念と混同しないようにします。MirrorMaker は、ソースクラスターからメッセージを消費し、これらのメッセージをターゲットクラスターにパブリッシュします。
4.3.1. Kafka MirrorMaker の OpenShift クラスターへのデプロイ
この手順では、Cluster Operator を使用して Kafka MirrorMaker クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使って、デプロイされた MirrorMaker のバージョンに応じて KafkaMirrorMaker
または KafkaMirrorMaker2
リソースが作成されます。
この手順では、AMQ Streams にある以下のサンプルファイルを使用します。
-
examples/mirror-maker/kafka-mirror-maker.yaml
-
examples/mirror-maker/kafka-mirror-maker-2.yaml
KafkaMirrorMaker
または KafkaMirrorMaker2
リソースの設定に関する詳細は Kafka MirrorMaker の設定 を参照してください。
手順
Kafka MirrorMaker を OpenShift クラスターにデプロイします。
MirrorMaker の場合
oc apply -f examples/mirror-maker/kafka-mirror-maker.yaml
MirrorMaker 2.0 の場合
oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yaml
MirrorMaker が正常にデプロイされたことを確認します。
oc get deployments
4.4. Kafka Bridge のデプロイ
Cluster Operator によって、1 つ以上の Kafka Bridge のレプリカがデプロイされ、HTTP API 経由で Kafka クラスターとクライアントの間でデータが送信されます。
4.4.1. Kafka Bridge を OpenShift クラスターへデプロイ
この手順では、Cluster Operator を使用して Kafka Bridge クラスターを OpenShift クラスターにデプロイする方法を説明します。
デプロイメントでは、YAML ファイルの仕様を使って KafkaBridge
リソースが作成されます。
この手順では、AMQ Streams にある以下のサンプルファイルを使用します。
-
examples/bridge/kafka-bridge.yaml
KafkaBridge
リソースの設定に関する詳細は Kafka Bridge の設定 を参照してください。
手順
Kafka Bridge を OpenShift クラスターにデプロイします。
oc apply -f examples/bridge/kafka-bridge.yaml
Kafka Bridge が正常にデプロイされたことを確認します。
oc get deployments
第5章 AMQ Streams のデプロイメントの検証
本セクションの手順では、AMQ Streams のデプロイ 後に、プロデューサーおよびコンシューマークライアントの例をデプロイする方法を説明します。
プロデューサーは、使用可能な AMQ Streams が OpenShift クラスターで稼働していることを想定します。
5.1. サンプルクライアントのデプロイ
この手順では、ユーザーが作成した Kafka クラスターを使用してメッセージを送受信するプロデューサーおよびコンシューマークライアントの例をデプロイする方法を説明します。
前提条件
- クライアントが Kafka クラスターを使用できる。
手順
Kafka プロデューサーをデプロイします。
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
- プロデューサーが稼働しているコンソールにメッセージを入力します。
- Enter を押してメッセージを送信します。
Kafka コンシューマーをデプロイします。
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.5.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
- コンシューマーコンソールに受信メッセージが表示されることを確認します。
第6章 Kafka へのメトリクスの導入
ここでは、AMQ Streams デプロイメントを監視するための設定オプションについて説明します。
要件に応じて以下を行うことができます。
Prometheus および Grafana が有効になっている場合、Kafka Exporter はコンシューマーラグに関して追加の監視を提供します。
さらに、分散トレーシングを設定 して、エンドツーエンドのメッセージ追跡を行うようにデプロイメントを設定することもできます。
その他のリソース
- Prometheus の詳細は、Prometheus のドキュメント を参照してください。
- Grafana の詳細は、Grafana のドキュメント を参照してください。
- Apache Kafka Monitoring では、Apache Kafka により公開される JMX メトリクスについて解説しています。
- ZooKeeper JMX では、Apache Zookeeper により公開される JMX メトリックについて解説しています。
6.1. Prometheus および Grafana の追加
このセクションでは、Prometheus を使用して AMQ Streams Kafka、Zookeeper、Kafka Connect、Kafka MirrorMaker、および MirrorMaker 2.0 クラスターを監視し、サンプル Grafana ダッシュボードの監視データを提供する方法について説明します。
Prometheus と Grafana は、operator の監視にも使用できます。operator の Grafana ダッシュボードのサンプルは以下を提供します。
- 調整の数や処理するカスタムリソースの数などの、operator に関する情報。
- operator からの JVM メトリクス。
Grafana ダッシュボードのサンプルを実行するには、以下を行う必要があります。
このセクションで参照されるリソースは、まず監視を設定することを目的としており、これらはサンプルとしてのみ提供されます。実稼働環境で Prometheus または Grafana を設定、実行するためにサポートがさらに必要な場合は、それぞれのコミュニティーに連絡してください。
6.1.1. メトリクスファイルの例
メトリクス設定のサンプルファイルは、examples/metrics
ディレクトリーにあります。
metrics ├── grafana-install │ ├── grafana.yaml 1 ├── grafana-dashboards 2 │ ├── strimzi-kafka-connect.json │ ├── strimzi-kafka.json │ ├── strimzi-zookeeper.json │ ├── strimzi-kafka-mirror-maker-2.json │ ├── strimzi-operators.json │ └── strimzi-kafka-exporter.json 3 ├── kafka-connect-metrics.yaml 4 ├── kafka-metrics.yaml 5 ├── prometheus-additional-properties │ └── prometheus-additional.yaml 6 ├── prometheus-alertmanager-config │ └── alert-manager-config.yaml 7 └── prometheus-install ├── alert-manager.yaml 8 ├── prometheus-rules.yaml 9 ├── prometheus.yaml 10 ├── strimzi-pod-monitor.yaml 11 └── strimzi-service-monitor.yaml 12
- 1
- Grafana イメージのインストールファイル。
- 2
- Grafana ダッシュボード。
- 3
- Kafka Exporter に固有の Grafana ダッシュボード。
- 4
- Kafka Connect に対する Prometheus JMX Exporter の再ラベル付けルールを定義するメトリクス設定。
- 5
- Kafka および ZooKeeper に対する Prometheus JMX Exporter の再ラベル付けルールを定義するメトリクス設定。
- 6
- サービス監視のロールを追加する設定。
- 7
- Alertmanager による通知送信のためのフック定義。
- 8
- Alertmanager をデプロイおよび設定するためのリソース。
- 9
- Prometheus Alertmanager と使用するアラートルールの例 (Prometheus とデプロイ)。
- 10
- Prometheus イメージのインストールファイル。
- 11
- Pod からメトリクスデータをスクレープする Prometheus ジョブ定義。
- 12
- サービスからメトリクスデータをスクレープする Prometheus ジョブ定義。
6.1.2. Prometheus メトリクスの公開
AMQ Streams は、Prometheus JMX Exporter を使用して、HTTP エンドポイントを使用する Kafka および ZooKeeper から JMX メトリクスを公開し、さらにメトリクスは Prometheus サーバーによってスクレープされます。
6.1.2.1. Prometheus メトリクスの設定
AMQ Streams には、Grafana の設定ファイルのサンプル が含まれています。
Grafana ダッシュボードは、以下に対して定義される Prometheus JMX Exporter の再ラベル付けルールに依存します。
-
kafka-metrics.yaml
サンプルファイルで、Kafka および ZooKeeper をKafka
リソース設定とする。 -
kafka-connect-metrics.yaml
サンプルファイルで、Kafka Connect をKafkaConnect
およびKafkaConnectS2I
リソースとする。
ラベルは名前と値のペアです。再ラベル付けは、ラベルを動的に書き込むプロセスです。たとえば、ラベルの値は Kafka サーバーおよびクライアント ID の名前から派生されます。
このセクションでは kafka-metrics.yaml
を使用してメトリクス設定を説明しますが、このプロセスは kafka-connect-metrics.yaml
ファイルを使用して Kafka Connect を設定する場合と同じです。
その他のリソース
再ラベル付けの使用方法の詳細は、Prometheus ドキュメントの Configuration を参照してください。
6.1.2.2. Prometheus メトリクスのデプロイメントオプション
再ラベル付けルールのメトリクス設定例を Kafka クラスターに適用するには、以下のいずれかを行います。
6.1.2.3. Prometheus メトリクス設定の Kafka リソースへのコピー
Grafana ダッシュボードを監視に使用するには、メトリクス設定サンプルを Kafka
リソースに コピーします。
手順
デプロイメントの Kafka
リソースごとに以下の手順を実行します。
エディターで
Kafka
リソースを更新します。oc edit kafka my-cluster
-
kafka-metrics.yaml
の設定例 を、ユーザーのKafka
リソース定義にコピーします。 - ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
6.1.2.4. Prometheus メトリクス設定での Kafka クラスターのデプロイメント
Grafana ダッシュボードを監視に使用するには、メトリクス設定でサンプル Kafka クラスター をデプロイできます。
手順
メトリクス設定で Kafka クラスターをデプロイします。
oc apply -f kafka-metrics.yaml
6.1.3. Prometheus の設定
Prometheus では、システム監視とアラート通知のオープンソースのコンポーネントセットが提供されます。
ここでは、CoreOS Prometheus Operator を使用して、実稼働環境での使用に適している Prometheus サーバーを実行および管理する方法を説明します。正しい設定を使用すれば、すべての Prometheus サーバーを実行できます。
Prometheus サーバーの設定では、サービス検出を使用して、メトリクス取得元のクラスター内にある Pod を検出します。この機能が正しく機能するには、Prometheus サービス Pod の稼働に使用されるサービスアカウントで API サーバーにアクセスし、Pod リストを取得できる必要があります。
詳細は、Discovering services を参照してください。
6.1.3.1. Prometheus の設定
AMQ Streams では、Prometheus サーバーの設定ファイルのサンプル が提供されます。
デプロイメント用に Prometheus イメージが提供されます。
-
prometheus.yaml
Prometheus 関連の追加設定も、以下のファイルに含まれています。
-
prometheus-additional.yaml
-
prometheus-rules.yaml
-
strimzi-pod-monitor.yaml
-
strimzi-service-monitor.yaml
Prometheus で監視データを取得するには以下を行います。
次に、設定ファイルを使用して以下を行います。
アラートルール
prometheus-rules.yaml
ファイルには、Alertmanager で使用するアラートルールのサンプル が含まれています。
6.1.3.2. Prometheus リソース
Prometheus 設定を適用すると、以下のリソースが OpenShift クラスターに作成され、Prometheus Operator によって管理されます。
-
ClusterRole
。コンテナーメトリクスのために Kafka と ZooKeeper の Pod、cAdvisor および kubelet によって公開される health エンドポイントを読み取る権限を Prometheus に付与します。 -
ServiceAccount
。これで Prometheus Pod が実行されます。 -
ClusterRoleBinding
。ClusterRole
をServiceAccount
にバインドします。 -
Deployment
。Prometheus Operator Pod を管理します。 -
ServiceMonitor
。Prometheus Pod の設定を管理します。 -
Prometheus
。Prometheus Pod の設定を管理します。 -
PrometheusRule
。Prometheus Pod のアラートルールを管理します。 -
Secret
。Prometheus の追加設定を管理します。 -
Service
。クラスターで稼働するアプリケーションが Prometheus に接続できるようにします (例: Prometheus をデータソースとして使用する Grafana)。
6.1.3.3. CoreOS Prometheus Operator のデプロイ
Prometheus Operator を Kafka クラスターにデプロイするには、Prometheus CoreOS リポジトリー から YAML バンドルリソースファイルを適用します。
手順
リポジトリーから
bundle.yaml
リソースファイルをダウンロードします。Linux の場合は、以下を使用します。
curl -s https://raw.githubusercontent.com/coreos/prometheus-operator/master/bundle.yaml | sed -e 's/namespace: .*/namespace: my-namespace/' > prometheus-operator-deployment.yaml
MacOS の場合は、以下を使用します。
curl -s https://raw.githubusercontent.com/coreos/prometheus-operator/master/bundle.yaml | sed -e '' 's/namespace: .*/namespace: my-namespace/' > prometheus-operator-deployment.yaml
サンプル
namespace
は、独自の namespace に置き換えます。注記OpenShift を使用している場合は、Prometheus Operator リポジトリーの OpenShift フォーク のリリースを指定します。
-
(任意) これが必要ない場合は、
spec.template.spec.securityContext
プロパティーをprometheus-operator-deployment.yaml
ファイルから手動で削除できます。 Prometheus Operator をデプロイします。
oc apply -f prometheus-operator-deployment.yaml
6.1.3.4. Prometheus のデプロイメント
Prometheus を Kafka クラスターにデプロイして監視データを取得するには、Prometheus Docker イメージのリソースサンプルファイルと Prometheus 関連リソースの YAML ファイルを適用します。
デプロイメントプロセスでは、ClusterRoleBinding
が作成され、デプロイメントのために指定された namespace で Alertmanager インスタンスが検出されます。
Prometheus Operator はデフォルトでは、endpoints
ロールが含まれるジョブのみをサービス検出でサポートします。ターゲットは、エンドポイントのポートアドレスごとに検出およびスクレープされます。エンドポイントの検出では、ポートアドレスはサービス (role: service
) または Pod (role: pod
) の検出から派生する可能性があります。
前提条件
- 提供されるアラートルールのサンプル を確認します。
手順
Prometheus のインストール先となる namespace に従い、Prometheus インストールファイル (
prometheus.yaml
) を変更します。Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-namespace/' prometheus.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-namespace/' prometheus.yaml
-
ServiceMonitor
リソースをstrimzi-service-monitor.yaml
で編集し、サービスからメトリクスデータをスクレープする Prometheus ジョブを定義します。ServiceMonitor
はサービスからメトリクスをスクレープするために使用され、Apache Kafka や ZooKeeper に使用されます。 -
PodMonitor
リソースをstrimzi-service-monitor.yaml
で編集し、Pod からメトリクスデータをスクレープする Prometheus ジョブを定義します。PodMonitor
は、Pod から直接データをスクレープするために使用され、Operator に使用されます。 別のロールを使用するには、以下を実行します。
Secret
リソースを作成します。oc create secret generic additional-scrape-configs --from-file=prometheus-additional.yaml
-
prometheus.yaml
ファイルでadditionalScrapeConfigs
プロパティーを編集し、Secret
の名前と、追加の設定が含まれる YAML ファイル (prometheus-additional.yaml
) を追加します。
Prometheus リソースをデプロイします。
oc apply -f strimzi-service-monitor.yaml oc apply -f strimzi-pod-monitor.yaml oc apply -f prometheus-rules.yaml oc apply -f prometheus.yaml
6.1.4. Prometheus Alertmanager の設定
Prometheus Alertmanager は、アラートを処理して通知サービスにルーティングするためのプラグインです。Alertmanager は、アラートルールを基にして潜在的な問題と見られる状態を通知し、監視で必要な条件に対応します。
6.1.4.1. Alertmanager の設定
AMQ Streams には、Prometheus Alertmanager の設定ファイルのサンプル が含まれます。
設定ファイルは、Alertmanager をデプロイするためのリソースを定義します。
-
alert-manager.yaml
追加の設定ファイルには、Kafka クラスターから通知を送信するためのフック定義が含まれます。
-
alert-manager-config.yaml
Alertmanger で Prometheus アラートの処理を可能にするには、設定ファイルを使用して以下を行います。
6.1.4.2. アラートルール
アラートルールによって、メトリクスで監視される特定条件についての通知が提供されます。ルールは Prometheus サーバーで宣言されますが、アラート通知は Prometheus Alertmanager で対応します。
Prometheus アラートルールでは、継続的に評価される PromQL 表現を使用して条件が記述されます。
アラート表現が true になると、条件が満たされ、Prometheus サーバーからアラートデータが Alertmanager に送信されます。次に Alertmanager は、そのデプロイメントに設定された通信方法を使用して通知を送信します。
Alertmanager は、電子メール、チャットメッセージなどの通知方法を使用するように設定できます。
その他のリソース
アラートルールの設定についての詳細は、Prometheus ドキュメントの Configuration を参照してください。
6.1.4.3. アラートルールの例
Kafka および ZooKeeper メトリクスのアラートルールのサンプルは AMQ Streams に含まれており、Prometheus デプロイメント で使用できます。
アラートルールの定義に関する一般的な留意点:
-
for
プロパティーはルールと併用され、アラートがトリガーされる前に条件が維持されなければならない期間を決定します。 -
ティック (tick) は ZooKeeper の基本的な時間単位です。ミリ秒単位で測定され、
Kafka.spec.zookeeper.config
のtickTime
パラメーターを使用して設定されます。たとえば、ZooKeeper でtickTime=3000
の場合、3 ティック (3 x 3000) は 9000 ミリ秒と等しくなります。 -
ZookeeperRunningOutOfSpace
メトリクスおよびアラートを利用できるかどうかは、使用される OpenShift 設定およびストレージ実装によります。特定のプラットフォームのストレージ実装では、メトリクスによるアラートの提供に必要な利用可能な領域について情報が提供されない場合があります。
Kafka アラートルール
UnderReplicatedPartitions
-
現在のブローカーがリードレプリカでありながら、パーティションのトピックに設定された
min.insync.replicas
よりも複製数が少ないパーティションの数が示されます。このメトリクスにより、フォロワーレプリカをホストするブローカーの詳細が提供されます。リーダーからこれらのフォロワーへの複製が追い付いていません。その理由として、現在または過去にオフライン状態になっていたり、過剰なスロットリングが適用されたブローカー間の複製であることが考えられます。この値がゼロより大きい場合にアラートが発生し、複製の数が最低数未満であるパーティションの情報がブローカー別に通知されます。 AbnormalControllerState
- 現在のブローカーがクラスターのコントローラーであるかどうかを示します。メトリクスは 0 または 1 です。クラスターのライフサイクルでは、1 つのブローカーのみかコントローラーとなるはずで、クラスターには常にアクティブなコントローラーが存在する必要があります。複数のブローカーがコントローラーであることが示される場合は問題になります。そのような状態が続くと、すべてのブローカーのこのメトリクスの合計値が 1 でない場合にアラートが発生します。合計値が 0 であればアクティブなコントローラーがなく、合計値が 1 を超えればコントローラーが複数あることを意味します。
UnderMinIsrPartitionCount
-
書き込み操作の完了を通知しなければならないリード Kafka ブローカーの ISR (In-Sync レプリカ) が最小数 (
min.insync.replicas
を使用して指定) に達していないことを示します。このメトリクスでは、ブローカーがリードし、In-Sync レプリカの数が最小数に達していない、パーティションの数が定義されます。この値がゼロより大きい場合にアラートが発生し、完了通知 (ack) が最少数未満であった各ブローカーのパーティション数に関する情報が提供されます。 OfflineLogDirectoryCount
- ハードウェア障害などの理由によりオフライン状態であるログディレクトリーの数を示します。そのため、ブローカーは受信メッセージを保存できません。この値がゼロより大きい場合にアラートが発生し、各ブローカーのオフライン状態であるログディレクトリーの数に関する情報が提供されます。
KafkaRunningOutOfSpace
-
データの書き込みに使用できる残りのディスク容量を示します。この値が 5GiB 未満になるとアラートが発生し、永続ボリューム要求 (Persistent Volume Claim、PVC) ごとに容量不足のディスクに関する情報が提供されます。しきい値は
prometheus-rules.yaml
で変更できます。
ZooKeeper アラートルール
AvgRequestLatency
- サーバーがクライアントリクエストに応答するまでの時間を示します。この値が 10 (tick) を超えるとアラートが発生し、各サーバーの平均リクエストレイテンシーの実際の値が通知されます。
OutstandingRequests
- サーバーでキューに置かれたリクエストの数を示します。この値は、サーバーが処理能力を超えるリクエストを受信すると上昇します。この値が 10 よりも大きい場合にアラートが発生し、各サーバーの未処理のリクエスト数が通知されます。
ZookeeperRunningOutOfSpace
- このメトリクスは、ZooKeeper へのデータ書き込みに使用できる残りのディスク容量を示します。この値が 5GiB 未満になるとアラートが発生し、永続ボリューム要求 (Persistent Volume Claim、PVC) ごとに容量不足のディスクに関する情報が提供されます。
6.1.4.4. Alertmanager のデプロイメント
Alertmanager をデプロイするには、設定ファイルのサンプル を適用します。
AMQ Streams に含まれる設定サンプルでは、Slack チャネルに通知を送信するように Alertmanager を設定します。
デプロイメントで以下のリソースが定義されます。
-
Alertmanager
。Alertmanager Pod を管理します。 -
Secret
。Alertmanager の設定を管理します。 -
Service
。参照しやすいホスト名を提供し、他のサービスが Alertmanager に接続できるようにします (Prometheus など)。
手順
Alertmanager 設定ファイル (
alert-manager-config.yaml
) からSecret
リソースを作成します。oc create secret generic alertmanager-alertmanager --from-file=alertmanager.yaml=alert-manager-config.yaml
以下を行って、
alert-manager-config.yaml
ファイルを更新します。-
slack_api_url
プロパティーを、Slack ワークスペースのアプリケーションに関連する Slack API URL の実際の値に置き換えます。 -
channel
プロパティーを、通知が送信される実際の Slack チャネルに置き換えます。
-
Alertmanager をデプロイします。
oc apply -f alert-manager.yaml
6.1.5. Grafana の設定
Grafana では、Prometheus メトリクスを視覚化できます。
AMQ Streams で提供される Grafana ダッシュボードサンプルをデプロイして有効化できます。
6.1.5.1. Grafana の設定
AMQ Streams には、Grafana のダッシュボード設定ファイルのサンプル が含まれています。
Grafana Docker イメージがデプロイメント用に提供されます。
-
grafana.yaml
ダッシュボードのサンプルも JSON ファイルで提供されます。
-
strimzi-kafka.json
-
strimzi-kafka-connect.json
-
strimzi-zookeeper.json
-
strimzi-kafka-mirror-maker-2.json
-
strimzi-kafka-exporter.json
-
strimzi-operators.json
ダッシュボードのサンプルは、主なメトリクスの監視を開始するための雛形として使用できますが、使用できるすべてのメトリックスを対象としていません。使用するインフラストラクチャーに応じて、ダッシュボードのサンプルの編集や、他のメトリクスの追加が必要な場合もあります。
Grafana でダッシュボードを表示するには、設定ファイルを使用して以下を行います。
6.1.5.2. Grafana のデプロイメント
Grafana をデプロイして Prometheus メトリクスを視覚化するには、設定ファイルのサンプル を適用します。
前提条件
手順
Grafana をデプロイします。
oc apply -f grafana.yaml
- Grafana ダッシュボードを有効にします。
6.1.5.3. Grafana ダッシュボードサンプルの有効化
Prometheus データソースおよびダッシュボードのサンプルを設定し、監視用に Grafana を有効にします。
アラート通知ルールは定義されていません。
ダッシュボードにアクセスする場合、port-forward
コマンドを使用して Grafana Pod からホストにトラフィックを転送できます。
たとえば、以下のように Grafana ユーザーインターフェイスにアクセスできます。
-
oc port-forward svc/grafana 3000:3000
を実行します。 -
ブラウザーで
http://localhost:3000
を指定します。
Grafana Pod の名前はユーザーごとに異なります。
手順
admin/admin
クレデンシャルを使用して、Grafana ユーザーインターフェイスにアクセスします。最初のビューで、パスワードのリセットを選択します。
Add data source ボタンをクリックします。
Prometheus をデータソースとして追加します。
- 名前を指定します。
- Prometheus をタイプとして追加します。
- URL フィールドに、Prometheus サーバーへ接続する URL (http://prometheus-operated:9090) を指定します。
Add をクリックしてデータソースへの接続をテストします。
Dashboards、Import の順にクリックして、Import Dashboard ウィンドウを開き、ダッシュボードのサンプルをインポートします (または JSON を貼り付けます)。
ダッシュボードをインポートしたら、Grafana ダッシュボードのホームページに Kafka および ZooKeeper ダッシュボードが表示されます。
Prometheus サーバーが AMQ Streams クラスターのメトリクスを収集すると、それがダッシュボードに反映されます。
図6.1 Kafka ダッシュボード
図6.2 ZooKeeper ダッシュボード
6.2. Kafka Exporter の追加
Kafka Exporter は、Apache Kafka ブローカーおよびクライアントの監視を強化するオープンソースプロジェクトです。Kafka Exporter は、Kafka クラスターとのデプロイメントを実現するために AMQ Streams で提供され、オフセット、コンシューマーグループ、コンシューマーラグ、およびトピックに関連する Kafka ブローカーから追加のメトリクスデータを抽出します。
一例として、メトリクスデータを使用すると、低速なコンシューマーの識別に役立ちます。
ラグデータは Prometheus メトリクスとして公開され、解析のために Grafana で使用できます。
ビルトイン Kafka メトリクスの監視のために Prometheus および Grafana をすでに使用している場合、Kafka Exporter Prometheus エンドポイントをスクレープするように Prometheus を設定することもできます。
6.2.1. コンシューマーラグの監視
コンシューマーラグは、メッセージの生成と消費の差を示しています。具体的には、指定のコンシューマーグループのコンシューマーラグは、パーティションの最後のメッセージと、そのコンシューマーが現在ピックアップしているメッセージとの時間差を示しています。
ラグには、パーティションログの最後を基準とする、コンシューマーオフセットの相対的な位置が反映されます。
プロデューサーおよびコンシューマーオフセット間のコンシューマーラグ
この差は、Kafka ブローカートピックパーティションの読み取りと書き込みの場所である、プロデューサーオフセットとコンシューマーオフセットの間の デルタ とも呼ばれます。
あるトピックで毎秒 100 個のメッセージがストリーミングされる場合を考えてみましょう。プロデューサーオフセット (トピックパーティションの先頭) と、コンシューマーが読み取った最後のオフセットとの間のラグが 1000 個のメッセージであれば、10 秒の遅延があることを意味します。
コンシューマーラグ監視の重要性
可能な限りリアルタイムのデータの処理に依存するアプリケーションでは、コンシューマーラグを監視して、ラグが過度に大きくならないようにチェックする必要があります。ラグが大きくなるほど、リアルタイム処理の達成から遠ざかります。
たとえば、パージされていない古いデータの大量消費や、予定外のシャットダウンが、コンシューマーラグの原因となることがあります。
コンシューマーラグの削減
通常、ラグを削減するには以下を行います。
- 新規コンシューマーを追加してコンシューマーグループをスケールアップします。
- メッセージがトピックに留まる保持時間を延長します。
- ディスク容量を追加してメッセージバッファーを増強します。
コンシューマーラグを減らす方法は、基礎となるインフラストラクチャーや、AMQ Streams によりサポートされるユースケースによって異なります。たとえば、ラグが生じているコンシューマーの場合、ディスクキャッシュからフェッチリクエストに対応できるブローカーを活用できる可能性は低いでしょう。場合によっては、コンシューマーの状態が改善されるまで、自動的にメッセージをドロップすることが許容されることがあります。
6.2.2. Kafka Exporter アラートルールの例
メトリクスをデプロイメントに導入するステップが実行済みである場合、Kafka Exporter をサポートするアラート通知ルールを使用するよう Kafka クラスターがすでに設定された状態になっています。
Kafka Exporter のルールは prometheus-rules.yaml
に定義されており、Prometheus でデプロイされます。詳細は、Prometheus を参照してください。
Kafka Exporter に固有のサンプルのアラート通知ルールには以下があります。
UnderReplicatedPartition
- トピックで複製の数が最低数未満であり、ブローカーがパーティションで十分な複製を作成していないことを警告するアラートです。デフォルトの設定では、トピックに複製の数が最低数未満のパーティションが 1 つ以上ある場合のアラートになります。このアラートは、Kafka インスタンスがダウンしているか Kafka クラスターがオーバーロードの状態であることを示す場合があります。レプリケーションプロセスを再起動するには、Kafka ブローカーの計画的な再起動が必要な場合があります。
TooLargeConsumerGroupLag
- 特定のトピックパーティションでコンシューマーグループのラグが大きすぎることを警告するアラートです。デフォルト設定は 1000 レコードです。ラグが大きい場合、コンシューマーが遅すぎてプロデューサーの処理に追い付いてない可能性があります。
NoMessageForTooLong
- トピックが一定期間にわたりメッセージを受信していないことを警告するアラートです。この期間のデフォルト設定は 10 分です。この遅れは、設定の問題により、プロデューサーがトピックにメッセージを公開できないことが原因である可能性があります。
これらのルールのデフォルト設定は、特定のニーズに合わせて調整してください。
6.2.3. Kafka Exporter メトリクスの公開
ラグ情報は、Grafana で示す Prometheus メトリクスとして Kafka Exporter によって公開されます。
Kafka Exporter は、ブローカー、トピック、およびコンシューマーグループのメトリクスデータを公開します。
抽出されるデータを以下に示します。
表6.1 ブローカーメトリクスの出力
名前 | 詳細 |
---|---|
| Kafka クラスターに含まれるブローカーの数 |
表6.2 トピックメトリクスの出力
名前 | 詳細 |
---|---|
| トピックのパーティション数 |
| ブローカーの現在のトピックパーティションオフセット |
| ブローカーの最も古いトピックパーティションオフセット |
| トピックパーティションの In-Sync レプリカ数 |
| トピックパーティションのリーダーブローカー ID |
|
トピックパーティションが優先ブローカーを使用している場合は、 |
| このトピックパーティションのレプリカ数 |
|
トピックパーティションの複製の数が最低数未満である場合に |
表6.3 コンシューマーグループメトリクスの出力
名前 | 詳細 |
---|---|
| コンシューマーグループの現在のトピックパーティションオフセット |
| トピックパーティションのコンシューマーグループの現在のラグ (概算値) |
6.2.4. Kafka Exporter の設定
この手順では、KafkaExporter
プロパティーから Kafka
リソースの Kafka Exporter を設定する方法を説明します。
Kafka
リソースの設定に関する詳細は Kafka YAML の設定例 を参照してください。
この手順では、Kafka Exporter 設定に関連するプロパティーを取り上げます。
これらのプロパティーは、Kafka クラスターのデプロイメントまたは再デプロイメントの一部として設定できます。
前提条件
- OpenShift クラスター
- 稼働中の Cluster Operator
手順
Kafka
リソースのKafkaExporter
プロパティーを編集します。設定可能なプロパティーは以下の例のとおりです。
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: # ... kafkaExporter: image: my-org/my-image:latest 1 groupRegex: ".*" 2 topicRegex: ".*" 3 resources: 4 requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi logging: debug 5 enableSaramaLogging: true 6 template: 7 pod: metadata: labels: label1: value1 imagePullSecrets: - name: my-docker-credentials securityContext: runAsUser: 1000001 fsGroup: 0 terminationGracePeriodSeconds: 120 readinessProbe: 8 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: 9 initialDelaySeconds: 15 timeoutSeconds: 5 # ...
リソースを作成または更新します。
oc apply -f kafka.yaml
次のステップ
Kafka Exporter の設定およびデプロイ後に、Grafana を有効にして Kafka Exporter ダッシュボードを表示 できます。
6.2.5. Kafka Exporter Grafana ダッシュボードの有効化
Kafka Exporter を Kafka クラスターでデプロイした場合、Grafana により公開されるメトリクスデータを表示するように Grafana を有効化できます。
Kafka Exporter ダッシュボードは、JSON ファイルとして提供され、examples/metrics
ディレクトリーに含まれています。
-
strimzi-kafka-exporter.json
前提条件
- Kafka クラスターが Kafka Exporter メトリクスの設定 によってデプロイされている必要があります。
- Prometheus および Prometheus Alertmanager が Kafka クラスターにデプロイされている必要があります。
- Grafana が Kafka クラスターにデプロイされている必要があります。
この手順では、Grafana ユーザーインターフェイスにアクセスでき、Prometheus がデータソースとして追加されていることを前提とします。ユーザーインターフェイスに初めてアクセスする場合は、Grafana を参照してください。
手順
- Grafana ユーザーインターフェイスにアクセスします。
Dashboards、Import の順にクリックして Import Dashboard ウィンドウを開き、Kafka Exporter ダッシュボードのサンプルをインポートします (または JSON を貼り付けます)。
メトリクスデータが収集されると、Kafka Exporter のチャートにデータが反映されます。
Kafka Exporter Grafana チャート
メトリクスから、チャートを作成して以下を表示できます。
- 毎秒のメッセージ (トピックから)
- 毎分のメッセージ (トピックから)
- コンシューマーグループごとのラグ
- 毎分のメッセージ消費 (コンシューマーグループごと)
Grafana のチャートを使用して、ラグを分析し、ラグ削減の方法が対象のコンシューマーグループに影響しているかどうかを確認します。たとえば、ラグを減らすように Kafka ブローカーを調整すると、ダッシュボードには コンシューマーグループごとのラグ のチャートが下降し 毎分のメッセージ消費 のチャートが上昇する状況が示されます。
第7章 AMQ Streams のアップグレード
AMQ Streams は、クラスターのダウンタイムを発生せずにアップグレードできます。各バージョンの AMQ Streams は、単一または複数バージョンの Apache Kafka をサポートします。ご使用のバージョンの AMQ Streams によってサポートされれば、上位バージョンの Kafka にアップグレードできます。サポートされる下位バージョンの Kafka にダウングレードできる場合もあります。
より新しいバージョンの AMQ Streams はより新しいバージョンの Kafka をサポートしますが、AMQ Streams をアップグレードしてから、サポートされる上位バージョンの Kafka にアップグレードする必要があります。
該当する場合、AMQ Streams と Kafka をアップグレードしてから リソースのアップグレード を実行する必要があります。
7.1. AMQ Streams および Kafka のアップグレード
AMQ Streams のアップグレードは 2 段階のプロセスで行います。ダウンタイムなしでブローカーとクライアントをアップグレードするには、以下の順序でアップグレード手順を 必ず 完了してください。
Cluster Operator を最新の AMQ Streams バージョンに更新します。
すべての Kafka ブローカーとクライアントアプリケーションを、最新の Kafka バージョンにアップグレードします。
7.1.1. Kafka バージョン
Kafka のログメッセージ形式バージョンおよびブローカー間のプロトコルバージョンは、メッセージに追加されるログ形式バージョンとクラスターで使用されるプロトコルのバージョンを指定します。そのためアップグレードプロセスでは、既存の Kafka ブローカーの設定変更およびクライアントアプリケーション (コンシューマーおよびプロデューサー) のコード変更により、必ず正しいバージョンを使用されるようにする必要になります。
以下の表は、Kafka バージョンの違いを示しています。
Kafka バージョン | Interbroker プロトコルバージョン | ログメッセージ形式バージョン | ZooKeeper バージョン |
---|---|---|---|
2.4.0 | 2.4 | 2.4 | 3.5.7 |
2.5.0 | 2.5 | 2.5 | 3.5.8 |
メッセージ形式バージョン
プロデューサーが Kafka ブローカーにメッセージを送信すると、特定の形式を使用してメッセージがエンコードされます。この形式は Kafka のリリースによって変わるため、メッセージにはエンコードに使用された形式のバージョンが含まれます。ブローカーがメッセージをログに追加する前に、メッセージを新しい形式バージョンから特定の旧形式バージョンに変換するように、Kafka ブローカーを設定できます。
Kafka には、メッセージ形式のバージョンを設定する 2 通りの方法があります。
-
message.format.version
プロパティーをトピックに設定します。 -
log.message.format.version
プロパティーを Kafka ブローカーに設定します。
トピックの message.format.version
のデフォルト値は、Kafka ブローカーに設定される log.message.format.version
によって定義されます。トピックの message.format.version
は、トピック設定を編集すると手動で設定できます。
本セクションのアップグレード作業では、メッセージ形式のバージョンが log.message.format.version
によって定義されることを前提としています。
7.1.2. Cluster Operator のアップグレード
このセクションでは、AMQ Streams 1.5 を使用するように Cluster Operator デプロイメントをアップグレードする手順について説明します。
Cluster Operator によって管理される Kafka クラスターの可用性は、アップグレード操作による影響を受けません。
特定バージョンの AMQ Streams へのアップグレード方法については、そのバージョンをサポートするドキュメントを参照してください。
7.1.2.1. Cluster Operator の後続バージョンへのアップグレード
この手順では、Cluster Operator デプロイメントを後続バージョンにアップグレードする方法を説明します。
前提条件
- 既存の Cluster Operator デプロイメントを利用できる必要があります。
- すでに 新規バージョンのインストールファイルをダウンロード してある必要があります。
手順
-
既存の Cluster Operator リソース (
/install/cluster-operator
ディレクトリー内) に追加した設定変更を書留めておきます。すべての変更は、新しいバージョンの Cluster Operator によって上書きされます。 Cluster Operator を更新します。
Cluster Operator を実行している namespace に従い、新しいバージョンのインストールファイルを編集します。
Linux の場合は、以下を使用します。
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
MacOS の場合は、以下を使用します。
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
-
既存の Cluster Operator
Deployment
で 1 つ以上の環境変数を編集した場合、install/cluster-operator/050-Deployment-cluster-operator.yaml
ファイルを編集し、これらの環境変数を使用します。
設定を更新したら、残りのインストールリソースとともにデプロイします。
oc apply -f install/cluster-operator
ローリング更新が完了するのを待ちます。
Kafka Pod のイメージを取得して、アップグレードが正常に完了したことを確認します。
oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
イメージタグには、新しい AMQ Streams バージョンと Kafka バージョンが順に示されます。例:
<New AMQ Streams version>-kafka-<Current Kafka version>
既存のリソースを更新して、非推奨になったカスタムリソースプロパティーを処理します。
これで Cluster Operator が更新されましたが、その管理下にあるクラスターで実行している Kafka のバージョンは変わりません。
次のステップ
Cluster Operator のアップグレードの次に、Kafka アップグレード を実行できます。
7.1.3. Kafka のアップグレード
Cluster Operator をアップグレードしたら、サポートされる上位の Kafka バージョンにブローカーをアップグレードできます。
Kafka のアップグレードは、Cluster Operator を使用して行います。Cluster Operator によるアップグレードの実行方法は、以下のバージョン間の違いによって異なります。
- Interbroker プロトコル
- ログメッセージの形式
- ZooKeeper
現在の Kafka バージョンとアップグレードする Kafka バージョンが同じ場合 (パッチレベルでのアップグレードではよくあります)、Kafka ブローカーのローリング更新を 1 回実行して Cluster Operator によるアップグレードを行います。
これらのバージョンが 1 つ以上異なる場合、Kafka ブローカーのローリング更新を 2、3 回実行して Cluster Operator でアップグレードを実行する必要があります。
7.1.3.1. Kafka バージョンおよびイメージマッピング
Kafka のアップグレード時に、STRIMZI_KAFKA_IMAGES
および Kafka.spec.kafka.version
プロパティーの設定について考慮してください。
-
それぞれの
Kafka
リソースはKafka.spec.kafka.version
で設定できます。 Cluster Operator の
STRIMZI_KAFKA_IMAGES
環境変数により、Kafka のバージョンと、指定のKafka
リソースでそのバージョンが要求されるときに使用されるイメージをマッピングできます。-
Kafka.spec.kafka.image
を設定しないと、そのバージョンのデフォルトのイメージが使用されます。 -
Kafka.spec.kafka.image
を設定すると、デフォルトのイメージがオーバーライドされます。
-
Cluster Operator は、Kafka ブローカーの想定されるバージョンが実際にイメージに含まれているかどうかを検証できません。所定のイメージが所定の Kafka バージョンに対応することを必ず確認してください。
7.1.3.2. クライアントをアップグレードするストラテジー
クライアントアプリケーション (Kafka Connect コネクターを含む) をアップグレードする最善の方法は、特定の状況によって異なります。
消費するアプリケーションは、そのアプリケーションが理解するメッセージ形式のメッセージを受信する必要があります。その状態であることを、以下のいずれかの方法で確認できます。
- プロデューサーをアップグレードする 前に、トピックのすべてのコンシューマーをアップグレードする。
- ブローカーでメッセージをダウンコンバートする。
ブローカーのダウンコンバートを使用すると、ブローカーに余分な負荷が加わるので、すべてのトピックで長期にわたりダウンコンバートに頼るのは最適な方法ではありません。ブローカーの実行を最適化するには、ブローカーがメッセージを一切ダウンコンバートしないようにしてください。
ブローカーのダウンコンバートは 2 通りの方法で設定できます。
-
トピックレベルの
message.format.version
。単一のトピックが設定されます。 -
ブローカーレベルの
log.message.format.version
。トップレベルのmessage.format.version
が設定されていないトピックのデフォルトです。
新バージョンの形式でトピックにパブリッシュされるメッセージは、コンシューマーによって認識されます。これは、メッセージがコンシューマーに送信されるときでなく、ブローカーがプロデューサーからメッセージを受信するときに、ブローカーがダウンコンバートを実行するからです。
クライアントのアップグレードに使用できるストラテジーは複数あります。
- コンシューマーを最初にアップグレード
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
ブローカーレベル
log.message.format.version
を新バージョンに変更します。 プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーは分かりやすく、ブローカーのダウンコンバートの発生をすべて防ぎます。ただし、所属組織内のすべてのコンシューマーを整然とアップグレードできることが前提になります。また、コンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。さらにリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加され、以前のコンシューマーバージョンに戻せなくなる場合があります。
- トピック単位でコンシューマーを最初にアップグレード
トピックごとに以下を実行します。
- コンシューマーとして機能するアプリケーションをすべてアップグレードします。
-
トピックレベルの
message.format.version
を新バージョンに変更します。 プロデューサーとして機能するアプリケーションをアップグレードします。
このストラテジーではブローカーのダウンコンバートがすべて回避され、トピックごとにアップグレードできます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションには通用しません。ここでもリスクとして、アップグレード済みのクライアントに問題がある場合は、新しい形式のメッセージがメッセージログに追加される可能性があります。
- トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートあり
トピックごとに以下を実行します。
-
トピックレベルの
message.format.version
を、旧バージョンに変更します (または、デフォルトがブローカーレベルのlog.message.format.version
のトピックを利用します)。 - コンシューマーおよびプロデューサーとして機能するアプリケーションをすべてアップグレードします。
- アップグレードしたアプリケーションが正しく機能することを確認します。
トピックレベルの
message.format.version
を新バージョンに変更します。このストラテジーにはブローカーのダウンコンバートが必要ですが、ダウンコンバートは一度に 1 つのトピック (またはトピックの小さなグループ) のみに必要になるので、ブローカーへの負荷は最小限に抑えられます。この方法は、同じトピックのコンシューマーとプロデューサーの両方に該当するアプリケーションにも通用します。この方法により、新しいメッセージ形式バージョンを使用する前に、アップグレードされたプロデューサーとコンシューマーが正しく機能することが保証されます。
この方法の主な欠点は、多くのトピックやアプリケーションが含まれるクラスターでの管理が複雑になる場合があることです。
-
トピックレベルの
クライアントアプリケーションをアップグレードするストラテジーは他にもあります。
複数のストラテジーを適用することもできます。たとえば、最初のいくつかのアプリケーションとトピックに、トピック単位でコンシューマーを最初にアップグレード、ダウンコンバートありのストラテジーを適用します。これが問題なく適用されたら、より効率的な別のストラテジーの使用を検討できます。
7.1.3.3. Kafka ブローカーおよびクライアントアプリケーションのアップグレード
この手順では、AMQ Streams Kafka クラスターを Kafka の上位バージョンにアップグレードする方法を説明します。
前提条件
Kafka
リソースをアップグレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.config
に、アップグレード先となる Kafka バージョンでサポートされないオプションが含まれていない。 現行の Kafka バージョンの
log.message.format.version
を新しいバージョンに更新する必要があるかどうか。Kafka バージョン表 を参照してください。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit kafka my-cluster
現行のバージョンの Kafka の
log.message.format.version
が新しい Kafka バージョンでも同じ場合は、次の手順に進みます。それ以外の場合、
Kafka.spec.kafka.config
のlog.message.format.version
が 現行 バージョンのデフォルトに設定されていることを確認してください。たとえば、Kafka 2.4.1 からのアップグレードでは以下のようになります。
kind: Kafka spec: # ... kafka: version: 2.4.1 config: log.message.format.version: "2.4" # ...
log.message.format.version
が設定されていない場合は、現行バージョンに設定します。注記log.message.format.version
の値は、浮動小数点数として解釈されないように文字列である必要があります。Kafka.spec.kafka.version
を変更し、新バージョンを指定します (log.message.format.version
は現行バージョンのままにします)。たとえば、Kafka 2.4.1 から 2.5.0 へのアップグレードは以下のようになります。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.5.0 1 config: log.message.format.version: "2.4" 2 # ...
Kafka バージョンのイメージが Cluster Operator の
STRIMZI_KAFKA_IMAGES
に定義されているイメージとは異なる場合は、Kafka.spec.kafka.image
を更新します。
エディターを保存して終了し、ローリング更新の完了を待ちます。
注記新バージョンの Kafka に新しい ZooKeeper バージョンがある場合、追加のローリング更新が発生します。
更新をログで確認するか、または Pod 状態の遷移を監視して確認します。
oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version upgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
oc get po -w
現行バージョンと新バージョンの Kafka で、Interbroker プロトコルのバージョンが異なる場合は、Cluster Operator ログで
INFO
レベルのメッセージを確認します。Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 2 of 2 completed
または、現行バージョンと新バージョンの Kafka で、Interbroker プロトコルのバージョンが同じ場合は、以下を確認します。
Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version upgrade from <from-version> to <to-version>, phase 1 of 1 completed
ローリング更新では以下を行います。
- 各 Pod が新バージョンの Kafka のブローカーバイナリーを使用していることを確認します。
新バージョンの Kafka の Interbroker プロトコルを使用してメッセージを送信するように、ブローカーを設定します。
注記クライアントは引き続き旧バージョンを使用するため、ブローカーはメッセージを旧バージョンに変換してからクライアントに送信します。この余分な負荷を最小化するには、できるだけ速やかにクライアントを更新します。
クライアントのアップグレードに選択したストラテジーに応じて、新バージョンのクライアントバイナリーを使用するようにすべてのクライアントアプリケーションをアップグレードします。
警告この手順を完了すると、ダウングレードできません。この時点で更新を元に戻す必要がある場合は、「Kafka ブローカーおよびクライアントアプリケーションのダウングレード」 の手順に従います。
必要に応じて、Kafka Connect および MirrorMaker のバージョンプロパティーを新バージョンの Kafka として設定します。
-
Kafka Connect では、
KafkaConnect.spec.version
を更新します。 -
MirrorMaker では、
KafkaMirrorMaker.spec.version
を更新します。
-
Kafka Connect では、
1. で特定された
log.message.format.version
が、新しいバージョンと同じ場合は、次の手順に進みます。それ以外の場合は、
Kafka.spec.kafka.config
のlog.message.format.version
を、現在使用している新バージョンの Kafka のデフォルトバージョンに変更します。たとえば、2.5.0 へのアップグレードでは以下のようになります。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.5.0 config: log.message.format.version: "2.5" # ...
Cluster Operator によってクラスターが更新されるまで待ちます。
これで、Kafka クラスターおよびクライアントが新バージョンの Kafka を使用するようになります。
関連情報
- AMQ Streams Kafka クラスターをあるバージョンから下位バージョンにダウングレードする手順は、「Kafka ブローカーおよびクライアントアプリケーションのダウングレード」 を参照してください。
7.1.3.4. コンシューマーおよび Kafka Streams アプリケーションの Cooperative Rebalancing へのアップグレード
Kafka コンシューマーおよび Kafka Streams アプリケーションをアップグレードすることで、パーティションの再分散にデフォルトの Eager Rebalance プロトコルではなく Incremental Cooperative Rebalance プロトコルを使用できます。この新しいプロトコルが Kafka 2.4.0 に追加されました。
コンシューマーは、パーティションの割り当てを Cooperative Rebalance で保持し、クラスターの分散が必要な場合にプロセスの最後でのみ割り当てを取り消します。これにより、コンシューマーグループまたは Kafka Streams アプリケーションが使用不可能になる状態が削減されます。
Incremental Cooperative Rebalance プロトコルへのアップグレードは任意です。Eager Rebalance プロトコルは引き続きサポートされます。
前提条件
- Kafka 2.5.0 に Kafka ブローカーおよびクライアントアプリケーションをアップグレード済み であることが必要です。
手順
Incremental Cooperative Rebalance プロトコルを使用するように Kafka コンシューマーをアップグレードする方法:
-
Kafka クライアント
.jar
ファイルを新バージョンに置き換える。 -
コンシューマー設定で、
partition.assignment.strategy
にcooperative-sticky
を追加する。たとえば、range
ストラテジーが設定されている場合は、設定をrange, cooperative-sticky
に変更する。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待つ。
-
コンシューマー設定から前述の
partition.assignment.strategy
を削除して、グループの各コンシューマーを再設定し、cooperative-sticky
ストラテジーのみを残す。 - グループ内の各コンシューマーを順次再起動し、再起動後に各コンシューマーがグループに再度参加するまで待つ。
Incremental Cooperative Rebalance プロトコルを使用するように Kafka Streams アプリケーションをアップグレードするには以下を行います。
-
Kafka Streams の
.jar
ファイルを新バージョンに置き換えます。 -
Kafka Streams の設定で、
upgrade.from
設定パラメーターをアップグレード前の Kafka バージョンに設定します (例: 2.3)。 - 各ストリームプロセッサー (ノード) を順次再起動します。
-
upgrade.from
設定パラメーターを Kafka Streams 設定から削除します。 - グループ内の各コンシューマーを順次再起動します。
関連情報
- Apache Kafka ドキュメントの Notable changes in 2.4.0。
7.1.4. Kafka のダウングレード
Kafka バージョンのダウングレードは、Cluster Operator を使用して行います。
Cluster Operator によるダウングレードの実行方法は、以下のバージョン間の違いによって異なります。
- Interbroker プロトコル
- ログメッセージの形式
- ZooKeeper
7.1.4.1. ダウングレード先のバージョン
Cluster Operator によるダウングレードの操作方法は、log.message.format.version
に応じて異なります。
-
ダウングレード先の Kafka バージョンの
log.message.format.version
が現行バージョンと同じ場合、Cluster Operator はブローカーのローリング再起動を 1 回実行してダウングレードを行います。 ダウングレード先の Kafka バージョンの
log.message.format.version
が異なる場合、ダウングレード後の Kafka バージョンが使用するバージョンに設定されたlog.message.format.version
が 常に 実行中のクラスターに存在する場合に限り、ダウングレードが可能です。通常は、アップグレードの手順が
log.message.format.version
の変更前に中止された場合にのみ該当します。その場合、ダウングレードには以下が必要です。- 2 つのバージョンで Interbroker プロトコルが異なる場合、ブローカーのローリング再起動が 2 回必要です。
- 両バージョンで同じ場合は、ローリング再起動が 1 回必要です。
7.1.4.2. Kafka ブローカーおよびクライアントアプリケーションのダウングレード
この手順では、AMQ Streams Kafka クラスターを Kafka の下位 (以前の) バージョンにダウングレードする方法 (2.5.0 から 2.4.1 へのダウングレードなど) を説明します。
以前のバージョンでサポートされない log.message.format.version
が新バージョンで使われていた場合 (log.message.format.version
のデフォルト値が使われていた場合など)、ダウングレードは実行 できません。たとえば以下のリソースの場合、log.message.format.version
が変更されていないので、Kafka バージョン 2.4.1 にダウングレードできます。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.5.0 config: log.message.format.version: "2.4" # ...
log.message.format.version
が "2.5"
に設定されているかまたは値がない (このためパラメーターに 2.5.0 ブローカーのデフォルト値 2.5 が採用される) 場合は、ダウングレードは実施できません。
前提条件
Kafka
リソースをダウングレードするには、以下を確認します。
- 両バージョンの Kafka をサポートする Cluster Operator が稼働している。
-
Kafka.spec.kafka.config
に、ダウングレード先となる Kafka バージョンでサポートされないオプションが含まれていない。 -
Kafka.spec.kafka.config
に、ダウングレード先のバージョンでサポートされるlog.message.format.version
がある。
手順
必要に応じてエディターで Kafka クラスター設定を更新します。
oc edit
を使用します。oc edit kafka my-cluster
Kafka.spec.kafka.version
を変更して、以前のバージョンを指定します。たとえば、Kafka 2.5.0 から 2.4.1 へのダウングレードは以下のようになります。
apiVersion: v1alpha1 kind: Kafka spec: # ... kafka: version: 2.4.1 1 config: log.message.format.version: "2.4" 2 # ...
注記log.message.format.version
の値は、浮動小数点数として解釈されないように、文字列とする必要があります。Kafka バージョンのイメージが Cluster Operator の
STRIMZI_KAFKA_IMAGES
に定義されているイメージとは異なる場合は、Kafka.spec.kafka.image
を更新します。
エディターを保存して終了し、ローリング更新の完了を待ちます。
更新をログで確認するか、または Pod 状態の遷移を監視して確認します。
oc logs -f <cluster-operator-pod-name> | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
oc get po -w
Kafka の前バージョンと現行バージョンで Interbroker プロトコルのバージョンが異なる場合、Cluster Operator ログで
INFO
レベルのメッセージを確認します。Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 2 of 2 completed
または、Kafka の前バージョンと現行バージョンで Interbroker プロトコルのバージョンが同じ場合は、以下を確認します。
Reconciliation #<num>(watch) Kafka(<namespace>/<name>): Kafka version downgrade from <from-version> to <to-version>, phase 1 of 1 completed
すべてのクライアントアプリケーション (コンシューマー) をダウングレードして、以前のバージョンのクライアントバイナリーを使用します。
これで、Kafka クラスターおよびクライアントは以前の Kafka バージョンを使用するようになります。
7.2. AMQ Streams リソースのアップグレード
API バージョン kafka.strimzi.io/v1alpha1
は非推奨になりました。API バージョン kafka.strimzi.io/v1alpha1
を使用するリソースを、kafka.strimzi.io/v1beta1
を使用するように更新する必要があります。
本セクションでは、リソースのアップグレード手順を説明します。
リソースのアップグレードは、Cluster Operator をアップグレード してから実施する 必要 があります。これにより、Cluster Operator がリソースを認識できるようになります。
リソースのアップグレードが実施されない場合
アップグレードが実施されない場合、apiVersion
を更新するまでリソースを更新できないことを示す警告が、調整に関するログに記録されます。
更新をトリガーするには、カスタムリソースにアノテーション追加などの表面的な変更を加えます。
アノテーションの例:
metadata: # ... annotations: upgrade: "Upgraded to kafka.strimzi.io/v1beta1"
7.2.1. Kafka リソースのアップグレード
前提条件
-
v1beta1
API バージョンをサポートする Cluster Operator が稼働している必要があります。
手順
デプロイメントの Kafka
リソースごとに以下の手順を実行します。
エディターで
Kafka
リソースを更新します。oc edit kafka my-cluster
以下を置き換えます。
apiVersion: kafka.strimzi.io/v1alpha1
上記の行を、以下のように置き換えます。
apiVersion:kafka.strimzi.io/v1beta1
Kafka
リソースに以下があるか確認します。Kafka.spec.topicOperator
ある場合は以下に変更します。
Kafka.spec.entityOperator.topicOperator
たとえば、以下がある場合を考えてみましょう。
spec: # ... topicOperator: {}
この行を以下の行に変更します。
spec: # ... entityOperator: topicOperator: {}
以下があるか確認します。
Kafka.spec.entityOperator.affinity
Kafka.spec.entityOperator.tolerations
これを以下に変更します。
Kafka.spec.entityOperator.template.pod.affinity
Kafka.spec.entityOperator.template.pod.tolerations
たとえば、以下の場合を考えてみましょう。
spec: # ... entityOperator: affinity {} tolerations {}
これを以下に変更します。
spec: # ... entityOperator: template: pod: affinity {} tolerations {}
以下があるか確認します。
Kafka.spec.kafka.affinity
Kafka.spec.kafka.tolerations
これを以下に変更します。
Kafka.spec.kafka.template.pod.affinity
Kafka.spec.kafka.template.pod.tolerations
たとえば、以下の場合を考えてみましょう。
spec: # ... kafka: affinity {} tolerations {}
これを以下に変更します。
spec: # ... kafka: template: pod: affinity {} tolerations {}
以下があるか確認します。
Kafka.spec.zookeeper.affinity
Kafka.spec.zookeeper.tolerations
これを以下に変更します。
Kafka.spec.zookeeper.template.pod.affinity
Kafka.spec.zookeeper.template.pod.tolerations
たとえば、以下の場合を考えてみましょう。
spec: # ... zookeeper: affinity {} tolerations {}
これを以下に変更します。
spec: # ... zookeeper: template: pod: affinity {} tolerations {}
- ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
7.2.2. Kafka Connect リソースのアップグレード
前提条件
-
v1beta1
API バージョンをサポートする Cluster Operator が稼働している必要があります。
手順
デプロイメントの KafkaConnect
リソースごとに以下の手順を実行します。
エディターで
KafkaConnect
リソースを更新します。oc edit kafkaconnect my-connect
以下を置き換えます。
apiVersion: kafka.strimzi.io/v1alpha1
この行を以下の行に変更します。
apiVersion:kafka.strimzi.io/v1beta1
以下があるか確認します。
KafkaConnect.spec.affinity
KafkaConnect.spec.tolerations
これを以下に変更します。
KafkaConnect.spec.template.pod.affinity
KafkaConnect.spec.template.pod.tolerations
たとえば、以下の場合を考えてみましょう。
spec: # ... affinity {} tolerations {}
これを以下に変更します。
spec: # ... template: pod: affinity {} tolerations {}
- ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
7.2.3. Kafka Connect S2I リソースのアップグレード
前提条件
-
v1beta1
API バージョンをサポートする Cluster Operator が稼働している必要があります。
手順
デプロイメントの KafkaConnectS2I
リソースごとに以下の手順を実行します。
エディターで
KafkaConnectS2I
リソースを更新します。oc edit kafkaconnects2i my-connect
以下を置き換えます。
apiVersion: kafka.strimzi.io/v1alpha1
この行を以下の行に変更します。
apiVersion:kafka.strimzi.io/v1beta1
以下があるか確認します。
KafkaConnectS2I.spec.affinity
KafkaConnectS2I.spec.tolerations
これを以下に変更します。
KafkaConnectS2I.spec.template.pod.affinity
KafkaConnectS2I.spec.template.pod.tolerations
たとえば、以下の場合を考えてみましょう。
spec: # ... affinity {} tolerations {}
これを以下に変更します。
spec: # ... template: pod: affinity {} tolerations {}
- ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
7.2.4. Kafka MirrorMaker リソースのアップグレード
前提条件
-
v1beta1
API バージョンをサポートする Cluster Operator が稼働している必要があります。
手順
デプロイメントの KafkaMirrorMaker
リソースごとに以下の手順を実行します。
エディターで
KafkaMirrorMaker
リソースを更新します。oc edit kafkamirrormaker my-connect
以下を置き換えます。
apiVersion: kafka.strimzi.io/v1alpha1
この行を以下の行に変更します。
apiVersion:kafka.strimzi.io/v1beta1
以下があるか確認します。
KafkaConnectMirrorMaker.spec.affinity
KafkaConnectMirrorMaker.spec.tolerations
これを以下に変更します。
KafkaConnectMirrorMaker.spec.template.pod.affinity
KafkaConnectMirrorMaker.spec.template.pod.tolerations
たとえば、以下の場合を考えてみましょう。
spec: # ... affinity {} tolerations {}
これを以下に変更します。
spec: # ... template: pod: affinity {} tolerations {}
- ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
7.2.5. Kafka Topic リソースのアップグレード
前提条件
-
v1beta1
API バージョンをサポートする Topic Operator が稼働している必要があります。
手順
デプロイメントの KafkaTopic
リソースごとに以下の手順を実行します。
エディターで
KafkaTopic
リソースを更新します。oc edit kafkatopic my-topic
以下を置き換えます。
apiVersion: kafka.strimzi.io/v1alpha1
この行を以下の行に変更します。
apiVersion:kafka.strimzi.io/v1beta1
- ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
7.2.6. Kafka User リソースのアップグレード
前提条件
-
v1beta1
API バージョンをサポートする User Operator が稼働している必要があります。
手順
デプロイメントの KafkaUser
リソースごとに以下の手順を実行します。
エディターで
KafkaUser
リソースを更新します。oc edit kafkauser my-user
以下を置き換えます。
apiVersion: kafka.strimzi.io/v1alpha1
この行を以下の行に変更します。
apiVersion:kafka.strimzi.io/v1beta1
- ファイルを保存し、エディターを終了して更新したリソースが調整されるのを待ちます。
付録A サブスクリプションの使用
AMQ Streams は、ソフトウェアサブスクリプションから提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。
アカウントへのアクセス
- access.redhat.com に移動します。
- アカウントがない場合は、作成します。
- アカウントにログインします。
サブスクリプションのアクティベート
- access.redhat.com に移動します。
- サブスクリプション に移動します。
- Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。
Zip および Tar ファイルのダウンロード
zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合は、この手順は必要ありません。
- ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
- JBOSS INTEGRATION AND AUTOMATION カテゴリーの Red Hat AMQ Streams エントリーを見つけます。
- 必要な AMQ Streams 製品を選択します。Software Downloads ページが開きます。
- コンポーネントの ダウンロード リンクをクリックします。
改訂日時:2023-01-28 11:53:55 +1000