5.2.5. 部署示例 KafkaConnector 资源
AMQ Streams 在 example
。这会创建一个基本的 /connect/source-connector.yaml 中包含一个 KafkaConnect
or 示例FileStreamSourceConnector
实例,它将 Kafka 许可证文件的每一行(一个文件源示例)发送到单个 Kafka 主题。
这个步骤描述了如何创建:
-
从 Kafka 许可证文件(源)读取数据的
FileStreamSourceConnector
,并将数据作为消息写入 Kafka 主题。 -
从 Kafka 主题读取信息并将消息写入临时文件( sink)的
FileStreamSinkConnector
。
在生产环境中,您要准备包含所需 Kafka Connect 连接器的容器镜像,如 第 5.2.3 节 “使用连接器插件扩展 Kafka Connect” 所述。
提供 FileStreamSourceConnector
和 FileStreamSinkConnector
作为示例。如此处所述,在容器中运行这些连接器不太可能适合生产用例。
先决条件
- Kafka Connect 部署
- KafkaConnectors 在 Kafka Connect 部署中启用
- Cluster Operator 正在运行
流程
编辑 example
/connect/source-connector.yaml
文件:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
在 OpenShift 集群中创建源
KafkaConnector
:oc apply -f examples/connect/source-connector.yaml
创建一个
示例/connect/sink-connector.yaml
文件:touch examples/connect/sink-connector.yaml
将以下 YAML 粘贴到
sink-connector.yaml
文件中:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector 1 tasksMax: 2 config: 2 file: "/tmp/my-file" 3 topics: my-topic 4
在 OpenShift 集群中创建 sink
KafkaConnector
:oc apply -f examples/connect/sink-connector.yaml
检查是否创建了连接器资源:
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connector
将 MY-CONNECT-CLUSTER 替换为您的 Kafka 连接集群。
在容器中,执行
kafka-console-consumer.sh
读取由源连接器写入主题的消息:oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
源和接收器连接器配置选项
连接器配置在 KafkaConnector 资源的 spec.config
属性中定义。
FileStreamSourceConnector
和 FileStreamSinkConnector
类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。
表 5.2. FileStreamSource
连接器类的配置选项
名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
| 字符串 | null | 可将消息写入到的源文件。如果没有指定,则使用标准输入。 |
| list | null | 发布数据的 Kafka 主题。 |
表 5.3. FileStreamSinkConnector
类的配置选项
名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
| 字符串 | null | 要写入消息的目标文件。如果没有指定,则使用标准输出。 |
| list | null | 一个或多个 Kafka 主题,从中读取数据。 |
| 字符串 | null | 与一个或多个 Kafka 主题匹配以从中读取数据的正则表达式。 |
其它资源