2.5. Kafka 接続内でのデータへの操作の適用

Kamelet と Kafka トピック間で渡されるデータで操作を実行する場合は、Kamelet Binding 内の中間ステップとして、アクション Kamelets を使用します。

2.5.1. 異なる宛先トピックへのイベントデータのルーティング

Kafka インスタンスへのコネクションを設定する場合、イベントが異なる Kafka トピックにルーティングされるように、オプションでイベントデータからのトピック情報を変換できます。以下の変換アクション Kamelets のいずれかを使用します。

  • Regex Router: 正規表現と代替文字列を使用してメッセージのトピックを変更します。たとえば、トピック接頭辞を削除する場合は、接頭辞を追加するか、トピック名の一部を削除します。Regex Router Action Kamelet (regex-router-action) を設定します。
  • TimeStamp: 元のトピックとメッセージのタイムスタンプに基づいてメッセージのトピックを変更します。たとえば、タイムスタンプに基づいて異なるテーブルまたはインデックスに書き込む必要があるシンクを使用する場合などです。たとえば、Kafka から Elasticsearch にイベントを書きみ、各イベントはイベント自体の情報に基づいて異なるインデックスに移動する必要がある場合。Timestamp Router Action Kamelet(timestamp-router-action) を設定します。
  • Message TimeStamp: 元のトピック値とメッセージ値フィールドからのタイムスタンプフィールドに基づいてメッセージのトピックを変更します。Message Timestamp Router Action Kamelet(message-timestamp-router-action) を設定します。
  • 述語: Predicate Filter Action Kamelet (predicate-filter-action) を設定して、指定の JSON パス式に基づいてイベントをフィルターします。

前提条件

手順

宛先トピックを変更するには、Kamelet Binding 内の中間ステップとして変更アクション Kamelets の 1 つを使用します。

アクション Kamelet を Kamelet Binding に追加する方法は、Kamelet Binding への操作の追加 を参照してください。

2.5.2. 特定の Kafka トピックのイベントデータの絞り込み

多くの異なる Kafka トピックにレコードを生成するソース Kamelet を使用し、レコードを 1 つの Kafka トピックに絞り込む場合は、Kamelet Binding の中間ステップとして topic-name-matches-filter-action Kamelet を追加します。

前提条件

  • YAML ファイルに Kamelet Binding を作成している。
  • イベンデータを絞り込む Kafka トピックの名前を知っておく必要があります。

手順

  1. Kamelet Binding を編集して、ソースとシンク Kamelets の間の中間ステップとして topic-name-matches-filter-action Kamelet を追加します。

    通常、ソース Kamelet として kafka-source Kamelet を使用し、トピックを必要な topic パラメーターの値として指定します。

    以下の Kamelet Binding の例では、kafka-source Kamelet は test-topic、test-topic-2、および test-topic-3 Kafka トピックを指定し、topic-name-matches-filter-action Kamelet は、topic-test トピックからイベントデータをフィルターするように指定します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic, test-topic-2, test-topic-3"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
            showStreams: true

    kafka-source Kamelet 以外のソース Kamelet からのトピックをフィルターリングする場合は、Kafka トピック情報を指定する必要があります。以下の例のように、insert-header-action Kamelet を使用して、Kamelet Binding の topic-name-matches-filter-action ステップの前に Kafka トピックフィールドを中間ステップとして追加できます。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: insert-header-action
        properties:
          name:  "KAFKA.topic"
          value:  "test-topic"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  2. Kamelet Binding YAML ファイルを保存します。