Chapter 4. Operators

4.1. Cluster Operator

Use the Cluster Operator to deploy a Kafka cluster and other Kafka components.

For information on the deployment options available for Kafka, see Section 3.1, “Kafka cluster configuration”.

Note

On OpenShift, a Kafka Connect deployment can incorporate a Source2Image feature to provide a convenient way to add additional connectors.

4.1.1. Cluster Operator

AMQ Streams uses the Cluster Operator to deploy and manage clusters for:

  • Kafka (including ZooKeeper, Entity Operator and Kafka Exporter)
  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

Custom resources are used to deploy the clusters.

For example, to deploy a Kafka cluster:

  • A Kafka resource with the cluster configuration is created within the OpenShift cluster.
  • The Cluster Operator deploys a corresponding Kafka cluster, based on what is declared in the Kafka resource.

The Cluster Operator can also deploy (through configuration of the Kafka resource):

  • A Topic Operator to provide operator-style topic management through KafkaTopic custom resources
  • A User Operator to provide operator-style user management through KafkaUser custom resources

The Topic Operator and User Operator function within the Entity Operator on deployment.

Example architecture for the Cluster Operator

Cluster Operator

4.1.2. Watch options for a Cluster Operator deployment

When the Cluster Operator is running, it starts to watch for updates of Kafka resources.

Depending on the deployment, the Cluster Operator can watch Kafka resources from:

Note

AMQ Streams provides example YAML files to make the deployment process easier.

The Cluster Operator watches for changes to the following resources:

  • Kafka for the Kafka cluster.
  • KafkaConnect for the Kafka Connect cluster.
  • KafkaConnectS2I for the Kafka Connect cluster with Source2Image support.
  • KafkaConnector for creating and managing connectors in a Kafka Connect cluster.
  • KafkaMirrorMaker for the Kafka MirrorMaker instance.
  • KafkaBridge for the Kafka Bridge instance

When one of these resources is created in the OpenShift cluster, the operator gets the cluster description from the resource and starts creating a new cluster for the resource by creating the necessary OpenShift resources, such as StatefulSets, Services and ConfigMaps.

Each time a Kafka resource is updated, the operator performs corresponding updates on the OpenShift resources that make up the cluster for the resource.

Resources are either patched or deleted, and then recreated in order to make the cluster for the resource reflect the desired state of the cluster. This operation might cause a rolling update that might lead to service disruption.

When a resource is deleted, the operator undeploys the cluster and deletes all related OpenShift resources.

4.1.3. Deploying the Cluster Operator to watch a single namespace

Prerequisites

  • This procedure requires use of an OpenShift user account which is able to create CustomResourceDefinitions, ClusterRoles and ClusterRoleBindings. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such as system:admin.
  • Modify the installation files according to the namespace the Cluster Operator is going to be installed in.

    On Linux, use:

    sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    On MacOS, use:

    sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

Procedure

  • Deploy the Cluster Operator:

    oc apply -f install/cluster-operator -n my-namespace

4.1.4. Deploying the Cluster Operator to watch multiple namespaces

Prerequisites

  • This procedure requires use of an OpenShift user account which is able to create CustomResourceDefinitions, ClusterRoles and ClusterRoleBindings. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such as system:admin.
  • Edit the installation files according to the namespace the Cluster Operator is going to be installed in.

    On Linux, use:

    sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    On MacOS, use:

    sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

Procedure

  1. Edit the file install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml and in the environment variable STRIMZI_NAMESPACE list all the namespaces where Cluster Operator should watch for resources. For example:

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
          - name: strimzi-cluster-operator
            image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0
            imagePullPolicy: IfNotPresent
            env:
            - name: STRIMZI_NAMESPACE
              value: watched-namespace-1,watched-namespace-2,watched-namespace-3
  2. For all namespaces which should be watched by the Cluster Operator (watched-namespace-1, watched-namespace-2, watched-namespace-3 in the above example), install the RoleBindings. Replace the watched-namespace with the namespace used in the previous step.

    This can be done using oc apply:

    oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace
    oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace
    oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
  3. Deploy the Cluster Operator

    This can be done using oc apply:

    oc apply -f install/cluster-operator -n my-namespace

4.1.5. Deploying the Cluster Operator to watch all namespaces

You can configure the Cluster Operator to watch AMQ Streams resources across all namespaces in your OpenShift cluster. When running in this mode, the Cluster Operator automatically manages clusters in any new namespaces that are created.

Prerequisites

  • This procedure requires use of an OpenShift user account which is able to create CustomResourceDefinitions, ClusterRoles and ClusterRoleBindings. Use of Role Base Access Control (RBAC) in the OpenShift cluster usually means that permission to create, edit, and delete these resources is limited to OpenShift cluster administrators, such as system:admin.
  • Your OpenShift cluster is running.

Procedure

  1. Configure the Cluster Operator to watch all namespaces:

    1. Edit the 050-Deployment-strimzi-cluster-operator.yaml file.
    2. Set the value of the STRIMZI_NAMESPACE environment variable to *.

      apiVersion: apps/v1
      kind: Deployment
      spec:
        # ...
        template:
          spec:
            # ...
            serviceAccountName: strimzi-cluster-operator
            containers:
            - name: strimzi-cluster-operator
              image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0
              imagePullPolicy: IfNotPresent
              env:
              - name: STRIMZI_NAMESPACE
                value: "*"
              # ...
  2. Create ClusterRoleBindings that grant cluster-wide access to all namespaces to the Cluster Operator.

    Use the oc create clusterrolebinding command:

    oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-namespace:strimzi-cluster-operator
    oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-namespace:strimzi-cluster-operator
    oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-namespace:strimzi-cluster-operator

    Replace my-namespace with the namespace in which you want to install the Cluster Operator.

  3. Deploy the Cluster Operator to your OpenShift cluster.

    Use the oc apply command:

    oc apply -f install/cluster-operator -n my-namespace

4.1.6. Reconciliation

Although the operator reacts to all notifications about the desired cluster resources received from the OpenShift cluster, if the operator is not running, or if a notification is not received for any reason, the desired resources will get out of sync with the state of the running OpenShift cluster.

In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the desired resources with the current cluster deployments in order to have a consistent state across all of them. You can set the time interval for the periodic reconciliations using the [STRIMZI_FULL_RECONCILIATION_INTERVAL_MS] variable.

4.1.7. Cluster Operator Configuration

The Cluster Operator can be configured through the following supported environment variables:

STRIMZI_NAMESPACE

A comma-separated list of namespaces that the operator should operate in. When not set, set to empty string, or to * the Cluster Operator will operate in all namespaces. The Cluster Operator deployment might use the OpenShift Downward API to set this automatically to the namespace the Cluster Operator is deployed in. See the example below:

env:
  - name: STRIMZI_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_LOG_LEVEL
Optional, default INFO. The level for printing logging messages. The value can be set to: ERROR, WARNING, INFO, DEBUG, and TRACE.
STRIMZI_OPERATION_TIMEOUT_MS
Optional, default 300000 ms. The timeout for internal operations, in milliseconds. This value should be increased when using AMQ Streams on clusters where regular OpenShift operations take longer than usual (because of slow downloading of Docker images, for example).
STRIMZI_KAFKA_IMAGES
Required. This provides a mapping from Kafka version to the corresponding Docker image containing a Kafka broker of that version. The required syntax is whitespace or comma separated <version>=<image> pairs. For example 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23-rhel7:1.4.0, 2.4.0=registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. This is used when a Kafka.spec.kafka.version property is specified but not the Kafka.spec.kafka.image, as described in Section 3.1.19, “Container images”.
STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
Optional, default registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0. The image name to use as default for the init container started before the broker for initial configuration work (that is, rack support), if no image is specified as the kafka-init-image in the Section 3.1.19, “Container images”.
STRIMZI_DEFAULT_TLS_SIDECAR_KAFKA_IMAGE
Optional, default registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. The image name to use as the default when deploying the sidecar container which provides TLS support for Kafka, if no image is specified as the Kafka.spec.kafka.tlsSidecar.image in the Section 3.1.19, “Container images”.
STRIMZI_DEFAULT_TLS_SIDECAR_ZOOKEEPER_IMAGE
Optional, default registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. The image name to use as the default when deploying the sidecar container which provides TLS support for ZooKeeper, if no image is specified as the Kafka.spec.zookeeper.tlsSidecar.image in the Section 3.1.19, “Container images”.
STRIMZI_KAFKA_CONNECT_IMAGES
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated <version>=<image> pairs. For example 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23-rhel7:1.4.0, 2.4.0=registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. This is used when a KafkaConnect.spec.version property is specified but not the KafkaConnect.spec.image, as described in Section 3.2.11, “Container images”.
STRIMZI_KAFKA_CONNECT_S2I_IMAGES
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka connect of that version. The required syntax is whitespace or comma separated <version>=<image> pairs. For example 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23-rhel7:1.4.0, 2.4.0=registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. This is used when a KafkaConnectS2I.spec.version property is specified but not the KafkaConnectS2I.spec.image, as described in Section 3.3.11, “Container images”.
STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
Required. This provides a mapping from the Kafka version to the corresponding Docker image containing a Kafka mirror maker of that version. The required syntax is whitespace or comma separated <version>=<image> pairs. For example 2.3.0=registry.redhat.io/amq7/amq-streams-kafka-23-rhel7:1.4.0, 2.4.0=registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. This is used when a KafkaMirrorMaker.spec.version property is specified but not the KafkaMirrorMaker.spec.image, as described in Section 3.4.2.14, “Container images”.
STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
Optional, default registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0. The image name to use as the default when deploying the topic operator, if no image is specified as the Kafka.spec.entityOperator.topicOperator.image in the Section 3.1.19, “Container images” of the Kafka resource.
STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
Optional, default registry.redhat.io/amq7/amq-streams-rhel7-operator:1.4.0. The image name to use as the default when deploying the user operator, if no image is specified as the Kafka.spec.entityOperator.userOperator.image in the Section 3.1.19, “Container images” of the Kafka resource.
STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
Optional, default registry.redhat.io/amq7/amq-streams-kafka-24-rhel7:1.4.0. The image name to use as the default when deploying the sidecar container which provides TLS support for the Entity Operator, if no image is specified as the Kafka.spec.entityOperator.tlsSidecar.image in the Section 3.1.19, “Container images”.
STRIMZI_IMAGE_PULL_POLICY
Optional. The ImagePullPolicy which will be applied to containers in all pods managed by AMQ Streams Cluster Operator. The valid values are Always, IfNotPresent, and Never. If not specified, the OpenShift defaults will be used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.
STRIMZI_IMAGE_PULL_SECRETS
Optional. A comma-separated list of Secret names. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are used in the imagePullSecrets field for all Pods created by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.
STRIMZI_KUBERNETES_VERSION

Optional. Overrides the OpenShift version information detected from the API server. See the example below:

env:
  - name: STRIMZI_KUBERNETES_VERSION
    value: |
           major=1
           minor=16
           gitVersion=v1.16.2
           gitCommit=c97fe5036ef3df2967d086711e6c0c405941e14b
           gitTreeState=clean
           buildDate=2019-10-15T19:09:08Z
           goVersion=go1.12.10
           compiler=gc
           platform=linux/amd64

4.1.8. Role-Based Access Control (RBAC)

4.1.8.1. Provisioning Role-Based Access Control (RBAC) for the Cluster Operator

For the Cluster Operator to function it needs permission within the OpenShift cluster to interact with resources such as Kafka, KafkaConnect, and so on, as well as the managed resources, such as ConfigMaps, Pods, Deployments, StatefulSets, Services, and so on. Such permission is described in terms of OpenShift role-based access control (RBAC) resources:

  • ServiceAccount,
  • Role and ClusterRole,
  • RoleBinding and ClusterRoleBinding.

In addition to running under its own ServiceAccount with a ClusterRoleBinding, the Cluster Operator manages some RBAC resources for the components that need access to OpenShift resources.

OpenShift also includes privilege escalation protections that prevent components operating under one ServiceAccount from granting other ServiceAccounts privileges that the granting ServiceAccount does not have. Because the Cluster Operator must be able to create the ClusterRoleBindings, and RoleBindings needed by resources it manages, the Cluster Operator must also have those same privileges.

4.1.8.2. Delegated privileges

When the Cluster Operator deploys resources for a desired Kafka resource it also creates ServiceAccounts, RoleBindings, and ClusterRoleBindings, as follows:

  • The Kafka broker pods use a ServiceAccount called cluster-name-kafka

    • When the rack feature is used, the strimzi-cluster-name-kafka-init ClusterRoleBinding is used to grant this ServiceAccount access to the nodes within the cluster via a ClusterRole called strimzi-kafka-broker
    • When the rack feature is not used no binding is created
  • The ZooKeeper pods use a ServiceAccount called cluster-name-zookeeper
  • The Entity Operator pod uses a ServiceAccount called cluster-name-entity-operator

    • The Topic Operator produces OpenShift events with status information, so the ServiceAccount is bound to a ClusterRole called strimzi-entity-operator which grants this access via the strimzi-entity-operator RoleBinding
  • The pods for KafkaConnect and KafkaConnectS2I resources use a ServiceAccount called cluster-name-cluster-connect
  • The pods for KafkaMirrorMaker use a ServiceAccount called cluster-name-mirror-maker
  • The pods for KafkaBridge use a ServiceAccount called cluster-name-bridge

4.1.8.3. ServiceAccount

The Cluster Operator is best run using a ServiceAccount:

Example ServiceAccount for the Cluster Operator

apiVersion: v1
kind: ServiceAccount
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi

The Deployment of the operator then needs to specify this in its spec.template.spec.serviceAccountName:

Partial example of Deployment for the Cluster Operator

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
spec:
  replicas: 1
  selector:
    matchLabels:
      name: strimzi-cluster-operator
      strimzi.io/kind: cluster-operator
  template:
      # ...

Note line 12, where the the strimzi-cluster-operator ServiceAccount is specified as the serviceAccountName.

4.1.8.4. ClusterRoles

The Cluster Operator needs to operate using ClusterRoles that gives access to the necessary resources. Depending on the OpenShift cluster setup, a cluster administrator might be needed to create the ClusterRoles.

Note

Cluster administrator rights are only needed for the creation of the ClusterRoles. The Cluster Operator will not run under the cluster admin account.

The ClusterRoles follow the principle of least privilege and contain only those privileges needed by the Cluster Operator to operate Kafka, Kafka Connect, and ZooKeeper clusters. The first set of assigned privileges allow the Cluster Operator to manage OpenShift resources such as StatefulSets, Deployments, Pods, and ConfigMaps.

Cluster Operator uses ClusterRoles to grant permission at the namespace-scoped resources level and cluster-scoped resources level:

ClusterRole with namespaced resources for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-namespaced
  labels:
    app: strimzi
rules:
- apiGroups:
  - ""
  resources:
  - serviceaccounts
  verbs:
  - get
  - create
  - delete
  - patch
  - update
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - rolebindings
  verbs:
  - get
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - kafka.strimzi.io
  resources:
  - kafkas
  - kafkas/status
  - kafkaconnects
  - kafkaconnects/status
  - kafkaconnects2is
  - kafkaconnects2is/status
  - kafkaconnectors
  - kafkaconnectors/status
  - kafkamirrormakers
  - kafkamirrormakers/status
  - kafkabridges
  - kafkabridges/status
  - kafkamirrormaker2s
  - kafkamirrormaker2s/status
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - get
  - list
  - watch
  - delete
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - endpoints
  verbs:
  - get
  - list
  - watch
- apiGroups:
  - extensions
  resources:
  - deployments
  - deployments/scale
  - replicasets
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - apps
  resources:
  - deployments
  - deployments/scale
  - deployments/status
  - statefulsets
  - replicasets
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - create
- apiGroups:
  - extensions
  resources:
  - replicationcontrollers
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - apps.openshift.io
  resources:
  - deploymentconfigs
  - deploymentconfigs/scale
  - deploymentconfigs/status
  - deploymentconfigs/finalizers
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - build.openshift.io
  resources:
  - buildconfigs
  - builds
  verbs:
  - create
  - delete
  - get
  - list
  - patch
  - watch
  - update
- apiGroups:
  - image.openshift.io
  resources:
  - imagestreams
  - imagestreams/status
  verbs:
  - create
  - delete
  - get
  - list
  - watch
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - replicationcontrollers
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - secrets
  verbs:
  - get
  - list
  - create
  - delete
  - patch
  - update
- apiGroups:
  - extensions
  resources:
  - networkpolicies
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - networking.k8s.io
  resources:
  - networkpolicies
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - route.openshift.io
  resources:
  - routes
  - routes/custom-host
  verbs:
  - get
  - list
  - create
  - delete
  - patch
  - update
- apiGroups:
  - ""
  resources:
  - persistentvolumeclaims
  verbs:
  - get
  - list
  - create
  - delete
  - patch
  - update
- apiGroups:
  - policy
  resources:
  - poddisruptionbudgets
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update
- apiGroups:
  - extensions
  resources:
  - ingresses
  verbs:
  - get
  - list
  - watch
  - create
  - delete
  - patch
  - update

The second includes the permissions needed for cluster-scoped resources.

ClusterRole with cluster-scoped resources for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-global
  labels:
    app: strimzi
rules:
- apiGroups:
  - rbac.authorization.k8s.io
  resources:
  - clusterrolebindings
  verbs:
  - get
  - create
  - delete
  - patch
  - update
  - watch
- apiGroups:
  - storage.k8s.io
  resources:
  - storageclasses
  verbs:
  - get
- apiGroups:
  - ""
  resources:
  - nodes
  verbs:
  - list

The strimzi-kafka-broker ClusterRole represents the access needed by the init container in Kafka pods that is used for the rack feature. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.

ClusterRole for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka broker pods

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-kafka-broker
  labels:
    app: strimzi
rules:
- apiGroups:
  - ""
  resources:
  - nodes
  verbs:
  - get

The strimzi-topic-operator ClusterRole represents the access needed by the Topic Operator. As described in the Delegated privileges section, this role is also needed by the Cluster Operator in order to be able to delegate this access.

ClusterRole for the Cluster Operator allowing it to delegate access to events to the Topic Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-entity-operator
  labels:
    app: strimzi
rules:
- apiGroups:
  - kafka.strimzi.io
  resources:
  - kafkatopics
  - kafkatopics/status
  verbs:
  - get
  - list
  - watch
  - create
  - patch
  - update
  - delete
- apiGroups:
  - ""
  resources:
  - events
  verbs:
  - create
- apiGroups:
  - kafka.strimzi.io
  resources:
  - kafkausers
  - kafkausers/status
  verbs:
  - get
  - list
  - watch
  - create
  - patch
  - update
  - delete
- apiGroups:
  - ""
  resources:
  - secrets
  verbs:
  - get
  - list
  - create
  - patch
  - update
  - delete

4.1.8.5. ClusterRoleBindings

The operator needs ClusterRoleBindings and RoleBindings which associates its ClusterRole with its ServiceAccount: ClusterRoleBindings are needed for ClusterRoles containing cluster-scoped resources.

Example ClusterRoleBinding for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
subjects:
- kind: ServiceAccount
  name: strimzi-cluster-operator
  namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-cluster-operator-global
  apiGroup: rbac.authorization.k8s.io

ClusterRoleBindings are also needed for the ClusterRoles needed for delegation:

Examples RoleBinding for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator-kafka-broker-delegation
  labels:
    app: strimzi
subjects:
- kind: ServiceAccount
  name: strimzi-cluster-operator
  namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-kafka-broker
  apiGroup: rbac.authorization.k8s.io

ClusterRoles containing only namespaced resources are bound using RoleBindings only.

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
subjects:
- kind: ServiceAccount
  name: strimzi-cluster-operator
  namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-cluster-operator-namespaced
  apiGroup: rbac.authorization.k8s.io
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator-entity-operator-delegation
  labels:
    app: strimzi
subjects:
- kind: ServiceAccount
  name: strimzi-cluster-operator
  namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-entity-operator
  apiGroup: rbac.authorization.k8s.io

4.2. Topic Operator

4.2.1. Topic Operator

The Topic Operator provides a way of managing topics in a Kafka cluster through OpenShift resources.

Example architecture for the Topic Operator

Topic Operator

The role of the Topic Operator is to keep a set of KafkaTopic OpenShift resources describing Kafka topics in-sync with corresponding Kafka topics.

Specifically, if a KafkaTopic is:

  • Created, the Topic Operator creates the topic
  • Deleted, the Topic Operator deletes the topic
  • Changed, the Topic Operator updates the topic

Working in the other direction, if a topic is:

  • Created within the Kafka cluster, the Operator creates a KafkaTopic
  • Deleted from the Kafka cluster, the Operator deletes the KafkaTopic
  • Changed in the Kafka cluster, the Operator updates the KafkaTopic

This allows you to declare a KafkaTopic as part of your application’s deployment and the Topic Operator will take care of creating the topic for you. Your application just needs to deal with producing or consuming from the necessary topics.

If the topic is reconfigured or reassigned to different Kafka nodes, the KafkaTopic will always be up to date.

4.2.2. Identifying a Kafka cluster for topic handling

A KafkaTopic resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka resource) to which it belongs.

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster

The label is used by the Topic Operator to identify the KafkaTopic resource and create a new topic, and also in subsequent handling of the topic.

If the label does not match the Kafka cluster, the Topic Operator cannot identify the KafkaTopic and the topic is not created.

4.2.3. Understanding the Topic Operator

A fundamental problem that the operator has to solve is that there is no single source of truth: Both the KafkaTopic resource and the topic within Kafka can be modified independently of the operator. Complicating this, the Topic Operator might not always be able to observe changes at each end in real time (for example, the operator might be down).

To resolve this, the operator maintains its own private copy of the information about each topic. When a change happens either in the Kafka cluster, or in OpenShift, it looks at both the state of the other system and at its private copy in order to determine what needs to change to keep everything in sync. The same thing happens whenever the operator starts, and periodically while it is running.

For example, suppose the Topic Operator is not running, and a KafkaTopic my-topic gets created. When the operator starts it will lack a private copy of "my-topic", so it can infer that the KafkaTopic has been created since it was last running. The operator will create the topic corresponding to "my-topic" and also store a private copy of the metadata for "my-topic".

The private copy allows the operator to cope with scenarios where the topic configuration gets changed both in Kafka and in OpenShift, so long as the changes are not incompatible (for example, both changing the same topic config key, but to different values). In the case of incompatible changes, the Kafka configuration wins, and the KafkaTopic will be updated to reflect that.

The private copy is held in the same ZooKeeper ensemble used by Kafka itself. This mitigates availability concerns, because if ZooKeeper is not running then Kafka itself cannot run, so the operator will be no less available than it would even if it was stateless.

4.2.4. Deploying the Topic Operator using the Cluster Operator

This procedure describes how to deploy the Topic Operator using the Cluster Operator. If you want to use the Topic Operator with a Kafka cluster that is not managed by AMQ Streams, you must deploy the Topic Operator as a standalone component. For more information, see Section 4.2.6, “Deploying the standalone Topic Operator”.

Prerequisites

  • A running Cluster Operator
  • A Kafka resource to be created or updated

Procedure

  1. Ensure that the Kafka.spec.entityOperator object exists in the Kafka resource. This configures the Entity Operator.

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      #...
      entityOperator:
        topicOperator: {}
        userOperator: {}
  2. Configure the Topic Operator using the properties described in Section B.62, “EntityTopicOperatorSpec schema reference”.
  3. Create or update the Kafka resource in OpenShift.

    Use oc apply:

    oc apply -f your-file

Additional resources

4.2.5. Configuring the Topic Operator with resource requests and limits

You can allocate resources, such as CPU and memory, to the Topic Operator and set a limit on the amount of resources it can consume.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Update the Kafka cluster configuration in an editor, as required:

    Use oc edit:

    oc edit kafka my-cluster
  2. In the spec.entityOperator.topicOperator.resources property in the Kafka resource, set the resource requests and limits for the Topic Operator.

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # kafka and zookeeper sections...
      entityOperator:
        topicOperator:
          resources:
            request:
              cpu: "1"
              memory: 500Mi
            limit:
              cpu: "1"
              memory: 500Mi
  3. Apply the new configuration to create or update the resource.

    Use oc apply:

    oc apply -f kafka.yaml

Additional resources

4.2.6. Deploying the standalone Topic Operator

Deploying the Topic Operator as a standalone component is more complicated than installing it using the Cluster Operator, but it is more flexible. For instance, it can operate with any Kafka cluster, not necessarily one deployed by the Cluster Operator.

Prerequisites

  • An existing Kafka cluster for the Topic Operator to connect to.

Procedure

  1. Edit the install/topic-operator/05-Deployment-strimzi-topic-operator.yaml resource. You will need to change the following

    1. The STRIMZI_KAFKA_BOOTSTRAP_SERVERS environment variable in Deployment.spec.template.spec.containers[0].env should be set to a list of bootstrap brokers in your Kafka cluster, given as a comma-separated list of hostname:‍port pairs.
    2. The STRIMZI_ZOOKEEPER_CONNECT environment variable in Deployment.spec.template.spec.containers[0].env should be set to a list of the ZooKeeper nodes, given as a comma-separated list of hostname:‍port pairs. This should be the same ZooKeeper cluster that your Kafka cluster is using.
    3. The STRIMZI_NAMESPACE environment variable in Deployment.spec.template.spec.containers[0].env should be set to the OpenShift namespace in which you want the operator to watch for KafkaTopic resources.
  2. Deploy the Topic Operator.

    This can be done using oc apply:

    oc apply -f install/topic-operator
  3. Verify that the Topic Operator has been deployed successfully. This can be done using oc describe:

    oc describe deployment strimzi-topic-operator

    The Topic Operator is deployed once the Replicas: entry shows 1 available.

    Note

    This could take some time if you have a slow connection to the OpenShift and the images have not been downloaded before.

Additional resources

4.2.7. Topic Operator environment

When deployed standalone the Topic Operator can be configured using environment variables.

Note

The Topic Operator should be configured using the Kafka.spec.entityOperator.topicOperator property when deployed by the Cluster Operator.

STRIMZI_RESOURCE_LABELS
The label selector used to identify KafkaTopics to be managed by the operator.
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
The ZooKeeper session timeout, in milliseconds. For example, 10000. Default 20000 (20 seconds).
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
The list of Kafka bootstrap servers. This variable is mandatory.
STRIMZI_ZOOKEEPER_CONNECT
The ZooKeeper connection information. This variable is mandatory.
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
The interval between periodic reconciliations, in milliseconds.
STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS
The number of attempts at getting topic metadata from Kafka. The time between each attempt is defined as an exponential back-off. Consider increasing this value when topic creation could take more time due to the number of partitions or replicas. Default 6.
STRIMZI_TOPICS_PATH
The Zookeeper node path where the Topic Operator will store its metadata. Default /strimzi/topics
STRIMZI_LOG_LEVEL
The level for printing logging messages. The value can be set to: ERROR, WARNING, INFO, DEBUG, and TRACE. Default INFO.
STRIMZI_TLS_ENABLED
For enabling the TLS support so encrypting the communication with Kafka brokers. Default true.
STRIMZI_TRUSTSTORE_LOCATION
The path to the truststore containing certificates for enabling TLS based communication. This variable is mandatory only if TLS is enabled through STRIMZI_TLS_ENABLED.
STRIMZI_TRUSTSTORE_PASSWORD
The password for accessing the truststore defined by STRIMZI_TRUSTSTORE_LOCATION. This variable is mandatory only if TLS is enabled through STRIMZI_TLS_ENABLED.
STRIMZI_KEYSTORE_LOCATION
The path to the keystore containing private keys for enabling TLS based communication. This variable is mandatory only if TLS is enabled through STRIMZI_TLS_ENABLED.
STRIMZI_KEYSTORE_PASSWORD
The password for accessing the keystore defined by STRIMZI_KEYSTORE_LOCATION. This variable is mandatory only if TLS is enabled through STRIMZI_TLS_ENABLED.

4.3. User Operator

The User Operator manages Kafka users through custom resources.

4.3.1. User Operator

The User Operator manages Kafka users for a Kafka cluster by watching for KafkaUser resources that describe Kafka users, and ensuring that they are configured properly in the Kafka cluster.

For example, if a KafkaUser is:

  • Created, the User Operator creates the user it describes
  • Deleted, the User Operator deletes the user it describes
  • Changed, the User Operator updates the user it describes

Unlike the Topic Operator, the User Operator does not sync any changes from the Kafka cluster with the OpenShift resources. Kafka topics can be created by applications directly in Kafka, but it is not expected that the users will be managed directly in the Kafka cluster in parallel with the User Operator.

The User Operator allows you to declare a KafkaUser resource as part of your application’s deployment. You can specify the authentication and authorization mechanism for the user. You can also configure user quotas that control usage of Kafka resources to ensure, for example, that a user does not monopolize access to a broker.

When the user is created, the user credentials are created in a Secret. Your application needs to use the user and its credentials for authentication and to produce or consume messages.

In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the user’s access rights in the KafkaUser declaration.

4.3.2. Identifying a Kafka cluster for user handling

A KafkaUser resource includes a label that defines the appropriate name of the Kafka cluster (derived from the name of the Kafka resource) to which it belongs.

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster

The label is used by the User Operator to identify the KafkaUser resource and create a new user, and also in subsequent handling of the user.

If the label does not match the Kafka cluster, the User Operator cannot identify the kafkaUser and the user is not created.

4.3.3. Deploying the User Operator using the Cluster Operator

Prerequisites

  • A running Cluster Operator
  • A Kafka resource to be created or updated.

Procedure

  1. Edit the Kafka resource ensuring it has a Kafka.spec.entityOperator.userOperator object that configures the User Operator how you want.
  2. Create or update the Kafka resource in OpenShift.

    This can be done using oc apply:

    oc apply -f your-file

Additional resources

4.3.4. Configuring the User Operator with resource requests and limits

You can allocate resources, such as CPU and memory, to the User Operator and set a limit on the amount of resources it can consume.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Update the Kafka cluster configuration in an editor, as required:

    oc edit kafka my-cluster
  2. In the spec.entityOperator.userOperator.resources property in the Kafka resource, set the resource requests and limits for the User Operator.

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # kafka and zookeeper sections...
      entityOperator:
        userOperator:
          resources:
            request:
              cpu: "1"
              memory: 500Mi
            limit:
              cpu: "1"
              memory: 500Mi

    Save the file and exit the editor. The Cluster Operator will apply the changes automatically.

Additional resources

4.3.5. Deploying the standalone User Operator

Deploying the User Operator as a standalone component is more complicated than installing it using the Cluster Operator, but it is more flexible. For instance, it can operate with any Kafka cluster, not only the one deployed by the Cluster Operator.

Prerequisites

  • An existing Kafka cluster for the User Operator to connect to.

Procedure

  1. Edit the install/user-operator/05-Deployment-strimzi-user-operator.yaml resource. You will need to change the following

    1. The STRIMZI_CA_CERT_NAME environment variable in Deployment.spec.template.spec.containers[0].env should be set to point to an OpenShift Secret which should contain the public key of the Certificate Authority for signing new user certificates for TLS Client Authentication. The Secret should contain the public key of the Certificate Authority under the key ca.crt.
    2. The STRIMZI_CA_KEY_NAME environment variable in Deployment.spec.template.spec.containers[0].env should be set to point to an OpenShift Secret which should contain the private key of the Certificate Authority for signing new user certificates for TLS Client Authentication. The Secret should contain the private key of the Certificate Authority under the key ca.key.
    3. The STRIMZI_ZOOKEEPER_CONNECT environment variable in Deployment.spec.template.spec.containers[0].env should be set to a list of the ZooKeeper nodes, given as a comma-separated list of hostname:‍port pairs. This should be the same ZooKeeper cluster that your Kafka cluster is using.
    4. The STRIMZI_NAMESPACE environment variable in Deployment.spec.template.spec.containers[0].env should be set to the OpenShift namespace in which you want the operator to watch for KafkaUser resources.
  2. Deploy the User Operator.

    This can be done using oc apply:

    oc apply -f install/user-operator
  3. Verify that the User Operator has been deployed successfully. This can be done using oc describe:

    oc describe deployment strimzi-user-operator

    The User Operator is deployed once the Replicas: entry shows 1 available.

    Note

    This could take some time if you have a slow connection to the OpenShift and the images have not been downloaded before.

Additional resources