Chapter 5. Authenticating Camel K against Kafka
You can authenticate Camel K against Apache Kafka.
This example demonstrates how to set up a Kafka Topic using Red Hat OpenShift Streams for Apache Kafka and use it in a simple Producer/Consumer pattern Integration.
5.1. Setting up Kafka
Setting up Kafka involves installing the required OpenShift operators, creating a Kafka instance, and creating a Kafka topic.
Use one of these Red Hat products to set up Kafka:
- Red Hat Advanced Message Queuing (AMQ) streams - A self-managed Apache Kafka offering. AMQ Streams is based on open source Strimzi and is included as part of Red Hat Integration. AMQ Streams is a distributed and scalable streaming platform based on Apache Kafka that includes a publish/subscribe messaging broker. Kafka Connect provides a framework to integrate Kafka-based systems with external systems. Using Kafka Connect, you can configure source and sink connectors to stream data from external systems into and out of a Kafka broker.
- Red Hat OpenShift Streams for Apache Kafka - A managed cloud service that simplifies the process of running Apache Kafka. It provides a streamlined developer experience for building, deploying, and scaling new cloud-native applications or modernizing existing systems.
5.1.1. Setting up Kafka by using AMQ streams
AMQ Streams simplifies the process of running Apache Kafka in an OpenShift cluster.
5.1.1.1. Preparing your OpenShift cluster for AMQ Streams
To use Camel K or Kamelets and Red Hat AMQ Streams, you must install the following operators and tools:
- Red Hat Integration - AMQ Streams operator - Manages the communication between your Openshift Cluster and AMQ Streams for Apache Kafka instances.
- Red Hat Integration - Camel K operator - Installs and manages Camel K - a lightweight integration framework that runs natively in the cloud on OpenShift.
- Camel K CLI tool - Allows you to access all Camel K features.
Prerequisites
- You are familiar with Apache Kafka concepts.
- You can access an OpenShift 4.6 (or later) cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and the Camel K CLI on your local system.
-
You installed the OpenShift CLI tool (
oc
) so that you can interact with the OpenShift cluster at the command line.
Procedure
To set up Kafka by using AMQ Streams:
- Log in to your OpenShift cluster’s web console.
- Create or open a project in which you plan to create your integration, for example my-camel-k-kafka.
- Install the Camel K operator and Camel K CLI as described in Installing Camel K.
Install the AMQ streams operator:
- From any project, select Operators > OperatorHub.
- In the Filter by Keyword field, type AMQ Streams.
Click the Red Hat Integration - AMQ Streams card and then click Install.
The Install Operator page opens.
- Accept the defaults and then click Install.
- Select Operators > Installed Operators to verify that the Camel K and AMQ Streams operators are installed.
Next steps
5.1.1.2. Setting up a Kafka topic with AMQ Streams
A Kafka topic provides a destination for the storage of data in a Kafka instance. You must set up a Kafka topic before you can send data to it.
Prerequisites
- You can access an OpenShift cluster.
- You installed the Red Hat Integration - Camel K and Red Hat Integration - AMQ Streams operators as described in Preparing your OpenShift cluster.
-
You installed the OpenShift CLI (
oc
) and the Camel K CLI (kamel
).
Procedure
To set up a Kafka topic by using AMQ Streams:
- Log in to your OpenShift cluster’s web console.
- Select Projects and then click the project in which you installed the Red Hat Integration - AMQ Streams operator. For example, click the my-camel-k-kafka project.
- Select Operators > Installed Operators and then click Red Hat Integration - AMQ Streams.
Create a Kafka cluster:
- Under Kafka, click Create instance.
- Type a name for the cluster, for example kafka-test.
Accept the other defaults and then click Create.
The process to create the Kafka instance might take a few minutes to complete.
When the status is ready, continue to the next step.
Create a Kafka topic:
- Select Operators > Installed Operators and then click Red Hat Integration - AMQ Streams.
- Under Kafka Topic, click Create Kafka Topic.
- Type a name for the topic, for example test-topic.
- Accept the other defaults and then click Create.
5.1.2. Setting up Kafka by using OpenShift streams
Red Hat OpenShift Streams for Apache Kafka is a managed cloud service that simplifies the process of running Apache Kafka.
To use OpenShift Streams for Apache Kafka, you must be logged into your Red Hat account.
5.1.2.1. Preparing your OpenShift cluster for OpenShift Streams
To use the Red Hat OpenShift Streams for Apache Kafka managed cloud service, you must install the following operators and tools:
- OpenShift Application Services (RHOAS) CLI - Allows you to manage your application services from a terminal.
- Red Hat Integration - Camel K operator Installs and manages Camel K - a lightweight integration framework that runs natively in the cloud on OpenShift.
- Camel K CLI tool - Allows you to access all Camel K features.
Prerequisites
- You are familiar with Apache Kafka concepts.
- You can access an OpenShift 4.6 (or later) cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and Apache Camel K CLI on your local system.
-
You installed the OpenShift CLI tool (
oc
) so that you can interact with the OpenShift cluster at the command line.
Procedure
- Log in to your OpenShift web console with a cluster admin account.
Create the OpenShift project for your Camel K or Kamelets application.
- Select Home > Projects.
- Click Create Project.
-
Type the name of the project, for example
my-camel-k-kafka
, then click Create.
- Download and install the RHOAS CLI as described in Getting started with the rhoas CLI.
- Install the Camel K operator and Camel K CLI as described in Installing Camel K.
- To verify that the Red Hat Integration - Camel K operator is installed, click Operators > Installed Operators.
Next step
5.1.2.2. Setting up a Kafka topic with RHOAS
Kafka organizes messages around topics. Each topic has a name. Applications send messages to topics and retrieve messages from topics. A Kafka topic provides a destination for the storage of data in a Kafka instance. You must set up a Kafka topic before you can send data to it.
Prerequisites
- You can access an OpenShift cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and the Camel K CLI on your local system.
-
You installed the OpenShift CLI (
oc
) , the Camel K CLI (kamel
) , and RHOAS CLI (rhoas
) tools as described in Preparing your OpenShift cluster. - You installed the Red Hat Integration - Camel K operator as described in Preparing your OpenShift cluster.
- You are logged in to the Red Hat Cloud site.
Procedure
To set up a Kafka topic by using Red Hat OpenShift Streams for Apache Kafka:
- From the command line, log in to your OpenShift cluster.
Open your project, for example:
oc project my-camel-k-kafka
Verify that the Camel K operator is installed in your project:
oc get csv
The result lists the Red Hat Camel K operator and indicates that it is in the
Succeeded
phase.Prepare and connect a Kafka instance to RHOAS:
Login to the RHOAS CLI by using this command:
rhoas login
Create a kafka instance, for example kafka-test:
rhoas kafka create kafka-test
The process to create the Kafka instance might take a few minutes to complete.
To check the status of your Kafka instance:
rhoas status
You can also view the status in the web console:
https://cloud.redhat.com/application-services/streams/kafkas/
When the status is ready, continue to the next step.
Create a new Kafka topic:
rhoas kafka topic create --name test-topic
Connect your Kafka instance (cluster) with the Openshift Application Services instance:
rhoas cluster connect
Follow the script instructions for obtaining a credential token.
You should see output similar to the following:
Token Secret "rh-cloud-services-accesstoken-cli" created successfully Service Account Secret "rh-cloud-services-service-account" created successfully KafkaConnection resource "kafka-test" has been created KafkaConnection successfully installed on your cluster.
Next step
5.1.2.3. Obtaining Kafka credentials
To connect your applications or services to a Kafka instance, you must first obtain the following Kafka credentials:
- Obtain the bootstrap URL.
- Create a service account with credentials (username and password).
For OpenShift Streams, the authentication protocol is SASL_SSL.
Prerequisite
- You have created a Kafka instance, and it has a ready status.
- You have created a Kafka topic.
Procedure
Obtain the Kafka Broker URL (Bootstrap URL):
rhoas status
This command returns output similar to the following:
Kafka --------------------------------------------------------------- ID: 1ptdfZRHmLKwqW6A3YKM2MawgDh Name: my-kafka Status: ready Bootstrap URL: my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443
To obtain a username and password, create a service account by using the following syntax:
rhoas service-account create --name "<account-name>" --file-format json
NoteWhen creating a service account, you can choose the file format and location to save the credentials. For more information, type
rhoas service-account create --help
For example:
rhoas service-account create --name "my-service-acct" --file-format json
The service account is created and saved to a JSON file.
To verify your service account credentials, view the
credentials.json
file:cat credentials.json
This command returns output similar to the following:
{"clientID":"srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094", "password":"facf3df1-3c8d-4253-aa87-8c95ca5e1225"}
Grant permission for sending and receiving messages to or from the Kakfa topic. Use the following command, where
clientID
is the value provided in thecredentials.json
file (from Step 3).rhoas kafka acl grant-access --producer --consumer --service-account $CLIENT_ID --topic test-topic --group all
For example:
rhoas kafka acl grant-access --producer --consumer --service-account srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094 --topic test-topic --group all
5.1.2.4. Creating a secret by using the SASL/Plain authentication method
You can create a secret with the credentials that you obtained (Kafka bootstrap URL, service account ID, and service account secret).
Procedure
Edit the
application.properties
file and add the Kafka credentials.application.properties file
camel.component.kafka.brokers = <YOUR-KAFKA-BOOTSTRAP-URL-HERE> camel.component.kafka.security-protocol = SASL_SSL camel.component.kafka.sasl-mechanism = PLAIN camel.component.kafka.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR-SERVICE-ACCOUNT-ID-HERE>' password='<YOUR-SERVICE-ACCOUNT-SECRET-HERE>'; consumer.topic=<TOPIC-NAME> producer.topic=<TOPIC-NAME>
Run the following command to create a secret that contains the sensitive properties in the
application.properties
file:oc create secret generic kafka-props --from-file application.properties
You use this secret when you run a Camel K integration.
See Also
5.1.2.5. Creating a secret by using the SASL/OAUTHBearer authentication method
You can create a secret with the credentials that you obtained (Kafka bootstrap URL, service account ID, and service account secret).
Procedure
Edit the
application-oauth.properties
file and add the Kafka credentials.application-oauth.properties file
camel.component.kafka.brokers = <YOUR-KAFKA-BOOTSTRAP-URL-HERE> camel.component.kafka.security-protocol = SASL_SSL camel.component.kafka.sasl-mechanism = OAUTHBEARER camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id='<YOUR-SERVICE-ACCOUNT-ID-HERE>' \ oauth.client.secret='<YOUR-SERVICE-ACCOUNT-SECRET-HERE>' \ oauth.token.endpoint.uri="https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token" ; camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler consumer.topic=<TOPIC-NAME> producer.topic=<TOPIC-NAME>
Run the following command to create a secret that contains the sensitive properties in the
application.properties
file:oc create secret generic kafka-props --from-file application-oauth.properties
You use this secret when you run a Camel K integration.
See Also
5.2. Running a Kafka integration
Running a producer integration
Create a sample producer integration. This fills the topic with a message, every 10 seconds.
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; public class SaslSSLKafkaProducer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Timer -> Kafka "); from("timer:foo") .routeId("FromTimer2Kafka") .setBody() .simple("Message #${exchangeProperty.CamelTimerCounter}") .to("kafka:{{producer.topic}}") .log("Message correctly sent to the topic!"); } }
Then run the procedure integration.
kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
The producer will create a new message and push into the topic and log some information.
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:12,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:13,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!
Running a consumer integration
Create a consumer integration.
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; public class SaslSSLKafkaConsumer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Kafka -> Log "); from("kafka:{{consumer.topic}}") .routeId("FromKafka2Log") .log("${body}"); } }
Open another shell and run the consumer integration using the command:
kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
A consumer will start logging the events found in the Topic:
[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8 [1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9 [1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10 [1] 2021-05-06 08:51:11,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11