Chapter 2. Getting started with AMQ Streams
AMQ Streams works on all types of clusters, from public and private clouds on to local deployments intended for development. This guide expects that an OpenShift cluster is available and the oc command-line tools are installed and configured to connect to the running cluster.
AMQ Streams is based on Strimzi 0.8.0. This chapter describes the procedures to deploy AMQ Streams on OpenShift 3.9 and later.
To run the commands in this guide, your OpenShift user must have the rights to manage role-based access control (RBAC).
For more information about OpenShift and setting up OpenShift cluster, see OpenShift documentation.
2.1. Installing AMQ Streams and deploying components
To install AMQ Streams, download and extract the install_and_examples.zip file from the AMQ Streams download site.
The folder contains several YAML files to help you deploy the components of AMQ Streams to OpenShift, perform common operations, and configure your Kafka cluster. The YAML files are referenced throughout this documentation.
The remainder of this chapter provides an overview of each component and instructions for deploying the components to OpenShift using the YAML files provided.
Although container images for AMQ Streams are available in the Red Hat Container Catalog, we recommend that you use the YAML files provided instead.
2.2. Cluster Operator
AMQ Streams uses the Cluster Operator to deploy and manage Kafka (including Zookeeper) and Kafka Connect clusters. The Cluster Operator is deployed inside of the OpenShift cluster. To deploy a Kafka cluster, a Kafka resource with the cluster configuration has to be created within the OpenShift cluster. Based on what is declared inside of the Kafka resource, the Cluster Operator deploys a corresponding Kafka cluster. For more information about the different configuration options supported by the Kafka resource, see Section 3.1, “Kafka cluster configuration”
AMQ Streams contains example YAML files, which make deploying a Cluster Operator easier.
2.2.1. Overview of the Cluster Operator component
The Cluster Operator is in charge of deploying a Kafka cluster alongside a Zookeeper ensemble. As part of the Kafka cluster, it can also deploy the topic operator which provides operator-style topic management via KafkaTopic custom resources. The Cluster Operator is also able to deploy a Kafka Connect cluster which connects to an existing Kafka cluster. On OpenShift such a cluster can be deployed using the Source2Image feature, providing an easy way of including more connectors.
Figure 2.1. Example Architecture diagram of the Cluster Operator.

When the Cluster Operator is up, it starts to watch for certain OpenShift resources containing the desired Kafka or Kafka Connect cluster configuration. By default, it watches only in the same namespace or project where it is installed. The Cluster Operator can be configured to watch for more OpenShift projects or Kubernetes namespaces. Cluster Operator watches the following resources:
-
A
Kafkaresource for the Kafka cluster. -
A
KafkaConnectresource for the Kafka Connect cluster. -
A
KafkaConnectS2Iresource for the Kafka Connect cluster with Source2Image support.
When a new Kafka, KafkaConnect, or KafkaConnectS2I resource is created in the OpenShift cluster, the operator gets the cluster description from the desired resource and starts creating a new Kafka or Kafka Connect cluster by creating the necessary other OpenShift resources, such as StatefulSets, Services, ConfigMaps, and so on.
Every time the desired resource is updated by the user, the operator performs corresponding updates on the OpenShift resources which make up the Kafka or Kafka Connect cluster. Resources are either patched or deleted and then re-created in order to make the Kafka or Kafka Connect cluster reflect the state of the desired cluster resource. This might cause a rolling update which might lead to service disruption.
Finally, when the desired resource is deleted, the operator starts to undeploy the cluster and delete all the related OpenShift resources.
2.2.2. Deploying the Cluster Operator to OpenShift
Prerequisites
-
A user with
cluster-adminrole needs to be used, for example,system:admin. Modify the installation files according to the namespace the Cluster Operator is going to be installed in.
On Linux, use:
sed -i 's/namespace: .*/namespace: my-project/' install/cluster-operator/*RoleBinding*.yamlOn MacOS, use:
sed -i '' 's/namespace: .*/namespace: my-project/' install/cluster-operator/*RoleBinding*.yaml
Procedure
Deploy the Cluster Operator
oc apply -f install/cluster-operator -n _my-project_ oc apply -f examples/templates/cluster-operator -n _my-project_
2.2.3. Deploying the Cluster Operator to watch multiple namespaces
Prerequisites
Edit the installation files according to the OpenShift project or Kubernetes namespace the Cluster Operator is going to be installed in.
On Linux, use:
sed -i 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yamlOn MacOS, use:
sed -i '' 's/namespace: .*/namespace: my-namespace/' install/cluster-operator/*RoleBinding*.yaml
Procedure
Edit the file
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yamland in the environment variableSTRIMZI_NAMESPACElist all the OpenShift projects or Kubernetes namespaces where Cluster Operator should watch for resources. For example:apiVersion: extensions/v1beta1 kind: Deployment spec: template: spec: serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: strimzi/cluster-operator:latest imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: myproject,myproject2,myproject3For all namespaces or projects which should be watched by the Cluster Operator, install the
RoleBindings. Replace themy-namespaceormy-projectwith the OpenShift project or Kubernetes namespace used in the previous step.On OpenShift this can be done using
oc apply:oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-project oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-project oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n my-project
Deploy the Cluster Operator
On OpenShift this can be done using
oc apply:oc apply -f install/cluster-operator -n my-project
2.3. Kafka cluster
You can use AMQ Streams to deploy an ephemeral or persistent Kafka cluster to OpenShift. When installing Kafka, AMQ Streams also installs a Zookeeper cluster and adds the necessary configuration to connect Kafka with Zookeeper.
- Ephemeral cluster
-
In general, an ephemeral (that is, temporary) Kafka cluster is suitable for development and testing purposes, not for production. This deployment uses
emptyDirvolumes for storing broker information (for Zookeeper) and topics or partitions (for Kafka). Using anemptyDirvolume means that its content is strictly related to the pod life cycle and is deleted when the pod goes down. - Persistent cluster
-
A persistent Kafka cluster uses
PersistentVolumesto store Zookeeper and Kafka data. ThePersistentVolumeis acquired using aPersistentVolumeClaimto make it independent of the actual type of thePersistentVolume. For example, it can use Amazon EBS volumes in Amazon AWS deployments without any changes in the YAML files. ThePersistentVolumeClaimcan use aStorageClassto trigger automatic volume provisioning.
AMQ Streams includes two templates for deploying a Kafka cluster:
-
kafka-ephemeral.yamldeploys an ephemeral cluster, namedmy-clusterby default. -
kafka-persistent.yamldeploys a persistent cluster, namedmy-clusterby default.
The cluster name is defined by the name of the resource and cannot be changed after the cluster has been deployed. To change the cluster name before you deploy the cluster, edit the Kafka.metadata.name property of the resource in the relevant YAML file.
apiVersion: kafka.strimzi.io/v1alpha1 kind: Kafka metadata: name: my-cluster # ...
2.3.1. Deploying the Kafka cluster to OpenShift
The following procedure describes how to deploy an ephemeral or persistent Kafka cluster to OpenShift on the command line. You can also deploy clusters in the OpenShift console.
Prerequisites
- The Cluster Operator is deployed.
Procedure
If you plan to use the cluster for development or testing purposes, create and deploy an ephemeral cluster using
oc apply.oc apply -f examples/kafka/kafka-ephemeral.yaml
If you plan to use the cluster in production, create and deploy a persistent cluster using
oc apply.oc apply -f examples/kafka/kafka-persistent.yaml
Additional resources
-
For more information on deploying the Cluster Operator, see Section 2.2, “Cluster Operator”. For more information on the different configuration options supported by the
Kafkaresource, see Section 3.1, “Kafka cluster configuration”.
2.4. Kafka Connect
The Cluster Operator deploys a Kafka Connect cluster, which can be used with your Kafka broker deployment. It is implemented as a Deployment with a configurable number of workers. The default image currently contains only the FileStreamSinkConnector and FileStreamSourceConnector connectors. The REST interface for managing the Kafka Connect cluster is exposed internally within the OpenShift cluster as a kafka-connect service on port 8083.
Example KafkaConnect resources and the details about the KafkaConnect format for deploying Kafka Connect can be found in Kafka Connect cluster configuration and Kafka Connect cluster with Source2Image support.
2.4.1. Deploying Kafka Connect to OpenShift
On OpenShift, Kafka Connect is provided in the form of a template. It can be deployed from the template using the command-line or through the OpenShift console.
Prerequisites
- Before deploying Kafka Connect, the Cluster Operator must be deployed.
Procedure
Create a Kafka Connect cluster from the command-line:
oc apply -f examples/kafka-connect/kafka-connect.yaml
Additional resources
- For more information about deploying the Cluster Operator, see Section 2.2, “Cluster Operator”
2.4.2. Using Kafka Connect with plugins
AMQ Streams container images for Kafka Connect contain, by default, only the FileStreamSinkConnector and FileStreamSourceConnector connectors which are part of Apache Kafka.
To facilitate deployment with 3rd party connectors, Kafka Connect is configured to automatically load all plugins or connectors that are present in the /opt/kafka/plugins directory during startup.
There are two ways of adding custom plugins into this directory:
- Using a custom Docker image
- Using the OpenShift build system with the AMQ Streams S2I
2.4.2.1. Create a new image based on our base image
AMQ Streams provides its own Docker image for running Kafka Connect, which can be found on Red Hat Container Catalog as registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnect-openshift:1.0.0. This image can be used as a base image for building a new custom image with additional plugins.
The following procedure describes the process for creating such a custom image.
Procedure
Create a new
Dockerfileusingregistry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnect-openshift:1.0.0as the base image:FROM registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnect-openshift:1.0.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER kafka:kafka- Build the container image and upload it to the appropriate container image repository.
-
Set the
KafkaConnect.spec.imageproperty of the KafkaConnect custom resource or theSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGEvariable to point to the new container image.
Additional resources
-
For more information about the
STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGEvariable, see Section 4.1.5, “Cluster Operator Configuration”. -
For more information about the
KafkaConnect.spec.image property, see Section 3.2.11, “Container images”.
2.4.2.2. Using OpenShift builds and S2I to create new images
OpenShift supports builds, which can be used together with the Source-to-Image (S2I) framework to create new container images. An OpenShift build takes a builder image with S2I support together with source code and binaries provided by the user and uses them to build a new container image. The newly created container image is stored in OpenShift’s local container image repository and can be used in deployments. AMQ Streams provides a Kafka Connect builder image, which can be found on Red Hat Container Catalog as registry.access.redhat.com/amqstreams-1/amqstreams10-kafkaconnects2i-openshift:1.0.0 with this S2I support. It takes user-provided binaries (with plugins and connectors) and creates a new Kafka Connect image. This enhanced Kafka Connect image can be used with the Kafka Connect deployment.
The S2I deployment provided as an OpenShift template. It can be deployed from the template using the command-line or the OpenShift console.
Procedure
Create a Kafka Connect S2I cluster from the command-line
oc apply -f examples/kafka-connect/kafka-connect-s2i.yaml
Once the cluster is deployed, a new build can be triggered from the command-line by creating a directory with Kafka Connect plugins:
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
Start a new image build using the prepared directory:
oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
NoteThe name of the build will be changed according to the cluster name of the deployed Kafka Connect cluster.
- Once the build is finished, the new image will be used automatically by the Kafka Connect deployment.
2.5. Kafka Mirror Maker
The Cluster Operator deploys one or more Kafka Mirror Maker replicas to replicate data between Kafka clusters. This process is called mirroring to avoid confusion with the Kafka partitions replication concept. The Mirror Maker consumes messages from the source cluster and republishes those messages to the target cluster.
For information about example resources and the format for deploying Kafka Mirror Maker, see Kafka Mirror Maker configuration.
2.5.1. Deploying Kafka Mirror Maker to OpenShift
On OpenShift, Kafka Mirror Maker is provided in the form of a template. It can be deployed from the template using the command-line or through the OpenShift console.
Prerequisites
- Before deploying Kafka Mirror Maker, the Cluster Operator must be deployed.
Procedure
Create a Kafka Mirror Maker cluster from the command-line:
oc apply -f examples/kafka-mirror-maker/kafka-mirror-maker.yaml
Additional resources
- For more information about deploying the Cluster Operator, see Section 2.2, “Cluster Operator”
2.6. Deploying example clients
Prerequisites
- An existing Kafka cluster for the client to connect to.
Procedure
Deploy the producer.
On OpenShift, use
oc run:oc run kafka-producer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
- Type your message into the console where the producer is running.
- Press Enter to send the message.
Deploy the consumer.
On OpenShift, use
oc run:oc run kafka-consumer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
- Confirm that you see the incoming messages in the consumer console.
2.7. Topic Operator
2.7.1. Overview of the Topic Operator component
The Topic Operator provides a way of managing topics in a Kafka cluster via OpenShift resources.

The role of the Topic Operator is to keep a set of KafkaTopic OpenShift resources describing Kafka topics in-sync with corresponding Kafka topics.
Specifically:
-
if a
KafkaTopicis created, the operator will create the topic it describes -
if a
KafkaTopicis deleted, the operator will delete the topic it describes -
if a
KafkaTopicis changed, the operator will update the topic it describes
And also, in the other direction:
-
if a topic is created within the Kafka cluster, the operator will create a
KafkaTopicdescribing it -
if a topic is deleted from the Kafka cluster, the operator will create the
KafkaTopicdescribing it -
if a topic in the Kafka cluster is changed, the operator will update the
KafkaTopicdescribing it
This allows you to declare a KafkaTopic as part of your application’s deployment and the Topic Operator will take care of creating the topic for you. Your application just needs to deal with producing or consuming from the necessary topics.
If the topic be reconfigured or reassigned to different Kafka nodes, the KafkaTopic will always be up to date.
For more details about creating, modifying and deleting topics, see Chapter 5, Using the Topic Operator.
2.7.2. Deploying the Topic Operator using the Cluster Operator
This procedure describes how to deploy the Topic Operator using the Cluster Operator. If you want to use the Topic Operator with a Kafka cluster that is not managed by AMQ Streams, you must deploy the Topic Operator as a standalone component. For more information, see Section 4.2.5, “Deploying the standalone Topic Operator”.
Prerequisites
- A running Cluster Operator
-
A
Kafkaresource to be created or updated
Procedure
Ensure that the
Kafka.spec.entityOperatorobject exists in theKafkaresource. This configures the Entity Operator.apiVersion: kafka.strimzi.io/v1alpha1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}-
Configure the Topic Operator using the fields described in Section B.26, “
EntityTopicOperatorSpecschema reference”. Create or update the Kafka resource in OpenShift.
On OpenShift, use
oc apply:oc apply -f your-file
Additional resources
- For more information about deploying the Cluster Operator, see Section 2.2, “Cluster Operator”.
- For more information about deploying the Entity Operator, see Section 3.1.8, “Entity Operator”.
-
For more information about the
Kafka.spec.entityOperatorobject used to configure the Topic Operator when deployed by the Cluster Operator, see Section B.25, “EntityOperatorSpecschema reference”.
2.8. User Operator
The User Operator provides a way of managing Kafka users via OpenShift resources.
2.8.1. Overview of the User Operator component
The User Operator manages Kafka users for a Kafka cluster by watching for KafkaUser OpenShift resources that describe Kafka users and ensuring that they are configured properly in the Kafka cluster. For example:
-
if a
KafkaUseris created, the User Operator will create the user it describes -
if a
KafkaUseris deleted, the User Operator will delete the user it describes -
if a
KafkaUseris changed, the User Operator will update the user it describes
Unlike the Topic Operator, the User Operator does not sync any changes from the Kafka cluster with the OpenShift resources. Unlike the Kafka topics which might be created by applications directly in Kafka, it is not expected that the users will be managed directly in the Kafka cluster in parallel with the User Operator, so this should not be needed.
The User Operator allows you to declare a KafkaUser as part of your application’s deployment. When the user is created, the credentials will be created in a Secret. Your application needs to use the user and its credentials for authentication and to produce or consume messages.
In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the user’s rights in the KafkaUser declaration.
2.8.2. Deploying the User Operator using the Cluster Operator
Prerequisites
- A running Cluster Operator
-
A
Kafkaresource to be created or updated.
Procedure
-
Edit the
Kafkaresource ensuring it has aKafka.spec.entityOperator.userOperatorobject that configures the User Operator how you want. Create or update the Kafka resource in OpenShift.
On OpenShift this can be done using
oc apply:oc apply -f your-file
Additional resources
- For more information about deploying the Cluster Operator, see Section 2.2, “Cluster Operator”.
-
For more information about the
Kafka.spec.entityOperatorobject used to configure the User Operator when deployed by the Cluster Operator, seeEntityOperatorSpecschema reference.

Where did the comment section go?
Red Hat's documentation publication system recently went through an upgrade to enable speedier, more mobile-friendly content. We decided to re-evaluate our commenting platform to ensure that it meets your expectations and serves as an optimal feedback mechanism. During this redesign, we invite your input on providing feedback on Red Hat documentation via the discussion platform.