2.5. Apache Kafka のソース

Apache Kafka クラスターからイベントを読み取り、これらのイベントをシンクに渡す Apache Kafka ソースを作成できます。Kafka ソースを作成するには、OpenShift Container Platform Web コンソールの Knative (kn) CLI を使用するか、KafkaSource オブジェクトを YAML ファイルとして直接作成し、OpenShift CLI (oc) を使用して適用します。

2.5.1. Web コンソールを使用した Apache Kafka イベントソースの作成

Apache Kafka の Knative ブローカー実装がクラスターにインストールされたら、Web コンソールを使用して Apache Kafka ソースを作成できます。OpenShift Container Platform Web コンソールを使用すると、Kafka ソースを作成するための合理的で直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • Web コンソールにログインしている。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。

手順

  1. Developer パースペクティブで、+Add ページに移動し、Event Source を選択します。
  2. Event Sources ページで、Type セクションの Kafka Source を選択します。
  3. Kafka Source 設定を設定します。

    1. ブートストラップサーバー のコンマ区切りのリストを追加します。
    2. トピック のコンマ区切りのリストを追加します。
    3. コンシューマーグループ を追加します。
    4. 作成したサービスアカウントの Service Account Name を選択します。
    5. Target セクションで、イベントシンクを選択します。これは Resource または URI のいずれかです。

      1. Resource を選択して、チャネル、ブローカー、またはサービスをイベントソースのシンクとして使用します。
      2. URI を選択して、イベントのルーティング先となる URI (Uniform Resource Identifier) を指定します。
    6. Kafka イベントソースの Name を入力します。
  4. Create をクリックします。

検証

Topology ページを表示して、Kafka イベントソースが作成され、シンクに接続されていることを確認できます。

  1. Developer パースペクティブで、Topology に移動します。
  2. Kafka イベントソースおよびシンクを表示します。

    Topology ビューでの Kafka ソースおよびサービスの表示

2.5.2. Knative CLI を使用した Apache Kafka イベントソースの作成

kn source kafka create コマンドを使用し、Knative (kn) CLI を使用して Kafka ソースを作成できます。Knative CLI を使用してイベントソースを作成すると、YAML ファイルを直接変更するよりも合理化された直感的なユーザーインターフェイスが提供されます。

前提条件

  • OpenShift Serverless Operator、Knative Eventing、Knative Serving、および KnativeKafka カスタムリソース (CR) がクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • Knative (kn) CLI がインストールされている。
  • オプション: この手順で検証ステップを使用する場合は、OpenShift CLI (oc) をインストールしている。

手順

  1. Kafka イベントソースが機能していることを確認するには、受信メッセージをサービスのログにダンプする Knative サービスを作成します。

    $ kn service create event-display \
        --image quay.io/openshift-knative/showcase
  2. KafkaSource CR を作成します。

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display
    注記

    このコマンドのプレースホルダー値は、ソース名、ブートストラップサーバー、およびトピックの値に置き換えます。

    --servers--topics、および --consumergroup オプションは、Kafka クラスターへの接続パラメーターを指定します。--consumergroup オプションは任意です。

  3. オプション: 作成した KafkaSource CR の詳細を表示します。

    $ kn source kafka describe <kafka_source_name>

    出力例

    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h

検証手順

  1. Kafka インスタンスをトリガーし、メッセージをトピックに送信します。

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    プロンプトにメッセージを入力します。このコマンドは、以下を前提とします。

    • Kafka クラスターが kafka namespace にインストールされている。
    • KafkaSource オブジェクトが my-topic トピックを使用するように設定されている。
  2. ログを表示して、メッセージが到達していることを確認します。

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container

    出力例

    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

2.5.2.1. Knative CLI シンクフラグ

Knative (kn) CLI を使用してイベントソースを作成する場合は、--sink フラグを使用して、そのリソースからイベントが送信されるシンクを指定できます。シンクは、他のリソースから受信イベントを受信できる、アドレス指定可能または呼び出し可能な任意のリソースです。

以下の例では、サービスの http://event-display.svc.cluster.local をシンクとして使用するシンクバインディングを作成します。

シンクフラグを使用したコマンドの例

$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ 1
  --ce-override "sink=bound"

1
http://event-display.svc.cluster.localsvc は、シンクが Knative サービスであることを判別します。他のデフォルトのシンクの接頭辞には、channel および broker が含まれます。

2.5.3. YAML を使用した Apache Kafka イベントソースの作成

YAML ファイルを使用して Knative リソースを作成する場合は、宣言的 API を使用するため、再現性の高い方法でアプリケーションを宣言的に記述できます。YAML を使用して Kafka ソースを作成するには、KafkaSource オブジェクトを定義する YAML ファイルを作成し、oc apply コマンドを使用してそれを適用する必要があります。

前提条件

  • OpenShift Serverless Operator、Knative Serving、および KnativeKafka カスタムリソースがクラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • インポートする Kafka メッセージを生成する Red Hat AMQ Streams (Kafka) クラスターにアクセスできる。
  • OpenShift CLI (oc) がインストールされている。

手順

  1. KafkaSource オブジェクトを YAML ファイルとして作成します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> 1
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> 2
      sink:
      - <list_of_sinks> 3
    1
    コンシューマーグループは、同じグループ ID を使用し、トピックからデータを消費するコンシューマーのグループです。
    2
    トピックは、データの保存先を提供します。各トピックは、1 つまたは複数のパーティションに分割されます。
    3
    シンクは、イベントがソースから送信される場所を指定します。
    重要

    OpenShift Serverless 上の KafkaSource オブジェクトの API の v1beta1 バージョンのみがサポートされます。非推奨となった v1alpha1 バージョンの API は使用しないでください。

    KafkaSource オブジェクトの例

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

  2. KafkaSource YAML ファイルを適用します。

    $ oc apply -f <filename>

検証

  • 以下のコマンドを入力して、Kafka イベントソースが作成されたことを確認します。

    $ oc get pods

    出力例

    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

2.5.4. Apache Kafka ソースの SASL 認証の設定

Simple Authentication and Security Layer (SASL) は、Apache Kafka が認証に使用します。クラスターで SASL 認証を使用する場合、ユーザーは Kafka クラスターと通信するために Knative に認証情報を提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • OpenShift Container Platform でクラスターまたは専用の管理者パーミッションを持っている。
  • OpenShift Serverless Operator、Knative Eventing、および KnativeKafka CR は、OpenShift Container Platform クラスターにインストールされている。
  • OpenShift Container Platform でアプリケーションおよび他のワークロードを作成するために、プロジェクトを作成しているか、適切なロールおよびパーミッションを持つプロジェクトにアクセスできる。
  • Kafka クラスターのユーザー名およびパスワードがある。
  • 使用する SASL メカニズムを選択している (例: PLAINSCRAM-SHA-256、または SCRAM-SHA-512)。
  • TLS が有効になっている場合は、Kafka クラスターの ca.crt 証明書ファイルがある。
  • OpenShift (oc) CLI がインストールされている。

手順

  1. 選択された namespace にシークレットとして証明書ファイルを作成します。

    $ oc create secret -n <namespace> generic <kafka_auth_secret> \
      --from-file=ca.crt=caroot.pem \
      --from-literal=password="SecretPassword" \
      --from-literal=saslType="SCRAM-SHA-512" \ 1
      --from-literal=user="my-sasl-user"
    1
    SASL タイプは PLAINSCRAM-SHA-256、または SCRAM-SHA-512 です。
  2. Kafka ソースを作成または変更して、次の spec 設定が含まれるようにします。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: user
          password:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: password
          type:
            secretKeyRef:
              name: <kafka_auth_secret>
              key: saslType
        tls:
          enable: true
          caCert: 1
            secretKeyRef:
              name: <kafka_auth_secret>
              key: ca.crt
    ...
    1
    パブリッククラウドの Kafka サービスを使用している場合は、caCert 仕様は必要ありません。