4.2.2. 使用连接器插件扩展 Kafka Connect
Kafka Connect 的 AMQ Streams 容器镜像包括两个内置文件连接器,用于将基于文件的数据移入或移出 Kafka 集群。
表 4.1. 文件连接器
| 文件连接器 | 描述 |
|---|---|
|
| 从文件(源)传输数据到 Kafka 集群。 |
|
| 将数据从 Kafka 集群传输到文件(sink)。 |
Cluster Operator 也可以使用您创建的镜像将 Kafka Connect 集群部署到 OpenShift 集群。
本节中的步骤显示如何通过以下方法为连接器镜像添加您自己的连接器类:
4.2.2.1. 从 Kafka Connect 基础镜像创建 Docker 镜像
此流程演示了如何创建自定义镜像并将其添加到 /opt/kafka/plugins 目录中。
您可以使用 Red Hat Ecosystem Catalog 上的 Kafka 容器镜像作为基础镜像,以通过额外的连接器插件创建您自己的自定义镜像。
在启动时,Kafka Connect 的 AMQ Streams 版本加载 /opt/kafka/plugins 目录中包含的任何第三方连接器插件。
步骤
使用
registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7作为基础镜像创建新的Dockerfile:FROM registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001插件文件示例
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
- 构建容器镜像。
- 将自定义镜像推送到容器 registry。
指向新容器映像。
您可以:
编辑
KafkaConnect.spec.image属性,即KafkaConnect自定义资源。如果设置,此属性会覆盖 Cluster Operator 中的
STRIMZI_KAFKA_CONNECT_IMAGES变量。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... image: my-new-container-image 2 config: 3 #...
或
-
在
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml文件中编辑STRIMZI_KAFKA_CONNECT_IMAGES变量以指向新容器镜像,然后重新安装 Cluster Operator。
其他资源
如需了解更多相关信息,请参阅 OpenShift 指南中的使用 AMQ Streams :