Chapter 5. Authenticating Camel K against Kafka

You can authenticate Camel K against Apache Kafka.

This example demonstrates how to set up a Kafka Topic using Red Hat OpenShift Streams for Apache Kafka and use it in a simple Producer/Consumer pattern Integration.

5.1. Setting up Kafka

Setting up Kafka involves installing the required OpenShift operators, creating a Kafka instance, and creating a Kafka topic.

Use one of these Red Hat products to set up Kafka:

  • Red Hat Advanced Message Queuing (AMQ) streams - A self-managed Apache Kafka offering. AMQ Streams is based on open source Strimzi and is included as part of Red Hat Integration. AMQ Streams is a distributed and scalable streaming platform based on Apache Kafka that includes a publish/subscribe messaging broker. Kafka Connect provides a framework to integrate Kafka-based systems with external systems. Using Kafka Connect, you can configure source and sink connectors to stream data from external systems into and out of a Kafka broker.
  • Red Hat OpenShift Streams for Apache Kafka - A managed cloud service that simplifies the process of running Apache Kafka. It provides a streamlined developer experience for building, deploying, and scaling new cloud-native applications or modernizing existing systems.

5.1.1. Setting up Kafka by using AMQ streams

AMQ Streams simplifies the process of running Apache Kafka in an OpenShift cluster.

5.1.1.1. Preparing your OpenShift cluster for AMQ Streams

To use Camel K or Kamelets and Red Hat AMQ Streams, you must install the following operators and tools:

  • Red Hat Integration - AMQ Streams operator - Manages the communication between your Openshift Cluster and AMQ Streams for Apache Kafka instances.
  • Red Hat Integration - Camel K operator - Installs and manages Camel K - a lightweight integration framework that runs natively in the cloud on OpenShift.
  • Camel K CLI tool - Allows you to access all Camel K features.

Prerequisites

  • You are familiar with Apache Kafka concepts.
  • You can access an OpenShift 4.6 (or later) cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and the Camel K CLI on your local system.
  • You installed the OpenShift CLI tool (oc) so that you can interact with the OpenShift cluster at the command line.

Procedure

To set up Kafka by using AMQ Streams:

  1. Log in to your OpenShift cluster’s web console.
  2. Create or open a project in which you plan to create your integration, for example my-camel-k-kafka.
  3. Install the Camel K operator and Camel K CLI as described in Installing Camel K.
  4. Install the AMQ streams operator:

    1. From any project, select Operators > OperatorHub.
    2. In the Filter by Keyword field, type AMQ Streams.
    3. Click the Red Hat Integration - AMQ Streams card and then click Install.

      The Install Operator page opens.

    4. Accept the defaults and then click Install.
  5. Select Operators > Installed Operators to verify that the Camel K and AMQ Streams operators are installed.

5.1.1.2. Setting up a Kafka topic with AMQ Streams

A Kafka topic provides a destination for the storage of data in a Kafka instance. You must set up a Kafka topic before you can send data to it.

Prerequisites

  • You can access an OpenShift cluster.
  • You installed the Red Hat Integration - Camel K and Red Hat Integration - AMQ Streams operators as described in Preparing your OpenShift cluster.
  • You installed the OpenShift CLI (oc) and the Camel K CLI (kamel).

Procedure

To set up a Kafka topic by using AMQ Streams:

  1. Log in to your OpenShift cluster’s web console.
  2. Select Projects and then click the project in which you installed the Red Hat Integration - AMQ Streams operator. For example, click the my-camel-k-kafka project.
  3. Select Operators > Installed Operators and then click Red Hat Integration - AMQ Streams.
  4. Create a Kafka cluster:

    1. Under Kafka, click Create instance.
    2. Type a name for the cluster, for example kafka-test.
    3. Accept the other defaults and then click Create.

      The process to create the Kafka instance might take a few minutes to complete.

      When the status is ready, continue to the next step.

  5. Create a Kafka topic:

    1. Select Operators > Installed Operators and then click Red Hat Integration - AMQ Streams.
    2. Under Kafka Topic, click Create Kafka Topic.
    3. Type a name for the topic, for example test-topic.
    4. Accept the other defaults and then click Create.

5.1.2. Setting up Kafka by using OpenShift streams

Red Hat OpenShift Streams for Apache Kafka is a managed cloud service that simplifies the process of running Apache Kafka.

To use OpenShift Streams for Apache Kafka, you must be logged into your Red Hat account.

5.1.2.1. Preparing your OpenShift cluster for OpenShift Streams

To use the Red Hat OpenShift Streams for Apache Kafka managed cloud service, you must install the following operators and tools:

  • OpenShift Application Services (RHOAS) CLI - Allows you to manage your application services from a terminal.
  • Red Hat Integration - Camel K operator Installs and manages Camel K - a lightweight integration framework that runs natively in the cloud on OpenShift.
  • Camel K CLI tool - Allows you to access all Camel K features.

Prerequisites

  • You are familiar with Apache Kafka concepts.
  • You can access an OpenShift 4.6 (or later) cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and Apache Camel K CLI on your local system.
  • You installed the OpenShift CLI tool (oc) so that you can interact with the OpenShift cluster at the command line.

Procedure

  1. Log in to your OpenShift web console with a cluster admin account.
  2. Create the OpenShift project for your Camel K or Kamelets application.

    1. Select Home > Projects.
    2. Click Create Project.
    3. Type the name of the project, for example my-camel-k-kafka, then click Create.
  3. Download and install the RHOAS CLI as described in Getting started with the rhoas CLI.
  4. Install the Camel K operator and Camel K CLI as described in Installing Camel K.
  5. To verify that the Red Hat Integration - Camel K operator is installed, click Operators > Installed Operators.

5.1.2.2. Setting up a Kafka topic with RHOAS

Kafka organizes messages around topics. Each topic has a name. Applications send messages to topics and retrieve messages from topics. A Kafka topic provides a destination for the storage of data in a Kafka instance. You must set up a Kafka topic before you can send data to it.

Prerequisites

  • You can access an OpenShift cluster with the correct access level, the ability to create projects and install operators, and the ability to install the OpenShift and the Camel K CLI on your local system.
  • You installed the OpenShift CLI (oc) , the Camel K CLI (kamel) , and RHOAS CLI (rhoas) tools as described in Preparing your OpenShift cluster.
  • You installed the Red Hat Integration - Camel K operator as described in Preparing your OpenShift cluster.
  • You are logged in to the Red Hat Cloud site.

Procedure

To set up a Kafka topic by using Red Hat OpenShift Streams for Apache Kafka:

  1. From the command line, log in to your OpenShift cluster.
  2. Open your project, for example:

    oc project my-camel-k-kafka

  3. Verify that the Camel K operator is installed in your project:

    oc get csv

    The result lists the Red Hat Camel K operator and indicates that it is in the Succeeded phase.

  4. Prepare and connect a Kafka instance to RHOAS:

    1. Login to the RHOAS CLI by using this command:

      rhoas login

    2. Create a kafka instance, for example kafka-test:

      rhoas kafka create kafka-test

      The process to create the Kafka instance might take a few minutes to complete.

  5. To check the status of your Kafka instance:

    rhoas status

    You can also view the status in the web console:

    https://cloud.redhat.com/application-services/streams/kafkas/

    When the status is ready, continue to the next step.

  6. Create a new Kafka topic:

    rhoas kafka topic create --name test-topic

  7. Connect your Kafka instance (cluster) with the Openshift Application Services instance:

    rhoas cluster connect

  8. Follow the script instructions for obtaining a credential token.

    You should see output similar to the following:

    Token Secret "rh-cloud-services-accesstoken-cli" created successfully
    Service Account Secret "rh-cloud-services-service-account" created successfully
    KafkaConnection resource "kafka-test" has been created
    KafkaConnection successfully installed on your cluster.

5.1.2.3. Obtaining Kafka credentials

To connect your applications or services to a Kafka instance, you must first obtain the following Kafka credentials:

  • Obtain the bootstrap URL.
  • Create a service account with credentials (username and password).

For OpenShift Streams, the authentication protocol is SASL_SSL.

Prerequisite

  • You have created a Kafka instance, and it has a ready status.
  • You have created a Kafka topic.

Procedure

  1. Obtain the Kafka Broker URL (Bootstrap URL):

    rhoas status

    This command returns output similar to the following:

      Kafka
      ---------------------------------------------------------------
      ID:                     1ptdfZRHmLKwqW6A3YKM2MawgDh
      Name:                   my-kafka
      Status:                 ready
      Bootstrap URL:        my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443
  2. To obtain a username and password, create a service account by using the following syntax:

    rhoas service-account create --name "<account-name>" --file-format json

    Note

    When creating a service account, you can choose the file format and location to save the credentials. For more information, type rhoas service-account create --help

    For example:

    rhoas service-account create --name "my-service-acct" --file-format json

    The service account is created and saved to a JSON file.

  3. To verify your service account credentials, view the credentials.json file:

    cat credentials.json

    This command returns output similar to the following:

    {"clientID":"srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094", "password":"facf3df1-3c8d-4253-aa87-8c95ca5e1225"}
  4. Grant permission for sending and receiving messages to or from the Kakfa topic. Use the following command, where clientID is the value provided in the credentials.json file (from Step 3).

    rhoas kafka acl grant-access --producer --consumer --service-account $CLIENT_ID --topic test-topic --group all

    For example:

    rhoas kafka acl grant-access --producer --consumer --service-account srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094 --topic test-topic --group all

5.1.2.4. Creating a secret by using the SASL/Plain authentication method

You can create a secret with the credentials that you obtained (Kafka bootstrap URL, service account ID, and service account secret).

Procedure

  1. Edit the application.properties file and add the Kafka credentials.

    application.properties file

    camel.component.kafka.brokers = <YOUR-KAFKA-BOOTSTRAP-URL-HERE>
    camel.component.kafka.security-protocol = SASL_SSL
    camel.component.kafka.sasl-mechanism = PLAIN
    camel.component.kafka.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR-SERVICE-ACCOUNT-ID-HERE>' password='<YOUR-SERVICE-ACCOUNT-SECRET-HERE>';
    consumer.topic=<TOPIC-NAME>
    producer.topic=<TOPIC-NAME>

  2. Run the following command to create a secret that contains the sensitive properties in the application.properties file:

    oc create secret generic kafka-props --from-file application.properties

    You use this secret when you run a Camel K integration.

5.1.2.5. Creating a secret by using the SASL/OAUTHBearer authentication method

You can create a secret with the credentials that you obtained (Kafka bootstrap URL, service account ID, and service account secret).

Procedure

  1. Edit the application-oauth.properties file and add the Kafka credentials.

    application-oauth.properties file

    camel.component.kafka.brokers = <YOUR-KAFKA-BOOTSTRAP-URL-HERE>
    camel.component.kafka.security-protocol = SASL_SSL
    camel.component.kafka.sasl-mechanism = OAUTHBEARER
    camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.client.id='<YOUR-SERVICE-ACCOUNT-ID-HERE>' \
    oauth.client.secret='<YOUR-SERVICE-ACCOUNT-SECRET-HERE>' \
    oauth.token.endpoint.uri="https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token" ;
    camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    
    consumer.topic=<TOPIC-NAME>
    producer.topic=<TOPIC-NAME>

  2. Run the following command to create a secret that contains the sensitive properties in the application.properties file:

    oc create secret generic kafka-props --from-file application-oauth.properties

    You use this secret when you run a Camel K integration.

5.2. Running a Kafka integration

Running a producer integration

  1. Create a sample producer integration. This fills the topic with a message, every 10 seconds.

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.kafka.KafkaConstants;
    
    public class SaslSSLKafkaProducer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
      log.info("About to start route: Timer -> Kafka ");
      from("timer:foo")
        .routeId("FromTimer2Kafka")
        .setBody()
          .simple("Message #${exchangeProperty.CamelTimerCounter}")
        .to("kafka:{{producer.topic}}")
        .log("Message correctly sent to the topic!");
      }
    }

  2. Then run the procedure integration.

    kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev

    The producer will create a new message and push into the topic and log some information.

    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,973 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:12,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:13,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!

Running a consumer integration

  1. Create a consumer integration.

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    
    public class SaslSSLKafkaConsumer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
    	log.info("About to start route: Kafka -> Log ");
    	from("kafka:{{consumer.topic}}")
        .routeId("FromKafka2Log")
        .log("${body}");
      }
    }

  2. Open another shell and run the consumer integration using the command:

    kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev

    A consumer will start logging the events found in the Topic:

    [1] 2021-05-06 08:51:08,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
    [1] 2021-05-06 08:51:10,065 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
    [1] 2021-05-06 08:51:10,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
    [1] 2021-05-06 08:51:11,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11