第5章 AMQ Streams operator の使用

AMQ Streams の operator を使用して Kafka クラスターと Kafka トピックおよびユーザーを管理します。

5.1. Cluster Operator の使用

Cluster Operator は Kafka クラスターや他の Kafka コンポーネントをデプロイするために使用されます。

Cluster Operator は YAML インストールファイルを使用してデプロイされます。

注記

OpenShift では、Kafka Connect デプロイメントに Source2Image 機能を組み込み、追加のコネクターを加えるための便利な方法として利用できます。

その他のリソース

5.1.1. Cluster Operator の設定

Cluster Operator は、サポートされる環境変数を使用してロギング設定から設定できます。

環境変数は、Cluster Operator イメージのデプロイメンのコンテナー設定に関連します。image 設定の詳細は、image を参照してください。

STRIMZI_NAMESPACE

Operator が操作する namespace のカンマ区切りのリスト。設定されていない場合や、空の文字列や * に設定された場合は、Cluster Operator はすべての namespace で操作します。Cluster Operator デプロイメントでは OpenShift Downward API を使用して、これを Cluster Operator がデプロイされる namespace に自動設定することがあります。

Cluster Operator namespace の設定例

env:
  - name: STRIMZI_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace

STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
任意設定、デフォルトは 120000 ミリ秒です。定期的な調整の間隔 (秒単位)。
STRIMZI_OPERATION_TIMEOUT_MS
任意設定、デフォルトは 300000 ミリ秒です。内部操作のタイムアウト (ミリ秒単位)。この値は、標準の OpenShift 操作の時間が通常よりも長いクラスターで (Docker イメージのダウンロードが遅い場合など) AMQ Streams を使用する場合に増やす必要があります。
STRIMZI_OPERATOR_NAMESPACE

AMQ Streams Cluster Operator が稼働している namespace の名前。この変数は手動で設定しないでください。OpenShift Downward API を使用します。

env:
  - name: STRIMZI_OPERATOR_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_OPERATOR_NAMESPACE_LABELS

任意設定。AMQ Streams Cluster Operator が稼働している namespace のラベル。namespace ラベルは、ネットワークポリシーで namespace セレクターを設定するために使用されます。これにより、AMQ Streams Cluster Operator はこれらのラベルを持つ namespace からのオペランドのみにアクセスできます。設定されていない場合、ネットワークポリシーの namespace セレクターは、OpenShift クラスターのすべての namespace から AMQ Streams Cluster Operator にアクセスできるように設定されます。

env:
  - name: STRIMZI_OPERATOR_NAMESPACE_LABELS
    value: label1=value1,label2=value2
STRIMZI_CUSTOM_RESOURCE_SELECTOR

任意設定。Operator によって処理されるカスタムリソースのフィルタリングに使用されるラベルセレクターを指定します。Operator は、指定されたラベルが設定されているカスタムリソースでのみ動作します。これらのラベルのないリソースは Operator によって認識されません。ラベルセレクターは、KafkaKafkaConnectKafkaConnectS2IKafkaBridgeKafkaMirrorMaker、および KafkaMirrorMaker2 リソースに適用されます。KafkaRebalance および KafkaConnector リソースは、対応する Kafka および Kafka Connect クラスターに一致するラベルがある場合にのみ操作されます。

env:
  - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR
    value: label1=value1,label2=value2
STRIMZI_KAFKA_IMAGES
必須。Kafka バージョンから、そのバージョンの Kafka ブローカーが含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの <version>=<image> ペアです。例: 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、Kafka.spec.kafka.version プロパティーが指定され、Kafka リソースの Kafka.spec.kafka.image は指定されていない場合に使用されます。
STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
任意設定で、デフォルトは registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0 です。Kafka リソースで kafka-init-image として指定されたイメージがない場合に、初期設定作業 (ラックサポート) のブローカーの前に開始される init コンテナーのデフォルトとして使用するイメージ名。
STRIMZI_KAFKA_CONNECT_IMAGES
必須。Kafka バージョンから、そのバージョンの Kafka Connect が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの <version>=<image> ペアです。例: 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、 KafkaConnect.spec.version プロパティーが指定され、KafkaConnect.spec.image は指定されていない場合に使用されます。
STRIMZI_KAFKA_CONNECT_S2I_IMAGES
必須。Kafka バージョンから、そのバージョンの Kafka Connect が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの <version>=<image> ペアです。例: 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、KafkaConnectS2I.spec.version プロパティーが指定され、KafkaConnectS2I.spec.image は指定されていない場合に使用されます。
STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
必須。Kafka バージョンから、そのバージョンの Kafka Mirror Maker が含まれる該当の Docker イメージへのマッピングが提供されます。必要な構文は、空白またはカンマ区切りの <version>=<image> ペアです。例: 2.6.0=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.7.0, 2.7.0=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0これは、KafkaMirrorMaker.spec.version プロパティーが指定され、KafkaMirrorMaker.spec.image は指定されていない場合に使用されます。
STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
任意設定で、デフォルトは registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0 です。Kafka リソースに Kafka.spec.entityOperator.topicOperator.image として指定されたイメージがない場合に、Topic Operator のデプロイ時にデフォルトとして使用するイメージ名。
STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
任意設定で、デフォルトは registry.redhat.io/amq7/amq-streams-rhel7-operator:1.7.0 です。Kafka リソースに Kafka.spec.entityOperator.userOperator.image として指定されたイメージがない場合に、User Operator のデプロイ時にデフォルトとして使用するイメージ名。
STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
任意設定で、デフォルトは registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 です。Kafka リソースで Kafka.spec.entityOperator.tlsSidecar.image として指定されたイメージがない場合に、Entity Operator の TLS サポートを提供するサイドカーコンテナーをデプロイする際にデフォルトとして使用するイメージ名。
STRIMZI_IMAGE_PULL_POLICY
任意設定。AMQ Streams の Cluster Operator によって管理されるすべての Pod のコンテナーに適用される ImagePullPolicy。有効な値は、AlwaysIfNotPresent、および Never です。指定のない場合、OpenShift のデフォルトが使用されます。ポリシーを変更すると、すべての Kafka、Kafka Connect、および Kafka MirrorMaker クラスターのローリングアップデートが実行されます。
STRIMZI_IMAGE_PULL_SECRETS
任意設定。Secret 名のカンマ区切りのリスト。ここで参照されるシークレットには、コンテナーイメージがプルされるコンテナーレジストリーへのクレデンシャルが含まれます。シークレットは、Cluster Operator によって作成されるすべての PodsimagePullSecrets フィールドで使用されます。このリストを変更すると、Kafka、Kafka Connect、および Kafka MirrorMaker のすべてのクラスターのローリングアップデートが実行されます。
STRIMZI_KUBERNETES_VERSION

任意設定。API サーバーから検出された Kubernetes バージョン情報をオーバーライドします。

Kubernetes バージョンオーバーライドの設定例

env:
  - name: STRIMZI_KUBERNETES_VERSION
    value: |
           major=1
           minor=16
           gitVersion=v1.16.2
           gitCommit=c97fe5036ef3df2967d086711e6c0c405941e14b
           gitTreeState=clean
           buildDate=2019-10-15T19:09:08Z
           goVersion=go1.12.10
           compiler=gc
           platform=linux/amd64

KUBERNETES_SERVICE_DNS_DOMAIN

任意設定。デフォルトの OpenShift DNS サフィックスを上書きします。

デフォルトでは、OpenShfit クラスターで割り当てられるサービスに、デフォルトのサフィックス cluster.local を使用する DNS ドメイン名があります。

ブローカーが kafka-0 の場合の例は次のとおりです。

<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local

DNS ドメイン名は、ホスト名の検証に使用される Kafka ブローカー証明書に追加されます。

クラスターで異なる DNS サフィックスを使用している場合、Kafka ブローカーとの接続を確立するために、KUBERNETES_SERVICE_DNS_DOMAIN 環境変数をデフォルトから現在使用中の DNS サフィックスに変更します。

STRIMZI_CONNECT_BUILD_TIMEOUT_MS
任意設定、デフォルトは 300000 ミリ秒です。追加のコネクターで新しい Kafka Connect イメージをビルドする場合のタイムアウト (ミリ秒単位)。AMQ Streams を使用して多くのコネクターが含まれるコンテナーイメージをビルドしたり、低速なコンテナーレジストリーを使用する場合は、この値を大きくする必要があります。

5.1.1.1. ConfigMap による設定のロギング

Cluster Operator のロギングは strimzi-cluster-operator ConfigMap によって設定されます。

ロギング設定を含む ConfigMap は、Cluster Operator のインストール時に作成されます。この ConfigMap は、ファイル install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml に記述されています。Cluster Operator のロギングを設定するには、この ConfigMap のデータフィールド log4j2.properties を変更します。

ロギング設定を更新するには、050-ConfigMap-strimzi-cluster-operator.yaml ファイルを編集して以下のコマンドを実行します。

oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml

ConfigMap を直接編集することもできます。

oc edit cm strimzi-cluster-operator

リロード間隔の頻度を変更するには、作成された ConfigMapmonitorInterval オプションで、秒単位の時間を設定します。

Cluster Operator のデプロイ時に ConfigMap がない場合、デフォルトのロギング値が使用されます。

Cluster Operator のデプロイ後に ConfigMap が誤って削除された場合、最後にロードされたロギング設定が使用されます。ConfigMap を新たに作成し、新しいロギング設定を読み込みます。

注記

ConfigMap から monitorInterval オプションを削除しないでください。

5.1.1.2. ネットワークポリシーによる Cluster Operator アクセスの制限

Cluster Operator は、管理するリソースと同じ namespace または別の namespace で実行できます。デフォルトでは、STRIMZI_OPERATOR_NAMESPACE 環境変数は、OpenShift Downward API を使用して Cluster Operator が稼働している namespace を検索するように設定されます。Cluster Operator がリソースと同じ namespace で実行されている場合は、ローカルアクセスのみが必要で、AMQ Sreams によって許可されます。

Cluster Operator が管理するリソースとは別の namespace で実行されている場合、ネットワークポリシーが設定されている場合を除き、OpenShift クラスターのすべての namespace は Cluster Operator へのアクセスが許可されます。任意の STRIMZI_OPERATOR_NAMESPACE_LABELS 環境変数を使用し、namespace ラベルを使用して Cluster Operator のネットワークポリシーを確立します。namespace ラベルを追加すると、Cluster Operator へのアクセスは指定された namespace に限定されます。

Cluster Operator デプロイメントに設定されたネットワークポリシー

#...
env:
  # ...
  - name: STRIMZI_OPERATOR_NAMESPACE_LABELS
    value: label1=value1,label2=value2
  #...

5.1.1.3. 定期的な調整

Cluster Operator は OpenShift クラスターから受信する必要なクラスターリソースに関するすべての通知に対応しますが、Operator が実行されていない場合や、何らかの理由で通知が受信されない場合、必要なリソースは実行中の OpenShift クラスターの状態と同期しなくなります。

フェイルオーバーを適切に処理するために、Cluster Operator によって定期的な調整プロセスが実行され、必要なリソースすべてで一貫した状態になるように、必要なリソースの状態を現在のクラスターデプロイメントと比較できます。[STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] 変数を使用して、定期的な調整の時間間隔を設定できます。

5.1.2. ロールベースアクセス制御 (RBAC) のプロビジョニング

Cluster Operator が機能するには、KafkaKafkaConnect などのリソースや ConfigMapsPodsDeploymentsStatefulSets、および Services の管理リソースと対話するために OpenShift クラスター内でパーミッションが必要になり ます。このようなパーミッションは、OpenShift のロールベースアクセス制御 (RBAC) リソースに記述されます。

  • ServiceAccount
  • Role および ClusterRole
  • RoleBinding および ClusterRoleBinding

Cluster Operator は、ClusterRoleBinding を使用して独自の ServiceAccount で実行される他に、OpenShift リソースへのアクセスを必要とするコンポーネントの RBAC リソースを管理します。

また OpenShift には、ServiceAccount で動作するコンポーネントが、その ServiceAccount にはない他の ServiceAccounts の権限を付与しないようにするための特権昇格の保護機能も含まれています。Cluster Operator は、ClusterRoleBindings と、それが管理するリソースで必要な RoleBindings を作成できる必要があるため、Cluster Operator にも同じ権限が必要です。

5.1.2.1. 委譲された権限

Cluster Operator が必要な Kafka リソースのリソースをデプロイする場合、以下のように ServiceAccountsRoleBindings、および ClusterRoleBindings も作成します。

  • Kafka ブローカー Pod は cluster-name-kafka という ServiceAccount を使用します。

    • ラック機能が使用されると、strimzi-cluster-name-kafka-init ClusterRoleBinding は、strimzi-kafka-broker と呼ばれる ClusterRole 経由で、クラスター内のノードへの ServiceAccount アクセスを付与するために使用されます。
    • ラック機能が使用されていない場合は、バインディングは作成されません。
  • ZooKeeper Pod は cluster-name-zookeeper という ServiceAccount を使用します。
  • Entity Operator は、cluster-name-entity-operator という ServiceAccount を使用します。

    • Topic Operator はステータス情報のある OpenShift イベントを生成し、ServiceAccountstrimzi-entity-operator という ClusterRole にバインドされるようにします。strimzi-entity-operator はこのアクセス権限を strimzi-entity-operator RoleBinding 経由で付与します。
  • KafkaConnect および KafkaConnectS2I リソースの Pod は cluster-name-cluster-connect という ServiceAccount を使用します。
  • KafkaMirrorMaker の Pod は cluster-name-mirror-maker というServiceAccount を使用します。
  • KafkaMirrorMaker2 の Pod は cluster-name-mirrormaker2 というServiceAccount を使用します。
  • KafkaBridge の Pod は cluster-name-bridge というServiceAccount を使用します。

5.1.2.2. ServiceAccount

Cluster Operator は ServiceAccount を使用して最適に実行されます。

Cluster Operator の ServiceAccount の例

apiVersion: v1
kind: ServiceAccount
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi

その後、Cluster Operator の Deployment で、これを spec.template.spec.serviceAccountName に指定する必要があります。

Cluster Operator の Deployment の部分的な例

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
spec:
  replicas: 1
  selector:
    matchLabels:
      name: strimzi-cluster-operator
      strimzi.io/kind: cluster-operator
  template:
      # ...

strimzi-cluster-operator ServiceAccountserviceAccountName として指定されている 12 行目に注目してください。

5.1.2.3. ClusterRoles

Cluster Operator は、必要なリソースへのアクセス権限を付与する ClusterRoles を使用して操作する必要があります。OpenShift クラスターの設定によっては、クラスター管理者が ClusterRoles を作成する必要があることがあります。

注記

クラスター管理者の権限は ClusterRoles の作成にのみ必要です。Cluster Operator はクラスター管理者アカウントで実行されません。

ClusterRoles は、 最小権限の原則に従い、Kafka、Kafka Connect、および ZooKeeper クラスターを操作するために Cluster Operator が必要とする権限のみが含まれます。最初に割り当てられた一連の権限により、Cluster Operator で StatefulSetsDeploymentsPods、および ConfigMaps などの OpenShift リソースを管理できます。

Cluster Operator は ClusterRoles を使用して、namespace スコープリソースのレベルおよびクラスタースコープリソースのレベルで権限を付与します。

Cluster Operator の namespaced リソースのある ClusterRole

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-namespaced
  labels:
    app: strimzi
rules:
  - apiGroups:
      - "rbac.authorization.k8s.io"
    resources:
      # The cluster operator needs to access and manage rolebindings to grant Strimzi components cluster permissions
      - rolebindings
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - "rbac.authorization.k8s.io"
    resources:
      # The cluster operator needs to access and manage roles to grant the entity operator permissions
      - roles
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - ""
    resources:
      # The cluster operator needs to access and delete pods, this is to allow it to monitor pod health and coordinate rolling updates
      - pods
      # The cluster operator needs to access and manage service accounts to grant Strimzi components cluster permissions
      - serviceaccounts
      # The cluster operator needs to access and manage config maps for Strimzi components configuration
      - configmaps
      # The cluster operator needs to access and manage services and endpoints to expose Strimzi components to network traffic
      - services
      - endpoints
      # The cluster operator needs to access and manage secrets to handle credentials
      - secrets
      # The cluster operator needs to access and manage persistent volume claims to bind them to Strimzi components for persistent data
      - persistentvolumeclaims
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - "kafka.strimzi.io"
    resources:
      # The cluster operator runs the KafkaAssemblyOperator, which needs to access and manage Kafka resources
      - kafkas
      - kafkas/status
      # The cluster operator runs the KafkaConnectAssemblyOperator, which needs to access and manage KafkaConnect resources
      - kafkaconnects
      - kafkaconnects/status
      # The cluster operator runs the KafkaConnectS2IAssemblyOperator, which needs to access and manage KafkaConnectS2I resources
      - kafkaconnects2is
      - kafkaconnects2is/status
      # The cluster operator runs the KafkaConnectorAssemblyOperator, which needs to access and manage KafkaConnector resources
      - kafkaconnectors
      - kafkaconnectors/status
      # The cluster operator runs the KafkaMirrorMakerAssemblyOperator, which needs to access and manage KafkaMirrorMaker resources
      - kafkamirrormakers
      - kafkamirrormakers/status
      # The cluster operator runs the KafkaBridgeAssemblyOperator, which needs to access and manage BridgeMaker resources
      - kafkabridges
      - kafkabridges/status
      # The cluster operator runs the KafkaMirrorMaker2AssemblyOperator, which needs to access and manage KafkaMirrorMaker2 resources
      - kafkamirrormaker2s
      - kafkamirrormaker2s/status
      # The cluster operator runs the KafkaRebalanceAssemblyOperator, which needs to access and manage KafkaRebalance resources
      - kafkarebalances
      - kafkarebalances/status
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      # The cluster operator needs the extensions api as the operator supports Kubernetes version 1.11+
      # apps/v1 was introduced in Kubernetes 1.14
      - "extensions"
    resources:
      # The cluster operator needs to access and manage deployments to run deployment based Strimzi components
      - deployments
      - deployments/scale
      # The cluster operator needs to access replica sets to manage Strimzi components and to determine error states
      - replicasets
      # The cluster operator needs to access and manage replication controllers to manage replicasets
      - replicationcontrollers
      # The cluster operator needs to access and manage network policies to lock down communication between Strimzi components
      - networkpolicies
      # The cluster operator needs to access and manage ingresses which allow external access to the services in a cluster
      - ingresses
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - "apps"
    resources:
      # The cluster operator needs to access and manage deployments to run deployment based Strimzi components
      - deployments
      - deployments/scale
      - deployments/status
      # The cluster operator needs to access and manage stateful sets to run stateful sets based Strimzi components
      - statefulsets
      # The cluster operator needs to access replica-sets to manage Strimzi components and to determine error states
      - replicasets
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - ""
    resources:
      # The cluster operator needs to be able to create events and delegate permissions to do so
      - events
    verbs:
      - create
  - apiGroups:
      # OpenShift S2I requirements
      - apps.openshift.io
    resources:
      - deploymentconfigs
      - deploymentconfigs/scale
      - deploymentconfigs/status
      - deploymentconfigs/finalizers
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      # OpenShift S2I requirements
      - build.openshift.io
    resources:
      - buildconfigs
      - buildconfigs/instantiate
      - builds
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      # OpenShift S2I requirements
      - image.openshift.io
    resources:
      - imagestreams
      - imagestreams/status
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - networking.k8s.io
    resources:
      # The cluster operator needs to access and manage network policies to lock down communication between Strimzi components
      - networkpolicies
      # The cluster operator needs to access and manage ingresses which allow external access to the services in a cluster
      - ingresses
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - route.openshift.io
    resources:
      # The cluster operator needs to access and manage routes to expose Strimzi components for external access
      - routes
      - routes/custom-host
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - policy
    resources:
      # The cluster operator needs to access and manage pod disruption budgets this limits the number of concurrent disruptions
      # that a Strimzi component experiences, allowing for higher availability
      - poddisruptionbudgets
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update

2 番目の一連の権限には、クラスタースコープリソースに必要な権限が含まれます。

Cluster Operator のクラスタースコープリソースのある ClusterRole

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-global
  labels:
    app: strimzi
rules:
  - apiGroups:
      - "rbac.authorization.k8s.io"
    resources:
      # The cluster operator needs to create and manage cluster role bindings in the case of an install where a user
      # has specified they want their cluster role bindings generated
      - clusterrolebindings
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - storage.k8s.io
    resources:
      # The cluster operator requires "get" permissions to view storage class details
      # This is because only a persistent volume of a supported storage class type can be resized
      - storageclasses
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      # The cluster operator requires "list" permissions to view all nodes in a cluster
      # The listing is used to determine the node addresses when NodePort access is configured
      # These addresses are then exposed in the custom resource states
      - nodes
    verbs:
      - list

strimzi-kafka-broker ClusterRole は、ラック機能に使用される Kafka Pod の init コンテナーが必要とするアクセス権限を表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。

Cluster Operator の ClusterRole により、OpenShift ノードへのアクセスを Kafka ブローカー Pod に委譲できます。

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-kafka-broker
  labels:
    app: strimzi
rules:
  - apiGroups:
      - ""
    resources:
      # The Kafka Brokers require "get" permissions to view the node they are on
      # This information is used to generate a Rack ID that is used for High Availability configurations
      - nodes
    verbs:
      - get

strimzi-topic-operatorClusterRole は、Topic Operator が必要とするアクセスを表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。

Cluster Operator の ClusterRole により、イベントへのアクセスを Topic Operator に委譲できます。

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-entity-operator
  labels:
    app: strimzi
rules:
  - apiGroups:
      - "kafka.strimzi.io"
    resources:
      # The entity operator runs the KafkaTopic assembly operator, which needs to access and manage KafkaTopic resources
      - kafkatopics
      - kafkatopics/status
      # The entity operator runs the KafkaUser assembly operator, which needs to access and manage KafkaUser resources
      - kafkausers
      - kafkausers/status
    verbs:
      - get
      - list
      - watch
      - create
      - patch
      - update
      - delete
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      # The entity operator needs to be able to create events
      - create
  - apiGroups:
      - ""
    resources:
      # The entity operator user-operator needs to access and manage secrets to store generated credentials
      - secrets
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update

strimzi-kafka-client ClusterRole は、クライアントのラックアウェアネスを使用する Kafka クライアントをベースとするコンポーネントが必要なアクセスを表します。「委譲された権限」 で説明したように、このアクセスを委譲できるようにするには、このロールも Cluster Operator に必要です。

Cluster Operator の ClusterRole により、OpenShift ノードへのアクセスを Kafka クライアントベースの Pod に委譲できます。

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-kafka-client
  labels:
    app: strimzi
rules:
  - apiGroups:
      - ""
    resources:
      # The Kafka clients (Connect, Mirror Maker, etc.) require "get" permissions to view the node they are on
      # This information is used to generate a Rack ID (client.rack option) that is used for consuming from the closest
      # replicas when enabled
      - nodes
    verbs:
      - get

5.1.2.4. ClusterRoleBindings

Operator には ClusterRoleBindings と、ClusterRoleServiceAccount に関連付ける RoleBindings が必要です。ClusterRoleBindings は、クラスタースコープリロースが含まれる ClusterRoles に必要です。

Cluster Operator の ClusterRoleBinding の例

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-cluster-operator-global
  apiGroup: rbac.authorization.k8s.io

ClusterRoleBindings は、委譲に必要な ClusterRoles にも必要です。

Kafka ブローカーラックアウェアネスの Cluster Operator の ClusterRoleBinding

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator-kafka-broker-delegation
  labels:
    app: strimzi
# The Kafka broker cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Kafka brokers.
# This must be done to avoid escalating privileges which would be blocked by Kubernetes.
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-kafka-broker
  apiGroup: rbac.authorization.k8s.io

以下も必要です。

Kafka クライアントラックアウェアネスの Cluster Operator の ClusterRoleBinding

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator-kafka-client-delegation
  labels:
    app: strimzi
# The Kafka clients cluster role must be bound to the cluster operator service account so that it can delegate the
# cluster role to the Kafka clients using it for consuming from closest replica.
# This must be done to avoid escalating privileges which would be blocked by Kubernetes.
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-kafka-client
  apiGroup: rbac.authorization.k8s.io

namespaced リソースのみが含まれる ClusterRoles は、RoleBindings のみを使用してバインドされます。

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-cluster-operator-namespaced
  apiGroup: rbac.authorization.k8s.io
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator-entity-operator-delegation
  labels:
    app: strimzi
# The Entity Operator cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Entity Operator.
# This must be done to avoid escalating privileges which would be blocked by Kubernetes.
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-entity-operator
  apiGroup: rbac.authorization.k8s.io

5.2. Topic Operator の使用

KafkaTopic リソースを使用してトピックを作成、編集、または削除する場合、Topic Operator によって変更が確実に Kafka クラスターで反映されます。

OpenShift での AMQ Streams のデプロイおよびアップグレード』には、Topic Operator をデプロイする手順が記載されています。

5.2.1. Kafka トピックリソース

KafkaTopic リソースは、パーティションやレプリカの数を含む、トピックの設定に使用されます。

KafkaTopic の完全なスキーマは、「KafkaTopic スキーマ参照」で確認できます。

5.2.1.1. トピック処理用の Kafka クラスターの特定

KafkaTopic リソースには、このリソースが属する Kafka クラスターに適した名前 (Kafka リソースの名前から派生) を定義するラベルが含まれています。

以下に例を示します。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
  labels:
    strimzi.io/cluster: my-cluster

ラベルは、KafkaTopic リソースを特定し、新しいトピックを作成するために、Topic Operator によって使用されます。また、以降のトピックの処理でも使用されます。

ラベルが Kafka クラスターと一致しない場合、Topic Operator は KafkaTopic を識別できず、トピックは作成されません。

5.2.1.2. Kafka トピックの使用に関する推奨事項

トピックを使用する場合は、整合性を保ちます。常に KafkaTopic リソースで作業を行うか、直接 OpenShift でトピックを扱います。特定のトピックで、両方の方法を頻繁に切り替えないでください。

トピックの性質を反映するトピック名を使用し、後で名前を変更できないことに注意してください。

Kafka でトピックを作成する場合は、有効な OpenShift リソース名である名前を使用します。それ以外の場合は、Topic Operator は対応する KafkaTopic を OpenShift ルールに準じた名前で作成する必要があります。

注記

OpenShift の識別子および名前の推奨事項については、OpenShift コミュニティーの記事「Identifiers and Names」を参照してください。

5.2.1.3. Kafka トピックの命名規則

Kafka と OpenShift では、Kafka と KafkaTopic.metadata.name でのトピックの命名にそれぞれ独自の検証ルールを適用します。トピックごとに有効な名前があり、他のトピックには無効です。

spec.topicName プロパティーを使用すると、OpenShift の Kafka トピックでは無効な名前を使用して、Kafka で有効なトピックを作成できます。

spec.topicName プロパティーは Kafka の命名検証ルールを継承します。

  • 249 文字を超える名前は使用できません。
  • Kafka トピックの有効な文字は ASCII 英数字、._、および - です。
  • 名前を . または .. にすることはできませんが、.exampleTopic..exampleTopic のように名前で使用できます。

spec.topicName は変更しないでください。

以下に例を示します。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
spec:
  topicName: topicName-1 1
  # ...
1
OpenShift では大文字は無効です。

上記は下記のように変更できません。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
spec:
  topicName: name-2
  # ...
注記

Kafka Streams など一部の Kafka クライアントアプリケーションは、プログラムを使用して Kafka でトピックを作成できます。これらのトピックに、OpenShift リソース名としては無効な名前が付いている場合、Topic Operator はそれらのトピックに Kafka 名に基づく有効な metadata.name を付けます。無効な文字が置き換えられ、ハッシュが名前に追加されます。以下に例を示します。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: mytopic---c55e57fe2546a33f9e603caf57165db4072e827e
spec:
  topicName: myTopic
  # ...

5.2.2. Topic Operator のトピックストア

Topic Operator は Kafka を使用して、トピック設定をキーと値のペアとして記述するトピックメタデータを保存します。トピックストアは、Kafka トピックを使用して状態を永続化する Kafka Streams のキーバリューメカニズムを基にしています。

トピックメタデータはインメモリーでキャッシュされ、Topic Operator 内にてローカルでアクセスされます。ローカルのインメモリーキャッシュに適用される操作からの更新は、ディスク上のバックアップトピックストアに永続化されます。トピックストアは、Kafka トピックまたは OpenShift KafkaTopic カスタムリソースからの更新と継続的に同期されます。操作は、このような方法で設定されたトピックストアで迅速に処理されますが、インメモリーキャッシュがクラッシュした場合は、永続ストレージから自動的にデータが再入力されます。

5.2.2.1. 内部トピックストアトピック

内部トピックは、トピックストアでのトピックメタデータの処理をサポートします。

__strimzi_store_topic
トピックメタデータを保存するための入力トピック
__strimzi-topic-operator-kstreams-topic-store-changelog
圧縮されたトピックストア値のログの維持
警告

これらのトピックは、Topic Operator の実行に不可欠であるため、削除しないでください。

5.2.2.2. ZooKeeper からのトピックメタデータの移行

これまでのリリースの AMQ Streams では、トピックメタデータは ZooKeeper に保存されていました。新しいプロセスによってこの要件は除外されたため、メタデータは Kafka クラスターに取り込まれ、Topic Operator の制御下となります。

AMQ Streams 1.7 にアップグレードする場合、Topic Operator によってトピックストアが制御されるようにシームレスに移行されます。メタデータは ZooKeeper から検出および移行され、古いストアは削除されます。

5.2.2.3. ZooKeeper を使用してトピックメタデータを保存する AMQ Streams バージョンへのダウングレード

トピックメタデータの保存に ZooKeeper を使用する 0.22 より前のバージョンの AMQ Streams に戻す場合でも、Cluster Operator を前のバージョンにダウングレードしてから、Kafka ブローカーおよびクライアントアプリケーションを前の Kafka バージョンにダウングレードします。

ただし、kafka-admin コマンドを使用して Kafka クラスターのブートストラップアドレスを指定し、トピックストア用に作成されたトピックを削除する必要もあります。以下に例を示します。

oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.7.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete

このコマンドは、Kafka クラスターへのアクセスに使用されるリスナーおよび認証のタイプに対応している必要があります。

Topic Operator は、Kafka のトピックの状態から ZooKeeper トピックメタデータを再構築します。

5.2.2.4. Topic Operator トピックのレプリケーションおよびスケーリング

Topic Operator によって管理されるトピックには、トピックレプリケーション係数を 3 に設定し、最低でも 2 つの In-Sync レプリカを設定することが推奨されます。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1 1
  replicas: 3 2
  config:
    min.insync.replicas=2 3
  #...
1
トピックのパーティション数。通常、1 つのパーティションで十分です。
2
レプリカトピックパーティションの数。現在、これを KafkaTopic リソースで変更できませんが、kafka-reassign-partitions.sh ツールを使用して変更できます。
3
メッセージが正常に書き込まれる必要があるレプリカパーティションの最小数。この条件を満たさない場合は例外が発生します。
注記

In-Sync レプリカは、プロデューサーアプリケーションの acks 設定とともに使用されます。acks 設定は、メッセージの正常受信が確認される前に、メッセージがレプリケートされなければならないフォロワーパーティションの数を決定します。Topic Operator は acks=all で実行されます。すべての In-Sync レプリカによってメッセージが確認される必要があります。

ブローカーを追加または削除して Kafka クラスターをスケーリングする場合、レプリケーション係数設定は変更されず、レプリカは自動的に再割り当てされません。ただし、kafka-reassign-partitions.sh ツールを使用してレプリケーション係数を変更し、レプリカをブローカーに手動で再割り当てすることができます。

また、AMQ Streams の Cruise Control の統合ではトピックのレプリケーション係数を変更することはできませんが、Kafka をリバランスするために生成された最適化プロポーザルには、パーティションレプリカを転送し、パーティションリーダーを変更するコマンドが含まれます。

5.2.2.5. トピック変更の処理

Topic Operator にとって解決しなければならない基本的な問題として、信頼できる唯一の情報源 (SSOT: single source of truth) がないことがあります。KafkaTopic リソースと Kafka トピックの両方とも、Topic Operator に関係なく変更される可能性があります面倒なことに、Topic Operator は KafkaTopic リソースと Kafka トピックで変更を常にリアルタイムで監視できるとは限りません。たとえば、Topic Operator が停止した場合などがこれに該当します。

これを解決するために、Topic Operator はトピックストアの各トピックに関する情報を維持します。Kafka クラスターまたは OpenShift で変更が生じると、他のシステムの状態とトピックストアの両方を確認し、すべての同期が保たれるように何を変更する必要があるかを判断します。同じことが Topic Operator の起動時に必ず実行され、また Topic Operator の稼働中にも定期的に行われます。

たとえば、Topic Operator が実行されていないときに my-topic という KafkaTopic が作成された場合を考えてみましょう。Topic Operator の起動時、トピックストアには my-topic に関する情報が含まれないため、最後に実行された後に KafkaTopic が作成されたと推測できます。Topic Operator によって my-topic に対応するトピックが作成され、さらにトピックストアに my-topic のメタデータも格納されます。

Kafka トピック設定を更新するか、KafkaTopic カスタムリソースで変更を適用する場合、Kafka クラスターの調整後にトピックストアが更新されます。

また、トピックストアは Kafka トピックでトピック設定が変更され、かつ OpenShift KafkaTopic カスタムリソースで更新される場合に、変更が矛盾しない限り、Topic Operator による管理を可能にします。たとえば、同じトピック設定キーに変更を加えることはできますが、別の値への変更のみが可能です。変更に矛盾がある場合、Kafka の設定が優先され、KafkaTopic はそれに応じて更新されます。

注記

KafkaTopic リソースを使用すると、oc delete -f KAFKA-TOPIC-CONFIG-FILE コマンドを使用してトピックを削除することもできます。これを可能にするには、Kafka リソースの spec.kafka.configdelete.topic.enabletrue (デフォルト) に設定する必要があります。

5.2.3. Kafka トピックの設定

KafkaTopic リソースのプロパティーを使用して Kafka トピックを設定します。

oc apply を使用すると、トピックを作成または編集できます。oc delete を使用すると、既存のトピックを削除できます。

以下に例を示します。

  • oc apply -f <topic-config-file>
  • oc delete KafkaTopic <topic-name>

この手順では、10 個のパーティションと 2 つのレプリカがあるトピックを作成する方法を説明します。

作業を開始する前の注意事項

以下を考慮してから変更を行うことが重要になります。

  • Kafka は KafkaTopic リソースによる以下の変更をサポートしません

    • spec.topicName を使用したトピック名の変更
    • spec.partitions を使用したパーティションサイズの減少
  • spec.replicas を使用して最初に指定したレプリカの数を変更することはできません。
  • キーのあるトピックの spec.partitions を増やすと、レコードをパーティション化する方法が変更されます。これは、トピックがセマンティックパーティションを使用するとき、特に問題になる場合があります。

前提条件

手順

  1. 作成する KafkaTopic が含まれるファイルを準備します。

    KafkaTopic の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: orders
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2

    ヒント

    トピックを変更する場合、現行バージョンのリソースは、oc get kafkatopic orders -o yaml を使用して取得できます。

  2. OpenShift で KafkaTopic リソースを作成します。

    oc apply -f TOPIC-CONFIG-FILE

5.2.4. リソース要求および制限のある Topic Operator の設定

CPU やメモリーなどのリソースを Topic Operator に割り当て、Topic Operator が消費できるリソースの量に制限を設定できます。

前提条件

  • Cluster Operator が稼働している必要があります。

手順

  1. 必要に応じてエディターで Kafka クラスター設定を更新します。

    oc edit kafka MY-CLUSTER
  2. Kafka リソースの spec.entityOperator.topicOperator.resources プロパティーで、Topic Operator のリソース要求および制限を設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # Kafka and ZooKeeper sections...
      entityOperator:
        topicOperator:
          resources:
            requests:
              cpu: "1"
              memory: 500Mi
            limits:
              cpu: "1"
              memory: 500Mi
  3. 新しい設定を適用してリソースを作成または更新します。

    oc apply -f KAFKA-CONFIG-FILE

5.3. User Operator の使用

KafkaUser リソースを使用してユーザーを作成、編集、または削除する場合、User Operator によって変更が確実に Kafka クラスターで反映されます。

OpenShift での AMQ Streams のデプロイおよびアップグレード』には、User Operator をデプロイする手順が記載されています。

スキーマの詳細は、「KafkaUser スキーマ参照」を参照してください。

Kafka へのアクセスの認証および承認

KafkaUser を使用して、Kafka にアクセスするために特定のクライアントが使用する認証および承認メカニズムを有効にします。

KafkUser を使用してユーザーを管理し、Kafka ブローカーへのアクセスをセキュアにする方法の詳細は、「Kafka ブローカーへのアクセスのセキュア化」を参照してください。

5.3.1. リソース要求および制限のある User Operator の設定

CPU やメモリーなどのリソースを User Operator に割り当て、User Operator が消費できるリソースの量に制限を設定できます。

前提条件

  • Cluster Operator が稼働している必要があります。

手順

  1. 必要に応じてエディターで Kafka クラスター設定を更新します。

    oc edit kafka MY-CLUSTER
  2. Kafka リソースの spec.entityOperator.userOperator.resources プロパティーで、User Operator のリソース要求および制限を設定します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # Kafka and ZooKeeper sections...
      entityOperator:
        userOperator:
          resources:
            requests:
              cpu: "1"
              memory: 500Mi
            limits:
              cpu: "1"
              memory: 500Mi

    ファイルを保存し、エディターを終了します。Cluster Operator によって変更が自動的に適用されます。

5.4. Prometheus メトリクスを使用した Operator の監視

AMQ Streams の operator は Prometheus メトリクスを公開します。メトリクスは自動で有効になり、以下の情報が含まれます。

  • 調整の数
  • operator が処理しているカスタムリソースの数
  • 調整の期間
  • operator からの JVM メトリクス

この他に、Grafana ダッシュボードのサンプルが提供されます。

詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「AMQ Streams のメトリクスおよびダッシュボードの設定」を参照してください。