第5章 Kafka に対する Camel K の認証

Apache Kafka に対して Camel K を認証できます。

以下の例では、Red Hat OpenShift Streams for Apache Kafka を使用して Kafka トピックを設定し、それを単純なプロデューサー/コンシューマーパターンのインテグレーションで使用する方法を実証しています。

5.1. Kafka の設定

Kafka を設定するには、必要な OpenShift Operator のインストール、Kafka インスタンスの作成、Kafka トピックの作成が必要になります。

以下の Red Hat 製品のいずれかを使用して Kafka を設定します。

  • Red Hat Advanced Message Queuing (AMQ) ストリーム: 自己管理の Apache Kafka オファリング。AMQ Streams はオープンソースの Strimzi をベースとしており、Red Hat Integration の一部として組み込まれています。AMQ Streams は、パブリッシュ/サブスクライブメッセージングブローカーが含まれる Apache Kafka をベースとした分散型でスケーラブルなストリーミングプラットフォームです。Kafka Connect は、Kafka ベースのシステムを外部システムと統合するフレームワークを提供します。Kafka Connect を使用すると、外部システムと Kafka ブローカーとの間で双方向にデータをストリーミングするように ソースおよびシンクコネクターを設定できます。
  • Red Hat OpenShift Streams for Apache Kafka - Apache Kafka の実行プロセスを簡素化するマネージドクラウドサービスです。これにより、新しいクラウドネイティブアプリケーションを構築、デプロイ、およびスケーリングする際、または既存システムを現代化する際に、効率的な開発者エクスペリエンスが提供されます。

5.1.1. AMQ Streams を使用した Kafka の設定

AMQ Streams は、OpenShift クラスターで Apache Kafka を実行するプロセスを簡素化します。

5.1.1.1. AMQ Streams の OpenShift クラスターの準備

Camel K または Kamelets および Red Hat AMQ Streams を使用するには、以下の Operator およびツールをインストールする必要があります。

  • Red Hat Integration - AMQ Streams Operator: OpenShift Cluster と AMQ Streams for Apache Kafka インスタンスの間の通信を管理します。
  • Red Hat Integration - Camel K Operator: Camel K (OpenShift のクラウドでネイティブに実行される軽量なインテグレーションフレームワーク) をインストールし、管理します。
  • Camel K CLI ツール: すべての Camel K 機能にアクセスできます。

前提条件

  • Apache Kafka の概念を理解している。
  • 適切なアクセスレベルで OpenShift 4.6 (またはそれ以降の) クラスターにアクセスできること。この場合、プロジェクトの作成および Operator のインストールができること。また、OpenShift および Camel K CLI をローカルシステムにインストールできること。
  • コマンドラインで OpenShift クラスターと対話できるように OpenShift CLI ツール (oc) をインストールしていること。

手順

AMQ Streams を使用して Kafka を設定するには、以下を行います。

  1. OpenShift クラスターの Web コンソールにログインします。
  2. インテグレーションを作成する予定のプロジェクト (例: my-camel-k-kafka) を作成または開きます。
  3. Installing Camel K の説明に従って、Camel K Operator および Camel K CLI をインストールします。
  4. AMQ Streams Operator をインストールします。

    1. 任意のプロジェクトから Operators > OperatorHub を選択します。
    2. Filter by Keyword フィールドに AMQ Streams を入力します。
    3. Red Hat Integration - AMQ Streams カードをクリックしてから Install をクリックします。

      Install Operator ページが開きます。

    4. デフォルトを受け入れ、Install をクリックします。
  5. Operators > Installed Operators を選択し、Camel K および AMQ Streams Operator がインストールされていることを確認します。

5.1.1.2. AMQ Streams を使用した Kafka トピックの設定

Kafka トピックは、Kafka インスタンスのデータの保存先を提供します。データを送信する前に、Kafka トピックを設定する必要があります。

前提条件

  • OpenShift クラスターにアクセスできる。
  • OpenShift クラスターの準備 の説明どおりに、Red Hat Integration - Camel K および Red Hat Integration - AMQ Streams Operator がインストールされている。
  • OpenShift CLI (oc) および Camel K CLI (kamel) をインストールしている。

手順

AMQ Streams を使用して Kafka トピックを設定するには、以下を行います。

  1. OpenShift クラスターの Web コンソールにログインします。
  2. Projects を選択してから、Red Hat Integration - AMQ Streams Operator をインストールしたプロジェクトをクリックします。たとえば、my-camel-k-kafka プロジェクトをクリックします。
  3. Operators > Installed Operators の順に選択し、Red Hat Integration - AMQ Streams をクリックします。
  4. Kafka クラスターを作成します。

    1. Kafka で、Create instance をクリックします。
    2. kafka-test などクラスターの名前を入力します。
    3. その他のデフォルトを受け入れ、Create をクリックします。

      Kafka インスタンスを作成するプロセスの完了に数分かかる場合があります。

      ステータスが ready になったら、次のステップに進みます。

  5. Kafka トピックを作成します。

    1. Operators > Installed Operators の順に選択し、Red Hat Integration - AMQ Streams をクリックします。
    2. Kafka TopicCreate Kafka Topic をクリックします。
    3. トピックの名前を入力します (例: test-topic)。
    4. その他のデフォルトを受け入れ、Create をクリックします。

5.1.2. OpenShift Streams を使用した Kafka の設定

Red Hat OpenShift Streams for Apache Kafka は、Apache Kafka の実行プロセスを簡素化する管理クラウドサービスです。

OpenShift Streams for Apache Kafka を使用するには、Red Hat アカウントにログインする必要があります。

5.1.2.1. OpenShift Streams の OpenShift クラスターの準備

Red Hat OpenShift Streams for Apache Kafka 管理クラウドサービスを使用するには、以下の Operator およびツールをインストールする必要があります。

  • OpenShift Application Services (RHOAS) CLI: ターミナルからアプリケーションサービスを管理できます。
  • Red Hat Integration - Camel K Operator は、Camel K (OpenShift のクラウドでネイティブに実行される軽量なインテグレーションフレームワーク) をインストールし、管理します。
  • Camel K CLI ツール: すべての Camel K 機能にアクセスできます。

前提条件

  • Apache Kafka の概念を理解している。
  • 適切なアクセスレベルで OpenShift 4.6 (またはそれ以降の) クラスターにアクセスできること。この場合、プロジェクトの作成および Operator のインストールができること。また、OpenShift および Apache Camel K CLI をローカルシステムにインストールできること。
  • コマンドラインで OpenShift クラスターと対話できるように OpenShift CLI ツール (oc) をインストールしていること。

手順

  1. クラスター管理者アカウントで OpenShift Web コンソールにログインします。
  2. Camel K または Kamelets アプリケーションの OpenShift プロジェクトを作成します。

    1. Home > Projects を選択します。
    2. Create Project をクリックします。
    3. プロジェクトの名前 (例: my-camel-k-kafka) を入力し、続いて Create をクリックします。
  3. Getting started with the rhoas CLI の説明に従って、RHOAS CLI をダウンロードし、インストールします。
  4. Installing Camel K の説明に従って、Camel K Operator および Camel K CLI をインストールします。
  5. Red Hat Integration - Camel K Operator がインストールされていることを確認するには、Operators > Installed Operators の順にクリックします。

5.1.2.2. RHOAS を使用した Kafka トピックの設定

Kafka は トピック に関するメッセージを整理します。各トピックには名前があります。アプリケーションは、トピックにメッセージを送信し、トピックからメッセージを取得します。Kafka トピックは、Kafka インスタンスのデータの保存先を提供します。データを送信する前に、Kafka トピックを設定する必要があります。

前提条件

  • 適切なアクセスレベルで OpenShift クラスターにアクセスできること。この場合、プロジェクトの作成および Operator のインストールができること。また、OpenShift および Camel K CLI をローカルシステムにインストールできること。
  • OpenShift クラスターの準備 の手順に従って、OpenShift CLI (oc)、Camel K CLI (kamel)、および RHOAS CLI (rhoas) ツールをインストールしている。
  • OpenShift クラスターの準備 の説明どおりに、Red Hat Integration - Camel K Operator がインストールされている。
  • Red Hat Cloud サイト にログインしている。

手順

Red Hat OpenShift Streams for Apache Kafka を使用して Kafka トピックを設定するには、以下を行います。

  1. コマンドラインから OpenShift クラスターにログインします。
  2. プロジェクトを開きます。以下に例を示します。

    oc project my-camel-k-kafka

  3. Camel K Operator がプロジェクトにインストールされていることを確認します。

    oc get csv

    結果には、Red Hat Camel K Operator が表示され、それが Succeeded フェーズにあることを示します。

  4. Kafka インスタンスを準備し、RHOAS に接続します。

    1. 以下のコマンドを使用して RHOAS CLI にログインします。

      rhoas login

    2. kafka-test などの kafka インスタンスを作成します。

      rhoas kafka create kafka-test

      Kafka インスタンスを作成するプロセスの完了に数分かかる場合があります。

  5. Kafka インスタンスのステータスを確認するには、以下を実行します。

    rhoas status

    Web コンソールでステータスを表示することもできます。

    https://cloud.redhat.com/application-services/streams/kafkas/

    ステータスが ready になったら、次のステップに進みます。

  6. 新しい Kafka トピックを作成します。

    rhoas kafka topic create --name test-topic

  7. Kafka インスタンス (クラスター) を Openshift Application Services インスタンスに接続します。

    rhoas cluster connect

  8. クレデンシャルトークンを取得するスクリプトの手順に従います。

    以下のような出力が表示されるはずです。

    Token Secret "rh-cloud-services-accesstoken-cli" created successfully
    Service Account Secret "rh-cloud-services-service-account" created successfully
    KafkaConnection resource "kafka-test" has been created
    KafkaConnection successfully installed on your cluster.

5.1.2.3. Kafka クレデンシャルの取得

アプリケーションまたはサービスを Kafka インスタンスに接続するには、まず以下の Kafka クレデンシャルを取得する必要があります。

  • ブートストラップ URL を取得します。
  • クレデンシャル (ユーザー名とパスワード) を使用してサービスアカウントを作成します。

OpenShift Streams では、認証プロトコルは SASL_SSL です。

前提条件

  • Kafka インスタンスを作成し、ステータスが ready である。
  • Kafka トピックを作成している。

手順

  1. Kafka ブローカーの URL (ブートストラップ URL) を取得します。

    rhoas status

    コマンドは、以下のような出力を返します。

      Kafka
      ---------------------------------------------------------------
      ID:                     1ptdfZRHmLKwqW6A3YKM2MawgDh
      Name:                   my-kafka
      Status:                 ready
      Bootstrap URL:        my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443
  2. ユーザー名とパスワードを取得するには、以下の構文を使用してサービスアカウントを作成します。

    rhoas service-account create --name "<account-name>" --file-format json

    注記

    サービスアカウントの作成時に、ファイル形式および場所を選択してクレデンシャルを保存できます。詳細は、rhoas service-account create --help コマンドを参照してください。

    以下に例を示します。

    rhoas service-account create --name "my-service-acct" --file-format json

    サービスアカウントが作成され、JSON ファイルに保存されます。

  3. サービスアカウントの認証情報を確認するには、credentials.json ファイルを表示します。

    cat credentials.json

    コマンドは、以下のような出力を返します。

    {"clientID":"srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094", "password":"facf3df1-3c8d-4253-aa87-8c95ca5e1225"}
  4. Kakfa トピックとの間でメッセージを送受信する権限を付与します。以下のコマンドを使用します。ここで、clientID は (ステップ 3 からの)credentials.json ファイルで指定される値に置き換えます。

    rhoas kafka acl grant-access --producer --consumer --service-account $CLIENT_ID --topic test-topic --group all

    以下に例を示します。

    rhoas kafka acl grant-access --producer --consumer --service-account srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094 --topic test-topic --group all

5.1.2.4. SASL/Plain 認証を使用したシークレットの作成

取得したクレデンシャル (Kafka ブートストラップ URL、サービスアカウント ID およびサービスアカウントのシークレット) を使用してシークレットを作成できます。

手順

  1. application.properties ファイルを編集し、Kafka 認証情報を追加します。

    application.properties ファイル

    camel.component.kafka.brokers = <YOUR-KAFKA-BOOTSTRAP-URL-HERE>
    camel.component.kafka.security-protocol = SASL_SSL
    camel.component.kafka.sasl-mechanism = PLAIN
    camel.component.kafka.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<YOUR-SERVICE-ACCOUNT-ID-HERE>' password='<YOUR-SERVICE-ACCOUNT-SECRET-HERE>';
    consumer.topic=<TOPIC-NAME>
    producer.topic=<TOPIC-NAME>

  2. 以下のコマンドを実行して、application.properties ファイルに機密プロパティーが含まれるシークレットを作成します。

    oc create secret generic kafka-props --from-file application.properties

    Camel K インテグレーションの実行時に、このシークレットを使用します。

5.1.2.5. SASL/OAUTHBearer 認証を使用したシークレットの作成

取得したクレデンシャル (Kafka ブートストラップ URL、サービスアカウント ID およびサービスアカウントのシークレット) を使用してシークレットを作成できます。

手順

  1. application-oauth.properties ファイルを編集し、Kafka 認証情報を追加します。

    application-oauth.properties ファイル

    camel.component.kafka.brokers = <YOUR-KAFKA-BOOTSTRAP-URL-HERE>
    camel.component.kafka.security-protocol = SASL_SSL
    camel.component.kafka.sasl-mechanism = OAUTHBEARER
    camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.client.id='<YOUR-SERVICE-ACCOUNT-ID-HERE>' \
    oauth.client.secret='<YOUR-SERVICE-ACCOUNT-SECRET-HERE>' \
    oauth.token.endpoint.uri="https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token" ;
    camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    
    consumer.topic=<TOPIC-NAME>
    producer.topic=<TOPIC-NAME>

  2. 以下のコマンドを実行して、application.properties ファイルに機密プロパティーが含まれるシークレットを作成します。

    oc create secret generic kafka-props --from-file application-oauth.properties

    Camel K インテグレーションの実行時に、このシークレットを使用します。