Chapter 2. Getting started with AMQ Streams

AMQ Streams works on all types of clusters, from public and private clouds on to local deployments intended for development. This guide expects that an OpenShift cluster is available and the oc command-line tools are installed and configured to connect to the running cluster.

AMQ Streams is based on Strimzi 0.8.0. This chapter describes the procedures to deploy AMQ Streams on OpenShift 3.9 and later.

Note

To run the commands in this guide, your OpenShift user must have the rights to manage role-based access control (RBAC).

For more information about OpenShift and setting up OpenShift cluster, see OpenShift documentation.

2.1. Installing AMQ Streams and deploying components

To install AMQ Streams, download and extract the install_and_examples.zip file from the AMQ Streams download site.

The folder contains several YAML files to help you deploy the components of AMQ Streams to OpenShift, perform common operations, and configure your Kafka cluster. The YAML files are referenced throughout this documentation.

The remainder of this chapter provides an overview of each component and instructions for deploying the components to OpenShift using the YAML files provided.

Note

Although container images for AMQ Streams are available in the Red Hat Container Catalog, we recommend that you use the YAML files provided instead.

2.2. Cluster Operator

AMQ Streams uses the Cluster Operator to deploy and manage Kafka (including Zookeeper) and Kafka Connect clusters. The Cluster Operator is deployed inside of the OpenShift cluster. To deploy a Kafka cluster, a Kafka resource with the cluster configuration has to be created within the OpenShift cluster. Based on what is declared inside of the Kafka resource, the Cluster Operator deploys a corresponding Kafka cluster. For more information about the different configuration options supported by the Kafka resource, see Section 3.1, “Kafka cluster configuration”

Note

AMQ Streams contains example YAML files, which make deploying a Cluster Operator easier.

2.2.1. Overview of the Cluster Operator component

The Cluster Operator is in charge of deploying a Kafka cluster alongside a Zookeeper ensemble. As part of the Kafka cluster, it can also deploy the topic operator which provides operator-style topic management via KafkaTopic custom resources. The Cluster Operator is also able to deploy a Kafka Connect cluster which connects to an existing Kafka cluster. On OpenShift such a cluster can be deployed using the Source2Image feature, providing an easy way of including more connectors.

Figure 2.1. Example Architecture diagram of the Cluster Operator.

Cluster Operator

When the Cluster Operator is up, it starts to watch for certain OpenShift resources containing the desired Kafka or Kafka Connect cluster configuration. By default, it watches only in the same namespace or project where it is installed. The Cluster Operator can be configured to watch for more OpenShift projects or Kubernetes namespaces. Cluster Operator watches the following resources:

  • A Kafka resource for the Kafka cluster.
  • A KafkaConnect resource for the Kafka Connect cluster.
  • A KafkaConnectS2I resource for the Kafka Connect cluster with Source2Image support.

When a new Kafka, KafkaConnect, or KafkaConnectS2I resource is created in the OpenShift cluster, the operator gets the cluster description from the desired resource and starts creating a new Kafka or Kafka Connect cluster by creating the necessary other OpenShift resources, such as StatefulSets, Services, ConfigMaps, and so on.

Every time the desired resource is updated by the user, the operator performs corresponding updates on the OpenShift resources which make up the Kafka or Kafka Connect cluster. Resources are either patched or deleted and then re-created in order to make the Kafka or Kafka Connect cluster reflect the state of the desired cluster resource. This might cause a rolling update which might lead to service disruption.

Finally, when the desired resource is deleted, the operator starts to undeploy the cluster and delete all the related OpenShift resources.

2.2.2. Deploying the Cluster Operator to OpenShift

Prerequisites

  • A user with cluster-admin role needs to be used, for example, system:admin.
  • Modify the installation files according to the namespace the Cluster Operator is going to be installed in.

    On Linux, use:

    sed -i 's/namespace: .*/namespace: my-project/' install/cluster-operator/*RoleBinding*.yaml

    On MacOS, use:

    sed -i '' 's/namespace: .*/namespace: my-project/' install/cluster-operator/*RoleBinding*.yaml

Procedure

  1. Deploy the Cluster Operator

    oc apply -f install/cluster-operator -n _my-project_
    oc apply -f examples/templates/cluster-operator -n _my-project_

2.2.3. Deploying the Cluster Operator to watch multiple namespaces

Prerequisites

  • Edit the installation files according to the OpenShift project or Kubernetes namespace the Cluster Operator is going to be installed in.

    On Linux, use:

    sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

    On MacOS, use:

    sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml

Procedure

  1. Edit the file install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml and in the environment variable STRIMZI_NAMESPACE list all the OpenShift projects or Kubernetes namespaces where Cluster Operator should watch for resources. For example:

    apiVersion: extensions/v1beta1
    kind: Deployment
    spec:
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
          - name: strimzi-cluster-operator
            image: strimzi/cluster-operator:latest
            imagePullPolicy: IfNotPresent
            env:
            - name: STRIMZI_NAMESPACE
              value: myproject,myproject2,myproject3
  2. For all namespaces or projects which should be watched by the Cluster Operator, install the RoleBindings. Replace the my-namespace or my-project with the OpenShift project or Kubernetes namespace used in the previous step.

    On OpenShift this can be done using oc apply:

    oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-project
    oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-project
    oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n my-project
  3. Deploy the Cluster Operator

    On OpenShift this can be done using oc apply:

    oc apply -f install/cluster-operator -n my-project

2.3. Kafka cluster

You can use AMQ Streams to deploy an ephemeral or persistent Kafka cluster to OpenShift. When installing Kafka, AMQ Streams also installs a Zookeeper cluster and adds the necessary configuration to connect Kafka with Zookeeper.

Ephemeral cluster
In general, an ephemeral (that is, temporary) Kafka cluster is suitable for development and testing purposes, not for production. This deployment uses emptyDir volumes for storing broker information (for Zookeeper) and topics or partitions (for Kafka). Using an emptyDir volume means that its content is strictly related to the pod life cycle and is deleted when the pod goes down.
Persistent cluster
A persistent Kafka cluster uses PersistentVolumes to store Zookeeper and Kafka data. The PersistentVolume is acquired using a PersistentVolumeClaim to make it independent of the actual type of the PersistentVolume. For example, it can use Amazon EBS volumes in Amazon AWS deployments without any changes in the YAML files. The PersistentVolumeClaim can use a StorageClass to trigger automatic volume provisioning.

AMQ Streams includes two templates for deploying a Kafka cluster:

  • kafka-ephemeral.yaml deploys an ephemeral cluster, named my-cluster by default.
  • kafka-persistent.yaml deploys a persistent cluster, named my-cluster by default.

The cluster name is defined by the name of the resource and cannot be changed after the cluster has been deployed. To change the cluster name before you deploy the cluster, edit the Kafka.metadata.name property of the resource in the relevant YAML file.

apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
  name: my-cluster
# ...

2.3.1. Deploying the Kafka cluster to OpenShift

The following procedure describes how to deploy an ephemeral or persistent Kafka cluster to OpenShift on the command line. You can also deploy clusters in the OpenShift console.

Prerequisites

  • The Cluster Operator is deployed.

Procedure

  1. If you plan to use the cluster for development or testing purposes, create and deploy an ephemeral cluster using oc apply.

    oc apply -f examples/kafka/kafka-ephemeral.yaml
  2. If you plan to use the cluster in production, create and deploy a persistent cluster using oc apply.

    oc apply -f examples/kafka/kafka-persistent.yaml

Additional resources

2.4. Kafka Connect

The Cluster Operator deploys a Kafka Connect cluster, which can be used with your Kafka broker deployment. It is implemented as a Deployment with a configurable number of workers. The default image currently contains only the FileStreamSinkConnector and FileStreamSourceConnector connectors. The REST interface for managing the Kafka Connect cluster is exposed internally within the OpenShift cluster as a kafka-connect service on port 8083.

Example KafkaConnect resources and the details about the KafkaConnect format for deploying Kafka Connect can be found in Kafka Connect cluster configuration and Kafka Connect cluster with Source2Image support.

2.4.1. Deploying Kafka Connect to OpenShift

On OpenShift, Kafka Connect is provided in the form of a template. It can be deployed from the template using the command-line or through the OpenShift console.

Prerequisites

  • Before deploying Kafka Connect, the Cluster Operator must be deployed.

Procedure

  • Create a Kafka Connect cluster from the command-line:

    oc apply -f examples/kafka-connect/kafka-connect.yaml

Additional resources

2.4.2. Using Kafka Connect with plugins

AMQ Streams container images for Kafka Connect contain, by default, only the FileStreamSinkConnector and FileStreamSourceConnector connectors which are part of Apache Kafka.

To facilitate deployment with 3rd party connectors, Kafka Connect is configured to automatically load all plugins or connectors that are present in the /opt/kafka/plugins directory during startup.

There are two ways of adding custom plugins into this directory:

  • Using a custom Docker image
  • Using the OpenShift build system with the AMQ Streams S2I

2.4.2.1. Create a new image based on our base image

AMQ Streams provides its own Docker image for running Kafka Connect, which can be found on Red Hat Container Catalog as registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnect-openshift:1.0.0. This image can be used as a base image for building a new custom image with additional plugins.

The following procedure describes the process for creating such a custom image.

Procedure

  1. Create a new Dockerfile using registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnect-openshift:1.0.0 as the base image:

    FROM registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnect-openshift:1.0.0
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER kafka:kafka
  2. Build the container image and upload it to the appropriate container image repository.
  3. Set the KafkaConnect.spec.image property of the KafkaConnect custom resource or the STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE variable to point to the new container image.

Additional resources

2.4.2.2. Using OpenShift builds and S2I to create new images

OpenShift supports builds, which can be used together with the Source-to-Image (S2I) framework to create new container images. An OpenShift build takes a builder image with S2I support together with source code and binaries provided by the user and uses them to build a new container image. The newly created container image is stored in OpenShift’s local container image repository and can be used in deployments. AMQ Streams provides a Kafka Connect builder image, which can be found on Red Hat Container Catalog as registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnects2i-openshift:1.0.0 with this S2I support. It takes user-provided binaries (with plugins and connectors) and creates a new Kafka Connect image. This enhanced Kafka Connect image can be used with the Kafka Connect deployment.

The S2I deployment provided as an OpenShift template. It can be deployed from the template using the command-line or the OpenShift console.

Procedure

  1. Create a Kafka Connect S2I cluster from the command-line

    oc apply -f examples/kafka-connect/kafka-connect-s2i.yaml
  2. Once the cluster is deployed, a new build can be triggered from the command-line by creating a directory with Kafka Connect plugins:

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-3.4.2.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-3.4.2.jar
    │   ├── mongodb-driver-core-3.4.2.jar
    │   └── README.md
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-0.13.0.jar
    │   ├── mysql-connector-java-5.1.40.jar
    │   ├── README.md
    │   └── wkb-1.0.2.jar
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-0.7.1.jar
        ├── debezium-core-0.7.1.jar
        ├── LICENSE.txt
        ├── postgresql-42.0.0.jar
        ├── protobuf-java-2.6.1.jar
        └── README.md
  3. Start a new image build using the prepared directory:

    oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
    Note

    The name of the build will be changed according to the cluster name of the deployed Kafka Connect cluster.

  4. Once the build is finished, the new image will be used automatically by the Kafka Connect deployment.

2.5. Kafka Mirror Maker

The Cluster Operator deploys one or more Kafka Mirror Maker replicas to replicate data between Kafka clusters. This process is called mirroring to avoid confusion with the Kafka partitions replication concept. The Mirror Maker consumes messages from the source cluster and republishes those messages to the target cluster.

For information about example resources and the format for deploying Kafka Mirror Maker, see Kafka Mirror Maker configuration.

2.5.1. Deploying Kafka Mirror Maker to OpenShift

On OpenShift, Kafka Mirror Maker is provided in the form of a template. It can be deployed from the template using the command-line or through the OpenShift console.

Prerequisites

  • Before deploying Kafka Mirror Maker, the Cluster Operator must be deployed.

Procedure

  • Create a Kafka Mirror Maker cluster from the command-line:

    oc apply -f examples/kafka-mirror-maker/kafka-mirror-maker.yaml

Additional resources

2.6. Deploying example clients

Prerequisites

  • An existing Kafka cluster for the client to connect to.

Procedure

  1. Deploy the producer.

    On OpenShift, use oc run:

    oc run kafka-producer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
  2. Type your message into the console where the producer is running.
  3. Press Enter to send the message.
  4. Deploy the consumer.

    On OpenShift, use oc run:

    oc run kafka-consumer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
  5. Confirm that you see the incoming messages in the consumer console.

2.7. Topic Operator

2.7.1. Overview of the Topic Operator component

The Topic Operator provides a way of managing topics in a Kafka cluster via OpenShift resources.

Topic Operator

The role of the Topic Operator is to keep a set of KafkaTopic OpenShift resources describing Kafka topics in-sync with corresponding Kafka topics.

Specifically:

  • if a KafkaTopic is created, the operator will create the topic it describes
  • if a KafkaTopic is deleted, the operator will delete the topic it describes
  • if a KafkaTopic is changed, the operator will update the topic it describes

And also, in the other direction:

  • if a topic is created within the Kafka cluster, the operator will create a KafkaTopic describing it
  • if a topic is deleted from the Kafka cluster, the operator will create the KafkaTopic describing it
  • if a topic in the Kafka cluster is changed, the operator will update the KafkaTopic describing it

This allows you to declare a KafkaTopic as part of your application’s deployment and the Topic Operator will take care of creating the topic for you. Your application just needs to deal with producing or consuming from the necessary topics.

If the topic be reconfigured or reassigned to different Kafka nodes, the KafkaTopic will always be up to date.

For more details about creating, modifying and deleting topics, see Chapter 5, Using the Topic Operator.

2.7.2. Deploying the Topic Operator using the Cluster Operator

This procedure describes how to deploy the Topic Operator using the Cluster Operator. If you want to use the Topic Operator with a Kafka cluster that is not managed by AMQ Streams, you must deploy the Topic Operator as a standalone component. For more information, see Section 4.2.5, “Deploying the standalone Topic Operator”.

Prerequisites

  • A running Cluster Operator
  • A Kafka resource to be created or updated

Procedure

  1. Ensure that the Kafka.spec.entityOperator object exists in the Kafka resource. This configures the Entity Operator.

    apiVersion: kafka.strimzi.io/v1alpha1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      #...
      entityOperator:
        topicOperator: {}
        userOperator: {}
  2. Configure the Topic Operator using the fields described in Section B.26, “EntityTopicOperatorSpec schema reference”.
  3. Create or update the Kafka resource in OpenShift.

    On OpenShift, use oc apply:

    oc apply -f your-file

Additional resources

2.8. User Operator

The User Operator provides a way of managing Kafka users via OpenShift resources.

2.8.1. Overview of the User Operator component

The User Operator manages Kafka users for a Kafka cluster by watching for KafkaUser OpenShift resources that describe Kafka users and ensuring that they are configured properly in the Kafka cluster. For example:

  • if a KafkaUser is created, the User Operator will create the user it describes
  • if a KafkaUser is deleted, the User Operator will delete the user it describes
  • if a KafkaUser is changed, the User Operator will update the user it describes

Unlike the Topic Operator, the User Operator does not sync any changes from the Kafka cluster with the OpenShift resources. Unlike the Kafka topics which might be created by applications directly in Kafka, it is not expected that the users will be managed directly in the Kafka cluster in parallel with the User Operator, so this should not be needed.

The User Operator allows you to declare a KafkaUser as part of your application’s deployment. When the user is created, the credentials will be created in a Secret. Your application needs to use the user and its credentials for authentication and to produce or consume messages.

In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the user’s rights in the KafkaUser declaration.

2.8.2. Deploying the User Operator using the Cluster Operator

Prerequisites

  • A running Cluster Operator
  • A Kafka resource to be created or updated.

Procedure

  1. Edit the Kafka resource ensuring it has a Kafka.spec.entityOperator.userOperator object that configures the User Operator how you want.
  2. Create or update the Kafka resource in OpenShift.

    On OpenShift this can be done using oc apply:

    oc apply -f your-file

Additional resources