2.3. Kamelet Bindingでのデータソースの Kafka トピックへの接続

データソースを Kafka トピックに接続するには、図 2.2で説明されているように Kamelet Bindingを作成します。

Connecting a data source to a Kafka topic 図 2.2 データソースの Kafka トピックへの接続

前提条件

  • イベントの送信先となる Kafka トピックの名前を知っておく必要があります。

    この手順の例では、イベントを受信するために test-topic を使用します。

  • Kafka インスタンスの以下のパラメーターの値を知っている必要があります。

    • bootstrapServers: Kafka Broker URL のコンマ区切りリスト
    • password: Kafka に対して認証を行うためのパスワード。OpenShift Streams では、これは credentials.json ファイル passwordです。AMQ Streams の認証されていない kafka インスタンスでは、任意の空でない文字列を指定できます。
    • user: Kafka に対して認証するユーザー名。OpenShift Streams では、これは credentials.json ファイル clientIDです。AMQ Streams の認証されていない kafka インスタンスでは、任意の空でない文字列を指定できます。

      OpenShift Streams の使用時にこれらの値を取得する方法は、「 Kafka クレデンシャルの取得 」を参照してください。

    • securityProtocol: Kafka ブローカーと通信するためのセキュリティープロトコルを知っている必要があります。OpenShift Streams の Kafka クラスターでは、SASL_SSL (デフォルト) です。AMQ Streams上の Kafka クラスターでは、PLAINTEXT になります。
  • Camel K インテグレーションに追加する Kamelets と必要なインスタンスパラメーターを知っている必要があります。

    この手順の Kamelets の例は次のとおりです。

    • coffee-source Kamelet:各イベントを送信する頻度を指定するオプションのパラメーター period があります。Example source Kamelet から coffee-source.kamelet.yaml という名前のファイルにコードをコピーしてから、以下のコマンドを実行してこれをリソースとして namespace に追加できます。

      oc apply -f coffee-source.kamelet.yaml

    • Kamelet Catalog で提供される kafka-sink Kamelet。Kafka トピックがこのバインディングでデータ(データコンシューマー)を受信しているため、kafka-sink Kamelet を使用します。

手順

データソースを Kafka トピックに接続するには、Kamelet Bindingを作成します。

  1. 任意のエディターで、以下の基本構造で YAML ファイルを作成します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name:
    spec:
      source:
      sink:
  2. Kamelet Bindingの名前を追加します。この例では、バインディングが coffee-source Kamelet を kafka-sink Kamelet に接続するため、名前は coffees-to-kafka になります。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
      sink:
  3. Kamelet Bindingのソースの場合は、データソース Kamelet を指定し (たとえば、coffee-source Kamelet はコーヒーに関するデータが含まれるイベントを生成します)、Kamelet のパラメーターを設定します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffees-to-kafka
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
  4. Kamelet Binding のシンクの場合は、kafka-sink Kamelet およびその必要なプロパティを指定します。

    たとえば、Kafka クラスターが OpenShift Streams にある場合、以下を実行します。

    • user プロパティーに clientID を指定します(例: srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094)。
    • password プロパティーでは、パスワード を指定します(例: facf3df1-3c8d-4253-aa87-8c95ca5e1225)。
    • securityProtocol プロパティーを設定する必要はありません。

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "my-kafka--ptdfzrhmlkwqw-a-ykm-mawgdh.kafka.devshift.org:443"
            password: "facf3df1-3c8d-4253-aa87-8c95ca5e1225"
            topic: "test-topic"
            user: "srvc-acct-eb575691-b94a-41f1-ab97-50ade0cd1094"

      別の例として、Kafka クラスターが AMQ Streams にある場合は、securityProtocol プロパティーを "PLAINTEXT" に設定します。

      apiVersion: camel.apache.org/v1alpha1
      kind: KameletBinding
      metadata:
        name: coffees-to-kafka
      spec:
        source:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: coffee-source
          properties:
            period: 5000
        sink:
          ref:
            kind: Kamelet
            apiVersion: camel.apache.org/v1alpha1
            name: kafka-sink
          properties:
            bootstrapServers: "broker.url:9092"
            password: "testpassword"
            topic: "test-topic"
            user: "testuser"
            securityProtocol: "PLAINTEXT"
  5. YAML ファイルを保存します(例:coffees-to-kafka.yaml)。
  6. OpenShift プロジェクトにログインします。
  7. Kamelet Binding をリソースとして OpenShift namespace に追加します。

    oc apply -f <kamelet binding filename>

    以下に例を示します。

    oc apply -f coffees-to-kafka.yaml

    Camel K Operator は、KameletBinding リソースを使用して Camel K インテグレーションを生成し、実行します。ビルドに数分かかる場合があります。

  8. KameletBinding リソースのステータスを表示するには、次のコマンドを実行します。

    oc get kameletbindings

  9. インテグレーションの状態を表示するには、以下を実行します。

    oc get integrations

  10. インテグレーションのログを表示するには、以下を実行します。

    kamel logs <integration> -n <project>

    以下に例を示します。

    kamel logs coffees-to-kafka -n my-camel-k-kafka