Chapter 5. Kafka configuration

A deployment of Kafka components to an OpenShift cluster using AMQ Streams is highly configurable through the application of custom resources. Custom resources are created as instances of the APIs added by CRDs to extend OpenShift resources.

CRDs act as configuration instructions to describe the custom resources in an OpenShift cluster, and are provided with AMQ Streams for each Kafka component used in a deployment. CRDs and custom resources are defined as YAML files. Example YAML files are provided with the AMQ Streams distribution.

In this chapter we look at how Kafka components are configured through custom resources, starting with common configuration points and then important configuration considerations specific to components.

5.1. Custom resources

After a new custom resource type is added to your cluster by installing a CRD, you can create instances of the resource based on its specification.

The custom resources for AMQ Streams components have common configuration properties, which are defined under spec.

In this fragment from a Kafka topic custom resource, the apiVersion and kind properties identify the associated CRD. The spec property shows configuration that defines the number of partitions and replicas for the topic.

Kafka topic custom resource

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  # ...

There are many additional configuration options that can be incorporated into a YAML definition, some common and some specific to a particular component.

5.2. Common configuration

Some of the configuration options common to resources are described here. Security and metrics collection might also be adopted where applicable.

Bootstrap servers

Bootstrap servers are used for host/port connection to a Kafka cluster for:

  • Kafka Connect
  • Kafka Bridge
  • Kafka MirrorMaker producers and consumers
CPU and memory resources

You request CPU and memory resources for components. Limits specify the maximum resources that can be consumed by a given container.

Resource requests and limits for the Topic Operator and User Operator are set in the Kafka resource.

Logging
You define the logging level for the component. Logging can be defined directly (inline) or externally using a config map.
Healthchecks
Healthcheck configuration introduces liveness and readiness probes to know when to restart a container (liveness) and when a container can accept traffic (readiness).
JVM options
JVM options provide maximum and minimum memory allocation to optimize the performance of the component according to the platform it is running on.
Pod scheduling
Pod schedules use affinity/anti-affinity rules to determine under what circumstances a pod is scheduled onto a node.

Example YAML showing common configuration

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-cluster
spec:
  # ...
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      cpu: 12
      memory: 64Gi
    limits:
      cpu: 12
      memory: 64Gi
  logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  jvmOptions:
    "-Xmx": "2g"
    "-Xms": "2g"
  template:
    pod:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: node-type
                    operator: In
                    values:
                      - fast-network
  # ...

5.3. Kafka cluster configuration

A kafka cluster comprises one or more brokers. For producers and consumers to be able to access topics within the brokers, Kafka configuration must define how data is stored in the cluster, and how the data is accessed. You can configure a Kafka cluster to run with multiple broker nodes across racks.

Storage

Kafka and ZooKeeper store data on disks.

AMQ Streams requires block storage provisioned through StorageClass. The file system format for storage must be XFS or EXT4. Three types of data storage are supported:

Ephemeral (Recommended for development only)
Ephemeral storage stores data for the lifetime of an instance. Data is lost when the instance is restarted.
Persistent
Persistent storage relates to long-term data storage independent of the lifecycle of the instance.
JBOD (Just a Bunch of Disks, suitable for Kafka only)
JBOD allows you to use multiple disks to store commit logs in each broker.

The disk capacity used by an existing Kafka cluster can be increased if supported by the infrastructure.

Listeners

Listeners configure how clients connect to a Kafka cluster.

The following types of listener are supported:

  • Plain listener that does not use encryption
  • TLS listener that uses encryption
  • External listener for access outside of OpenShift

External listeners expose Kafka by specifying a type:

  • route to use OpenShift Routes and HAProxy router
  • loadbalancer to use loadbalancer services
  • nodeport to use ports on the OpenShift nodes
  • ingress to use OpenShift Ingress and the NGINX Ingress Controller for Kubernetes.

If you are using OAuth 2.0 for token-based authentication, you can configure listeners to use the authorization server.

Rack awareness
Rack awareness is a configuration feature that distributes Kafka broker pods and topic replicas across racks, which represent data centers or racks in data centers, or availability zones.

Example YAML showing Kafka configuration

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      tls:
        authentication:
          type: tls
      external:
        type: route
        authentication:
          type: tls
    # ...
    storage:
      type: persistent-claim
      size: 10000Gi
    # ...
    rack:
      topologyKey: failure-domain.beta.kubernetes.io/zone
    # ...

5.4. Kafka MirrorMaker configuration

To set up MirrorMaker, a source and target (destination) Kafka cluster must be running.

You can use AMQ Streams with MirrorMaker or MirrorMaker 2.0.

MirrorMaker

MirrorMaker uses producers and consumers to replicate data across clusters.

MirrorMaker uses:

  • Consumer configuration to consume data from the source cluster
  • Producer configuration to output data to the target cluster

Consumer and producer configuration includes any authentication and encryption settings.

A whitelist defines the topics to mirror from a source to a target cluster.

MirrorMaker 2.0

MirrorMaker 2.0 is based on the Kafka Connect framework, connectors managing the transfer of data between clusters.

MirrorMaker 2.0 uses:

  • Source cluster configuration to consume data from the source cluster
  • Target cluster configuration to output data to the target cluster

A MirrorMaker 2.0 MirrorSourceConnector custom resource replicates topics from a source cluster to a target cluster.

Figure 5.1. Replication across two clusters

MirrorMaker 2.0 replication

The MirrorMaker 2.0 architecture supports bidirectional replication in an active/active cluster configuration, so both clusters are active and provide the same data simultaneously. A MirrorMaker 2.0 cluster is required at each target destination. This is useful if you want to make the same data available locally in different geographical locations.

Key Consumer configuration

Consumer group identifier
The consumer group ID for a MirrorMaker consumer so that messages consumed are assigned to a consumer group.
Number of consumer streams
A value to determine the number of consumers in a consumer group that consume a message in parallel.
Offset commit interval
An offset commit interval to set the time between consuming and committing a message.

Key Producer configuration

Cancel option for send failure
You can define whether a message send failure is ignored or MirrorMaker is terminated and recreated.

Example YAML showing MirrorMaker configuration

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  # ...
  consumer:
    bootstrapServers: my-source-cluster-kafka-bootstrap:9092
    groupId: "my-group"
    numStreams: 2
    offsetCommitInterval: 120000
    # ...
  producer:
    # ...
    abortOnSendFailure: false
    # ...
  whitelist: "my-topic|other-topic"
  # ...

5.5. Kafka Connect configuration

A basic Kafka Connect configuration requires a bootstrap address to connect to a Kafka cluster, and encryption and authentication details.

Kafka Connect instances are configured by default with the same:

  • Group ID for the Kafka Connect cluster
  • Kafka topic to store the connector offsets
  • Kafka topic to store connector and task status configurations
  • Kafka topic to store connector and task status updates

If multiple different Kafka Connect instances are used, these settings must reflect each instance.

Example YAML showing Kafka Connect configuration

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
  # ...

Connectors

Connectors are configured separately from Kafka Connect. The configuration describes the source input data and target output data to feed into and out of Kafka Connect. The external source data must reference specific topics that will store the messages.

Kafka provides two built-in connectors:

  • FileStreamSourceConnector streams data from an external system to Kafka, reading lines from an input source and sending each line to a Kafka topic.
  • FileStreamSinkConnector streams data from Kafka to an external system, reading messages from a Kafka topic and creating a line for each in an output file.

You can add other connectors using connector plugins, which are a set of JAR files that define the implementation required to connect to certain types of external system.

You create a custom Kafka Connect image that uses new Kafka Connect plugins.

To create the image, you can use:

Managing connectors

You can use the KafkaConnector resource or the Kafka Connect REST API to create and manage connector instances in a Kafka Connect cluster. The KafkaConnector resource offers an OpenShift-native approach, and is managed by the Cluster Operator.

The spec for the KafkaConnector resource specifies the connector class and configuration settings, as well as the maximum number of connector tasks to handle the data.

Example YAML showing KafkaConnector configuration

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/opt/kafka/LICENSE"
    topic: my-topic
    # ...

You enable KafkaConnectors by adding an annotation to the KafkaConnect resource.

Example YAML showing annotation to enable KafkaConnector

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

5.6. Kafka Bridge configuration

A Kafka Bridge configuration requires a bootstrap server specification for the Kafka cluster it connects to, as well as any encryption and authentication options required.

Kafka Bridge consumer and producer configuration is standard, as described in the Apache Kafka configuration documentation for consumers and Apache Kafka configuration documentation for producers.

HTTP-related configuration options set the port connection which the server listens on.

Example YAML showing Kafka Bridge configuration

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  bootstrapServers: my-cluster-kafka:9092
  http:
    port: 8080
  consumer:
    config:
      auto.offset.reset: earliest
  producer:
    config:
      delivery.timeout.ms: 300000
  # ...