2.5.2. 特定の Kafka トピックのイベントデータの絞り込み

多くの異なる Kafka トピックにレコードを生成するソース Kamelet を使用し、レコードを 1 つの Kafka トピックに絞り込む場合は、Kamelet Binding の中間ステップとして topic-name-matches-filter-action Kamelet を追加します。

前提条件

  • YAML ファイルに Kamelet Binding を作成している。
  • イベンデータを絞り込む Kafka トピックの名前を知っておく必要があります。

手順

  1. Kamelet Binding を編集して、ソースとシンク Kamelets の間の中間ステップとして topic-name-matches-filter-action Kamelet を追加します。

    通常、ソース Kamelet として kafka-source Kamelet を使用し、トピックを必要な topic パラメーターの値として指定します。

    以下の Kamelet Binding の例では、kafka-source Kamelet は test-topic、test-topic-2、および test-topic-3 Kafka トピックを指定し、topic-name-matches-filter-action Kamelet は、topic-test トピックからイベントデータをフィルターするように指定します。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic, test-topic-2, test-topic-3"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
            showStreams: true

    kafka-source Kamelet 以外のソース Kamelet からのトピックをフィルターリングする場合は、Kafka トピック情報を指定する必要があります。以下の例のように、insert-header-action Kamelet を使用して、Kamelet Binding の topic-name-matches-filter-action ステップの前に Kafka トピックフィールドを中間ステップとして追加できます。

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: insert-header-action
        properties:
          name:  "KAFKA.topic"
          value:  "test-topic"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  2. Kamelet Binding YAML ファイルを保存します。