6.2. Knative Kafka の設定

Knative Kafka は、OpenShift Serverless でサポートされているバージョンの Apache Kafka メッセージストリーミングプラットフォームを使用する統合オプションを提供します。Kafka は、イベントソース、チャネル、ブローカー、およびイベントシンク機能のオプションを提供します。

OpenShift Serverless のコアインストールの一部として提供される Knative Eventing コンポーネントの他に、クラスター管理者は KnativeKafka カスタムリソース (CR) をインストールできます。

注記

現時点で、Knative Kafka は IBM Z および IBM Power Systems ではサポートされていません。

KnativeKafka CR は、ユーザーに以下のような追加オプションを提供します。

  • Kafka ソース
  • Kafka チャネル
  • Kafka ブローカー
  • Kafka シンク

6.2.1. Knative Kafka のインストール

Knative Kafka は、OpenShift Serverless でサポートされているバージョンの Apache Kafka メッセージストリーミングプラットフォームを使用する統合オプションを提供します。KnativeKafka カスタムリソースをインストールしている場合、Knative Kafka 機能は OpenShift Serverless インストールで使用できます。

前提条件

  • OpenShift Serverless Operator および Knative Eventing がクラスターにインストールされていること。
  • Red Hat AMQ Streams クラスターにアクセスできる。
  • 検証手順を使用する場合は、OpenShift CLI (oc) をインストールします。
  • OpenShift Container Platform のクラスター管理者パーミッションがある。
  • OpenShift Container Platform Web コンソールにログインしている。

手順

  1. Administrator パースペクティブで、OperatorsInstalled Operators に移動します。
  2. ページ上部の Project ドロップダウンメニューが Project: knative-eventing に設定されていることを確認します。
  3. OpenShift Serverless Operator の Provided APIs の一覧で Knative Kafka ボックスを見つけ、Create Instance をクリックします。
  4. Create Knative Kafka ページで KnativeKafka オブジェクトを設定します。

    重要

    クラスターで Kafka チャネル、ソース、ブローカー、またはシンクを使用するには、使用するオプションの 有効な スイッチを true に切り替える必要があります。これらのスイッチは、デフォルトで false に設定されます。さらに、Kafka チャネル、ブローカー、またはシンクを使用するには、ブートストラップサーバーを指定する必要があります。

    KnativeKafka カスタムリソースの例

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
        name: knative-kafka
        namespace: knative-eventing
    spec:
        channel:
            enabled: true 1
            bootstrapServers: <bootstrap_servers> 2
        source:
            enabled: true 3
        broker:
            enabled: true 4
            defaultConfig:
                bootstrapServers: <bootstrap_servers> 5
                numPartitions: <num_partitions> 6
                replicationFactor: <replication_factor> 7
        sink:
            enabled: true 8

    1
    開発者はクラスターで KafkaChannel チャネルを使用できます。
    2
    AMQ Streams クラスターからのブートストラップサーバーのコンマ区切りの一覧。
    3
    開発者はクラスターで KafkaSource イベントソースタイプを使用できます。
    4
    開発者はクラスターで Knative Kafka ブローカー実装を使用できます。
    5
    Red Hat AMQ Streams クラスターからのブートストラップサーバーのコンマ区切りリスト。
    6
    Broker オブジェクトでサポートされる Kafka トピックのパーティション数を定義します。デフォルトは 10 です。
    7
    Broker オブジェクトでサポートされる Kafka トピックのレプリケーション係数を定義します。デフォルトは 3 です。
    8
    開発者がクラスター内で Kafka シンクを使用できるようにします。
    注記

    replicationFactor の値は、Red Hat AMQ Streams クラスターのノード数以下である必要があります。

    1. KnativeKafka オブジェクトの作成を完全に制御する必要がない単純な設定に、このフォームの使用が推奨されます。
    2. KnativeKafka オブジェクトの作成を完全に制御する必要のあるより複雑な設定には、YAML の編集が推奨されます。YAML にアクセスするには、Create Knative Kafka ページの右上にある Edit YAML リンクをクリックします。
  5. Kafka のオプションの設定が完了したら、Create をクリックします。Knative Kafka タブに自動的にダイレクトされます。ここで、knative-kafka はリソースの一覧にあります。

検証

  1. Knative Kafka タブで knative-kafka リソースをクリックします。Knative Kafka Overview ページに自動的にダイレクトされます。
  2. リソースの Conditions (状態) の一覧を表示し、それらのステータスが True であることを確認します。

    Kafka Knative Overview page showing Conditions

    状態のステータスが Unknown または False である場合は、ページを更新するためにしばらく待機します。

  3. Knative Eventing リソースが作成されていることを確認します。

    $ oc get pods -n knative-eventing

    出力例

    NAME                                        READY   STATUS    RESTARTS   AGE
    kafka-broker-dispatcher-7769fbbcbb-xgffn    2/2     Running   0          44s
    kafka-broker-receiver-5fb56f7656-fhq8d      2/2     Running   0          44s
    kafka-channel-dispatcher-84fd6cb7f9-k2tjv   2/2     Running   0          44s
    kafka-channel-receiver-9b7f795d5-c76xr      2/2     Running   0          44s
    kafka-controller-6f95659bf6-trd6r           2/2     Running   0          44s
    kafka-source-dispatcher-6bf98bdfff-8bcsn    2/2     Running   0          44s
    kafka-webhook-eventing-68dc95d54b-825xs     2/2     Running   0          44s

6.2.2. Knative Kafka のセキュリティー設定

Kafka クラスターは、通常、TLS または SASL 認証方法を使用して保護されます。TLS または SASL を使用して、保護された Red Hat AMQ Streams クラスターに対して動作するように Kafka ブローカーまたはチャネルを設定できます。

注記

Red Hat は、SASL と TLS の両方を一緒に有効にすることをお勧めします。

6.2.2.1. Kafka ブローカーの TLS 認証の設定

Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Knative Kafka のトラフィック暗号化でサポートされている唯一の方法です。

前提条件

  • OpenShift Container Platform のクラスター管理者パーミッションがある。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • .pem ファイルとして Kafka クラスター CA 証明書が保存されている。
  • Kafka クラスタークライアント証明書とキーが .pem ファイルとして保存されている。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. 証明書ファイルを knative-eventing namespace にシークレットファイルとして作成します。

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SSL \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    キー名に ca.crtuser.crt、および user.key を使用します。これらの値は変更しないでください。

  2. KnativeKafka CR を編集し、broker 仕様にシークレットへの参照を追加します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

6.2.2.2. Kafka ブローカーの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform のクラスター管理者パーミッションがある。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効にされている場合、Kafka クラスターの ca.crt 証明書ファイルも必要になります。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. 証明書ファイルを knative-eventing namespace にシークレットファイルとして作成します。

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SASL_SSL \
      --from-literal=sasl.mechanism=<sasl_mechanism> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=user="my-sasl-user"
    • キー名に ca.crtpassword、および sasl.mechanism を使用します。これらの値は変更しないでください。
    • パブリック CA 証明書で SASL を使用する場合は、シークレットの作成時に ca.crt 引数ではなく tls.enabled=true フラグを使用する必要があります。以下に例を示します。

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. KnativeKafka CR を編集し、broker 仕様にシークレットへの参照を追加します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

6.2.2.3. Kafka チャネルの TLS 認証の設定

Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Knative Kafka のトラフィック暗号化でサポートされている唯一の方法です。

前提条件

  • OpenShift Container Platform のクラスター管理者パーミッションがある。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • .pem ファイルとして Kafka クラスター CA 証明書が保存されている。
  • Kafka クラスタークライアント証明書とキーが .pem ファイルとして保存されている。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. 選択された namespace にシークレットとして証明書ファイルを作成します。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    キー名に ca.crtuser.crt、および user.key を使用します。これらの値は変更しないでください。

  2. KnativeKafka カスタムリソースの編集を開始します。

    $ oc edit knativekafka
  3. シークレットおよびシークレットの namespace を参照します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注記

    ブートストラップサーバーで一致するポートを指定するようにしてください。

    以下に例を示します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: tls-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9094
        enabled: true
      source:
        enabled: true

6.2.2.4. Kafka チャネルの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform のクラスター管理者パーミッションがある。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効にされている場合、Kafka クラスターの ca.crt 証明書ファイルも必要になります。
  • OpenShift CLI (oc) をインストールしている。

手順

  1. 選択された namespace にシークレットとして証明書ファイルを作成します。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \
      --from-literal=user="my-sasl-user"
    • キー名に ca.crtpassword、および sasl.mechanism を使用します。これらの値は変更しないでください。
    • パブリック CA 証明書で SASL を使用する場合は、シークレットの作成時に ca.crt 引数ではなく tls.enabled=true フラグを使用する必要があります。以下に例を示します。

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. KnativeKafka カスタムリソースの編集を開始します。

    $ oc edit knativekafka
  3. シークレットおよびシークレットの namespace を参照します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: <kafka_auth_secret>
        authSecretNamespace: <kafka_auth_secret_namespace>
        bootstrapServers: <bootstrap_servers>
        enabled: true
      source:
        enabled: true
    注記

    ブートストラップサーバーで一致するポートを指定するようにしてください。

    以下に例を示します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      channel:
        authSecretName: scram-user
        authSecretNamespace: kafka
        bootstrapServers: eventing-kafka-bootstrap.kafka.svc:9093
        enabled: true
      source:
        enabled: true

6.2.2.5. Kafka ソースの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効にされている場合、Kafka クラスターの ca.crt 証明書ファイルも必要になります。
  • OpenShift (oc) CLI がインストールされている。

手順

  1. 選択された namespace にシークレットとして証明書ファイルを作成します。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL タイプは PLAINSCRAM-SHA-256、または SCRAM-SHA-512 です。
  2. Kafka ソースを作成または変更して、次の spec 設定が含まれるようにします。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    Red Hat OpenShift Streams for Apache Kafka などのパブリッククラウド Kafka サービスを使用している場合は、caCert 仕様は必要ありません。

6.2.2.6. Kafka シンクのセキュリティーの設定

Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Knative Kafka のトラフィック暗号化でサポートされている唯一の方法です。

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソース (CR) は OpenShift Container Platform クラスターにインストールされます。
  • Kafka シンクは KnativeKafka CR で有効になっています。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • .pem ファイルとして Kafka クラスター CA 証明書が保存されている。
  • Kafka クラスタークライアント証明書とキーが .pem ファイルとして保存されている。
  • OpenShift (oc) CLI がインストールされている。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。

手順

  1. KafkaSink オブジェクトと同じ namespace に証明書ファイルをシークレットとして作成します。

    重要

    証明書とキーは PEM 形式である必要があります。

    • 暗号化なしで SASL を使用した認証の場合:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
    • SASL を使用した認証と TLS を使用した暗号化の場合:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 1
        --from-literal=user=<username> \
        --from-literal=password=<password>
      1
      Red Hat OpenShift Streams for Apache Kafka などのパブリッククラウドで管理される Kafka サービスを使用している場合は、システムのルート CA セットを使用するために ca.crt を省略できます。
    • TLS を使用した認証と暗号化の場合:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ 1
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      1
      Red Hat OpenShift Streams for Apache Kafka などのパブリッククラウドで管理される Kafka サービスを使用している場合は、システムのルート CA セットを使用するために ca.crt を省略できます。
  2. KafkaSink オブジェクトを作成または変更し、auth 仕様にシークレットへの参照を追加します。

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
  3. KafkaSink オブジェクトを適用します。

    $ oc apply -f <filename>

6.2.3. Kafka ブローカー構成の設定

設定マップを作成し、Kafka Broker オブジェクトでこの ConfigMap を参照することで、レプリケーション係数、ブートストラップサーバー、および Kafka ブローカーのトピックパーティションの数を設定できます。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソース (CR) は OpenShift Container Platform クラスターにインストールされます。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. kafka-broker-config ConfigMap を変更するか、以下の設定が含まれる独自の ConfigMap を作成します。

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: <config_map_name> 1
      namespace: <namespace> 2
    data:
      default.topic.partitions: <integer> 3
      default.topic.replication.factor: <integer> 4
      bootstrap.servers: <list_of_servers> 5
    1
    ConfigMap 名。
    2
    ConfigMap が存在する namespace。
    3
    Kafka ブローカーのトピックパーティションの数。これは、イベントをブローカーに送信する速度を制御します。パーティションが多い場合には、コンピュートリソースが多く必要です。
    4
    トピックメッセージのレプリケーション係数。これにより、データ損失を防ぐことができます。レプリケーション係数を増やすには、より多くのコンピュートリソースとストレージが必要になります。
    5
    ブートストラップサーバーのコンマ区切りリスト。これは、OpenShift Container Platform クラスターの内部または外部にある可能性があり、ブローカーがイベントを受信してイベントを送信する Kafka クラスターのリストです。
    重要

    default.topic.replication.factor の値は、クラスター内の Kafka ブローカーインスタンスの数以下である必要があります。たとえば、Kafka ブローカーが 1 つしかない場合には、default.topic.replication.factor の値は "1" を超える値にすることはできません。

    Kafka ブローカーの ConfigMap の例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-broker-config
      namespace: knative-eventing
    data:
      default.topic.partitions: "10"
      default.topic.replication.factor: "3"
      bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

  2. ConfigMap を適用します。

    $ oc apply -f <config_map_filename>
  3. Kafka Broker オブジェクトの ConfigMap を指定します。

    Broker オブジェクトの例

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      name: <broker_name> 1
      namespace: <namespace> 2
      annotations:
        eventing.knative.dev/broker.class: Kafka 3
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: <config_map_name> 4
        namespace: <namespace> 5
    ...

    1
    ブローカー名。
    2
    ブローカーが存在する namespace。
    3
    ブローカークラスアノテーション。この例では、ブローカーはクラス値 Kafka を使用する Kafka ブローカーです。
    4
    ConfigMap 名。
    5
    ConfigMap が存在する namespace。
  4. ブローカーを適用します。

    $ oc apply -f <broker_filename>

6.2.4. 関連情報