2.5.2. 为特定 Kafka 主题过滤事件数据

如果您使用一个源 Kamelet,它会将记录扩展到多个不同的 Kafka 主题,并想将记录过滤到一个 Kafka 主题,请添加 topic-name-matches-filter-action Kamelet 作为 Kamelet 中的 intermediary 步骤。

先决条件

  • 您已在 YAML 文件中创建了一个 Kamelet Binding。
  • 您知道要从中过滤事件数据的 Kafka 主题的名称。

流程

  1. 编辑 Kamelet Binding,将 topic-name-matches-filter-action Kamelet 作为源与 sink Kamelets 之间的中间步骤包括在内。

    通常,您可以使用 kafka-source Kamelet,作为源 Kamelet,您提供一个主题作为所需 主题 参数的值。

    在以下 Kamelet Binding 示例中,kafka-source Kamelet 指定 test-topic、test-topic-2 和 test-topic-3 Kafka 主题和 topic-name-matches-filter-action Kamelet 指定来过滤来自 topic-test 主题的事件数据:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: kafka-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: kafka-source
        properties:
          bootstrapServers: "broker.url:9092"
          password: "testpassword"
          topic: "test-topic, test-topic-2, test-topic-3"
          user: "testuser"
          securityProtocol: "PLAINTEXT" // only for AMQ streams
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
            showStreams: true

    如果要过滤来自 kafka-source Kamelet 以外的源 Kamelet 的主题,您必须提供 Kafka 主题信息。您可以使用 insert-header-action Kamelet 添加 Kafka topic 字段作为中间步骤,然后再在 Kamelet Binding 中的 topic-name-matches-filter-action 步骤之前,如下例所示:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log-by-topic
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
    steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: insert-header-action
        properties:
          name:  "KAFKA.topic"
          value:  "test-topic"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: topic-name-matches-filter-action
        properties:
          regex:  "test-topic"
    sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
        properties:
          showStreams: true
  2. 保存 Kamelet Binding YAML 文件。