7.5. Kafka Connect 配置
使用 AMQ Streams 的 KafkaConnect 资源快速轻松地创建新的 Kafka Connect 集群。
当使用 KafkaConnect 资源部署 Kafka Connect 时,您可以指定 bootstrap 服务器地址(在 spec.bootstrapServers中)以连接到 Kafka 集群。当服务器停机时,您可以指定多个地址。您还指定身份验证凭据和 TLS 客户端证书,以建立安全连接。
Kafka 集群不需要由 AMQ Streams 管理,或部署到 OpenShift 集群。
您还可以使用 KafkaConnect 资源来指定以下内容:
- 构建包含插件的容器镜像的插件配置,以建立连接
- 属于 Kafka Connect 集群的 worker pod 配置
-
启用使用
KafkaConnector资源管理插件的注解
Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect 集群,以及利用 KafkaConnector 资源创建的连接器。
插件配置
插件提供创建连接器实例的实施。当插件实例化时,会为连接特定类型的外部数据系统提供配置。插件提供一组或者多个 JAR 文件,该文件定义了连接器和任务实施,以连接到指定类型数据源。许多外部系统的插件可用于 Kafka 连接。您还可以创建自己的插件。
配置描述了要发送到 Kafka Connect 的源输入数据和目标输出数据。对于源连接器,外部源数据必须引用要存储消息的特定主题。插件也可以包含转换数据所需的库和文件。
Kafka Connect 部署可以有一个或多个插件,但每个插件只能有一个版本。
您可以创建自定义 Kafka Connect 镜像,其中包括您选择的插件。您可以通过两种方式创建镜像:
要自动创建容器镜像,您可以使用 KafkaConnect 资源的 build 属性指定要添加到 Kafka Connect 集群中的插件。AMQ Streams 会自动下载插件工件并将其添加到新容器镜像中。
插件配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
# ...
build: 1
output: 2
type: docker
image: my-registry.io/my-org/my-connect-cluster:latest
pushSecret: my-registry-credentials
plugins: 3
- name: debezium-postgres-connector
artifacts:
- type: tgz
url: https://ARTIFACT-ADDRESS.tgz
sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
# ...
# ...
如果使用 Dockerfile 构建镜像,您可以使用 AMQ Streams 的最新容器镜像作为基础镜像来添加插件配置文件。
显示手动添加插件配置示例
FROM registry.redhat.io/amq7/amq-streams-kafka-33-rhel8:2.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
worker 的 Kafka Connect 集群配置
您可以在 KafkaConnect 资源的 config 属性中指定 worker 的配置。
分布式 Kafka Connect 集群有一个组 ID 和一组内部配置主题。
-
group.id -
offset.storage.topic -
config.storage.topic -
status.storage.topic
Kafka Connect 集群默认使用这些属性的值配置。Kafka Connect 集群无法共享组 ID 或主题名称,因为它将创建错误。如果使用多个不同的 Kafka Connect 集群,则每个创建的 Kafka Connect 集群的 worker 必须是唯一的。
每个 Kafka Connect 集群使用的连接器名称也必须是唯一的。
在以下示例中,指定了 JSON 转换器。Kafka Connect 使用的内部 Kafka 主题设置了复制因素。对于生产环境,至少应有 3 个。在创建主题后更改复制因素将无效。
worker 配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
config:
# ...
group.id: my-connect-cluster 1
offset.storage.topic: my-connect-cluster-offsets 2
config.storage.topic: my-connect-cluster-configs 3
status.storage.topic: my-connect-cluster-status 4
key.converter: org.apache.kafka.connect.json.JsonConverter 5
value.converter: org.apache.kafka.connect.json.JsonConverter 6
key.converter.schemas.enable: true 7
value.converter.schemas.enable: true 8
config.storage.replication.factor: 3 9
offset.storage.replication.factor: 3 10
status.storage.replication.factor: 3 11
# ...
- 1
- Kafka 中的 Kafka Connect 集群 ID。每个 Kafka Connect 集群都必须是唯一的。
- 2
- 存储连接器偏移的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 3
- 存储连接器和任务状态配置的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 4
- 存储连接器和任务状态更新的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 5
- 转换程序,将消息密钥转换为 Kafka 中存储的 JSON 格式。
- 6
- 转换程序,将消息值转换为 Kafka 中存储的 JSON 格式。
- 7
- 为将消息键转换为结构化 JSON 格式的 schema。
- 8
- 为将消息值转换为结构化 JSON 格式的 schema。
- 9
- 存储连接器偏移的 Kafka 主题的复制因素。
- 10
- 存储连接器和任务状态配置的 Kafka 主题的复制因素。
- 11
- 存储连接器和任务状态更新的 Kafka 主题的复制因素。
连接器的 KafkaConnector 管理
在将插件添加到用于部署中的 worker pod 的容器镜像后,您可以使用 AMQ Streams 的 KafkaConnector 自定义资源或 Kafka Connect API 来管理连接器实例。您还可以使用这些选项创建新的连接器实例。
KafkaConnector 资源提供了一种 OpenShift 原生的方法来管理 Cluster Operator 连接器。要使用 KafkaConnector 资源管理连接器,您必须在 KafkaConnect 自定义资源中指定注解。
启用 KafkaConnectors 的注解
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
# ...
将 use-connector-resources 设置为 true 可启用 KafkaConnectors 创建、删除和重新配置连接器。
如果在 KafkaConnect 配置中启用了 use-connector-resources,则必须使用 KafkaConnector 资源来定义和管理连接器。KafkaConnector 资源被配置为连接到外部系统。它们部署到与 Kafka Connect 集群和与外部数据系统交互的 Kafka 集群相同的 OpenShift 集群。
Kafka 组件在同一个 OpenShift 集群中包含
配置指定连接器实例如何连接到外部数据系统,包括任何身份验证。您还需要陈述要监视的数据。对于源连接器,您可能会在配置中提供数据库名称。您还可以通过指定目标主题名称指定数据在 Kafka 中的位置。
使用 tasksMax 指定最大任务数。例如,带有 tasksMax: 2 的源连接器可能会将源数据导入两个任务。
KafkaConnector 源连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
- 1
KafkaConnector资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。- 2
- 在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
- 3
- 连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
- 4
- 连接器可创建的最大 Kafka Connect 任务数量。
- 5
- 连接器配置 作为键值对。
- 6
- 外部数据文件的位置。在本例中,我们将
FileStreamSourceConnector配置为从/opt/kafka/LICENSE文件中读取。 - 7
- 将源数据发布到的 Kafka 主题。
您可以从 OpenShift Secret 或 ConfigMap 为连接器加载机密配置值。
Kafka Connect API
使用 Kafka Connect REST API 作为使用 KafkaConnector 资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083 的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。
您可以将连接器配置添加为 JSON 对象。
添加连接器配置的 curl 请求示例
curl -X POST \
http://my-connect-cluster-connect-api:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "my-source-connector",
"config":
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/opt/kafka/LICENSE",
"topic":"my-topic",
"tasksMax": "4",
"type": "source"
}
}'
如果启用了 KafkaConnectors,Cluster Operator 将恢复使用 Kafka Connect REST API 进行的手动更改。
REST API 支持的操作在 Apache Kafka 文档中 描述。
您可以在 OpenShift 外部公开 Kafka Connect API 服务。为此,您可以创建一个使用连接机制来提供访问的服务,如入口或路由。我们建议使用,因为连接是不安全的。