4.6. Apache Kafka の Knative ブローカー実装

実稼働環境対応の Knative Eventing デプロイメントの場合、Red Hat は Apache Kafka に Knative ブローカー実装を使用することを推奨します。ブローカーは、Knative ブローカーの Apache Kafka ネイティブ実装であり、CloudEvents を Kafka インスタンスに直接送信します。

Kafka ブローカーは、イベントを保存してルーティングできるように Kafka とネイティブに統合されています。これにより、他のブローカータイプよりもブローカーとトリガーモデルの Kafka との統合性が向上し、ネットワークホップを削減することができます。Knative ブローカー実装のその他の利点は次のとおりです。

  • 少なくとも 1 回の配信保証
  • CloudEvents パーティショニング拡張機能に基づくイベントの順序付き配信
  • コントロールプレーンの高可用性
  • 水平方向にスケーラブルなデータプレーン

Apache Kafka の Knative ブローカー実装は、バイナリーコンテンツモードを使用して、受信した CloudEvent を Kafka レコードとして保存します。これは、CloudEvent のすべての属性と拡張機能が Kafka レコードのヘッダーとしてマップされ、CloudEvent の data 仕様が Kafka レコードの値に対応することを意味します。

4.6.1. デフォルトのブローカータイプとして設定されていない場合の Apache Kafka ブローカーの作成

OpenShift Serverless デプロイメントがデフォルトのブローカータイプとして Kafka ブローカーを使用するように設定されていない場合は、以下の手順のいずれかを使用して、Kafka ベースのブローカーを作成できます。

4.6.1.1. YAML を使用した Apache Kafka ブローカーの作成

YAML ファイルを使用して Knative リソースを作成する場合は、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述できます。YAML を使用して Kafka ブローカーを作成するには、Broker オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースが OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. Kafka ベースのブローカーを YAML ファイルとして作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka 1
      name: example-kafka-broker
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config 2
        namespace: knative-eventing
    1
    ブローカークラス。指定されていないと、ブローカーはクラスター管理者の設定に従ってデフォルトクラスを使用します。Kafka ブローカーを使用するには、この値を Kafka にする必要があります。
    2
    Apache Kafka の Knative ブローカーのデフォルトの Config Map 。この Config Map は、クラスター管理者がクラスター上で Kafka ブローカー機能を有効にした場合に作成されます。
  2. Kafka ベースのブローカー YAML ファイルを適用します。

    $ oc apply -f <filename>

4.6.1.2. 外部で管理される Kafka トピックを使用する Apache Kafka ブローカーの作成

独自の内部トピックの作成を許可せずに Kafka ブローカーを使用する場合は、代わりに外部で管理される Kafka トピックを使用できます。これを実行するには、kafka.eventing.knative.dev/external.topic アノテーションを使用する Kafka Broker オブジェクトを作成する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースが OpenShift Container Platform クラスターにインストールされている。
  • Red Hat AMQ Streams などの Kafka インスタンスにアクセスでき、Kafka トピックを作成している。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. Kafka ベースのブローカーを YAML ファイルとして作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka 1
        kafka.eventing.knative.dev/external.topic: <topic_name> 2
    ...
    1
    ブローカークラス。指定されていないと、ブローカーはクラスター管理者の設定に従ってデフォルトクラスを使用します。Kafka ブローカーを使用するには、この値を Kafka にする必要があります。
    2
    使用する Kafka トピックの名前。
  2. Kafka ベースのブローカー YAML ファイルを適用します。

    $ oc apply -f <filename>

4.6.1.3. 分離されたデータプレーンのある Apache Kafka の Knative Broker 実装

重要

分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能としてのみ提供されます。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。

Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

Apache Kafka の Knative Broker 実装には 2 つのプレーンがあります。

コントロールプレーン
Kubernetes API と通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで設定されます。
データプレーン
受信イベントをリッスンし、Apache Kafka と通信し、イベントをイベントシンクに送信するコンポーネントのコレクション。Apache Kafka データプレーンの Knative Broker 実装は、イベントが送信される場所です。この実装は、kafka-broker-receiver および kafka-broker-dispatcher デプロイメントで設定されます。

Kafka の Broker クラスを設定する場合、Apache Kafka の Knative Broker 実装は共有データプレーンを使用します。つまり、knative-eventing namespace の kafka-broker-receiver および kafka-broker-dispatcher デプロイメントがクラスター内のすべての Apache Kafka Broker に使用されます。

ただし、KafkaNamespaced の Broker クラスを設定すると、Apache Kafka ブローカーコントローラーは、ブローカーが存在する namespace ごとに新しいデータプレーンを作成します。このデータプレーンは、その namespace のすべての KafkaNamespaced ブローカーによって使用されます。これにより、データプレーンが分離されるため、ユーザーの namespace の kafka-broker-receiver および kafka-broker-dispatcher デプロイメントは、その namespace のブローカーに対してのみ使用されます。

重要

データプレーンを分離した結果、このセキュリティー機能はより多くのデプロイメントを作成し、より多くのリソースを使用します。このような分離要件がない限り、Kafka のクラスで 通常 の Broker を使用します。

4.6.1.4. 分離されたデータプレーンを使用する Apache Kafka の Knative ブローカーの作成

重要

分離されたデータプレーンを使用した Apache Kafka の Knative Broker 実装は、テクノロジープレビュー機能としてのみ提供されます。テクノロジープレビュー機能は、Red Hat 製品のサービスレベルアグリーメント (SLA) の対象外であり、機能的に完全ではないことがあります。Red Hat は、実稼働環境でこれらを使用することを推奨していません。テクノロジープレビュー機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。

Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

KafkaNamespaced ブローカーを作成するには、eventing.knative.dev/broker.class アノテーションを KafkaNamespaced に設定する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソースが OpenShift Container Platform クラスターにインストールされている。
  • Red Hat AMQ Streams などの Apache Kafka インスタンスにアクセスでき、Kafka トピックを作成している。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. YAML ファイルを使用して Apache Kafka ベースのブローカーを作成します。

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: KafkaNamespaced 1
      name: default
      namespace: my-namespace 2
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: my-config 3
    ...
    1
    分離されたデータプレーンで Apache Kafka ブローカーを使用するには、ブローカークラスの値は KafkaNamespaced である必要があります。
    2 3
    参照される ConfigMap オブジェクトの my-config は、Broker オブジェクトと同じ namespace (この場合は my-namespace) に存在する必要があります。
  2. Apache Kafka ベースのブローカー YAML ファイルを適用します。

    $ oc apply -f <filename>
重要

spec.configConfigMap オブジェクトは Broker オブジェクトと同じ namespace にある必要があります。

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

KafkaNamespaced クラスで最初の Broker オブジェクトを作成すると、kafka-broker-receiver および kafka-broker-dispatcher デプロイメントが namespace に作成されます。その後、同じ namespace 内で KafkaNamespaced クラスが含まれる全ブローカーにより、同じデータプレーンが使用されます。KafkaNamespaced クラスを持つブローカーが namespace に存在しない場合は、namespace のデータプレーンが削除されます。

4.6.2. Apache Kafka ブローカー設定

Config Map を作成し、Kafka Broker オブジェクトでこの ConfigMap を参照することで、レプリケーション係数、ブートストラップサーバー、および Kafka ブローカーのトピックパーティションの数を設定できます。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka カスタムリソース (CR) が OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. kafka-broker-config ConfigMap を変更するか、以下の設定が含まれる独自の ConfigMap を作成します。

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: <config_map_name> 1
      namespace: <namespace> 2
    data:
      default.topic.partitions: <integer> 3
      default.topic.replication.factor: <integer> 4
      bootstrap.servers: <list_of_servers> 5
    1
    ConfigMap 名。
    2
    ConfigMap が存在する namespace。
    3
    Kafka ブローカーのトピックパーティションの数。これは、イベントをブローカーに送信する速度を制御します。パーティションが多い場合には、コンピュートリソースが多く必要です。
    4
    トピックメッセージのレプリケーション係数。これにより、データ損失を防ぐことができます。レプリケーション係数を増やすには、より多くのコンピュートリソースとストレージが必要になります。
    5
    ブートストラップサーバーのコンマ区切りリスト。これは、OpenShift Container Platform クラスターの内部または外部にある可能性があり、ブローカーがイベントを受信してイベントを送信する Kafka クラスターのリストです。
    重要

    default.topic.replication.factor の値は、クラスター内の Kafka ブローカーインスタンスの数以下である必要があります。たとえば、Kafka ブローカーが 1 つしかない場合、default.topic.replication.factor の値は "1" より大きな値にすることはできません。

    Kafka ブローカーの ConfigMap の例

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-broker-config
      namespace: knative-eventing
    data:
      default.topic.partitions: "10"
      default.topic.replication.factor: "3"
      bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

  2. ConfigMap を適用します。

    $ oc apply -f <config_map_filename>
  3. Kafka Broker オブジェクトの ConfigMap を指定します。

    Broker オブジェクトの例

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      name: <broker_name> 1
      namespace: <namespace> 2
      annotations:
        eventing.knative.dev/broker.class: Kafka 3
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: <config_map_name> 4
        namespace: <namespace> 5
    ...

    1
    ブローカー名。
    2
    ブローカーが存在する namespace。
    3
    ブローカークラスアノテーション。この例では、ブローカーはクラス値 Kafka を使用する Kafka ブローカーです。
    4
    ConfigMap 名。
    5
    ConfigMap が存在する namespace。
  4. ブローカーを適用します。

    $ oc apply -f <broker_filename>

4.6.3. Apache Kafka の Knative ブローカー実装のセキュリティー設定

Kafka クラスターは、通常、TLS または SASL 認証方法を使用して保護されます。TLS または SASL を使用して、保護された Red Hat AMQ Streams クラスターに対して動作するように Kafka ブローカーまたはチャネルを設定できます。

注記

Red Hat は、SASL と TLS の両方を一緒に有効にすることを推奨します。

4.6.3.1. Apache Kafka ブローカーの TLS 認証の設定

Transport Layer Security (TLS) は、Apache Kafka クライアントおよびサーバーによって、Knative と Kafka 間のトラフィックを暗号化するため、および認証のために使用されます。TLS は、Apache Kafka の Knative ブローカー実装でサポートされている唯一のトラフィック暗号化方式です。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • .pem ファイルとして Kafka クラスター CA 証明書が保存されている。
  • Kafka クラスタークライアント証明書とキーが .pem ファイルとして保存されている。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. 証明書ファイルを knative-eventing namespace にシークレットファイルとして作成します。

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SSL \
      --from-file=ca.crt=caroot.pem \
      --from-file=user.crt=certificate.pem \
      --from-file=user.key=key.pem
    重要

    キー名に ca.crtuser.crt、および user.key を使用します。これらの値は変更しないでください。

  2. KnativeKafka CR を編集し、broker 仕様にシークレットへの参照を追加します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

4.6.3.2. Apache Kafka ブローカーの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効になっている場合は、Kafka クラスターの ca.crt 証明書ファイルがある。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. 証明書ファイルを knative-eventing namespace にシークレットファイルとして作成します。

    $ oc create secret -n knative-eventing generic <secret_name> \
      --from-literal=protocol=SASL_SSL \
      --from-literal=sasl.mechanism=<sasl_mechanism> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=user="my-sasl-user"
    • キー名に ca.crtpassword、および sasl.mechanism を使用します。これらの値は変更しないでください。
    • パブリック CA 証明書で SASL を使用する場合は、シークレットの作成時に ca.crt 引数ではなく tls.enabled=true フラグを使用する必要があります。以下に例を示します。

      $ oc create secret -n <namespace> generic <kafka_auth_secret> \
        --from-literal=tls.enabled=true \
        --from-literal=password="SecretPassword" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="my-sasl-user"
  2. KnativeKafka CR を編集し、broker 仕様にシークレットへの参照を追加します。

    apiVersion: operator.serverless.openshift.io/v1alpha1
    kind: KnativeKafka
    metadata:
      namespace: knative-eventing
      name: knative-kafka
    spec:
      broker:
        enabled: true
        defaultConfig:
          authSecretName: <secret_name>
    ...

4.6.4. 関連情報