Chapter 20. Managing AMQ Streams

Managing AMQ Streams requires performing various tasks to keep the Kafka clusters and associated resources running smoothly. Use oc commands to check the status of resources, configure maintenance windows for rolling updates, and leverage tools such as the AMQ Streams Drain Cleaner and Kafka Static Quota plugin to manage your deployment effectively.

20.1. Working with custom resources

You can use oc commands to retrieve information and perform other operations on AMQ Streams custom resources.

Using oc with the status subresource of a custom resource allows you to get the information about the resource.

20.1.1. Performing oc operations on custom resources

Use oc commands, such as get, describe, edit, or delete, to perform operations on resource types. For example, oc get kafkatopics retrieves a list of all Kafka topics and oc get kafkas retrieves all deployed Kafka clusters.

When referencing resource types, you can use both singular and plural names: oc get kafkas gets the same results as oc get kafka.

You can also use the short name of the resource. Learning short names can save you time when managing AMQ Streams. The short name for Kafka is k, so you can also run oc get k to list all Kafka clusters.

oc get k

NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
my-cluster   3                        3

Table 20.1. Long and short names for each AMQ Streams resource

AMQ Streams resourceLong nameShort name

Kafka

kafka

k

Kafka Topic

kafkatopic

kt

Kafka User

kafkauser

ku

Kafka Connect

kafkaconnect

kc

Kafka Connector

kafkaconnector

kctr

Kafka Mirror Maker

kafkamirrormaker

kmm

Kafka Mirror Maker 2

kafkamirrormaker2

kmm2

Kafka Bridge

kafkabridge

kb

Kafka Rebalance

kafkarebalance

kr

20.1.1.1. Resource categories

Categories of custom resources can also be used in oc commands.

All AMQ Streams custom resources belong to the category strimzi, so you can use strimzi to get all the AMQ Streams resources with one command.

For example, running oc get strimzi lists all AMQ Streams custom resources in a given namespace.

oc get strimzi

NAME                                   DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS
kafka.kafka.strimzi.io/my-cluster      3                      3

NAME                                   PARTITIONS REPLICATION FACTOR
kafkatopic.kafka.strimzi.io/kafka-apps 3          3

NAME                                   AUTHENTICATION AUTHORIZATION
kafkauser.kafka.strimzi.io/my-user     tls            simple

The oc get strimzi -o name command returns all resource types and resource names. The -o name option fetches the output in the type/name format

oc get strimzi -o name

kafka.kafka.strimzi.io/my-cluster
kafkatopic.kafka.strimzi.io/kafka-apps
kafkauser.kafka.strimzi.io/my-user

You can combine this strimzi command with other commands. For example, you can pass it into a oc delete command to delete all resources in a single command.

oc delete $(oc get strimzi -o name)

kafka.kafka.strimzi.io "my-cluster" deleted
kafkatopic.kafka.strimzi.io "kafka-apps" deleted
kafkauser.kafka.strimzi.io "my-user" deleted

Deleting all resources in a single operation might be useful, for example, when you are testing new AMQ Streams features.

20.1.1.2. Querying the status of sub-resources

There are other values you can pass to the -o option. For example, by using -o yaml you get the output in YAML format. Using -o json will return it as JSON.

You can see all the options in oc get --help.

One of the most useful options is the JSONPath support, which allows you to pass JSONPath expressions to query the Kubernetes API. A JSONPath expression can extract or navigate specific parts of any resource.

For example, you can use the JSONPath expression {.status.listeners[?(@.name=="tls")].bootstrapServers} to get the bootstrap address from the status of the Kafka custom resource and use it in your Kafka clients.

Here, the command finds the bootstrapServers value of the listener named tls:

oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="tls")].bootstrapServers}{"\n"}'

my-cluster-kafka-bootstrap.myproject.svc:9093

By changing the name condition you can also get the address of the other Kafka listeners.

You can use jsonpath to extract any other property or group of properties from any custom resource.

20.1.2. AMQ Streams custom resource status information

Status properties provide status information for certain custom resources.

The following table lists the custom resources that provide status information (when deployed) and the schemas that define the status properties.

For more information on the schemas, see the Custom resource API reference.

Table 20.2. Custom resources that provide status information

AMQ Streams resourceSchema referencePublishes status information on…​

Kafka

KafkaStatus schema reference

The Kafka cluster

KafkaTopic

KafkaTopicStatus schema reference

Kafka topics in the Kafka cluster

KafkaUser

KafkaUserStatus schema reference

Kafka users in the Kafka cluster

KafkaConnect

KafkaConnectStatus schema reference

The Kafka Connect cluster

KafkaConnector

KafkaConnectorStatus schema reference

KafkaConnector resources

KafkaMirrorMaker2

KafkaMirrorMaker2Status schema reference

The Kafka MirrorMaker 2 cluster

KafkaMirrorMaker

KafkaMirrorMakerStatus schema reference

The Kafka MirrorMaker cluster

KafkaBridge

KafkaBridgeStatus schema reference

The AMQ Streams Kafka Bridge

KafkaRebalance

KafkaRebalance schema reference

The status and results of a rebalance

The status property of a resource provides information on the state of the resource. The status.conditions and status.observedGeneration properties are common to all resources.

status.conditions
Status conditions describe the current state of a resource. Status condition properties are useful for tracking progress related to the resource achieving its desired state, as defined by the configuration specified in its spec. Status condition properties provide the time and reason the state of the resource changed, and details of events preventing or delaying the operator from realizing the desired state.
status.observedGeneration
Last observed generation denotes the latest reconciliation of the resource by the Cluster Operator. If the value of observedGeneration is different from the value of metadata.generation ((the current version of the deployment), the operator has not yet processed the latest update to the resource. If these values are the same, the status information reflects the most recent changes to the resource.

The status properties also provide resource-specific information. For example, KafkaStatus provides information on listener addresses, and the ID of the Kafka cluster.

AMQ Streams creates and maintains the status of custom resources, periodically evaluating the current state of the custom resource and updating its status accordingly. When performing an update on a custom resource using oc edit, for example, its status is not editable. Moreover, changing the status would not affect the configuration of the Kafka cluster.

Here we see the status properties for a Kafka custom resource.

Kafka custom resource status

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
spec:
  # ...
status:
  clusterId: XP9FP2P-RByvEy0W4cOEUA 1
  conditions: 2
    - lastTransitionTime: '2023-01-20T17:56:29.396588Z'
      status: 'True'
      type: Ready 3
  listeners: 4
    - addresses:
        - host: my-cluster-kafka-bootstrap.prm-project.svc
          port: 9092
      bootstrapServers: 'my-cluster-kafka-bootstrap.prm-project.svc:9092'
      name: plain
      type: plain
    - addresses:
        - host: my-cluster-kafka-bootstrap.prm-project.svc
          port: 9093
      bootstrapServers: 'my-cluster-kafka-bootstrap.prm-project.svc:9093'
      certificates:
        - |
          -----BEGIN CERTIFICATE-----

          -----END CERTIFICATE-----
      name: tls
      type: tls
    - addresses:
        - host: >-
            2054284155.us-east-2.elb.amazonaws.com
          port: 9095
      bootstrapServers: >-
        2054284155.us-east-2.elb.amazonaws.com:9095
      certificates:
        - |
          -----BEGIN CERTIFICATE-----

          -----END CERTIFICATE-----
      name: external2
      type: external2
    - addresses:
        - host: ip-10-0-172-202.us-east-2.compute.internal
          port: 31644
      bootstrapServers: 'ip-10-0-172-202.us-east-2.compute.internal:31644'
      certificates:
        - |
          -----BEGIN CERTIFICATE-----

          -----END CERTIFICATE-----
      name: external1
      type: external1
  observedGeneration: 3 5

1
The Kafka cluster ID.
2
Status conditions describe the current state of the Kafka cluster.
3
The Ready condition indicates that the Cluster Operator considers the Kafka cluster able to handle traffic.
4
The listeners describe Kafka bootstrap addresses by type.
5
The observedGeneration value indicates the last reconciliation of the Kafka custom resource by the Cluster Operator.
Note

The Kafka bootstrap addresses listed in the status do not signify that those endpoints or the Kafka cluster is in a Ready state.

Accessing status information

You can access status information for a resource from the command line. For more information, see Section 20.1.3, “Finding the status of a custom resource”.

20.1.3. Finding the status of a custom resource

This procedure describes how to find the status of a custom resource.

Prerequisites

  • An OpenShift cluster.
  • The Cluster Operator is running.

Procedure

  • Specify the custom resource and use the -o jsonpath option to apply a standard JSONPath expression to select the status property:

    oc get kafka <kafka_resource_name> -o jsonpath='{.status}'

    This expression returns all the status information for the specified custom resource. You can use dot notation, such as status.listeners or status.observedGeneration, to fine-tune the status information you wish to see.

Additional resources

20.2. Pausing reconciliation of custom resources

Sometimes it is useful to pause the reconciliation of custom resources managed by AMQ Streams Operators, so that you can perform fixes or make updates. If reconciliations are paused, any changes made to custom resources are ignored by the Operators until the pause ends.

If you want to pause reconciliation of a custom resource, set the strimzi.io/pause-reconciliation annotation to true in its configuration. This instructs the appropriate Operator to pause reconciliation of the custom resource. For example, you can apply the annotation to the KafkaConnect resource so that reconciliation by the Cluster Operator is paused.

You can also create a custom resource with the pause annotation enabled. The custom resource is created, but it is ignored.

Prerequisites

  • The AMQ Streams Operator that manages the custom resource is running.

Procedure

  1. Annotate the custom resource in OpenShift, setting pause-reconciliation to true:

    oc annotate <kind_of_custom_resource> <name_of_custom_resource> strimzi.io/pause-reconciliation="true"

    For example, for the KafkaConnect custom resource:

    oc annotate KafkaConnect my-connect strimzi.io/pause-reconciliation="true"
  2. Check that the status conditions of the custom resource show a change to ReconciliationPaused:

    oc describe <kind_of_custom_resource> <name_of_custom_resource>

    The type condition changes to ReconciliationPaused at the lastTransitionTime.

    Example custom resource with a paused reconciliation condition type

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      annotations:
        strimzi.io/pause-reconciliation: "true"
        strimzi.io/use-connector-resources: "true"
      creationTimestamp: 2021-03-12T10:47:11Z
      #...
    spec:
      # ...
    status:
      conditions:
      - lastTransitionTime: 2021-03-12T10:47:41.689249Z
        status: "True"
        type: ReconciliationPaused

Resuming from pause

  • To resume reconciliation, you can set the annotation to false, or remove the annotation.

20.3. Maintenance time windows for rolling updates

Maintenance time windows allow you to schedule certain rolling updates of your Kafka and ZooKeeper clusters to start at a convenient time.

20.3.1. Maintenance time windows overview

In most cases, the Cluster Operator only updates your Kafka or ZooKeeper clusters in response to changes to the corresponding Kafka resource. This enables you to plan when to apply changes to a Kafka resource to minimize the impact on Kafka client applications.

However, some updates to your Kafka and ZooKeeper clusters can happen without any corresponding change to the Kafka resource. For example, the Cluster Operator will need to perform a rolling restart if a CA (certificate authority) certificate that it manages is close to expiry.

While a rolling restart of the pods should not affect availability of the service (assuming correct broker and topic configurations), it could affect performance of the Kafka client applications. Maintenance time windows allow you to schedule such spontaneous rolling updates of your Kafka and ZooKeeper clusters to start at a convenient time. If maintenance time windows are not configured for a cluster then it is possible that such spontaneous rolling updates will happen at an inconvenient time, such as during a predictable period of high load.

20.3.2. Maintenance time window definition

You configure maintenance time windows by entering an array of strings in the Kafka.spec.maintenanceTimeWindows property. Each string is a cron expression interpreted as being in UTC (Coordinated Universal Time, which for practical purposes is the same as Greenwich Mean Time).

The following example configures a single maintenance time window that starts at midnight and ends at 01:59am (UTC), on Sundays, Mondays, Tuesdays, Wednesdays, and Thursdays:

# ...
maintenanceTimeWindows:
  - "* * 0-1 ? * SUN,MON,TUE,WED,THU *"
# ...

In practice, maintenance windows should be set in conjunction with the Kafka.spec.clusterCa.renewalDays and Kafka.spec.clientsCa.renewalDays properties of the Kafka resource, to ensure that the necessary CA certificate renewal can be completed in the configured maintenance time windows.

Note

AMQ Streams does not schedule maintenance operations exactly according to the given windows. Instead, for each reconciliation, it checks whether a maintenance window is currently "open". This means that the start of maintenance operations within a given time window can be delayed by up to the Cluster Operator reconciliation interval. Maintenance time windows must therefore be at least this long.

20.3.3. Configuring a maintenance time window

You can configure a maintenance time window for rolling updates triggered by supported processes.

Prerequisites

  • An OpenShift cluster.
  • The Cluster Operator is running.

Procedure

  1. Add or edit the maintenanceTimeWindows property in the Kafka resource. For example to allow maintenance between 0800 and 1059 and between 1400 and 1559 you would set the maintenanceTimeWindows as shown below:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
      zookeeper:
        # ...
      maintenanceTimeWindows:
        - "* * 8-10 * * ?"
        - "* * 14-15 * * ?"
  2. Create or update the resource:

    oc apply -f <kafka_configuration_file>

20.4. Manually starting rolling updates of Kafka and ZooKeeper clusters

AMQ Streams supports the use of annotations on resources to manually trigger a rolling update of Kafka and ZooKeeper clusters through the Cluster Operator. Rolling updates restart the pods of the resource with new ones.

Manually performing a rolling update on a specific pod or set of pods is usually only required in exceptional circumstances. However, rather than deleting the pods directly, if you perform the rolling update through the Cluster Operator you ensure the following:

  • The manual deletion of the pod does not conflict with simultaneous Cluster Operator operations, such as deleting other pods in parallel.
  • The Cluster Operator logic handles the Kafka configuration specifications, such as the number of in-sync replicas.

20.4.1. Performing a rolling update using a pod management annotation

This procedure describes how to trigger a rolling update of a Kafka cluster or ZooKeeper cluster.

To trigger the update, you add an annotation to the resource you are using to manage the pods running on the cluster. You annotate the StatefulSet or StrimziPodSet resource (if you enabled the UseStrimziPodSets feature gate).

Prerequisites

To perform a manual rolling update, you need a running Cluster Operator and Kafka cluster.

Procedure

  1. Find the name of the resource that controls the Kafka or ZooKeeper pods you want to manually update.

    For example, if your Kafka cluster is named my-cluster, the corresponding names are my-cluster-kafka and my-cluster-zookeeper.

  2. Use oc annotate to annotate the appropriate resource in OpenShift.

    Annotating a StatefulSet

    oc annotate statefulset <cluster_name>-kafka strimzi.io/manual-rolling-update=true
    
    oc annotate statefulset <cluster_name>-zookeeper strimzi.io/manual-rolling-update=true

    Annotating a StrimziPodSet

    oc annotate strimzipodset <cluster_name>-kafka strimzi.io/manual-rolling-update=true
    
    oc annotate strimzipodset <cluster_name>-zookeeper strimzi.io/manual-rolling-update=true

  3. Wait for the next reconciliation to occur (every two minutes by default). A rolling update of all pods within the annotated resource is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of all the pods is complete, the annotation is removed from the resource.

20.4.2. Performing a rolling update using a pod annotation

This procedure describes how to manually trigger a rolling update of an existing Kafka cluster or ZooKeeper cluster using an OpenShift Pod annotation. When multiple pods are annotated, consecutive rolling updates are performed within the same reconciliation run.

Prerequisites

To perform a manual rolling update, you need a running Cluster Operator and Kafka cluster.

You can perform a rolling update on a Kafka cluster regardless of the topic replication factor used. But for Kafka to stay operational during the update, you’ll need the following:

  • A highly available Kafka cluster deployment running with nodes that you wish to update.
  • Topics replicated for high availability.

    Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.

    Kafka topic replicated for high availability

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 1
      replicas: 3
      config:
        # ...
        min.insync.replicas: 2
        # ...

Procedure

  1. Find the name of the Kafka or ZooKeeper Pod you want to manually update.

    For example, if your Kafka cluster is named my-cluster, the corresponding Pod names are my-cluster-kafka-index and my-cluster-zookeeper-index. The index starts at zero and ends at the total number of replicas minus one.

  2. Annotate the Pod resource in OpenShift.

    Use oc annotate:

    oc annotate pod cluster-name-kafka-index strimzi.io/manual-rolling-update=true
    
    oc annotate pod cluster-name-zookeeper-index strimzi.io/manual-rolling-update=true
  3. Wait for the next reconciliation to occur (every two minutes by default). A rolling update of the annotated Pod is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of a pod is complete, the annotation is removed from the Pod.

20.5. Evicting pods with the AMQ Streams Drain Cleaner

Kafka and ZooKeeper pods might be evicted during OpenShift upgrades, maintenance, or pod rescheduling. If your Kafka broker and ZooKeeper pods were deployed by AMQ Streams, you can use the AMQ Streams Drain Cleaner tool to handle the pod evictions. The AMQ Streams Drain Cleaner handles the eviction instead of OpenShift. You must set the podDisruptionBudget for your Kafka deployment to 0 (zero). OpenShift will then no longer be allowed to evict the pod automatically.

By deploying the AMQ Streams Drain Cleaner, you can use the Cluster Operator to move Kafka pods instead of OpenShift. The Cluster Operator ensures that topics are never under-replicated. Kafka can remain operational during the eviction process. The Cluster Operator waits for topics to synchronize, as the OpenShift worker nodes drain consecutively.

An admission webhook notifies the AMQ Streams Drain Cleaner of pod eviction requests to the Kubernetes API. The AMQ Streams Drain Cleaner then adds a rolling update annotation to the pods to be drained. This informs the Cluster Operator to perform a rolling update of an evicted pod.

Note

If you are not using the AMQ Streams Drain Cleaner, you can add pod annotations to perform rolling updates manually.

Webhook configuration

The AMQ Streams Drain Cleaner deployment files include a ValidatingWebhookConfiguration resource file. The resource provides the configuration for registering the webhook with the Kubernetes API.

The configuration defines the rules for the Kubernetes API to follow in the event of a pod eviction request. The rules specify that only CREATE operations related to pods/eviction sub-resources are intercepted. If these rules are met, the API forwards the notification.

The clientConfig points to the AMQ Streams Drain Cleaner service and /drainer endpoint that exposes the webhook. The webhook uses a secure TLS connection, which requires authentication. The caBundle property specifies the certificate chain to validate HTTPS communication. Certificates are encoded in Base64.

Webhook configuration for pod eviction notifications

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
# ...
webhooks:
  - name: strimzi-drain-cleaner.strimzi.io
    rules:
      - apiGroups:   [""]
        apiVersions: ["v1"]
        operations:  ["CREATE"]
        resources:   ["pods/eviction"]
        scope:       "Namespaced"
    clientConfig:
      service:
        namespace: "strimzi-drain-cleaner"
        name: "strimzi-drain-cleaner"
        path: /drainer
        port: 443
        caBundle: Cg==
    # ...

20.5.1. Downloading the AMQ Streams Drain Cleaner deployment files

To deploy and use the AMQ Streams Drain Cleaner, you need to download the deployment files.

The AMQ Streams Drain Cleaner deployment files are available from the AMQ Streams software downloads page.

20.5.2. Deploying the AMQ Streams Drain Cleaner using installation files

Deploy the AMQ Streams Drain Cleaner to the OpenShift cluster where the Cluster Operator and Kafka cluster are running.

Prerequisites

  • You have downloaded the AMQ Streams Drain Cleaner deployment files.
  • You have a highly available Kafka cluster deployment running with OpenShift worker nodes that you would like to update.
  • Topics are replicated for high availability.

    Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.

    Kafka topic replicated for high availability

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 1
      replicas: 3
      config:
        # ...
        min.insync.replicas: 2
        # ...

Excluding Kafka or ZooKeeper

If you don’t want to include Kafka or ZooKeeper pods in Drain Cleaner operations, change the default environment variables in the Drain Cleaner Deployment configuration file.

  • Set STRIMZI_DRAIN_KAFKA to false to exclude Kafka pods
  • Set STRIMZI_DRAIN_ZOOKEEPER to false to exclude ZooKeeper pods

Example configuration to exclude ZooKeeper pods

apiVersion: apps/v1
kind: Deployment
spec:
  # ...
  template:
    spec:
      serviceAccountName: strimzi-drain-cleaner
      containers:
        - name: strimzi-drain-cleaner
          # ...
          env:
            - name: STRIMZI_DRAIN_KAFKA
              value: "true"
            - name: STRIMZI_DRAIN_ZOOKEEPER
              value: "false"
          # ...

Procedure

  1. Configure a pod disruption budget of 0 (zero) for your Kafka deployment using template settings in the Kafka resource.

    Specifying a pod disruption budget

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      namespace: myproject
    spec:
      kafka:
        template:
          podDisruptionBudget:
            maxUnavailable: 0
    
      # ...
      zookeeper:
        template:
          podDisruptionBudget:
            maxUnavailable: 0
      # ...

    Reducing the maximum pod disruption budget to zero prevents OpenShift from automatically evicting the pods in case of voluntary disruptions, leaving the AMQ Streams Drain Cleaner and AMQ Streams Cluster Operator to roll the pod which will be scheduled by OpenShift on a different worker node.

    Add the same configuration for ZooKeeper if you want to use AMQ Streams Drain Cleaner to drain ZooKeeper nodes.

  2. Update the Kafka resource:

    oc apply -f <kafka_configuration_file>
  3. Deploy the AMQ Streams Drain Cleaner.

    • To run the Drain Cleaner on OpenShift, apply the resources in the /install/drain-cleaner/openshift directory.

      oc apply -f ./install/drain-cleaner/openshift

20.5.3. Using the AMQ Streams Drain Cleaner

Use the AMQ Streams Drain Cleaner in combination with the Cluster Operator to move Kafka broker or ZooKeeper pods from nodes that are being drained. When you run the AMQ Streams Drain Cleaner, it annotates pods with a rolling update pod annotation. The Cluster Operator performs rolling updates based on the annotation.

Procedure

  1. Drain a specified OpenShift node hosting the Kafka broker or ZooKeeper pods.

    oc get nodes
    oc drain <name-of-node> --delete-emptydir-data --ignore-daemonsets --timeout=6000s --force
  2. Check the eviction events in the AMQ Streams Drain Cleaner log to verify that the pods have been annotated for restart.

    AMQ Streams Drain Cleaner log show annotations of pods

    INFO ... Received eviction webhook for Pod my-cluster-zookeeper-2 in namespace my-project
    INFO ... Pod my-cluster-zookeeper-2 in namespace my-project will be annotated for restart
    INFO ... Pod my-cluster-zookeeper-2 in namespace my-project found and annotated for restart
    
    INFO ... Received eviction webhook for Pod my-cluster-kafka-0 in namespace my-project
    INFO ... Pod my-cluster-kafka-0 in namespace my-project will be annotated for restart
    INFO ... Pod my-cluster-kafka-0 in namespace my-project found and annotated for restart

  3. Check the reconciliation events in the Cluster Operator log to verify the rolling updates.

    Cluster Operator log shows rolling updates

    INFO  PodOperator:68 - Reconciliation #13(timer) Kafka(my-project/my-cluster): Rolling Pod my-cluster-zookeeper-2
    INFO  PodOperator:68 - Reconciliation #13(timer) Kafka(my-project/my-cluster): Rolling Pod my-cluster-kafka-0
    INFO  AbstractOperator:500 - Reconciliation #13(timer) Kafka(my-project/my-cluster): reconciled

20.5.4. Watching the TLS certificates used by the AMQ Streams Drain Cleaner

By default, the Drain Cleaner deployment watches the secret containing the TLS certificates its uses for authentication. The Drain Cleaner watches for changes, such as certificate renewals. If it detects a change, it restarts to reload the TLS certificates. The Drain Cleaner installation files enable this behavior by default. But you can disable the watching of certificates by setting the STRIMZI_CERTIFICATE_WATCH_ENABLED environment variable to false in the Deployment configuration (060-Deployment.yaml) of the Drain Cleaner installation files.

With STRIMZI_CERTIFICATE_WATCH_ENABLED enabled, you can also use the following environment variables for watching TLS certificates.

Table 20.3. Drain Cleaner environment variables for watching TLS certificates

Environment VariableDescriptionDefault

STRIMZI_CERTIFICATE_WATCH_ENABLED

Enables or disables the certificate watch

false

STRIMZI_CERTIFICATE_WATCH_NAMESPACE

The namespace where the Drain Cleaner is deployed and where the certificate secret exists

strimzi-drain-cleaner

STRIMZI_CERTIFICATE_WATCH_POD_NAME

The Drain Cleaner pod name

-

STRIMZI_CERTIFICATE_WATCH_SECRET_NAME

The name of the secret containing TLS certificates

strimzi-drain-cleaner

STRIMZI_CERTIFICATE_WATCH_SECRET_KEYS

The list of fields inside the secret that contain the TLS certificates

tls.crt, tls.key

Example environment variable configuration to control watch operations

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-drain-cleaner
  labels:
    app: strimzi-drain-cleaner
  namespace: strimzi-drain-cleaner
spec:
  # ...
    spec:
      serviceAccountName: strimzi-drain-cleaner
      containers:
        - name: strimzi-drain-cleaner
          # ...
          env:
            - name: STRIMZI_DRAIN_KAFKA
              value: "true"
            - name: STRIMZI_DRAIN_ZOOKEEPER
              value: "true"
            - name: STRIMZI_CERTIFICATE_WATCH_ENABLED
              value: "true"
            - name: STRIMZI_CERTIFICATE_WATCH_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
            - name: STRIMZI_CERTIFICATE_WATCH_POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
              # ...

Tip

Use the Downward API mechanism to configure STRIMZI_CERTIFICATE_WATCH_NAMESPACE and STRIMZI_CERTIFICATE_WATCH_POD_NAME.

20.6. Discovering services using labels and annotations

Service discovery makes it easier for client applications running in the same OpenShift cluster as AMQ Streams to interact with a Kafka cluster.

A service discovery label and annotation is generated for services used to access the Kafka cluster:

  • Internal Kafka bootstrap service
  • HTTP Bridge service

The label helps to make the service discoverable, and the annotation provides connection details that a client application can use to make the connection.

The service discovery label, strimzi.io/discovery, is set as true for the Service resources. The service discovery annotation has the same key, providing connection details in JSON format for each service.

Example internal Kafka bootstrap service

apiVersion: v1
kind: Service
metadata:
  annotations:
    strimzi.io/discovery: |-
      [ {
        "port" : 9092,
        "tls" : false,
        "protocol" : "kafka",
        "auth" : "scram-sha-512"
      }, {
        "port" : 9093,
        "tls" : true,
        "protocol" : "kafka",
        "auth" : "tls"
      } ]
  labels:
    strimzi.io/cluster: my-cluster
    strimzi.io/discovery: "true"
    strimzi.io/kind: Kafka
    strimzi.io/name: my-cluster-kafka-bootstrap
  name: my-cluster-kafka-bootstrap
spec:
  #...

Example HTTP Bridge service

apiVersion: v1
kind: Service
metadata:
  annotations:
    strimzi.io/discovery: |-
      [ {
        "port" : 8080,
        "tls" : false,
        "auth" : "none",
        "protocol" : "http"
      } ]
  labels:
    strimzi.io/cluster: my-bridge
    strimzi.io/discovery: "true"
    strimzi.io/kind: KafkaBridge
    strimzi.io/name: my-bridge-bridge-service

20.6.1. Returning connection details on services

You can find the services by specifying the discovery label when fetching services from the command line or a corresponding API call.

oc get service -l strimzi.io/discovery=true

The connection details are returned when retrieving the service discovery label.

20.7. Recovering a cluster from persistent volumes

You can recover a Kafka cluster from persistent volumes (PVs) if they are still present.

You might want to do this, for example, after:

  • A namespace was deleted unintentionally
  • A whole OpenShift cluster is lost, but the PVs remain in the infrastructure

20.7.1. Recovery from namespace deletion

Recovery from namespace deletion is possible because of the relationship between persistent volumes and namespaces. A PersistentVolume (PV) is a storage resource that lives outside of a namespace. A PV is mounted into a Kafka pod using a PersistentVolumeClaim (PVC), which lives inside a namespace.

The reclaim policy for a PV tells a cluster how to act when a namespace is deleted. If the reclaim policy is set as:

  • Delete (default), PVs are deleted when PVCs are deleted within a namespace
  • Retain, PVs are not deleted when a namespace is deleted

To ensure that you can recover from a PV if a namespace is deleted unintentionally, the policy must be reset from Delete to Retain in the PV specification using the persistentVolumeReclaimPolicy property:

apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  persistentVolumeReclaimPolicy: Retain

Alternatively, PVs can inherit the reclaim policy of an associated storage class. Storage classes are used for dynamic volume allocation.

By configuring the reclaimPolicy property for the storage class, PVs that use the storage class are created with the appropriate reclaim policy. The storage class is configured for the PV using the storageClassName property.

apiVersion: v1
kind: StorageClass
metadata:
  name: gp2-retain
parameters:
  # ...
# ...
reclaimPolicy: Retain
apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  storageClassName: gp2-retain
Note

If you are using Retain as the reclaim policy, but you want to delete an entire cluster, you need to delete the PVs manually. Otherwise they will not be deleted, and may cause unnecessary expenditure on resources.

20.7.2. Recovery from loss of an OpenShift cluster

When a cluster is lost, you can use the data from disks/volumes to recover the cluster if they were preserved within the infrastructure. The recovery procedure is the same as with namespace deletion, assuming PVs can be recovered and they were created manually.

20.7.3. Recovering a deleted cluster from persistent volumes

This procedure describes how to recover a deleted cluster from persistent volumes (PVs).

In this situation, the Topic Operator identifies that topics exist in Kafka, but the KafkaTopic resources do not exist.

When you get to the step to recreate your cluster, you have two options:

  1. Use Option 1 when you can recover all KafkaTopic resources.

    The KafkaTopic resources must therefore be recovered before the cluster is started so that the corresponding topics are not deleted by the Topic Operator.

  2. Use Option 2 when you are unable to recover all KafkaTopic resources.

    In this case, you deploy your cluster without the Topic Operator, delete the Topic Operator topic store metadata, and then redeploy the Kafka cluster with the Topic Operator so it can recreate the KafkaTopic resources from the corresponding topics.

Note

If the Topic Operator is not deployed, you only need to recover the PersistentVolumeClaim (PVC) resources.

Before you begin

In this procedure, it is essential that PVs are mounted into the correct PVC to avoid data corruption. A volumeName is specified for the PVC and this must match the name of the PV.

For more information, see Persistent storage.

Note

The procedure does not include recovery of KafkaUser resources, which must be recreated manually. If passwords and certificates need to be retained, secrets must be recreated before creating the KafkaUser resources.

Procedure

  1. Check information on the PVs in the cluster:

    oc get pv

    Information is presented for PVs with data.

    Example output showing columns important to this procedure:

    NAME                                         RECLAIMPOLICY CLAIM
    pvc-5e9c5c7f-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-my-cluster-zookeeper-1
    pvc-5e9cc72d-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-0
    pvc-5ead43d1-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-2
    pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-0-my-cluster-kafka-0
    pvc-7e21042e-3317-11ea-9786-02deaf9aa87e ... Retain ...    myproject/data-0-my-cluster-kafka-1
    pvc-7e226978-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-0-my-cluster-kafka-2
    • NAME shows the name of each PV.
    • RECLAIM POLICY shows that PVs are retained.
    • CLAIM shows the link to the original PVCs.
  2. Recreate the original namespace:

    oc create namespace myproject
  3. Recreate the original PVC resource specifications, linking the PVCs to the appropriate PV:

    For example:

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: data-0-my-cluster-kafka-0
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 100Gi
      storageClassName: gp2-retain
      volumeMode: Filesystem
      volumeName: pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c
  4. Edit the PV specifications to delete the claimRef properties that bound the original PVC.

    For example:

    apiVersion: v1
    kind: PersistentVolume
    metadata:
      annotations:
        kubernetes.io/createdby: aws-ebs-dynamic-provisioner
        pv.kubernetes.io/bound-by-controller: "yes"
        pv.kubernetes.io/provisioned-by: kubernetes.io/aws-ebs
      creationTimestamp: "<date>"
      finalizers:
      - kubernetes.io/pv-protection
      labels:
        failure-domain.beta.kubernetes.io/region: eu-west-1
        failure-domain.beta.kubernetes.io/zone: eu-west-1c
      name: pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      resourceVersion: "39431"
      selfLink: /api/v1/persistentvolumes/pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      uid: 7efe6b0d-3317-11ea-a650-06e1eadd9a4c
    spec:
      accessModes:
      - ReadWriteOnce
      awsElasticBlockStore:
        fsType: xfs
        volumeID: aws://eu-west-1c/vol-09db3141656d1c258
      capacity:
        storage: 100Gi
      claimRef:
        apiVersion: v1
        kind: PersistentVolumeClaim
        name: data-0-my-cluster-kafka-2
        namespace: myproject
        resourceVersion: "39113"
        uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: failure-domain.beta.kubernetes.io/zone
              operator: In
              values:
              - eu-west-1c
            - key: failure-domain.beta.kubernetes.io/region
              operator: In
              values:
              - eu-west-1
      persistentVolumeReclaimPolicy: Retain
      storageClassName: gp2-retain
      volumeMode: Filesystem

    In the example, the following properties are deleted:

    claimRef:
      apiVersion: v1
      kind: PersistentVolumeClaim
      name: data-0-my-cluster-kafka-2
      namespace: myproject
      resourceVersion: "39113"
      uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
  5. Deploy the Cluster Operator.

    oc create -f install/cluster-operator -n my-project
  6. Recreate your cluster.

    Follow the steps depending on whether or not you have all the KafkaTopic resources needed to recreate your cluster.

    Option 1: If you have all the KafkaTopic resources that existed before you lost your cluster, including internal topics such as committed offsets from __consumer_offsets:

    1. Recreate all KafkaTopic resources.

      It is essential that you recreate the resources before deploying the cluster, or the Topic Operator will delete the topics.

    2. Deploy the Kafka cluster.

      For example:

      oc apply -f kafka.yaml

    Option 2: If you do not have all the KafkaTopic resources that existed before you lost your cluster:

    1. Deploy the Kafka cluster, as with the first option, but without the Topic Operator by removing the topicOperator property from the Kafka resource before deploying.

      If you include the Topic Operator in the deployment, the Topic Operator will delete all the topics.

    2. Delete the internal topic store topics from the Kafka cluster:

      oc run kafka-admin -ti --image=registry.redhat.io/amq-streams/kafka-34-rhel8:2.4.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete

      The command must correspond to the type of listener and authentication used to access the Kafka cluster.

    3. Enable the Topic Operator by redeploying the Kafka cluster with the topicOperator property to recreate the KafkaTopic resources.

      For example:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        #...
        entityOperator:
          topicOperator: {} 1
          #...
    1
    Here we show the default configuration, which has no additional properties. You specify the required configuration using the properties described in the EntityTopicOperatorSpec schema reference.
  7. Verify the recovery by listing the KafkaTopic resources:

    oc get KafkaTopic

20.8. Setting limits on brokers using the Kafka Static Quota plugin

Use the Kafka Static Quota plugin to set throughput and storage limits on brokers in your Kafka cluster. You enable the plugin and set limits by configuring the Kafka resource. You can set a byte-rate threshold and storage quotas to put limits on the clients interacting with your brokers.

You can set byte-rate thresholds for producer and consumer bandwidth. The total limit is distributed across all clients accessing the broker. For example, you can set a byte-rate threshold of 40 MBps for producers. If two producers are running, they are each limited to a throughput of 20 MBps.

Storage quotas throttle Kafka disk storage limits between a soft limit and hard limit. The limits apply to all available disk space. Producers are slowed gradually between the soft and hard limit. The limits prevent disks filling up too quickly and exceeding their capacity. Full disks can lead to issues that are hard to rectify. The hard limit is the maximum storage limit.

Note

For JBOD storage, the limit applies across all disks. If a broker is using two 1 TB disks and the quota is 1.1 TB, one disk might fill and the other disk will be almost empty.

Prerequisites

  • The Cluster Operator that manages the Kafka cluster is running.

Procedure

  1. Add the plugin properties to the config of the Kafka resource.

    The plugin properties are shown in this example configuration.

    Example Kafka Static Quota plugin configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        config:
          client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 1
          client.quota.callback.static.produce: 1000000 2
          client.quota.callback.static.fetch: 1000000 3
          client.quota.callback.static.storage.soft: 400000000000 4
          client.quota.callback.static.storage.hard: 500000000000 5
          client.quota.callback.static.storage.check-interval: 5 6

    1
    Loads the Kafka Static Quota plugin.
    2
    Sets the producer byte-rate threshold. 1 MBps in this example.
    3
    Sets the consumer byte-rate threshold. 1 MBps in this example.
    4
    Sets the lower soft limit for storage. 400 GB in this example.
    5
    Sets the higher hard limit for storage. 500 GB in this example.
    6
    Sets the interval in seconds between checks on storage. 5 seconds in this example. You can set this to 0 to disable the check.
  2. Update the resource.

    oc apply -f <kafka_configuration_file>

Additional resources

20.9. Uninstalling AMQ Streams

You can uninstall AMQ Streams on OpenShift 4.10 to 4.13 from the OperatorHub using the OpenShift Container Platform web console or CLI.

Use the same approach you used to install AMQ Streams.

When you uninstall AMQ Streams, you will need to identify resources created specifically for a deployment and referenced from the AMQ Streams resource.

Such resources include:

  • Secrets (Custom CAs and certificates, Kafka Connect secrets, and other Kafka secrets)
  • Logging ConfigMaps (of type external)

These are resources referenced by Kafka, KafkaConnect, KafkaMirrorMaker, or KafkaBridge configuration.

Warning

Deleting CustomResourceDefinitions results in the garbage collection of the corresponding custom resources (Kafka, KafkaConnect, KafkaMirrorMaker, or KafkaBridge) and the resources dependent on them (Deployments, StatefulSets, and other dependent resources).

20.9.1. Uninstalling AMQ Streams from the OperatorHub using the web console

This procedure describes how to uninstall AMQ Streams from the OperatorHub and remove resources related to the deployment.

You can perform the steps from the console or use alternative CLI commands.

Prerequisites

  • Access to an OpenShift Container Platform web console using an account with cluster-admin or strimzi-admin permissions.
  • You have identified the resources to be deleted.

    You can use the following oc CLI command to find resources and also verify that they have been removed when you have uninstalled AMQ Streams.

    Command to find resources related to an AMQ Streams deployment

    oc get <resource_type> --all-namespaces | grep <kafka_cluster_name>

    Replace <resource_type> with the type of the resource you are checking, such as secret or configmap.

Procedure

  1. Navigate in the OpenShift web console to Operators > Installed Operators.
  2. For the installed AMQ Streams operator, select the options icon (three vertical dots) and click Uninstall Operator.

    The operator is removed from Installed Operators.

  3. Navigate to Home > Projects and select the project where you installed AMQ Streams and the Kafka components.
  4. Click the options under Inventory to delete related resources.

    Resources include the following:

    • Deployments
    • StatefulSets
    • Pods
    • Services
    • ConfigMaps
    • Secrets
    Tip

    Use the search to find related resources that begin with the name of the Kafka cluster. You can also find the resources under Workloads.

Alternative CLI commands

You can use CLI commands to uninstall AMQ Streams from the OperatorHub.

  1. Delete the AMQ Streams subscription.

    oc delete subscription amq-streams -n openshift-operators
  2. Delete the cluster service version (CSV).

    oc delete csv amqstreams.<version>  -n openshift-operators
  3. Remove related CRDs.

    oc get crd -l app=strimzi -o name | xargs oc delete

20.9.2. Uninstalling AMQ Streams using the CLI

This procedure describes how to use the oc command-line tool to uninstall AMQ Streams and remove resources related to the deployment.

Prerequisites

  • Access to an OpenShift cluster using an account with cluster-admin or strimzi-admin permissions.
  • You have identified the resources to be deleted.

    You can use the following oc CLI command to find resources and also verify that they have been removed when you have uninstalled AMQ Streams.

    Command to find resources related to an AMQ Streams deployment

    oc get <resource_type> --all-namespaces | grep <kafka_cluster_name>

    Replace <resource_type> with the type of the resource you are checking, such as secret or configmap.

Procedure

  1. Delete the Cluster Operator Deployment, related CustomResourceDefinitions, and RBAC resources.

    Specify the installation files used to deploy the Cluster Operator.

    oc delete -f install/cluster-operator
  2. Delete the resources you identified in the prerequisites.

    oc delete <resource_type> <resource_name> -n <namespace>

    Replace <resource_type> with the type of resource you are deleting and <resource_name> with the name of the resource.

    Example to delete a secret

    oc delete secret my-cluster-clients-ca -n my-project

20.10. Frequently asked questions