Chapter 13. Using AMQ Streams Operators

Use the AMQ Streams operators to manage your Kafka cluster, and Kafka topics and users.

13.1. Watching namespaces with AMQ Streams operators

Operators watch and manage AMQ Streams resources in namespaces. The Cluster Operator can watch a single namespace, multiple namespaces, or all namespaces in an OpenShift cluster. The Topic Operator and User Operator can watch a single namespace.

  • The Cluster Operator watches for Kafka resources
  • The Topic Operator watches for KafkaTopic resources
  • The User Operator watches for KafkaUser resources

The Topic Operator and the User Operator can only watch a single Kafka cluster in a namespace. And they can only be connected to a single Kafka cluster.

If multiple Topic Operators watch the same namespace, name collisions and topic deletion can occur. This is because each Kafka cluster uses Kafka topics that have the same name (such as __consumer_offsets). Make sure that only one Topic Operator watches a given namespace.

When using multiple User Operators with a single namespace, a user with a given username can exist in more than one Kafka cluster.

If you deploy the Topic Operator and User Operator using the Cluster Operator, they watch the Kafka cluster deployed by the Cluster Operator by default. You can also specify a namespace using watchedNamespace in the operator configuration.

For a standalone deployment of each operator, you specify a namespace and connection to the Kafka cluster to watch in the configuration.

13.2. Using the Cluster Operator

Use the Cluster Operator to deploy a Kafka cluster and other Kafka components.

13.2.1. Role-Based Access Control (RBAC) resources

The Cluster Operator creates and manages RBAC resources for AMQ Streams components that need access to OpenShift resources.

For the Cluster Operator to function, it needs permission within the OpenShift cluster to interact with Kafka resources, such as Kafka and KafkaConnect, as well as managed resources like ConfigMap, Pod, Deployment, StatefulSet, and Service.

Permission is specified through OpenShift role-based access control (RBAC) resources:

  • ServiceAccount
  • Role and ClusterRole
  • RoleBinding and ClusterRoleBinding

13.2.1.1. Delegating privileges to AMQ Streams components

The Cluster Operator runs under a service account called strimzi-cluster-operator. It is assigned cluster roles that give it permission to create the RBAC resources for AMQ Streams components. Role bindings associate the cluster roles with the service account.

OpenShift prevents components operating under one ServiceAccount from granting another ServiceAccount privileges that the granting ServiceAccount does not have. Because the Cluster Operator creates the RoleBinding and ClusterRoleBinding RBAC resources needed by the resources it manages, it requires a role that gives it the same privileges.

The following tables describe the RBAC resources created by the Cluster Operator.

Table 13.1. ServiceAccount resources

NameUsed by

<cluster_name>-kafka

Kafka broker pods

<cluster_name>-zookeeper

ZooKeeper pods

<cluster_name>-cluster-connect

Kafka Connect pods

<cluster_name>-mirror-maker

MirrorMaker pods

<cluster_name>-mirrormaker2

MirrorMaker 2 pods

<cluster_name>-bridge

Kafka Bridge pods

<cluster_name>-entity-operator

Entity Operator

Table 13.2. ClusterRole resources

NameUsed by

strimzi-cluster-operator-namespaced

Cluster Operator

strimzi-cluster-operator-global

Cluster Operator

strimzi-cluster-operator-leader-election

Cluster Operator

strimzi-kafka-broker

Cluster Operator, rack feature (when used)

strimzi-entity-operator

Cluster Operator, Topic Operator, User Operator

strimzi-kafka-client

Cluster Operator, Kafka clients for rack awareness

Table 13.3. ClusterRoleBinding resources

NameUsed by

strimzi-cluster-operator

Cluster Operator

strimzi-cluster-operator-kafka-broker-delegation

Cluster Operator, Kafka brokers for rack awareness

strimzi-cluster-operator-kafka-client-delegation

Cluster Operator, Kafka clients for rack awareness

Table 13.4. RoleBinding resources

NameUsed by

strimzi-cluster-operator

Cluster Operator

strimzi-cluster-operator-kafka-broker-delegation

Cluster Operator, Kafka brokers for rack awareness

13.2.1.2. Running the Cluster Operator using a ServiceAccount

The Cluster Operator is best run using a ServiceAccount:

Example ServiceAccount for the Cluster Operator

apiVersion: v1
kind: ServiceAccount
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi

The Deployment of the operator then needs to specify this in its spec.template.spec.serviceAccountName:

Partial example of Deployment for the Cluster Operator

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
spec:
  replicas: 1
  selector:
    matchLabels:
      name: strimzi-cluster-operator
      strimzi.io/kind: cluster-operator
  template:
      # ...

Note line 12, where strimzi-cluster-operator is specified as the serviceAccountName.

13.2.1.3. ClusterRole resources

The Cluster Operator uses ClusterRole resources to provide the necessary access to resources. Depending on the OpenShift cluster setup, a cluster administrator might be needed to create the cluster roles.

Note

Cluster administrator rights are only needed for the creation of ClusterRole resources. The Cluster Operator will not run under a cluster admin account.

ClusterRole resources follow the principle of least privilege and contain only those privileges needed by the Cluster Operator to operate the cluster of the Kafka component. The first set of assigned privileges allow the Cluster Operator to manage OpenShift resources such as StatefulSet, Deployment, Pod, and ConfigMap.

All cluster roles are required by the Cluster Operator in order to delegate privileges.

The Cluster Operator uses the strimzi-cluster-operator-namespaced and strimzi-cluster-operator-global cluster roles to grant permission at the namespace-scoped resources level and cluster-scoped resources level.

ClusterRole with namespaced resources for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-namespaced
  labels:
    app: strimzi
rules:
  # Resources in this role are used by the operator based on an operand being deployed in some namespace. When needed, you
  # can deploy the operator as a cluster-wide operator. But grant the rights listed in this role only on the namespaces
  # where the operands will be deployed. That way, you can limit the access the operator has to other namespaces where it
  # does not manage any clusters.
  - apiGroups:
      - "rbac.authorization.k8s.io"
    resources:
      # The cluster operator needs to access and manage rolebindings to grant Strimzi components cluster permissions
      - rolebindings
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - "rbac.authorization.k8s.io"
    resources:
      # The cluster operator needs to access and manage roles to grant the entity operator permissions
      - roles
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - ""
    resources:
      # The cluster operator needs to access and delete pods, this is to allow it to monitor pod health and coordinate rolling updates
      - pods
      # The cluster operator needs to access and manage service accounts to grant Strimzi components cluster permissions
      - serviceaccounts
      # The cluster operator needs to access and manage config maps for Strimzi components configuration
      - configmaps
      # The cluster operator needs to access and manage services and endpoints to expose Strimzi components to network traffic
      - services
      - endpoints
      # The cluster operator needs to access and manage secrets to handle credentials
      - secrets
      # The cluster operator needs to access and manage persistent volume claims to bind them to Strimzi components for persistent data
      - persistentvolumeclaims
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - "apps"
    resources:
      # The cluster operator needs to access and manage deployments to run deployment based Strimzi components
      - deployments
      - deployments/scale
      - deployments/status
      # The cluster operator needs to access and manage stateful sets to run stateful sets based Strimzi components
      - statefulsets
      # The cluster operator needs to access replica-sets to manage Strimzi components and to determine error states
      - replicasets
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - "" # legacy core events api, used by topic operator
      - "events.k8s.io" # new events api, used by cluster operator
    resources:
      # The cluster operator needs to be able to create events and delegate permissions to do so
      - events
    verbs:
      - create
  - apiGroups:
      # Kafka Connect Build on OpenShift requirement
      - build.openshift.io
    resources:
      - buildconfigs
      - buildconfigs/instantiate
      - builds
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - networking.k8s.io
    resources:
      # The cluster operator needs to access and manage network policies to lock down communication between Strimzi components
      - networkpolicies
      # The cluster operator needs to access and manage ingresses which allow external access to the services in a cluster
      - ingresses
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - route.openshift.io
    resources:
      # The cluster operator needs to access and manage routes to expose Strimzi components for external access
      - routes
      - routes/custom-host
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - image.openshift.io
    resources:
      # The cluster operator needs to verify the image stream when used for Kafka Connect image build
      - imagestreams
    verbs:
      - get
  - apiGroups:
      - policy
    resources:
      # The cluster operator needs to access and manage pod disruption budgets this limits the number of concurrent disruptions
      # that a Strimzi component experiences, allowing for higher availability
      - poddisruptionbudgets
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update

ClusterRole with cluster-scoped resources for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-global
  labels:
    app: strimzi
rules:
  - apiGroups:
      - "rbac.authorization.k8s.io"
    resources:
      # The cluster operator needs to create and manage cluster role bindings in the case of an install where a user
      # has specified they want their cluster role bindings generated
      - clusterrolebindings
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update
  - apiGroups:
      - storage.k8s.io
    resources:
      # The cluster operator requires "get" permissions to view storage class details
      # This is because only a persistent volume of a supported storage class type can be resized
      - storageclasses
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      # The cluster operator requires "list" permissions to view all nodes in a cluster
      # The listing is used to determine the node addresses when NodePort access is configured
      # These addresses are then exposed in the custom resource states
      - nodes
    verbs:
      - list

The strimzi-cluster-operator-leader-election cluster role represents the permissions needed for the leader election.

ClusterRole with leader election permissions

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-cluster-operator-leader-election
  labels:
    app: strimzi
rules:
  - apiGroups:
      - coordination.k8s.io
    resources:
      # The cluster operator needs to access and manage leases for leader election
      # The "create" verb cannot be used with "resourceNames"
      - leases
    verbs:
      - create
  - apiGroups:
      - coordination.k8s.io
    resources:
      # The cluster operator needs to access and manage leases for leader election
      - leases
    resourceNames:
      # The default RBAC files give the operator only access to the Lease resource names strimzi-cluster-operator
      # If you want to use another resource name or resource namespace, you have to configure the RBAC resources accordingly
      - strimzi-cluster-operator
    verbs:
      - get
      - list
      - watch
      - delete
      - patch
      - update

The strimzi-kafka-broker cluster role represents the access needed by the init container in Kafka pods that use rack awareness.

A role binding named strimzi-<cluster_name>-kafka-init grants the <cluster_name>-kafka service account access to nodes within a cluster using the strimzi-kafka-broker role. If the rack feature is not used and the cluster is not exposed through nodeport, no binding is created.

ClusterRole for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka broker pods

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-kafka-broker
  labels:
    app: strimzi
rules:
  - apiGroups:
      - ""
    resources:
      # The Kafka Brokers require "get" permissions to view the node they are on
      # This information is used to generate a Rack ID that is used for High Availability configurations
      - nodes
    verbs:
      - get

The strimzi-entity-operator cluster role represents the access needed by the Topic Operator and User Operator.

The Topic Operator produces OpenShift events with status information, so the <cluster_name>-entity-operator service account is bound to the strimzi-entity-operator role, which grants this access via the strimzi-entity-operator role binding.

ClusterRole for the Cluster Operator allowing it to delegate access to events to the Topic and User Operators

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-entity-operator
  labels:
    app: strimzi
rules:
  - apiGroups:
      - "kafka.strimzi.io"
    resources:
      # The entity operator runs the KafkaTopic assembly operator, which needs to access and manage KafkaTopic resources
      - kafkatopics
      - kafkatopics/status
      # The entity operator runs the KafkaUser assembly operator, which needs to access and manage KafkaUser resources
      - kafkausers
      - kafkausers/status
    verbs:
      - get
      - list
      - watch
      - create
      - patch
      - update
      - delete
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      # The entity operator needs to be able to create events
      - create
  - apiGroups:
      - ""
    resources:
      # The entity operator user-operator needs to access and manage secrets to store generated credentials
      - secrets
    verbs:
      - get
      - list
      - watch
      - create
      - delete
      - patch
      - update

The strimzi-kafka-client cluster role represents the access needed by Kafka clients that use rack awareness.

ClusterRole for the Cluster Operator allowing it to delegate access to OpenShift nodes to the Kafka client-based pods

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: strimzi-kafka-client
  labels:
    app: strimzi
rules:
  - apiGroups:
      - ""
    resources:
      # The Kafka clients (Connect, Mirror Maker, etc.) require "get" permissions to view the node they are on
      # This information is used to generate a Rack ID (client.rack option) that is used for consuming from the closest
      # replicas when enabled
      - nodes
    verbs:
      - get

13.2.1.4. ClusterRoleBinding resources

The Cluster Operator uses ClusterRoleBinding and RoleBinding resources to associate its ClusterRole with its ServiceAccount: Cluster role bindings are required by cluster roles containing cluster-scoped resources.

Example ClusterRoleBinding for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-cluster-operator-global
  apiGroup: rbac.authorization.k8s.io

Cluster role bindings are also needed for the cluster roles used in delegating privileges:

Example ClusterRoleBinding for the Cluster Operator and Kafka broker rack awareness

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator-kafka-broker-delegation
  labels:
    app: strimzi
# The Kafka broker cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Kafka brokers.
# This must be done to avoid escalating privileges which would be blocked by Kubernetes.
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-kafka-broker
  apiGroup: rbac.authorization.k8s.io

Example ClusterRoleBinding for the Cluster Operator and Kafka client rack awareness

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: strimzi-cluster-operator-kafka-client-delegation
  labels:
    app: strimzi
# The Kafka clients cluster role must be bound to the cluster operator service account so that it can delegate the
# cluster role to the Kafka clients using it for consuming from closest replica.
# This must be done to avoid escalating privileges which would be blocked by Kubernetes.
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-kafka-client
  apiGroup: rbac.authorization.k8s.io

Cluster roles containing only namespaced resources are bound using role bindings only.

Example RoleBinding for the Cluster Operator

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator
  labels:
    app: strimzi
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-cluster-operator-namespaced
  apiGroup: rbac.authorization.k8s.io

Example RoleBinding for the Cluster Operator and Kafka broker rack awareness

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: strimzi-cluster-operator-entity-operator-delegation
  labels:
    app: strimzi
# The Entity Operator cluster role must be bound to the cluster operator service account so that it can delegate the cluster role to the Entity Operator.
# This must be done to avoid escalating privileges which would be blocked by Kubernetes.
subjects:
  - kind: ServiceAccount
    name: strimzi-cluster-operator
    namespace: myproject
roleRef:
  kind: ClusterRole
  name: strimzi-entity-operator
  apiGroup: rbac.authorization.k8s.io

13.2.2. ConfigMap for Cluster Operator logging

Cluster Operator logging is configured through a ConfigMap named strimzi-cluster-operator.

A ConfigMap containing logging configuration is created when installing the Cluster Operator. This ConfigMap is described in the file install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml. You configure Cluster Operator logging by changing the data field log4j2.properties in this ConfigMap.

To update the logging configuration, you can edit the 050-ConfigMap-strimzi-cluster-operator.yaml file and then run the following command:

oc create -f install/cluster-operator/050-ConfigMap-strimzi-cluster-operator.yaml

Alternatively, edit the ConfigMap directly:

oc edit configmap strimzi-cluster-operator

To change the frequency of the reload interval, set a time in seconds in the monitorInterval option in the created ConfigMap.

If the ConfigMap is missing when the Cluster Operator is deployed, the default logging values are used.

If the ConfigMap is accidentally deleted after the Cluster Operator is deployed, the most recently loaded logging configuration is used. Create a new ConfigMap to load a new logging configuration.

Note

Do not remove the monitorInterval option from the ConfigMap.

13.2.3. Configuring the Cluster Operator with environment variables

You can configure the Cluster Operator using environment variables. The supported environment variables are listed here.

Note

The environment variables are specified for the container image of the Cluster Operator in its Deployment configuration file. (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml)

STRIMZI_NAMESPACE

A comma-separated list of namespaces that the operator operates in. When not set, set to empty string, or set to *, the Cluster Operator operates in all namespaces.

The Cluster Operator deployment might use the downward API to set this automatically to the namespace the Cluster Operator is deployed in.

Example configuration for Cluster Operator namespaces

env:
  - name: STRIMZI_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace

STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
Optional, default is 120000 ms. The interval between periodic reconciliations, in milliseconds.
STRIMZI_OPERATION_TIMEOUT_MS
Optional, default 300000 ms. The timeout for internal operations, in milliseconds. Increase this value when using AMQ Streams on clusters where regular OpenShift operations take longer than usual (because of slow downloading of Docker images, for example).
STRIMZI_ZOOKEEPER_ADMIN_SESSION_TIMEOUT_MS
Optional, default 10000 ms. The session timeout for the Cluster Operator’s ZooKeeper admin client, in milliseconds. Increase the value if ZooKeeper requests from the Cluster Operator are regularly failing due to timeout issues. There is a maximum allowed session time set on the ZooKeeper server side via the maxSessionTimeout config. By default, the maximum session timeout value is 20 times the default tickTime (whose default is 2000) at 40000 ms. If you require a higher timeout, change the maxSessionTimeout ZooKeeper server configuration value.
STRIMZI_OPERATIONS_THREAD_POOL_SIZE
Optional, default 10. The worker thread pool size, which is used for various asynchronous and blocking operations that are run by the Cluster Operator.
STRIMZI_OPERATOR_NAME
Optional, defaults to the pod’s hostname. The operator name identifies the AMQ Streams instance when emitting OpenShift events.
STRIMZI_OPERATOR_NAMESPACE

The name of the namespace where the Cluster Operator is running. Do not configure this variable manually. Use the downward API.

env:
  - name: STRIMZI_OPERATOR_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_OPERATOR_NAMESPACE_LABELS

Optional. The labels of the namespace where the AMQ Streams Cluster Operator is running. Use namespace labels to configure the namespace selector in network policies. Network policies allow the AMQ Streams Cluster Operator access only to the operands from the namespace with these labels. When not set, the namespace selector in network policies is configured to allow access to the Cluster Operator from any namespace in the OpenShift cluster.

env:
  - name: STRIMZI_OPERATOR_NAMESPACE_LABELS
    value: label1=value1,label2=value2
STRIMZI_LABELS_EXCLUSION_PATTERN

Optional, default regex pattern is ^app.kubernetes.io/(?!part-of).*. The regex exclusion pattern used to filter labels propagation from the main custom resource to its subresources. The labels exclusion filter is not applied to labels in template sections such as spec.kafka.template.pod.metadata.labels.

env:
  - name: STRIMZI_LABELS_EXCLUSION_PATTERN
    value: "^key1.*"
STRIMZI_CUSTOM_{COMPONENT_NAME}_LABELS

Optional. One or more custom labels to apply to all the pods created by the {COMPONENT_NAME} custom resource. The Cluster Operator labels the pods when the custom resource is created or is next reconciled.

Labels can be applied to the following components:

  • KAFKA
  • KAFKA_CONNECT
  • KAFKA_CONNECT_BUILD
  • ZOOKEEPER
  • ENTITY_OPERATOR
  • KAFKA_MIRROR_MAKER2
  • KAFKA_MIRROR_MAKER
  • CRUISE_CONTROL
  • KAFKA_BRIDGE
  • KAFKA_EXPORTER
STRIMZI_CUSTOM_RESOURCE_SELECTOR

Optional. The label selector to filter the custom resources handled by the Cluster Operator. The operator will operate only on those custom resources that have the specified labels set. Resources without these labels will not be seen by the operator. The label selector applies to Kafka, KafkaConnect, KafkaBridge, KafkaMirrorMaker, and KafkaMirrorMaker2 resources. KafkaRebalance and KafkaConnector resources are operated only when their corresponding Kafka and Kafka Connect clusters have the matching labels.

env:
  - name: STRIMZI_CUSTOM_RESOURCE_SELECTOR
    value: label1=value1,label2=value2
STRIMZI_KAFKA_IMAGES
Required. The mapping from the Kafka version to the corresponding Docker image containing a Kafka broker for that version. The required syntax is whitespace or comma-separated <version>=<image> pairs. For example 3.3.1=registry.redhat.io/amq-streams/kafka-33-rhel8:2.4.0, 3.4.0=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0. This is used when a Kafka.spec.kafka.version property is specified but not the Kafka.spec.kafka.image in the Kafka resource.
STRIMZI_DEFAULT_KAFKA_INIT_IMAGE
Optional, default registry.redhat.io/amq-streams/strimzi-rhel8-operator:2.4.0. The image name to use as default for the init container if no image is specified as the kafka-init-image in the Kafka resource. The init container is started before the broker for initial configuration work, such as rack support.
STRIMZI_KAFKA_CONNECT_IMAGES
Required. The mapping from the Kafka version to the corresponding Docker image of Kafka Connect for that version. The required syntax is whitespace or comma-separated <version>=<image> pairs. For example 3.3.1=registry.redhat.io/amq-streams/kafka-33-rhel8:2.4.0, 3.4.0=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0. This is used when a KafkaConnect.spec.version property is specified but not the KafkaConnect.spec.image.
STRIMZI_KAFKA_MIRROR_MAKER_IMAGES
Required. The mapping from the Kafka version to the corresponding Docker image of MirrorMaker for that version. The required syntax is whitespace or comma-separated <version>=<image> pairs. For example 3.3.1=registry.redhat.io/amq-streams/kafka-33-rhel8:2.4.0, 3.4.0=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0. This is used when a KafkaMirrorMaker.spec.version property is specified but not the KafkaMirrorMaker.spec.image.
STRIMZI_DEFAULT_TOPIC_OPERATOR_IMAGE
Optional, default registry.redhat.io/amq-streams/strimzi-rhel8-operator:2.4.0. The image name to use as the default when deploying the Topic Operator if no image is specified as the Kafka.spec.entityOperator.topicOperator.image in the Kafka resource.
STRIMZI_DEFAULT_USER_OPERATOR_IMAGE
Optional, default registry.redhat.io/amq-streams/strimzi-rhel8-operator:2.4.0. The image name to use as the default when deploying the User Operator if no image is specified as the Kafka.spec.entityOperator.userOperator.image in the Kafka resource.
STRIMZI_DEFAULT_TLS_SIDECAR_ENTITY_OPERATOR_IMAGE
Optional, default registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0. The image name to use as the default when deploying the sidecar container for the Entity Operator if no image is specified as the Kafka.spec.entityOperator.tlsSidecar.image in the Kafka resource. The sidecar provides TLS support.
STRIMZI_IMAGE_PULL_POLICY
Optional. The ImagePullPolicy that is applied to containers in all pods managed by the Cluster Operator. The valid values are Always, IfNotPresent, and Never. If not specified, the OpenShift defaults are used. Changing the policy will result in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.
STRIMZI_IMAGE_PULL_SECRETS
Optional. A comma-separated list of Secret names. The secrets referenced here contain the credentials to the container registries where the container images are pulled from. The secrets are specified in the imagePullSecrets property for all pods created by the Cluster Operator. Changing this list results in a rolling update of all your Kafka, Kafka Connect, and Kafka MirrorMaker clusters.
STRIMZI_KUBERNETES_VERSION

Optional. Overrides the OpenShift version information detected from the API server.

Example configuration for OpenShift version override

env:
  - name: STRIMZI_KUBERNETES_VERSION
    value: |
           major=1
           minor=16
           gitVersion=v1.16.2
           gitCommit=c97fe5036ef3df2967d086711e6c0c405941e14b
           gitTreeState=clean
           buildDate=2019-10-15T19:09:08Z
           goVersion=go1.12.10
           compiler=gc
           platform=linux/amd64

KUBERNETES_SERVICE_DNS_DOMAIN

Optional. Overrides the default OpenShift DNS domain name suffix.

By default, services assigned in the OpenShift cluster have a DNS domain name that uses the default suffix cluster.local.

For example, for broker kafka-0:

<cluster-name>-kafka-0.<cluster-name>-kafka-brokers.<namespace>.svc.cluster.local

The DNS domain name is added to the Kafka broker certificates used for hostname verification.

If you are using a different DNS domain name suffix in your cluster, change the KUBERNETES_SERVICE_DNS_DOMAIN environment variable from the default to the one you are using in order to establish a connection with the Kafka brokers.

STRIMZI_CONNECT_BUILD_TIMEOUT_MS
Optional, default 300000 ms. The timeout for building new Kafka Connect images with additional connectors, in milliseconds. Consider increasing this value when using AMQ Streams to build container images containing many connectors or using a slow container registry.
STRIMZI_NETWORK_POLICY_GENERATION

Optional, default true. Network policy for resources. Network policies allow connections between Kafka components.

Set this environment variable to false to disable network policy generation. You might do this, for example, if you want to use custom network policies. Custom network policies allow more control over maintaining the connections between components.

STRIMZI_DNS_CACHE_TTL
Optional, default 30. Number of seconds to cache successful name lookups in local DNS resolver. Any negative value means cache forever. Zero means do not cache, which can be useful for avoiding connection errors due to long caching policies being applied.
STRIMZI_POD_SET_RECONCILIATION_ONLY
Optional, default false. When set to true, the Cluster Operator reconciles only the StrimziPodSet resources and any changes to the other custom resources (Kafka, KafkaConnect, and so on) are ignored. This mode is useful for ensuring that your pods are recreated if needed, but no other changes happen to the clusters.
STRIMZI_FEATURE_GATES
Optional. Enables or disables the features and functionality controlled by feature gates.
STRIMZI_POD_SECURITY_PROVIDER_CLASS
Optional. Configuration for the pluggable PodSecurityProvider class, which can be used to provide the security context configuration for Pods and containers.

13.2.3.1. Leader election environment variables

Use leader election environment variables when running additional Cluster Operator replicas. You might run additional replicas to safeguard against disruption caused by major failure.

STRIMZI_LEADER_ELECTION_ENABLED
Optional, disabled (false) by default. Enables or disables leader election, which allows additional Cluster Operator replicas to run on standby.
Note

Leader election is disabled by default. It is only enabled when applying this environment variable on installation.

STRIMZI_LEADER_ELECTION_LEASE_NAME
Required when leader election is enabled. The name of the OpenShift Lease resource that is used for the leader election.
STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE

Required when leader election is enabled. The namespace where the OpenShift Lease resource used for leader election is created. You can use the downward API to configure it to the namespace where the Cluster Operator is deployed.

env:
  - name: STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE
    valueFrom:
      fieldRef:
        fieldPath: metadata.namespace
STRIMZI_LEADER_ELECTION_IDENTITY

Required when leader election is enabled. Configures the identity of a given Cluster Operator instance used during the leader election. The identity must be unique for each operator instance. You can use the downward API to configure it to the name of the pod where the Cluster Operator is deployed.

env:
  - name: STRIMZI_LEADER_ELECTION_IDENTITY
    valueFrom:
      fieldRef:
        fieldPath: metadata.name
STRIMZI_LEADER_ELECTION_LEASE_DURATION_MS
Optional, default 15000 ms. Specifies the duration the acquired lease is valid.
STRIMZI_LEADER_ELECTION_RENEW_DEADLINE_MS
Optional, default 10000 ms. Specifies the period the leader should try to maintain leadership.
STRIMZI_LEADER_ELECTION_RETRY_PERIOD_MS
Optional, default 2000 ms. Specifies the frequency of updates to the lease lock by the leader.

13.2.3.2. Restricting Cluster Operator access with network policy

Use the STRIMZI_OPERATOR_NAMESPACE_LABELS environment variable to establish network policy for the Cluster Operator using namespace labels.

The Cluster Operator can run in the same namespace as the resources it manages, or in a separate namespace. By default, the STRIMZI_OPERATOR_NAMESPACE environment variable is configured to use the downward API to find the namespace the Cluster Operator is running in. If the Cluster Operator is running in the same namespace as the resources, only local access is required and allowed by AMQ Streams.

If the Cluster Operator is running in a separate namespace to the resources it manages, any namespace in the OpenShift cluster is allowed access to the Cluster Operator unless network policy is configured. By adding namespace labels, access to the Cluster Operator is restricted to the namespaces specified.

Network policy configured for the Cluster Operator deployment

#...
env:
  # ...
  - name: STRIMZI_OPERATOR_NAMESPACE_LABELS
    value: label1=value1,label2=value2
  #...

13.2.3.3. Setting the time interval for periodic reconciliation

Use the STRIMZI_FULL_RECONCILIATION_INTERVAL_MS variable to set the time interval for periodic reconciliations.

The Cluster Operator reacts to all notifications about applicable cluster resources received from the OpenShift cluster. If the operator is not running, or if a notification is not received for any reason, resources will get out of sync with the state of the running OpenShift cluster. In order to handle failovers properly, a periodic reconciliation process is executed by the Cluster Operator so that it can compare the state of the resources with the current cluster deployments in order to have a consistent state across all of them.

Additional resources

13.2.4. Configuring the Cluster Operator with default proxy settings

If you are running a Kafka cluster behind a HTTP proxy, you can still pass data in and out of the cluster. For example, you can run Kafka Connect with connectors that push and pull data from outside the proxy. Or you can use a proxy to connect with an authorization server.

Configure the Cluster Operator deployment to specify the proxy environment variables. The Cluster Operator accepts standard proxy configuration (HTTP_PROXY, HTTPS_PROXY and NO_PROXY) as environment variables. The proxy settings are applied to all AMQ Streams containers.

The format for a proxy address is http://IP-ADDRESS:PORT-NUMBER. To set up a proxy with a name and password, the format is http://USERNAME:PASSWORD@IP-ADDRESS:PORT-NUMBER.

Prerequisites

  • You need an account with permission to create and manage CustomResourceDefinition and RBAC (ClusterRole, and RoleBinding) resources.

Procedure

  1. To add proxy environment variables to the Cluster Operator, update its Deployment configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml).

    Example proxy configuration for the Cluster Operator

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
            # ...
            env:
            # ...
            - name: "HTTP_PROXY"
              value: "http://proxy.com" 1
            - name: "HTTPS_PROXY"
              value: "https://proxy.com" 2
            - name: "NO_PROXY"
              value: "internal.com, other.domain.com" 3
      # ...

    1
    Address of the proxy server.
    2
    Secure address of the proxy server.
    3
    Addresses for servers that are accessed directly as exceptions to the proxy server. The URLs are comma-separated.

    Alternatively, edit the Deployment directly:

    oc edit deployment strimzi-cluster-operator
  2. If you updated the YAML file instead of editing the Deployment directly, apply the changes:

    oc create -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml

13.2.5. Running multiple Cluster Operator replicas with leader election

The default Cluster Operator configuration enables leader election. Use leader election to run multiple parallel replicas of the Cluster Operator. One replica is elected as the active leader and operates the deployed resources. The other replicas run in standby mode. When the leader stops or fails, one of the standby replicas is elected as the new leader and starts operating the deployed resources.

By default, AMQ Streams runs with a single Cluster Operator replica that is always the leader replica. When a single Cluster Operator replica stops or fails, OpenShift starts a new replica.

Running the Cluster Operator with multiple replicas is not essential. But it’s useful to have replicas on standby in case of large-scale disruptions. For example, suppose multiple worker nodes or an entire availability zone fails. This failure might cause the Cluster Operator pod and many Kafka pods to go down at the same time. If subsequent pod scheduling causes congestion through lack of resources, this can delay operations when running a single Cluster Operator.

13.2.5.1. Configuring Cluster Operator replicas

To run additional Cluster Operator replicas in standby mode, you will need to increase the number of replicas and enable leader election. To configure leader election, use the leader election environment variables.

To make the required changes, configure the following Cluster Operator installation files located in install/cluster-operator/:

  • 060-Deployment-strimzi-cluster-operator.yaml
  • 022-ClusterRole-strimzi-cluster-operator-role.yaml
  • 022-RoleBinding-strimzi-cluster-operator.yaml

Leader election has its own ClusterRole and RoleBinding RBAC resources that target the namespace where the Cluster Operator is running, rather than the namespace it is watching.

The default deployment configuration creates a Lease resource called strimzi-cluster-operator in the same namespace as the Cluster Operator. The Cluster Operator uses leases to manage leader election. The RBAC resources provide the permissions to use the Lease resource. If you use a different Lease name or namespace, update the ClusterRole and RoleBinding files accordingly.

Prerequisites

  • You need an account with permission to create and manage CustomResourceDefinition and RBAC (ClusterRole, and RoleBinding) resources.

Procedure

Edit the Deployment resource that is used to deploy the Cluster Operator, which is defined in the 060-Deployment-strimzi-cluster-operator.yaml file.

  1. Change the replicas property from the default (1) to a value that matches the required number of replicas.

    Increasing the number of Cluster Operator replicas

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: strimzi-cluster-operator
      labels:
        app: strimzi
    spec:
      replicas: 3

  2. Check that the leader election env properties are set.

    If they are not set, configure them.

    To enable leader election, STRIMZI_LEADER_ELECTION_ENABLED must be set to true (default).

    In this example, the name of the lease is changed to my-strimzi-cluster-operator.

    Configuring leader election environment variables for the Cluster Operator

    # ...
    spec
      containers:
        - name: strimzi-cluster-operator
          # ...
          env:
            - name: STRIMZI_LEADER_ELECTION_ENABLED
              value: "true"
            - name: STRIMZI_LEADER_ELECTION_LEASE_NAME
              value: "my-strimzi-cluster-operator"
            - name: STRIMZI_LEADER_ELECTION_LEASE_NAMESPACE
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.namespace
            - name: STRIMZI_LEADER_ELECTION_IDENTITY
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.name

    For a description of the available environment variables, see Section 13.2.3.1, “Leader election environment variables”.

    If you specified a different name or namespace for the Lease resource used in leader election, update the RBAC resources.

  3. (optional) Edit the ClusterRole resource in the 022-ClusterRole-strimzi-cluster-operator-role.yaml file.

    Update resourceNames with the name of the Lease resource.

    Updating the ClusterRole references to the lease

    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRole
    metadata:
      name: strimzi-cluster-operator-leader-election
      labels:
        app: strimzi
    rules:
      - apiGroups:
          - coordination.k8s.io
        resourceNames:
          - my-strimzi-cluster-operator
    # ...

  4. (optional) Edit the RoleBinding resource in the 022-RoleBinding-strimzi-cluster-operator.yaml file.

    Update subjects.name and subjects.namespace with the name of the Lease resource and the namespace where it was created.

    Updating the RoleBinding references to the lease

    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: strimzi-cluster-operator-leader-election
      labels:
        app: strimzi
    subjects:
      - kind: ServiceAccount
        name: my-strimzi-cluster-operator
        namespace: myproject
    # ...

  5. Deploy the Cluster Operator:

    oc create -f install/cluster-operator -n myproject
  6. Check the status of the deployment:

    oc get deployments -n myproject

    Output shows the deployment name and readiness

    NAME                      READY  UP-TO-DATE  AVAILABLE
    strimzi-cluster-operator  3/3    3           3

    READY shows the number of replicas that are ready/expected. The deployment is successful when the AVAILABLE output shows the correct number of replicas.

13.2.6. FIPS support

Federal Information Processing Standards (FIPS) are standards for computer security and interoperability. When running AMQ Streams on a FIPS-enabled OpenShift cluster, the OpenJDK used in AMQ Streams container images automatically switches to FIPS mode. From version 2.4, AMQ Streams can run on FIPS-enabled OpenShift clusters without any changes or special configuration. It uses only the FIPS-compliant security libraries from the OpenJDK.

Minimum password length

When running in the FIPS mode, SCRAM-SHA-512 passwords need to be at least 32 characters long. From AMQ Streams 2.4, the default password length in AMQ Streams User Operator is set to 32 characters as well. If you have a Kafka cluster with custom configuration that uses a password length that is less than 32 characters, you need to update your configuration. If you have any users with passwords shorter than 32 characters, you need to regenerate a password with the required length. You can do that, for example, by deleting the user secret and waiting for the User Operator to create a new password with the appropriate length.

Important

If you are using FIPS-enabled OpenShift clusters, you may experience higher memory consumption compared to regular OpenShift clusters. To avoid any issues, we suggest increasing the memory request to at least 512Mi.

13.2.6.1. Disabling FIPS mode

AMQ Streams automatically switches to FIPS mode when running on a FIPS-enabled OpenShift cluster. Disable FIPS mode by setting the FIPS_MODE environment variable to disabled in the deployment configuration for the Cluster Operator. With FIPS mode disabled, AMQ Streams automatically disables FIPS in the OpenJDK for all components. With FIPS mode disabled, AMQ Streams is not FIPS compliant. The AMQ Streams operators, as well as all operands, run in the same way as if they were running on an OpenShift cluster without FIPS enabled.

Procedure

  1. To disable the FIPS mode in the Cluster Operator, update its Deployment configuration (install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml) and add the FIPS_MODE environment variable.

    Example FIPS configuration for the Cluster Operator

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
            # ...
            env:
            # ...
            - name: "FIPS_MODE"
              value: "disabled" 1
      # ...

    1
    Disables the FIPS mode.

    Alternatively, edit the Deployment directly:

    oc edit deployment strimzi-cluster-operator
  2. If you updated the YAML file instead of editing the Deployment directly, apply the changes:

    oc apply -f install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml

13.3. Using the Topic Operator

When you create, modify or delete a topic using the KafkaTopic resource, the Topic Operator ensures those changes are reflected in the Kafka cluster.

For more information on the KafkaTopic resource, see the KafkaTopic schema reference.

Deploying the Topic Operator

You can deploy the Topic Operator using the Cluster Operator or as a standalone operator. You would use a standalone Topic Operator with a Kafka cluster that is not managed by the Cluster Operator.

For deployment instructions, see the following:

Important

To deploy the standalone Topic Operator, you need to set environment variables to connect to a Kafka cluster. These environment variables do not need to be set if you are deploying the Topic Operator using the Cluster Operator as they will be set by the Cluster Operator.

13.3.1. Kafka topic resource

The KafkaTopic resource is used to configure topics, including the number of partitions and replicas.

The full schema for KafkaTopic is described in KafkaTopic schema reference.

13.3.1.1. Identifying a Kafka cluster for topic handling

A KafkaTopic resource includes a label that specifies the name of the Kafka cluster (derived from the name of the Kafka resource) to which it belongs.

For example:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
  labels:
    strimzi.io/cluster: my-cluster

The label is used by the Topic Operator to identify the KafkaTopic resource and create a new topic, and also in subsequent handling of the topic.

If the label does not match the Kafka cluster, the Topic Operator cannot identify the KafkaTopic and the topic is not created.

13.3.1.2. Kafka topic usage recommendations

When working with topics, be consistent. Always operate on either KafkaTopic resources or topics directly in OpenShift. Avoid routinely switching between both methods for a given topic.

Use topic names that reflect the nature of the topic, and remember that names cannot be changed later.

If creating a topic in Kafka, use a name that is a valid OpenShift resource name, otherwise the Topic Operator will need to create the corresponding KafkaTopic with a name that conforms to the OpenShift rules.

Note

For information on the requirements for identifiers and names in OpenShift, refer to Object Names and IDs.

13.3.1.3. Kafka topic naming conventions

Kafka and OpenShift impose their own validation rules for the naming of topics in Kafka and KafkaTopic.metadata.name respectively. There are valid names for each which are invalid in the other.

Using the spec.topicName property, it is possible to create a valid topic in Kafka with a name that would be invalid for the Kafka topic in OpenShift.

The spec.topicName property inherits Kafka naming validation rules:

  • The name must not be longer than 249 characters.
  • Valid characters for Kafka topics are ASCII alphanumerics, ., _, and -.
  • The name cannot be . or .., though . can be used in a name, such as exampleTopic. or .exampleTopic.

spec.topicName must not be changed.

For example:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
spec:
  topicName: topicName-1 1
  # ...
1
Upper case is invalid in OpenShift.

cannot be changed to:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: topic-name-1
spec:
  topicName: name-2
  # ...
Note

Some Kafka client applications, such as Kafka Streams, can create topics in Kafka programmatically. If those topics have names that are invalid OpenShift resource names, the Topic Operator gives them a valid metadata.name based on the Kafka name. Invalid characters are replaced and a hash is appended to the name. For example:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: mytopic---c55e57fe2546a33f9e603caf57165db4072e827e
spec:
  topicName: myTopic
  # ...

13.3.2. Topic Operator topic store

The Topic Operator uses Kafka to store topic metadata describing topic configuration as key-value pairs. The topic store is based on the Kafka Streams key-value mechanism, which uses Kafka topics to persist the state.

Topic metadata is cached in-memory and accessed locally within the Topic Operator. Updates from operations applied to the local in-memory cache are persisted to a backup topic store on disk. The topic store is continually synchronized with updates from Kafka topics or OpenShift KafkaTopic custom resources. Operations are handled rapidly with the topic store set up this way, but should the in-memory cache crash it is automatically repopulated from the persistent storage.

13.3.2.1. Internal topic store topics

Internal topics support the handling of topic metadata in the topic store.

__strimzi_store_topic
Input topic for storing the topic metadata
__strimzi-topic-operator-kstreams-topic-store-changelog
Retains a log of compacted topic store values
Warning

Do not delete these topics, as they are essential to the running of the Topic Operator.

13.3.2.2. Migrating topic metadata from ZooKeeper

In previous releases of AMQ Streams, topic metadata was stored in ZooKeeper. The new process removes this requirement, bringing the metadata into the Kafka cluster, and under the control of the Topic Operator.

When upgrading to AMQ Streams 2.4, the transition to Topic Operator control of the topic store is seamless. Metadata is found and migrated from ZooKeeper, and the old store is deleted.

13.3.2.3. Downgrading to an AMQ Streams version that uses ZooKeeper to store topic metadata

If you are reverting back to a version of AMQ Streams earlier than 1.7, which uses ZooKeeper for the storage of topic metadata, you still downgrade your Cluster Operator to the previous version, then downgrade Kafka brokers and client applications to the previous Kafka version as standard.

However, you must also delete the topics that were created for the topic store using a kafka-admin command, specifying the bootstrap address of the Kafka cluster. For example:

oc run kafka-admin -ti --image=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete

The command must correspond to the type of listener and authentication used to access the Kafka cluster.

The Topic Operator will reconstruct the ZooKeeper topic metadata from the state of the topics in Kafka.

13.3.2.4. Topic Operator topic replication and scaling

The recommended configuration for topics managed by the Topic Operator is a topic replication factor of 3, and a minimum of 2 in-sync replicas.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10 1
  replicas: 3 2
  config:
    min.insync.replicas: 2 3
  #...
1
The number of partitions for the topic.
2
The number of replica topic partitions. Currently, this cannot be changed in the KafkaTopic resource, but it can be changed using the kafka-reassign-partitions.sh tool.
3
The minimum number of replica partitions that a message must be successfully written to, or an exception is raised.
Note

In-sync replicas are used in conjunction with the acks configuration for producer applications. The acks configuration determines the number of follower partitions a message must be replicated to before the message is acknowledged as successfully received. The Topic Operator runs with acks=all, whereby messages must be acknowledged by all in-sync replicas.

When scaling Kafka clusters by adding or removing brokers, replication factor configuration is not changed and replicas are not reassigned automatically. However, you can use the kafka-reassign-partitions.sh tool to change the replication factor, and manually reassign replicas to brokers.

Alternatively, though the integration of Cruise Control for AMQ Streams cannot change the replication factor for topics, the optimization proposals it generates for rebalancing Kafka include commands that transfer partition replicas and change partition leadership.

13.3.2.5. Handling changes to topics

A fundamental problem that the Topic Operator needs to solve is that there is no single source of truth: both the KafkaTopic resource and the Kafka topic can be modified independently of the Topic Operator. Complicating this, the Topic Operator might not always be able to observe changes at each end in real time. For example, when the Topic Operator is down.

To resolve this, the Topic Operator maintains information about each topic in the topic store. When a change happens in the Kafka cluster or OpenShift, it looks at both the state of the other system and the topic store in order to determine what needs to change to keep everything in sync. The same thing happens whenever the Topic Operator starts, and periodically while it is running.

For example, suppose the Topic Operator is not running, and a KafkaTopic called my-topic is created. When the Topic Operator starts, the topic store does not contain information on my-topic, so it can infer that the KafkaTopic was created after it was last running. The Topic Operator creates the topic corresponding to my-topic, and also stores metadata for my-topic in the topic store.

If you update Kafka topic configuration or apply a change through the KafkaTopic custom resource, the topic store is updated after the Kafka cluster is reconciled.

The topic store also allows the Topic Operator to manage scenarios where the topic configuration is changed in Kafka topics and updated through OpenShift KafkaTopic custom resources, as long as the changes are not incompatible. For example, it is possible to make changes to the same topic config key, but to different values. For incompatible changes, the Kafka configuration takes priority, and the KafkaTopic is updated accordingly.

Note

You can also use the KafkaTopic resource to delete topics using a oc delete -f KAFKA-TOPIC-CONFIG-FILE command. To be able to do this, delete.topic.enable must be set to true (default) in the spec.kafka.config of the Kafka resource.

13.3.3. Configuring Kafka topics

Use the properties of the KafkaTopic resource to configure Kafka topics.

You can use oc apply to create or modify topics, and oc delete to delete existing topics.

For example:

  • oc apply -f <topic_config_file>
  • oc delete KafkaTopic <topic_name>

This procedure shows how to create a topic with 10 partitions and 2 replicas.

Before you start

It is important that you consider the following before making your changes:

  • Kafka does not support decreasing the number of partitions.
  • Increasing spec.partitions for topics with keys will change how records are partitioned, which can be particularly problematic when the topic uses semantic partitioning.
  • AMQ Streams does not support making the following changes through the KafkaTopic resource:

    • Using spec.replicas to change the number of replicas that were initially specified
    • Changing topic names using spec.topicName

Prerequisites

  • A running Kafka cluster configured with a Kafka broker listener using mTLS authentication and TLS encryption.
  • A running Topic Operator (typically deployed with the Entity Operator).
  • For deleting a topic, delete.topic.enable=true (default) in the spec.kafka.config of the Kafka resource.

Procedure

  1. Configure the KafkaTopic resource.

    Example Kafka topic configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: orders
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 10
      replicas: 2

    Tip

    When modifying a topic, you can get the current version of the resource using oc get kafkatopic orders -o yaml.

  2. Create the KafkaTopic resource in OpenShift.

    oc apply -f <topic_config_file>
  3. Wait for the ready status of the topic to change to True:

    oc get kafkatopics -o wide -w -n <namespace>

    Kafka topic status

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-1   my-cluster  10          3                  True
    my-topic-2   my-cluster  10          3
    my-topic-3   my-cluster  10          3                  True

    Topic creation is successful when the READY output shows True.

  4. If the READY column stays blank, get more details on the status from the resource YAML or from the Topic Operator logs.

    Messages provide details on the reason for the current status.

    oc get kafkatopics my-topic-2 -o yaml

    Details on a topic with a NotReady status

    # ...
    status:
      conditions:
      - lastTransitionTime: "2022-06-13T10:14:43.351550Z"
        message: Number of partitions cannot be decreased
        reason: PartitionDecreaseException
        status: "True"
        type: NotReady

    In this example, the reason the topic is not ready is because the original number of partitions was reduced in the KafkaTopic configuration. Kafka does not support this.

    After resetting the topic configuration, the status shows the topic is ready.

    oc get kafkatopics my-topic-2 -o wide -w -n <namespace>

    Status update of the topic

    NAME         CLUSTER     PARTITIONS  REPLICATION FACTOR READY
    my-topic-2   my-cluster  10          3                  True

    Fetching the details shows no messages

    oc get kafkatopics my-topic-2 -o yaml

    Details on a topic with a READY status

    # ...
    status:
      conditions:
      - lastTransitionTime: '2022-06-13T10:15:03.761084Z'
        status: 'True'
        type: Ready

13.3.4. Configuring the Topic Operator with resource requests and limits

You can allocate resources, such as CPU and memory, to the Topic Operator and set a limit on the amount of resources it can consume.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Update the Kafka cluster configuration in an editor, as required:

    oc edit kafka MY-CLUSTER
  2. In the spec.entityOperator.topicOperator.resources property in the Kafka resource, set the resource requests and limits for the Topic Operator.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # Kafka and ZooKeeper sections...
      entityOperator:
        topicOperator:
          resources:
            requests:
              cpu: "1"
              memory: 500Mi
            limits:
              cpu: "1"
              memory: 500Mi
  3. Apply the new configuration to create or update the resource.

    oc apply -f <kafka_configuration_file>

13.4. Using the User Operator

When you create, modify or delete a user using the KafkaUser resource, the User Operator ensures those changes are reflected in the Kafka cluster.

For more information on the KafkaUser resource, see the KafkaUser schema reference.

Deploying the User Operator

You can deploy the User Operator using the Cluster Operator or as a standalone operator. You would use a standalone User Operator with a Kafka cluster that is not managed by the Cluster Operator.

For deployment instructions, see the following:

Important

To deploy the standalone User Operator, you need to set environment variables to connect to a Kafka cluster. These environment variables do not need to be set if you are deploying the User Operator using the Cluster Operator as they will be set by the Cluster Operator.

13.4.1. Configuring Kafka users

Use the properties of the KafkaUser resource to configure Kafka users.

You can use oc apply to create or modify users, and oc delete to delete existing users.

For example:

  • oc apply -f <user_config_file>
  • oc delete KafkaUser <user_name>

Users represent Kafka clients. When you configure Kafka users, you enable the user authentication and authorization mechanisms required by clients to access Kafka. The mechanism used must match the equivalent Kafka configuration. For more information on using Kafka and KafkaUser resources to secure access to Kafka brokers, see Securing access to Kafka brokers.

Prerequisites

  • A running Kafka cluster configured with a Kafka broker listener using mTLS authentication and TLS encryption.
  • A running User Operator (typically deployed with the Entity Operator).

Procedure

  1. Configure the KafkaUser resource.

    This example specifies mTLS authentication and simple authorization using ACLs.

    Example Kafka user configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # Example consumer Acls for topic my-topic using consumer group my-group
          - resource:
              type: topic
              name: my-topic
              patternType: literal
            operations:
              - Describe
              - Read
            host: "*"
          - resource:
              type: group
              name: my-group
              patternType: literal
            operations:
              - Read
            host: "*"
          # Example Producer Acls for topic my-topic
          - resource:
              type: topic
              name: my-topic
              patternType: literal
            operations:
              - Create
              - Describe
              - Write
            host: "*"

  2. Create the KafkaUser resource in OpenShift.

    oc apply -f <user_config_file>
  3. Wait for the ready status of the user to change to True:

    oc get kafkausers -o wide -w -n <namespace>

    Kafka user status

    NAME       CLUSTER     AUTHENTICATION  AUTHORIZATION READY
    my-user-1  my-cluster  tls             simple        True
    my-user-2  my-cluster  tls             simple
    my-user-3  my-cluster  tls             simple        True

    User creation is successful when the READY output shows True.

  4. If the READY column stays blank, get more details on the status from the resource YAML or User Operator logs.

    Messages provide details on the reason for the current status.

    oc get kafkausers my-user-2 -o yaml

    Details on a user with a NotReady status

    # ...
    status:
      conditions:
      - lastTransitionTime: "2022-06-10T10:07:37.238065Z"
        message: Simple authorization ACL rules are configured but not supported in the
          Kafka cluster configuration.
        reason: InvalidResourceException
        status: "True"
        type: NotReady

    In this example, the reason the user is not ready is because simple authorization is not enabled in the Kafka configuration.

    Kafka configuration for simple authorization

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        kafka:
          # ...
          authorization:
            type: simple

    After updating the Kafka configuration, the status shows the user is ready.

    oc get kafkausers my-user-2 -o wide -w -n <namespace>

    Status update of the user

    NAME       CLUSTER     AUTHENTICATION  AUTHORIZATION READY
    my-user-2  my-cluster  tls             simple        True

    Fetching the details shows no messages.

    oc get kafkausers my-user-2 -o yaml

    Details on a user with a READY status

    # ...
    status:
      conditions:
      - lastTransitionTime: "2022-06-10T10:33:40.166846Z"
        status: "True"
        type: Ready

13.4.2. Configuring the User Operator with resource requests and limits

You can allocate resources, such as CPU and memory, to the User Operator and set a limit on the amount of resources it can consume.

Prerequisites

  • The Cluster Operator is running.

Procedure

  1. Update the Kafka cluster configuration in an editor, as required:

    oc edit kafka MY-CLUSTER
  2. In the spec.entityOperator.userOperator.resources property in the Kafka resource, set the resource requests and limits for the User Operator.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # Kafka and ZooKeeper sections...
      entityOperator:
        userOperator:
          resources:
            requests:
              cpu: "1"
              memory: 500Mi
            limits:
              cpu: "1"
              memory: 500Mi

    Save the file and exit the editor. The Cluster Operator applies the changes automatically.

13.5. Configuring feature gates

AMQ Streams operators support feature gates to enable or disable certain features and functionality. Enabling a feature gate changes the behavior of the relevant operator and introduces the feature to your AMQ Streams deployment.

Feature gates have a default state of either enabled or disabled.

To modify a feature gate’s default state, use the STRIMZI_FEATURE_GATES environment variable in the operator’s configuration. You can modify multiple feature gates using this single environment variable. Specify a comma-separated list of feature gate names and prefixes. A + prefix enables the feature gate and a - prefix disables it.

Example feature gate configuration that enables FeatureGate1 and disables FeatureGate2

env:
  - name: STRIMZI_FEATURE_GATES
    value: +FeatureGate1,-FeatureGate2

13.5.1. ControlPlaneListener feature gate

The ControlPlaneListener feature gate has moved to GA, which means it is now permanently enabled and cannot be disabled. With ControlPlaneListener enabled, the connections between the Kafka controller and brokers use an internal control plane listener on port 9090. Replication of data between brokers, as well as internal connections from AMQ Streams operators, Cruise Control, or the Kafka Exporter use the replication listener on port 9091.

Important

With the ControlPlaneListener feature gate permanently enabled, it is no longer possible to upgrade or downgrade directly between AMQ Streams 1.7 and earlier and AMQ Streams 2.3 and newer. You have to first upgrade or downgrade through one of the AMQ Streams versions in-between, disable the ControlPlaneListener feature gate, and then downgrade or upgrade (with the feature gate enabled) to the target version.

13.5.2. ServiceAccountPatching feature gate

The ServiceAccountPatching feature gate has moved to GA, which means it is now permanently enabled and cannot be disabled. With ServiceAccountPatching enabled, the Cluster Operator always reconciles service accounts and updates them when needed. For example, when you change service account labels or annotations using the template property of a custom resource, the operator automatically updates them on the existing service account resources.

13.5.3. UseStrimziPodSets feature gate

The UseStrimziPodSets feature gate has a default state of enabled.

The UseStrimziPodSets feature gate introduces a resource for managing pods called StrimziPodSet. When the feature gate is enabled, this resource is used instead of the StatefulSets. AMQ Streams handles the creation and management of pods instead of OpenShift. Using StrimziPodSets instead of StatefulSets provides more control over the functionality.

When this feature gate is disabled, AMQ Streams relies on StatefulSets to create and manage pods for the ZooKeeper and Kafka clusters. AMQ Streams creates the StatefulSet and OpenShift creates the pods according to the StatefulSet definition. When a pod is deleted, OpenShift is responsible for recreating it. The use of StatefulSets has the following limitations:

  • Pods are always created or removed based on their index numbers
  • All pods in the StatefulSet need to have a similar configuration
  • Changing storage configuration for the Pods in the StatefulSet is complicated

Disabling the UseStrimziPodSets feature gate

To disable the UseStrimziPodSets feature gate, specify -UseStrimziPodSets in the STRIMZI_FEATURE_GATES environment variable in the Cluster Operator configuration.

Important

The UseStrimziPodSets feature gate must be disabled when downgrading to AMQ Streams 2.0 and earlier versions.

13.5.4. (Preview) UseKRaft feature gate

The UseKRaft feature gate has a default state of disabled.

The UseKRaft feature gate deploys the Kafka cluster in the KRaft (Kafka Raft metadata) mode without ZooKeeper. This feature gate is currently intended only for development and testing.

Important

The KRaft mode is not ready for production in Apache Kafka or in AMQ Streams.

When the UseKRaft feature gate is enabled, the Kafka cluster is deployed without ZooKeeper. The .spec.zookeeper properties in the Kafka custom resource will be ignored, but still need to be present. The UseKRaft feature gate provides an API that configures Kafka cluster nodes and their roles. The API is still in development and is expected to change before the KRaft mode is production-ready.

Currently, the KRaft mode in AMQ Streams has the following major limitations:

  • Moving from Kafka clusters with ZooKeeper to KRaft clusters or the other way around is not supported.
  • Upgrades and downgrades of Apache Kafka versions or the AMQ Streams operator are not supported. Users might need to delete the cluster, upgrade the operator and deploy a new Kafka cluster.
  • The Topic Operator is not supported. The spec.entityOperator.topicOperator property must be removed from the Kafka custom resource.
  • SCRAM-SHA-512 authentication is not supported.
  • JBOD storage is not supported. The type: jbod storage can be used, but the JBOD array can contain only one disk.
  • All Kafka nodes have both the controller and broker KRaft roles. Kafka clusters with separate controller and broker nodes are not supported.

Enabling the UseKRaft feature gate

To enable the UseKRaft feature gate, specify +UseKRaft in the STRIMZI_FEATURE_GATES environment variable in the Cluster Operator configuration.

Important

The UseKRaft feature gate depends on the UseStrimziPodSets feature gate. When enabling the UseKRaft feature gate, make sure that the UseStrimziPodSets feature gate is enabled as well.

13.5.5. (Preview) StableConnectIdentities feature gate

The StableConnectIdentities feature gate has a default state of disabled.

The StableConnectIdentities feature gate uses StrimziPodSet resources to manage Kafka Connect and Kafka MirrorMaker 2 pods instead of using OpenShift Deployment resources. StrimziPodSets give the pods stable names and stable addresses, which do not change during rolling upgrades. This helps to minimize the number of rebalances of connector tasks.

Enabling the StableConnectIdentities feature gate

To enable the StableConnectIdentities feature gate, specify +StableConnectIdentities in the STRIMZI_FEATURE_GATES environment variable in the Cluster Operator configuration.

Important

The StableConnectIdentities feature gate must be disabled when downgrading to AMQ Streams 2.3 and earlier versions.

13.5.6. Feature gate releases

Feature gates have three stages of maturity:

  • Alpha — typically disabled by default
  • Beta — typically enabled by default
  • General Availability (GA) — typically always enabled

Alpha stage features might be experimental or unstable, subject to change, or not sufficiently tested for production use. Beta stage features are well tested and their functionality is not likely to change. GA stage features are stable and should not change in the future. Alpha and beta stage features are removed if they do not prove to be useful.

  • The ControlPlaneListener feature gate moved to GA stage in AMQ Streams 2.3. It is now permanently enabled and cannot be disabled.
  • The ServiceAccountPatching feature gate moved to GA stage in AMQ Streams 2.3. It is now permanently enabled and cannot be disabled.
  • The UseStrimziPodSets feature gate moved to beta stage in AMQ Streams 2.3. It moves to GA in a future release of AMQ Streams when the support for StatefulSets is completely removed.
  • The UseKRaft feature gate is available for development only and does not currently have a planned release for moving to the beta phase.
  • The StableConnectIdentities feature gate is in alpha stage and is disabled by default.
Note

Feature gates might be removed when they reach GA. This means that the feature was incorporated into the AMQ Streams core features and can no longer be disabled.

Table 13.5. Feature gates and the AMQ Streams versions when they moved to alpha, beta, or GA

Feature gateAlphaBetaGA

ControlPlaneListener

1.8

2.0

2.3

ServiceAccountPatching

1.8

2.0

2.3

UseStrimziPodSets

2.1

2.3

future release (planned)

UseKRaft

2.2

-

-

StableConnectIdentities

2.4

future release (planned)

-

If a feature gate is enabled, you may need to disable it before upgrading or downgrading from a specific AMQ Streams version. The following table shows which feature gates you need to disable when upgrading or downgrading AMQ Streams versions.

Table 13.6. Feature gates to disable when upgrading or downgrading AMQ Streams

Disable Feature gateUpgrading from AMQ Streams versionDowngrading to AMQ Streams version

ControlPlaneListener

1.7 and earlier

1.7 and earlier

UseStrimziPodSets

-

2.0 and earlier

StableConnectIdentities

-

2.3 and earlier

13.6. Monitoring operators using Prometheus metrics

AMQ Streams operators expose Prometheus metrics. The metrics are automatically enabled and contain information about the following:

  • Number of reconciliations
  • Number of Custom Resources the operator is processing
  • Duration of reconciliations
  • JVM metrics from the operators

Additionally, AMQ Streams provides an example Grafana dashboard for the operator.