Chapter 11. Managing AMQ Streams

This chapter covers tasks to maintain a deployment of AMQ Streams.

11.1. Working with custom resources

You can use oc commands to retrieve information and perform other operations on AMQ Streams custom resources.

Using oc with the status subresource of a custom resource allows you to get the information about the resource.

11.1.1. Performing oc operations on custom resources

Use oc commands, such as get, describe, edit, or delete, to perform operations on resource types. For example, oc get kafkatopics retrieves a list of all Kafka topics and oc get kafkas retrieves all deployed Kafka clusters.

When referencing resource types, you can use both singular and plural names: oc get kafkas gets the same results as oc get kafka.

You can also use the short name of the resource. Learning short names can save you time when managing AMQ Streams. The short name for Kafka is k, so you can also run oc get k to list all Kafka clusters.

oc get k

NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
my-cluster   3                        3

Table 11.1. Long and short names for each AMQ Streams resource

AMQ Streams resourceLong nameShort name

Kafka

kafka

k

Kafka Topic

kafkatopic

kt

Kafka User

kafkauser

ku

Kafka Connect

kafkaconnect

kc

Kafka Connector

kafkaconnector

kctr

Kafka Mirror Maker

kafkamirrormaker

kmm

Kafka Mirror Maker 2

kafkamirrormaker2

kmm2

Kafka Bridge

kafkabridge

kb

Kafka Rebalance

kafkarebalance

kr

11.1.1.1. Resource categories

Categories of custom resources can also be used in oc commands.

All AMQ Streams custom resources belong to the category strimzi, so you can use strimzi to get all the AMQ Streams resources with one command.

For example, running oc get strimzi lists all AMQ Streams custom resources in a given namespace.

oc get strimzi

NAME                                   DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS
kafka.kafka.strimzi.io/my-cluster      3                      3

NAME                                   PARTITIONS REPLICATION FACTOR
kafkatopic.kafka.strimzi.io/kafka-apps 3          3

NAME                                   AUTHENTICATION AUTHORIZATION
kafkauser.kafka.strimzi.io/my-user     tls            simple

The oc get strimzi -o name command returns all resource types and resource names. The -o name option fetches the output in the type/name format

oc get strimzi -o name

kafka.kafka.strimzi.io/my-cluster
kafkatopic.kafka.strimzi.io/kafka-apps
kafkauser.kafka.strimzi.io/my-user

You can combine this strimzi command with other commands. For example, you can pass it into a oc delete command to delete all resources in a single command.

oc delete $(oc get strimzi -o name)

kafka.kafka.strimzi.io "my-cluster" deleted
kafkatopic.kafka.strimzi.io "kafka-apps" deleted
kafkauser.kafka.strimzi.io "my-user" deleted

Deleting all resources in a single operation might be useful, for example, when you are testing new AMQ Streams features.

11.1.1.2. Querying the status of sub-resources

There are other values you can pass to the -o option. For example, by using -o yaml you get the output in YAML format. Using -o json will return it as JSON.

You can see all the options in oc get --help.

One of the most useful options is the JSONPath support, which allows you to pass JSONPath expressions to query the Kubernetes API. A JSONPath expression can extract or navigate specific parts of any resource.

For example, you can use the JSONPath expression {.status.listeners[?(@.name=="tls")].bootstrapServers} to get the bootstrap address from the status of the Kafka custom resource and use it in your Kafka clients.

Here, the command finds the bootstrapServers value of the listener named tls:

oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="tls")].bootstrapServers}{"\n"}'

my-cluster-kafka-bootstrap.myproject.svc:9093

By changing the name condition you can also get the address of the other Kafka listeners.

You can use jsonpath to extract any other property or group of properties from any custom resource.

11.1.2. AMQ Streams custom resource status information

Several resources have a status property, as described in the following table.

Table 11.2. Custom resource status properties

AMQ Streams resourceSchema referencePublishes status information on…​

Kafka

Section 12.2.56, “KafkaStatus schema reference”

The Kafka cluster.

KafkaConnect

Section 12.2.87, “KafkaConnectStatus schema reference”

The Kafka Connect cluster, if deployed.

KafkaConnector

Section 12.2.125, “KafkaConnectorStatus schema reference”

KafkaConnector resources, if deployed.

KafkaMirrorMaker

Section 12.2.113, “KafkaMirrorMakerStatus schema reference”

The Kafka MirrorMaker tool, if deployed.

KafkaTopic

Section 12.2.91, “KafkaTopicStatus schema reference”

Kafka topics in your Kafka cluster.

KafkaUser

Section 12.2.107, “KafkaUserStatus schema reference”

Kafka users in your Kafka cluster.

KafkaBridge

Section 12.2.122, “KafkaBridgeStatus schema reference”

The AMQ Streams Kafka Bridge, if deployed.

The status property of a resource provides information on the resource’s:

  • Current state, in the status.conditions property
  • Last observed generation, in the status.observedGeneration property

The status property also provides resource-specific information. For example:

  • KafkaStatus provides information on listener addresses, and the id of the Kafka cluster.
  • KafkaConnectStatus provides the REST API endpoint for Kafka Connect connectors.
  • KafkaUserStatus provides the user name of the Kafka user and the Secret in which their credentials are stored.
  • KafkaBridgeStatus provides the HTTP address at which external client applications can access the Bridge service.

A resource’s current state is useful for tracking progress related to the resource achieving its desired state, as defined by the spec property. The status conditions provide the time and reason the state of the resource changed and details of events preventing or delaying the operator from realizing the resource’s desired state.

The last observed generation is the generation of the resource that was last reconciled by the Cluster Operator. If the value of observedGeneration is different from the value of metadata.generation, the operator has not yet processed the latest update to the resource. If these values are the same, the status information reflects the most recent changes to the resource.

AMQ Streams creates and maintains the status of custom resources, periodically evaluating the current state of the custom resource and updating its status accordingly. When performing an update on a custom resource using oc edit, for example, its status is not editable. Moreover, changing the status would not affect the configuration of the Kafka cluster.

Here we see the status property specified for a Kafka custom resource.

Kafka custom resource with status

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
spec:
  # ...
status:
  conditions: 1
  - lastTransitionTime: 2021-07-23T23:46:57+0000
    status: "True"
    type: Ready 2
  observedGeneration: 4 3
  listeners: 4
  - addresses:
    - host: my-cluster-kafka-bootstrap.myproject.svc
      port: 9092
    type: plain
  - addresses:
    - host: my-cluster-kafka-bootstrap.myproject.svc
      port: 9093
    certificates:
    - |
      -----BEGIN CERTIFICATE-----
      ...
      -----END CERTIFICATE-----
    type: tls
  - addresses:
    - host: 172.29.49.180
      port: 9094
    certificates:
    - |
      -----BEGIN CERTIFICATE-----
      ...
      -----END CERTIFICATE-----
    type: external
  clusterId: CLUSTER-ID 5
# ...

1
Status conditions describe criteria related to the status that cannot be deduced from the existing resource information, or are specific to the instance of a resource.
2
The Ready condition indicates whether the Cluster Operator currently considers the Kafka cluster able to handle traffic.
3
The observedGeneration indicates the generation of the Kafka custom resource that was last reconciled by the Cluster Operator.
4
The listeners describe the current Kafka bootstrap addresses by type.
5
The Kafka cluster id.
Important

The address in the custom resource status for external listeners with type nodeport is currently not supported.

Note

The Kafka bootstrap addresses listed in the status do not signify that those endpoints or the Kafka cluster is in a ready state.

Accessing status information

You can access status information for a resource from the command line. For more information, see Section 11.1.3, “Finding the status of a custom resource”.

11.1.3. Finding the status of a custom resource

This procedure describes how to find the status of a custom resource.

Prerequisites

  • An OpenShift cluster.
  • The Cluster Operator is running.

Procedure

  • Specify the custom resource and use the -o jsonpath option to apply a standard JSONPath expression to select the status property:

    oc get kafka <kafka_resource_name> -o jsonpath='{.status}'

    This expression returns all the status information for the specified custom resource. You can use dot notation, such as status.listeners or status.observedGeneration, to fine-tune the status information you wish to see.

Additional resources

11.2. Pausing reconciliation of custom resources

Sometimes it is useful to pause the reconciliation of custom resources managed by AMQ Streams Operators, so that you can perform fixes or make updates. If reconciliations are paused, any changes made to custom resources are ignored by the Operators until the pause ends.

If you want to pause reconciliation of a custom resource, set the strimzi.io/pause-reconciliation annotation to true in its configuration. This instructs the appropriate Operator to pause reconciliation of the custom resource. For example, you can apply the annotation to the KafkaConnect resource so that reconciliation by the Cluster Operator is paused.

You can also create a custom resource with the pause annotation enabled. The custom resource is created, but it is ignored.

Prerequisites

  • The AMQ Streams Operator that manages the custom resource is running.

Procedure

  1. Annotate the custom resource in OpenShift, setting pause-reconciliation to true:

    oc annotate <kind_of_custom_resource> <name_of_custom_resource> strimzi.io/pause-reconciliation="true"

    For example, for the KafkaConnect custom resource:

    oc annotate KafkaConnect my-connect strimzi.io/pause-reconciliation="true"
  2. Check that the status conditions of the custom resource show a change to ReconciliationPaused:

    oc describe <kind_of_custom_resource> <name_of_custom_resource>

    The type condition changes to ReconciliationPaused at the lastTransitionTime.

    Example custom resource with a paused reconciliation condition type

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      annotations:
        strimzi.io/pause-reconciliation: "true"
        strimzi.io/use-connector-resources: "true"
      creationTimestamp: 2021-03-12T10:47:11Z
      #...
    spec:
      # ...
    status:
      conditions:
      - lastTransitionTime: 2021-03-12T10:47:41.689249Z
        status: "True"
        type: ReconciliationPaused

Resuming from pause

  • To resume reconciliation, you can set the annotation to false, or remove the annotation.

11.3. Evicting pods with AMQ Streams Drain Cleaner

Kafka and ZooKeeper pods might be evicted during OpenShift upgrades, maintenance or pod rescheduling. If your Kafka broker and ZooKeeper pods were deployed by AMQ Streams, you can use the AMQ Streams Drain Cleaner tool to handle the pod evictions. Since the AMQ Streams Drain Cleaner will handle the eviction instead of OpenShift, you need to set the podDisruptionBudget for your Kafka deployment to 0 (zero). OpenShift will then no longer be allowed to evict the pod automatically.

By deploying the AMQ Streams Drain Cleaner, you can use the Cluster Operator to move Kafka pods instead of OpenShift. The Cluster Operator ensures that topics are never under-replicated. Kafka can remain operational during the eviction process. The Cluster Operator waits for topics to synchronize, as the OpenShift worker nodes drain consecutively.

An admission webhook notifies the AMQ Streams Drain Cleaner of pod eviction requests to the Kubernetes API. The AMQ Streams Drain Cleaner then adds a rolling update annotation to the pods to be drained. This informs the Cluster Operator to perform a rolling update of an evicted pod.

Note

If you are not using the AMQ Streams Drain Cleaner, you can add pod annotations to perform rolling updates manually.

Webhook configuration

The AMQ Streams Drain Cleaner deployment files include a ValidatingWebhookConfiguration resource file. The resource provides the configuration for registering the webhook with the Kubernetes API.

The configuration defines the rules for the Kubernetes API to follow in the event of a pod eviction request. The rules specify that only CREATE operations related to pods/eviction sub-resources are intercepted. If these rules are met, the API forwards the notification.

The clientConfig points to the AMQ Streams Drain Cleaner service and /drainer endpoint that exposes the webhook. The webhook uses a secure TLS connection, which requires authentication. The caBundle property specifies the certificate chain to validate HTTPS communication. Certificates are encoded in Base64.

Webhook configuration for pod eviction notifications

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
# ...
webhooks:
  - name: strimzi-drain-cleaner.strimzi.io
    rules:
      - apiGroups:   [""]
        apiVersions: ["v1"]
        operations:  ["CREATE"]
        resources:   ["pods/eviction"]
        scope:       "Namespaced"
    clientConfig:
      service:
        namespace: "strimzi-drain-cleaner"
        name: "strimzi-drain-cleaner"
        path: /drainer
        port: 443
        caBundle: Cg==
    # ...

11.3.1. Prerequisites

To deploy and use the AMQ Streams Drain Cleaner, you need to download the deployment files.

The AMQ Streams Drain Cleaner deployment files are available from the AMQ Streams software downloads page.

11.3.2. Deploying the AMQ Streams Drain Cleaner

Deploy the AMQ Streams Drain Cleaner to the OpenShift cluster where the Cluster Operator and Kafka cluster are running.

Prerequisites

  • You have downloaded the AMQ Streams Drain Cleaner deployment files.
  • You have a highly available Kafka cluster deployment running with OpenShift worker nodes that you would like to update.
  • Topics are replicated for high availability.

    Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.

    Kafka topic replicated for high availability

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 1
      replicas: 3
      config:
        # ...
        min.insync.replicas: 2
        # ...

Excluding ZooKeeper

If you don’t want to include ZooKeeper, you can remove the --zookeeper command option from the AMQ Streams Drain Cleaner Deployment configuration file.

apiVersion: apps/v1
kind: Deployment
spec:
  # ...
  template:
    spec:
      serviceAccountName: strimzi-drain-cleaner
      containers:
        - name: strimzi-drain-cleaner
          # ...
          command:
            - "/application"
            - "-Dquarkus.http.host=0.0.0.0"
            - "--kafka"
            - "--zookeeper" 1
          # ...
1
Remove this option to exclude ZooKeeper from AMQ Streams Drain Cleaner operations.

Procedure

  1. Configure a pod disruption budget of 0 (zero) for your Kafka deployment using template settings in the Kafka resource.

    Specifying a pod disruption budget

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      namespace: myproject
    spec:
      kafka:
        template:
          podDisruptionBudget:
            maxUnavailable: 0
    
      # ...
      zookeeper:
        template:
          podDisruptionBudget:
            maxUnavailable: 0
      # ...

    Reducing the maximum pod disruption budget to zero prevents OpenShift from automatically evicting the pods in case of voluntary disruptions, leaving the AMQ Streams Drain Cleaner and AMQ Streams Cluster Operator to roll the pod which will be scheduled by OpenShift on a different worker node.

    Add the same configuration for ZooKeeper if you want to use AMQ Streams Drain Cleaner to drain ZooKeeper nodes.

  2. Update the Kafka resource:

    oc apply -f <kafka-configuration-file>
  3. Deploy the AMQ Streams Drain Cleaner.

    • To run the Drain Cleaner on OpenShift, apply the resources in the /install/drain-cleaner/openshift directory.

      oc apply -f ./install/drain-cleaner/openshift

11.3.3. Using the AMQ Streams Drain Cleaner

Use the AMQ Streams Drain Cleaner in combination with the Cluster Operator to move Kafka broker or ZooKeeper pods from nodes that are being drained. When you run the AMQ Streams Drain Cleaner, it annotates pods with a rolling update pod annotation. The Cluster Operator performs rolling updates based on the annotation.

Procedure

  1. Drain a specified OpenShift node hosting the Kafka broker or ZooKeeper pods.

    oc get nodes
    oc drain <name-of-node> --delete-emptydir-data --ignore-daemonsets --timeout=6000s --force
  2. Check the eviction events in the AMQ Streams Drain Cleaner log to verify that the pods have been annotated for restart.

    AMQ Streams Drain Cleaner log show annotations of pods

    INFO ... Received eviction webhook for Pod my-cluster-zookeeper-2 in namespace my-project
    INFO ... Pod my-cluster-zookeeper-2 in namespace my-project will be annotated for restart
    INFO ... Pod my-cluster-zookeeper-2 in namespace my-project found and annotated for restart
    
    INFO ... Received eviction webhook for Pod my-cluster-kafka-0 in namespace my-project
    INFO ... Pod my-cluster-kafka-0 in namespace my-project will be annotated for restart
    INFO ... Pod my-cluster-kafka-0 in namespace my-project found and annotated for restart

  3. Check the reconciliation events in the Cluster Operator log to verify the rolling updates.

    Cluster Operator log shows rolling updates

    INFO  PodOperator:68 - Reconciliation #13(timer) Kafka(my-project/my-cluster): Rolling Pod my-cluster-zookeeper-2
    INFO  PodOperator:68 - Reconciliation #13(timer) Kafka(my-project/my-cluster): Rolling Pod my-cluster-kafka-0
    INFO  AbstractOperator:500 - Reconciliation #13(timer) Kafka(my-project/my-cluster): reconciled

11.4. Manually starting rolling updates of Kafka and ZooKeeper clusters

AMQ Streams supports the use of annotations on resources to manually trigger a rolling update of Kafka and ZooKeeper clusters through the Cluster Operator. Rolling updates restart the pods of the resource with new ones.

Manually performing a rolling update on a specific pod or set of pods is usually only required in exceptional circumstances. However, rather than deleting the pods directly, if you perform the rolling update through the Cluster Operator you ensure the following:

  • The manual deletion of the pod does not conflict with simultaneous Cluster Operator operations, such as deleting other pods in parallel.
  • The Cluster Operator logic handles the Kafka configuration specifications, such as the number of in-sync replicas.

11.4.1. Prerequisites

To perform a manual rolling update, you need a running Cluster Operator and Kafka cluster.

See the Deploying and Upgrading AMQ Streams on OpenShift guide for instructions on running a:

11.4.2. Performing a rolling update using a pod management annotation

This procedure describes how to trigger a rolling update of a Kafka cluster or ZooKeeper cluster.

To trigger the update, you add an annotation to the resource you are using to manage the pods running on the cluster. You annotate the StatefulSet or StrimziPodSet resource (if you enabled the UseStrimziPodSets feature gate).

Procedure

  1. Find the name of the resource that controls the Kafka or ZooKeeper pods you want to manually update.

    For example, if your Kafka cluster is named my-cluster, the corresponding names are my-cluster-kafka and my-cluster-zookeeper.

  2. Use oc annotate to annotate the appropriate resource in OpenShift.

    Annotating a StatefulSet

    oc annotate statefulset <cluster_name>-kafka strimzi.io/manual-rolling-update=true
    
    oc annotate statefulset <cluster_name>-zookeeper strimzi.io/manual-rolling-update=true

    Annotating a StrimziPodSet

    oc annotate strimzipodset <cluster_name>-kafka strimzi.io/manual-rolling-update=true
    
    oc annotate strimzipodset <cluster_name>-zookeeper strimzi.io/manual-rolling-update=true

  3. Wait for the next reconciliation to occur (every two minutes by default). A rolling update of all pods within the annotated resource is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of all the pods is complete, the annotation is removed from the resource.

11.4.3. Performing a rolling update using a Pod annotation

This procedure describes how to manually trigger a rolling update of an existing Kafka cluster or ZooKeeper cluster using an OpenShift Pod annotation. When multiple pods are annotated, consecutive rolling updates are performed within the same reconciliation run.

Prerequisites

You can perform a rolling update on a Kafka cluster regardless of the topic replication factor used. But for Kafka to stay operational during the update, you’ll need the following:

  • A highly available Kafka cluster deployment running with nodes that you wish to update.
  • Topics replicated for high availability.

    Topic configuration specifies a replication factor of at least 3 and a minimum number of in-sync replicas to 1 less than the replication factor.

    Kafka topic replicated for high availability

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 1
      replicas: 3
      config:
        # ...
        min.insync.replicas: 2
        # ...

Procedure

  1. Find the name of the Kafka or ZooKeeper Pod you want to manually update.

    For example, if your Kafka cluster is named my-cluster, the corresponding Pod names are my-cluster-kafka-index and my-cluster-zookeeper-index. The index starts at zero and ends at the total number of replicas minus one.

  2. Annotate the Pod resource in OpenShift.

    Use oc annotate:

    oc annotate pod cluster-name-kafka-index strimzi.io/manual-rolling-update=true
    
    oc annotate pod cluster-name-zookeeper-index strimzi.io/manual-rolling-update=true
  3. Wait for the next reconciliation to occur (every two minutes by default). A rolling update of the annotated Pod is triggered, as long as the annotation was detected by the reconciliation process. When the rolling update of a pod is complete, the annotation is removed from the Pod.

11.5. Discovering services using labels and annotations

Service discovery makes it easier for client applications running in the same OpenShift cluster as AMQ Streams to interact with a Kafka cluster.

A service discovery label and annotation is generated for services used to access the Kafka cluster:

  • Internal Kafka bootstrap service
  • HTTP Bridge service

The label helps to make the service discoverable, and the annotation provides connection details that a client application can use to make the connection.

The service discovery label, strimzi.io/discovery, is set as true for the Service resources. The service discovery annotation has the same key, providing connection details in JSON format for each service.

Example internal Kafka bootstrap service

apiVersion: v1
kind: Service
metadata:
  annotations:
    strimzi.io/discovery: |-
      [ {
        "port" : 9092,
        "tls" : false,
        "protocol" : "kafka",
        "auth" : "scram-sha-512"
      }, {
        "port" : 9093,
        "tls" : true,
        "protocol" : "kafka",
        "auth" : "tls"
      } ]
  labels:
    strimzi.io/cluster: my-cluster
    strimzi.io/discovery: "true"
    strimzi.io/kind: Kafka
    strimzi.io/name: my-cluster-kafka-bootstrap
  name: my-cluster-kafka-bootstrap
spec:
  #...

Example HTTP Bridge service

apiVersion: v1
kind: Service
metadata:
  annotations:
    strimzi.io/discovery: |-
      [ {
        "port" : 8080,
        "tls" : false,
        "auth" : "none",
        "protocol" : "http"
      } ]
  labels:
    strimzi.io/cluster: my-bridge
    strimzi.io/discovery: "true"
    strimzi.io/kind: KafkaBridge
    strimzi.io/name: my-bridge-bridge-service

11.5.1. Returning connection details on services

You can find the services by specifying the discovery label when fetching services from the command line or a corresponding API call.

oc get service -l strimzi.io/discovery=true

The connection details are returned when retrieving the service discovery label.

11.6. Recovering a cluster from persistent volumes

You can recover a Kafka cluster from persistent volumes (PVs) if they are still present.

You might want to do this, for example, after:

  • A namespace was deleted unintentionally
  • A whole OpenShift cluster is lost, but the PVs remain in the infrastructure

11.6.1. Recovery from namespace deletion

Recovery from namespace deletion is possible because of the relationship between persistent volumes and namespaces. A PersistentVolume (PV) is a storage resource that lives outside of a namespace. A PV is mounted into a Kafka pod using a PersistentVolumeClaim (PVC), which lives inside a namespace.

The reclaim policy for a PV tells a cluster how to act when a namespace is deleted. If the reclaim policy is set as:

  • Delete (default), PVs are deleted when PVCs are deleted within a namespace
  • Retain, PVs are not deleted when a namespace is deleted

To ensure that you can recover from a PV if a namespace is deleted unintentionally, the policy must be reset from Delete to Retain in the PV specification using the persistentVolumeReclaimPolicy property:

apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  persistentVolumeReclaimPolicy: Retain

Alternatively, PVs can inherit the reclaim policy of an associated storage class. Storage classes are used for dynamic volume allocation.

By configuring the reclaimPolicy property for the storage class, PVs that use the storage class are created with the appropriate reclaim policy. The storage class is configured for the PV using the storageClassName property.

apiVersion: v1
kind: StorageClass
metadata:
  name: gp2-retain
parameters:
  # ...
# ...
reclaimPolicy: Retain
apiVersion: v1
kind: PersistentVolume
# ...
spec:
  # ...
  storageClassName: gp2-retain
Note

If you are using Retain as the reclaim policy, but you want to delete an entire cluster, you need to delete the PVs manually. Otherwise they will not be deleted, and may cause unnecessary expenditure on resources.

11.6.2. Recovery from loss of an OpenShift cluster

When a cluster is lost, you can use the data from disks/volumes to recover the cluster if they were preserved within the infrastructure. The recovery procedure is the same as with namespace deletion, assuming PVs can be recovered and they were created manually.

11.6.3. Recovering a deleted cluster from persistent volumes

This procedure describes how to recover a deleted cluster from persistent volumes (PVs).

In this situation, the Topic Operator identifies that topics exist in Kafka, but the KafkaTopic resources do not exist.

When you get to the step to recreate your cluster, you have two options:

  1. Use Option 1 when you can recover all KafkaTopic resources.

    The KafkaTopic resources must therefore be recovered before the cluster is started so that the corresponding topics are not deleted by the Topic Operator.

  2. Use Option 2 when you are unable to recover all KafkaTopic resources.

    In this case, you deploy your cluster without the Topic Operator, delete the Topic Operator topic store metadata, and then redeploy the Kafka cluster with the Topic Operator so it can recreate the KafkaTopic resources from the corresponding topics.

Note

If the Topic Operator is not deployed, you only need to recover the PersistentVolumeClaim (PVC) resources.

Before you begin

In this procedure, it is essential that PVs are mounted into the correct PVC to avoid data corruption. A volumeName is specified for the PVC and this must match the name of the PV.

For more information, see:

Note

The procedure does not include recovery of KafkaUser resources, which must be recreated manually. If passwords and certificates need to be retained, secrets must be recreated before creating the KafkaUser resources.

Procedure

  1. Check information on the PVs in the cluster:

    oc get pv

    Information is presented for PVs with data.

    Example output showing columns important to this procedure:

    NAME                                         RECLAIMPOLICY CLAIM
    pvc-5e9c5c7f-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-my-cluster-zookeeper-1
    pvc-5e9cc72d-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-0
    pvc-5ead43d1-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-my-cluster-zookeeper-2
    pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c ... Retain ...    myproject/data-0-my-cluster-kafka-0
    pvc-7e21042e-3317-11ea-9786-02deaf9aa87e ... Retain ...    myproject/data-0-my-cluster-kafka-1
    pvc-7e226978-3317-11ea-97b0-0aef8816c7ea ... Retain ...    myproject/data-0-my-cluster-kafka-2
    • NAME shows the name of each PV.
    • RECLAIM POLICY shows that PVs are retained.
    • CLAIM shows the link to the original PVCs.
  2. Recreate the original namespace:

    oc create namespace myproject
  3. Recreate the original PVC resource specifications, linking the PVCs to the appropriate PV:

    For example:

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: data-0-my-cluster-kafka-0
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 100Gi
      storageClassName: gp2-retain
      volumeMode: Filesystem
      volumeName: pvc-7e1f67f9-3317-11ea-a650-06e1eadd9a4c
  4. Edit the PV specifications to delete the claimRef properties that bound the original PVC.

    For example:

    apiVersion: v1
    kind: PersistentVolume
    metadata:
      annotations:
        kubernetes.io/createdby: aws-ebs-dynamic-provisioner
        pv.kubernetes.io/bound-by-controller: "yes"
        pv.kubernetes.io/provisioned-by: kubernetes.io/aws-ebs
      creationTimestamp: "<date>"
      finalizers:
      - kubernetes.io/pv-protection
      labels:
        failure-domain.beta.kubernetes.io/region: eu-west-1
        failure-domain.beta.kubernetes.io/zone: eu-west-1c
      name: pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      resourceVersion: "39431"
      selfLink: /api/v1/persistentvolumes/pvc-7e226978-3317-11ea-97b0-0aef8816c7ea
      uid: 7efe6b0d-3317-11ea-a650-06e1eadd9a4c
    spec:
      accessModes:
      - ReadWriteOnce
      awsElasticBlockStore:
        fsType: xfs
        volumeID: aws://eu-west-1c/vol-09db3141656d1c258
      capacity:
        storage: 100Gi
      claimRef:
        apiVersion: v1
        kind: PersistentVolumeClaim
        name: data-0-my-cluster-kafka-2
        namespace: myproject
        resourceVersion: "39113"
        uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: failure-domain.beta.kubernetes.io/zone
              operator: In
              values:
              - eu-west-1c
            - key: failure-domain.beta.kubernetes.io/region
              operator: In
              values:
              - eu-west-1
      persistentVolumeReclaimPolicy: Retain
      storageClassName: gp2-retain
      volumeMode: Filesystem

    In the example, the following properties are deleted:

    claimRef:
      apiVersion: v1
      kind: PersistentVolumeClaim
      name: data-0-my-cluster-kafka-2
      namespace: myproject
      resourceVersion: "39113"
      uid: 54be1c60-3319-11ea-97b0-0aef8816c7ea
  5. Deploy the Cluster Operator.

    oc create -f install/cluster-operator -n my-project
  6. Recreate your cluster.

    Follow the steps depending on whether or not you have all the KafkaTopic resources needed to recreate your cluster.

    Option 1: If you have all the KafkaTopic resources that existed before you lost your cluster, including internal topics such as committed offsets from __consumer_offsets:

    1. Recreate all KafkaTopic resources.

      It is essential that you recreate the resources before deploying the cluster, or the Topic Operator will delete the topics.

    2. Deploy the Kafka cluster.

      For example:

      oc apply -f kafka.yaml

    Option 2: If you do not have all the KafkaTopic resources that existed before you lost your cluster:

    1. Deploy the Kafka cluster, as with the first option, but without the Topic Operator by removing the topicOperator property from the Kafka resource before deploying.

      If you include the Topic Operator in the deployment, the Topic Operator will delete all the topics.

    2. Delete the internal topic store topics from the Kafka cluster:

      oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete

      The command must correspond to the type of listener and authentication used to access the Kafka cluster.

    3. Enable the Topic Operator by redeploying the Kafka cluster with the topicOperator property to recreate the KafkaTopic resources.

      For example:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
      spec:
        #...
        entityOperator:
          topicOperator: {} 1
          #...
    1
    Here we show the default configuration, which has no additional properties. You specify the required configuration using the properties described in Section 12.2.45, “EntityTopicOperatorSpec schema reference”.
  7. Verify the recovery by listing the KafkaTopic resources:

    oc get KafkaTopic

11.7. Setting limits on brokers using the Kafka Static Quota plugin

Use the Kafka Static Quota plugin to set throughput and storage limits on brokers in your Kafka cluster. You enable the plugin and set limits by configuring the Kafka resource. You can set a byte-rate threshold and storage quotas to put limits on the clients interacting with your brokers.

You can set byte-rate thresholds for producer and consumer bandwidth. The total limit is distributed across all clients accessing the broker. For example, you can set a byte-rate threshold of 40 MBps for producers. If two producers are running, they are each limited to a throughput of 20 MBps.

Storage quotas throttle Kafka disk storage limits between a soft limit and hard limit. The limits apply to all available disk space. Producers are slowed gradually between the soft and hard limit. The limits prevent disks filling up too quickly and exceeding their capacity. Full disks can lead to issues that are hard to rectify. The hard limit is the maximum storage limit.

Note

For JBOD storage, the limit applies across all disks. If a broker is using two 1 TB disks and the quota is 1.1 TB, one disk might fill and the other disk will be almost empty.

Prerequisites

  • The Cluster Operator that manages the Kafka cluster is running.

Procedure

  1. Add the plugin properties to the config of the Kafka resource.

    The plugin properties are shown in this example configuration.

    Example Kafka Static Quota plugin configuration

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        config:
          client.quota.callback.class: io.strimzi.kafka.quotas.StaticQuotaCallback 1
          client.quota.callback.static.produce: 1000000 2
          client.quota.callback.static.fetch: 1000000 3
          client.quota.callback.static.storage.soft: 400000000000 4
          client.quota.callback.static.storage.hard: 500000000000 5
          client.quota.callback.static.storage.check-interval: 5 6

    1
    Loads the Kafka Static Quota plugin.
    2
    Sets the producer byte-rate threshold. 1 MBps in this example.
    3
    Sets the consumer byte-rate threshold. 1 MBps in this example.
    4
    Sets the lower soft limit for storage. 400 GB in this example.
    5
    Sets the higher hard limit for storage. 500 GB in this example.
    6
    Sets the interval in seconds between checks on storage. 5 seconds in this example. You can set this to 0 to disable the check.
  2. Update the resource.

    oc apply -f <kafka_configuration_file>

Additional resources

11.8. Frequently asked questions