2.4. Kamelet Binding での Kafka トピックのデータシンクへの接続

Kafka トピックをデータシンクに接続するには、図 2.3 にあるように Kamelet Binding を作成します。

Connecting a Kafka topic to a data sink 図 2.3 Kafka トピックのデータシンクへの接続

前提条件

  • イベントの送信元となる Kafka トピックの名前を知っておく必要があります。この手順の例では、イベントを送信するために test-topic を使用します。これは、Kamelet Binding でのデータソースの Kafka トピックへの接続 でコーヒーソースからイベントを受信するのに使用したトピックと同じです。
  • Kafka インスタンスの以下のパラメーターの値を知っている必要があります。

    • bootstrapServers: Kafka Broker URL のコンマ区切りリスト
    • password: Kafka に対して認証を行うためのパスワード。
    • user: Kafka に対して認証するユーザー名。

      OpenShift Streams の使用時にこれらの値を取得する方法は、Kafka クレデンシャルの取得 を参照してください。

  • Kafka ブローカーとの通信のセキュリティープロトコルを把握している。OpenShift Streams の Kafka クラスターでは、SASL_SSL (デフォルト) です。AMQ Streams 上の Kafka クラスターでは、PLAINTEXT になります。
  • Camel K インテグレーションに追加する Kamelets と必要なインスタンスパラメーターを知っている必要があります。この手順の Kamelets の例は、Katmelet Catalog に記載されています。

    • kafka-source Kamelet: このバインディングでは Kafka トピックがデータを送信するため (データプロデューサー)、kafka-source Kamelet を使用します。必須パラメーターの値の例は次のとおりです。

      • bootstrapServers - "broker.url:9092"
      • password - "testpassword"
      • user - "testuser"
      • topic - "test-topic"
      • securityProtocol: OpenShift Streams の Kafka クラスターの場合には、SASL_SSL がデフォルト値であるため、このパラメーターを設定する必要はありません。AMQ ストリームの Kafka クラスターの場合は、このパラメーターの値は "PLAINTEXT" です。
    • log-sink Kamelet: log-sink を使用して、kafka-source Kamelet から受信するデータをログに記録します。必要に応じて、showStreams パラメーターを指定して、データのメッセージボディーを表示します。log-sink Kamelet は、デバッグに役立ちます。

手順

Kafka トピックをデータシンクに接続するには、Kamelet Binding を作成します。

  1. 任意のエディターで、以下の基本構造で YAML ファイルを作成します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. Kamelet Binding の名前を追加します。この例では、バインディングが kafka-source Kamelet を log-sink Kamelet に接続するため、名前は kafka-to-log になります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
      sink:
  3. Kamelet Binding のソースの場合は、kafka-source Kamelet を指定し、そのパラメーターを設定します。

    たとえば、Kafka クラスターが OpenShift Streams にある場合 (securityProtocol パラメーターを設定する必要はありません)。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
      sink:

    たとえば、Kafka クラスターが AMQ Streams にある場合は、securityProtocol パラメーターを "PLAINTEXT" に設定する必要があります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT"
      sink:
  4. Kamelet Binding のシンクでは、データコンシューマー Kamelet (例: log-sink Kamelet) を指定し、Kamelet のパラメーターを設定します。以下に例を示します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  5. YAML ファイルを保存します (例: kafka-to-log.yaml)。
  6. OpenShift プロジェクトにログインします。
  7. Kamelet Binding をリソースとして OpenShift namespace に追加します。

    oc apply -f <kamelet binding filename>

    以下に例を示します。

    oc apply -f kafka-to-log.yaml

    Camel K Operator は、KameletBinding リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。

  8. KameletBinding リソースのステータスを表示するには、次のコマンドを実行します。

    oc get kameletbindings

  9. インテグレーションの状態を表示するには、以下を実行します。

    oc get integrations

  10. インテグレーションのログを表示するには、以下を実行します。

    kamel logs <integration> -n <project>

    以下に例を示します。

    kamel logs kafka-to-log -n my-camel-k-kafka

    この出力では、以下の例のようにコーヒーイベントが表示されるはずです。

    INFO  [log-sink-E80C5C904418150-0000000000000001] (Camel (camel-1) thread #0 - timer://tick) {"id":7259,"uid":"a4ecb7c2-05b8-4a49-b0d2-d1e8db5bc5e2","blend_name":"Postmodern Symphony","origin":"Huila, Colombia","variety":"Kona","notes":"delicate, chewy, black currant, red apple, star fruit","intensifier":"balanced"}
  11. 実行中のインテグレーションを停止するには、関連付けられた Kamelet Binding リソースを削除します。

    oc delete kameletbindings/<kameletbinding-name>

    以下に例を示します。

    oc delete kameletbindings/kafka-to-log