第13章 AMQ Streams の管理

本章では、AMQ Streams のデプロイメントを維持するタスクについて説明します。

13.1. ラベルおよびアノテーションを使用したサービスの検出

サービスディスカバリーは、AMQ Streams と同じ OpenShift クラスターで稼働しているクライアントアプリケーションの Kafka クラスターとの対話を容易にします。

サービスディスカバリー ラベルおよびアノテーションは、Kafka クラスターにアクセスするために使用されるサービスに対して生成されます。

  • 内部 Kafka ブートストラップサービス
  • HTTP Bridge サービス

ラベルは、サービスの検出を可能にします。アノテーションは、クライアントアプリケーションが接続を確立するために使用できる接続詳細を提供します。

サービスディスカバリーラベル strimzi.io/discovery は、Service リソースに対して true に設定されています。サービスディスカバリーアノテーションには同じキーがあり、各サービスの接続詳細を JSON 形式で提供します。

内部 Kafka ブートストラップサービスの例

apiVersion: v1
kind: Service
metadata:
  annotations:
    strimzi.io/discovery: |-
      [ {
        "port" : 9092,
        "tls" : false,
        "protocol" : "kafka",
        "auth" : "scram-sha-512"
      }, {
        "port" : 9093,
        "tls" : true,
        "protocol" : "kafka",
        "auth" : "tls"
      } ]
  labels:
    strimzi.io/cluster: my-cluster
    strimzi.io/discovery: "true"
    strimzi.io/kind: Kafka
    strimzi.io/name: my-cluster-kafka-bootstrap
  name: my-cluster-kafka-bootstrap
spec:
  #...

HTTP Bridge サービスの例

apiVersion: v1
kind: Service
metadata:
  annotations:
    strimzi.io/discovery: |-
      [ {
        "port" : 8080,
        "tls" : false,
        "auth" : "none",
        "protocol" : "http"
      } ]
  labels:
    strimzi.io/cluster: my-bridge
    strimzi.io/discovery: "true"
    strimzi.io/kind: KafkaBridge
    strimzi.io/name: my-bridge-bridge-service

13.1.1. サービスの接続詳細の返信

サービスを検出するには、コマンドラインまたは対応する API 呼び出しでサービスを取得するときに、ディスカバリーラベルを指定します。

oc get service -l strimzi.io/discovery=true

サービスディスカバリーラベルの取得時に接続詳細が返されます。

13.2. カスタムリソースのステータスの確認

AMQ Streams カスタムリソースの status プロパティーは、リソースに関する情報を必要とするユーザーおよびツールにその情報をパブリッシュします。

13.2.1. AMQ Streams カスタムリソースのステータス情報

下記の表のとおり、複数のリソースに status プロパティーがあります。

AMQ Streams リソーススキーマ参照ステータス情報がパブリッシュされる場所

Kafka

KafkaStatus スキーマ参照」

Kafka クラスター

KafkaConnect

KafkaConnectStatus スキーマ参照」

デプロイされている場合は Kafka Connect クラスター。

KafkaConnectS2I

KafkaConnectS2IStatus スキーマ参照」

デプロイされている場合は Source-to-Image (S2I) サポートのある Kafka Connect クラスター。

KafkaConnector

KafkaConnectorStatus スキーマ参照」

デプロイされている場合は KafkaConnector リソース。

KafkaMirrorMaker

KafkaMirrorMakerStatus スキーマ参照」

デプロイされている場合は Kafka MirrorMakerツール。

KafkaTopic

KafkaTopicStatus スキーマ参照」

Kafka クラスターの Kafka トピック

KafkaUser

KafkaUserStatus スキーマ参照」

Kafka クラスターの Kafka ユーザー。

KafkaBridge

KafkaBridgeStatus スキーマ参照」

デプロイされている場合は AMQ Streams の Kafka Bridge。

リソースの status プロパティーによって、リソースの下記項目の情報が提供されます。

  • status.conditions プロパティーの Current state (現在の状態)。
  • status.observedGeneration プロパティーの Last observed generation (最後に確認された生成)。

status プロパティーによって、リソース固有の情報も提供されます。以下に例を示します。

  • KafkaConnectStatus によって、Kafka Connect コネクターの REST API エンドポイントが提供されます。
  • KafkaUserStatus によって、Kafka ユーザーの名前と、ユーザーのクレデンシャルが保存される Secret が提供されます。
  • KafkaBridgeStatus によって、外部クライアントアプリケーションが Bridge サービスにアクセスできる HTTP アドレスが提供されます。

リソースの Current state (現在の状態) は、spec プロパティーによって定義される Desired state (望ましい状態) を実現するリソースに関する進捗を追跡するのに便利です。ステータス条件によって、リソースの状態が変更された時間および理由が提供され、Operator によるリソースの望ましい状態の実現を妨げたり遅らせたりしたイベントの詳細が提供されます。

Last observed generation (最後に確認された生成) は、Cluster Operator によって最後に照合されたリソースの生成です。observedGeneration の値が metadata.generation の値と異なる場合、リソースの最新の更新が Operator によって処理されていません。これらの値が同じである場合、リソースの最新の変更がステータス情報に反映されます。

AMQ Streams によってカスタムリソースのステータスが作成および維持されます。定期的にカスタムリソースの現在の状態が評価され、その結果に応じてステータスが更新されます。くださいーたとえば、oc edit を使用してカスタムリソースで更新を行う場合、その status は編集不可能です。さらに、status の変更は Kafka クラスターステータスの設定に影響しません。

以下では、Kafka カスタムリソースに status プロパティーが指定されています。

Kafka カスタムリソースとステータス

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
spec:
  # ...
status:
  conditions: 1
  - lastTransitionTime: 2019-07-23T23:46:57+0000
    status: "True"
    type: Ready 2
  observedGeneration: 4 3
  listeners: 4
  - addresses:
    - host: my-cluster-kafka-bootstrap.myproject.svc
      port: 9092
    type: plain
  - addresses:
    - host: my-cluster-kafka-bootstrap.myproject.svc
      port: 9093
    certificates:
    - |
      -----BEGIN CERTIFICATE-----
      ...
      -----END CERTIFICATE-----
    type: tls
  - addresses:
    - host: 172.29.49.180
      port: 9094
    certificates:
    - |
      -----BEGIN CERTIFICATE-----
      ...
      -----END CERTIFICATE-----
    type: external
    # ...

1
status の conditions は、既存のリソース情報から推測できないステータスに関連する基準や、リソースのインスタンスに固有する基準を記述します。
2
Ready 条件は、Cluster Operator が現在 Kafka クラスターでトラフィックの処理が可能であると判断するかどうかを示しています。
3
observedGeneration は、最後に Cluster Operator によって照合された Kafka カスタムリソースの生成を示しています。
4
listeners は、現在の Kafka ブートストラップアドレスをタイプ別に示しています。
重要

タイプが nodeport の外部リスナーのカスタムリソースステータスにおけるアドレスは、現在サポートされていません。

注記

Kafka ブートストラップアドレスがステータスに一覧表示されても、それらのエンドポイントまたは Kafka クラスターが準備状態であるとは限りません。

ステータス情報のアクセス

リソースのステータス情報はコマンドラインから取得できます。詳細は 「カスタムリソースのステータスの検出」 を参照してください。

13.2.2. カスタムリソースのステータスの検出

この手順では、カスタムリソースのステータスを検出する方法を説明します。

前提条件

  • OpenShift クラスターが必要です。
  • Cluster Operator が稼働している必要があります。

手順

  • カスタムリソースを指定し、-o jsonpath オプションを使用して標準の JSONPath 式を適用して status プロパティーを選択します。

    oc get kafka <kafka_resource_name> -o jsonpath='{.status}'

    この式は、指定されたカスタムリソースのすべてのステータス情報を返します。status.listeners または status.observedGeneration などのドット表記を使用すると、表示するステータス情報を微調整できます。

その他のリソース

13.3. 永続ボリュームからのクラスターの復元

Kafka クラスターは、永続ボリューム (PV) が存在していれば、そこから復元できます。

たとえば、以下の場合に行います。

  • namespace が意図せずに削除された後。
  • OpenShift クラスター全体が失われた後でも PV がインフラストラクチャーに残っている場合。

13.3.1. namespace が削除された場合の復元

永続ボリュームと namespace の関係により、namespace の削除から復元することが可能です。PersistentVolume (PV) は、namespace の外部に存在するストレージリソースです。PV は、namespace 内部に存在する PersistentVolumeClaim (PVC) を使用して Kafka Pod にマウントされます。

PV の回収 (reclaim) ポリシーは、namespace が削除されるときにクラスターに動作方法を指示します。以下に、回収 (reclaim) ポリシーの設定とその結果を示します。

  • Delete (デフォルト) に設定すると、PVC が namespace 内で削除されるときに PV が削除されます。
  • Retain に設定すると、namespace の削除時に PV は削除されません。

namespace が意図せず削除された場合に PV から復旧できるようにするには、PV 仕様で persistentVolumeReclaimPolicy プロパティーを使用してポリシーを Delete から Retain にリセットする必要があります。

apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  persistentVolumeReclaimPolicy: Retain

または、PV は、関連付けられたストレージクラスの回収 (reclaim) ポリシーを継承できます。ストレージクラスは、動的ボリュームの割り当てに使用されます。

ストレージクラスの reclaimPolicy プロパティーを設定することで、ストレージクラスを使用する PV が適切な回収 (reclaim) ポリシー で作成されます。ストレージクラスは、storageClassName プロパティーを使用して PV に対して設定されます。

apiVersion: v1
kind: StorageClass
metadata:
  name: gp2-retain
parameters:
  # ...
# ...
reclaimPolicy: Retain
apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  storageClassName: gp2-retain
注記

Retain を回収 (reclaim) ポリシーとして使用しながら、クラスター全体を削除する場合は、PV を手動で削除する必要があります。そうしないと、PV は削除されず、リソースに不要な経費がかかる原因になります。

13.3.2. OpenShift クラスター喪失からの復旧

クラスターが失われた場合、ディスク/ボリュームのデータがインフラストラクチャー内に保持されていれば、それらのデータを使用してクラスターを復旧できます。PV が復旧可能でそれらが手動で作成されていれば、復旧の手順は namespace の削除と同じです。

13.3.3. 削除したクラスターの永続ボリュームからの復元

この手順では、削除されたクラスターを永続ボリューム (PV) から復元する方法を説明します。

この状況では、Topic Operator はトピックが Kafka に存在することを認識しますが、KafkaTopic リソースは存在しません。

クラスター再作成の手順を行うには、2 つの方法があります。

  1. すべての KafkaTopic リソースを復旧できる場合は、オプション 1 を使用します。

    これにより、クラスターが起動する前に KafkaTopic リソースを復旧することで、該当するトピックが Topic Operator によって削除されないようにする必要があります。

  2. すべての KafkaTopic リソースを復旧できない場合は、オプション 2 を使用します。

    この場合、Topic Operator なしでクラスターをデプロイし、ZooKeeper で Topic Operator データを削除してからそのデータを再デプロイすることで、Topic Operator が該当するトピックから KafkaTopic リソースを再作成できるようにします。

注記

Topic Operator がデプロイされていない場合は、PersistentVolumeClaim (PVC) リソースのみを復旧する必要があります。

作業を始める前に

この手順では、データの破損を防ぐために PV を正しい PVC にマウントする必要があります。volumeName が PVC に指定されており、それが PV の名前に一致する必要があります。

詳細は以下を参照してください。

注記

この手順には、手動での再作成が必要な KafkaUser リソースの復旧は含まれません。パスワードと証明書を保持する必要がある場合は、KafkaUser リソースの作成前にシークレットを再作成する必要があります。

手順

  1. クラスターの PV についての情報を確認します。

    oc get pv

    PV の情報がデータとともに表示されます。

    この手順で重要な列を示す出力例:

    NAME                                         RECLAIMPOLICY CLAIM
    pvc-5e9c5c7f-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-my-cluster-zookeeper-1
    pvc-5e9cc72d-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-0
    pvc-5ead43d1-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-2
    pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-0-my-cluster-kafka-0
    pvc-7e21042e-3317-11ea-9786-02deaf9aa87e ... Retain ...    myproject/data-0-my-cluster-kafka-1
    pvc-7e226978-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-0-my-cluster-kafka-2
    • NAME は各 PV の名前を示します。
    • RECLAIM POLICY は PV が 保持される ことを示します。
    • CLAIM は元の PVC へのリンクを示します。
  2. 元の namespace を再作成します。

    oc create namespace myproject
  3. 元の PVC リソース仕様を再作成し、PVC を該当する PV にリンクします。

    以下に例を示します。

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: data-0-my-cluster-kafka-0
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 100Gi
      storageClassName: gp2-retain
      volumeMode: Filesystem
      volumeName: pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c
  4. PV 仕様を編集して、元の PVC にバインドされた claimRef プロパティーを削除します。

    以下に例を示します。

    apiVersion: v1
    kind: PersistentVolume
    metadata:
      annotations:
        kubernetes.io/createdby: aws-ebs-dynamic-provisioner
        pv.kubernetes.io/bound-by-controller: "yes"
        pv.kubernetes.io/provisioned-by: kubernetes.io/aws-ebs
      creationTimestamp: "<date>"
      finalizers:
      - kubernetes.io/pv-protection
      labels:
        failure-domain.beta.kubernetes.io/region: eu-west-1
        failure-domain.beta.kubernetes.io/zone: eu-west-1c
      name: pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      resourceVersion: "39431"
      selfLink: /api/v1/persistentvolumes/pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      uid: 7efe6b0d-3317-11ea-a650-06e1eadd9a4c
    spec:
      accessModes:
      - ReadWriteOnce
      awsElasticBlockStore:
        fsType: xfs
        volumeID: aws://eu-west-1c/vol-09db3141656d1c258
      capacity:
        storage: 100Gi
      claimRef:
        apiVersion: v1
        kind: PersistentVolumeClaim
        name: data-0-my-cluster-kafka-2
        namespace: myproject
        resourceVersion: "39113"
        uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: failure-domain.beta.kubernetes.io/zone
              operator: In
              values:
              - eu-west-1c
            - key: failure-domain.beta.kubernetes.io/region
              operator: In
              values:
              - eu-west-1
      persistentVolumeReclaimPolicy: Retain
      storageClassName: gp2-retain
      volumeMode: Filesystem

    この例では、以下のプロパティーが削除されます。

    claimRef:
      apiVersion: v1
      kind: PersistentVolumeClaim
      name: data-0-my-cluster-kafka-2
      namespace: myproject
      resourceVersion: "39113"
      uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
  5. Cluster Operator をデプロイします。

    oc apply -f install/cluster-operator -n my-project
  6. クラスターを再作成します。

    クラスターの再作成に必要なすべての KafkaTopic リソースがあるかどうかに応じて、以下の手順を実行します。

    オプション 1: クラスターを失う前に存在した KafkaTopic リソースが すべて ある場合 (__consumer_offsets からコミットされたオフセットなどの内部トピックを含む)。

    1. すべての KafkaTopic リソースを再作成します。

      クラスターをデプロイする前にリソースを再作成する必要があります。そうでないと、Topic Operator によってトピックが削除されます。

    2. Kafka クラスターをデプロイします。

      以下に例を示します。

      oc apply -f kafka.yaml

    オプション 2: クラスターを失う前に存在したすべての KafkaTopic リソースがない場合。

    1. オプション 1 と同様に Kafka クラスターをデプロイしますが、デプロイ前に Kafka リソースから topicOperator プロパティーを削除して、Topic Operator がない状態でデプロイします。

      デプロイメントに Topic Operator が含まれると、Topic Operator によってすべてのトピックが削除されます。

    2. exec コマンドを Kafka ブローカー Pod の 1 つに実行し、ZooKeeper シェルスクリプトを開きます。

      たとえば、my-cluster-kafka-0 はブローカー Pod の名前になります。

      oc exec my-cluster-kafka-0 bin/zookeeper-shell.sh localhost:2181
    3. /strimzi パス全体を削除して、Topic Operator ストレージを削除します。

      deleteall /strimzi
    4. Kafka クラスターを topicOperator プロパティーで再デプロイして TopicOperator を有効にし、KafkaTopic リソースを再作成します。

      以下に例を示します。

      apiVersion: kafka.strimzi.io/v1beta1
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        #...
        entityOperator:
          topicOperator: {} 1
          #...
    1
    ここで示すデフォルト設定には、追加のプロパティーはありません。EntityTopicOperatorSpec スキーマ参照」に説明されているプロパティーを使用して、必要な設定を指定します。
  7. KafkaTopic リソースのリストを表示して、復旧を確認します。

    oc get KafkaTopic

13.4. AMQ Streams のアンインストール

この手順では、AMQ Streams をアンインストールし、デプロイメントに関連するリソースを削除する方法を説明します。

前提条件

この手順を実行するには、デプロイメント用に特別に作成され、AMQ Streams リソースから参照されるリソースを特定します。

このようなリソースには以下があります。

  • シークレット (カスタム CA および証明書、Kafka Connect Secrets、その他の Kafka シークレット)
  • ロギング ConfigMaps (タイプは external)

これらのリソースは、KafkaKafkaConnectKafkaConnectS2IKafkaMirrorMaker、または KafkaBridge 設定によって参照されます。

手順

  1. Cluster Operator の Deployment、関連するCustomResourceDefinitions および RBAC リソースを削除します。

    oc delete -f install/cluster-operator
    警告

    CustomResourceDefinitions を削除すると、対応するカスタムリソース (KafkaKafkaConnectKafkaConnectS2IKafkaMirrorMaker、または KafkaBridge) 、およびそれらに依存するリソース (Deployments、StatefulSets、その他の依存リソース) のガベージコレクションが実行されます。

  2. 前提条件で特定したリソースを削除します。