5.2.5. 部署示例 KafkaConnector 资源

AMQ Streams 在 example /connect/source-connector.yaml 中包含一个 KafkaConnect or 示例。这会创建一个基本的 FileStreamSourceConnector 实例,它将 Kafka 许可证文件的每一行(一个文件源示例)发送到单个 Kafka 主题。

这个步骤描述了如何创建:

  • 从 Kafka 许可证文件(源)读取数据的 FileStreamSourceConnector,并将数据作为消息写入 Kafka 主题。
  • 从 Kafka 主题读取信息并将消息写入临时文件( sink)的 FileStreamSinkConnector
注意

在生产环境中,您要准备包含所需 Kafka Connect 连接器的容器镜像,如 第 5.2.3 节 “使用连接器插件扩展 Kafka Connect” 所述。

提供 FileStreamSourceConnectorFileStreamSinkConnector 作为示例。如此处所述,在容器中运行这些连接器不太可能适合生产用例。

先决条件

流程

  1. 编辑 example /connect/source-connector.yaml 文件:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector 1
      labels:
        strimzi.io/cluster: my-connect-cluster 2
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
      tasksMax: 2 4
      config: 5
        file: "/opt/kafka/LICENSE" 6
        topic: my-topic 7
        # ...
    1
    KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
    2
    用于在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到与其链接的 Kafka Connect 集群相同的命名空间。
    3
    连接器类的全名或别名.这应该存在于由 Kafka Connect 集群使用的镜像中。
    4
    连接器可以创建的 Kafka Connect 任务 的最大数量。
    5
    连接器配置,作为键值对.
    6
    这个示例源连接器配置从 /opt/kafka/LICENSE 文件中读取数据。
    7
    将源数据发布到的 Kafka 主题。
  2. 在 OpenShift 集群中创建源 KafkaConnector

    oc apply -f examples/connect/source-connector.yaml
  3. 创建一个 示例/connect/sink-connector.yaml 文件:

    touch examples/connect/sink-connector.yaml
  4. 将以下 YAML 粘贴到 sink-connector.yaml 文件中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 1
      tasksMax: 2
      config: 2
        file: "/tmp/my-file" 3
        topics: my-topic 4
    1
    连接器类的全名或别名.这应该存在于由 Kafka Connect 集群使用的镜像中。
    2
    连接器配置,作为键值对.
    3
    发布源数据的临时文件。
    4
    从中读取源数据的 Kafka 主题。
  5. 在 OpenShift 集群中创建 sink KafkaConnector

    oc apply -f examples/connect/sink-connector.yaml
  6. 检查是否创建了连接器资源:

    oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name
    
    my-source-connector
    my-sink-connector

    MY-CONNECT-CLUSTER 替换为您的 Kafka 连接集群。

  7. 在容器中,执行 kafka-console-consumer.sh 读取由源连接器写入主题的消息:

    oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
源和接收器连接器配置选项

连接器配置在 KafkaConnector 资源的 spec.config 属性中定义。

FileStreamSourceConnectorFileStreamSinkConnector 类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。

表 5.2. FileStreamSource 连接器类的配置选项

名称类型默认值描述

file

字符串

null

可将消息写入到的源文件。如果没有指定,则使用标准输入。

主题

list

null

发布数据的 Kafka 主题。

表 5.3. FileStreamSinkConnector 类的配置选项

名称类型默认值描述

file

字符串

null

要写入消息的目标文件。如果没有指定,则使用标准输出。

主题

list

null

一个或多个 Kafka 主题,从中读取数据。

topics.regex

字符串

null

与一个或多个 Kafka 主题匹配以从中读取数据的正则表达式。