在 OpenShift 上部署和升级 AMQ 流
用于 OpenShift Container Platform 上的 AMQ Streams 1.6
摘要
第 1 章 部署概述
AMQ Streams 简化了在 OpenShift 集群中运行 Apache Kafka 的过程。
本指南提供有关用于部署和升级 AMQ Streams 的所有选项的说明,描述了部署的内容,以及在 OpenShift 集群中运行 Apache Kafka 所需的部署顺序。
除了说明部署步骤外,本指南还提供了部署前和部署后的说明,以准备和验证部署。介绍的其他部署选项包括引入指标的步骤。为 AMQ Streams 和 Kafka 升级提供了升级说明。
AMQ Streams 旨在处理所有类型的 OpenShift 集群,不论分发如何,从公共云和私有云到用于开发的本地部署。
1.1. AMQ 流如何支持 Kafka
AMQ Streams 提供容器镜像和 Operator,以便在 OpenShift 上运行 Kafka。AMQ Streams Operator 是运行 AMQ Streams 的基础。AMQ Streams 提供的 Operator 是专门构建的,具有可有效管理 Kafka 的专业操作知识。
Operator 简化了以下流程:
- 部署和运行 Kafka 集群
- 部署和运行 Kafka 组件
- 配置对 Kafka 的访问
- 保护对 Kafka 的访问
- 升级 Kafka
- 管理代理
- 创建和管理主题
- 创建和管理用户
1.2. AMQ Streams Operator
AMQ Streams 支持使用 Operator 的 Kafka 来部署和管理 Kafka 到 OpenShift 的组件和依赖项。
Operator 是一种打包、部署和管理 OpenShift 应用的方法。AMQ Streams Operator 扩展 OpenShift 功能,自动执行与 Kafka 部署相关的常见复杂任务。通过在代码中了解 Kafka 操作,Kafka 管理任务可以简化,无需人工干预。
Operator
AMQ Streams 提供 Operator 来管理在 OpenShift 集群中运行的 Kafka 集群。
- Cluster Operator
- 部署和管理 Apache Kafka 集群、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter 和 Entity Operator
- 实体 Operator
- 由主题 Operator 和 User Operator 组成
- 主题 Operator
- 管理 Kafka 主题
- User Operator
- 管理 Kafka 用户
Cluster Operator 可以与 Kafka 集群同时部署 Topic Operator 和 User Operator 作为 Entity Operator 配置的一部分。
AMQ Streams 架构中的 Operator
1.2.1. Cluster Operator
AMQ Streams 使用 Cluster Operator 来部署和管理集群:
- Kafka(包括 ZooKeeper、实体 Operator、Kafka Exporter 和 Cruise Control)
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
自定义资源用于部署集群。
例如,部署 Kafka 集群:
-
在
OpenShift 集群中创建了带有集群配置的 Kafka
资源。 -
Cluster Operator 根据 Kafka 资源中声明的内容部署对应的
Kafka
集群。
Cluster Operator 也可以部署(通过配置 Kafka
资源):
-
通过
KafkaTopic
自定义资源提供 operator 风格主题管理的主题 Operator -
一个 User Operator,通过
KafkaUser
自定义资源提供 operator 风格的用户管理
部署的 Entity Operator 中的 Topic Operator 和 User Operator 功能。
Cluster Operator 的架构示例
1.2.2. 主题 Operator
Topic Operator 提供了通过 OpenShift 资源管理 Kafka 集群中主题的方法。
Topic Operator 的架构示例
Topic Operator 的角色是保留一组 KafkaTopic
OpenShift 资源,描述与对应的 Kafka 主题同步中的 Kafka 主题。
特别是,如果 KafkaTopic
是:
- 创建,主题 Operator 会创建该主题
- 删除的,主题 Operator 会删除该主题
- 更改,主题 Operator 更新该主题
在另一个方向上工作,如果一个主题是:
-
在 Kafka 集群中创建,Operator 会创建一个
KafkaTopic
-
从 Kafka 集群中删除,Operator 会删除
KafkaTopic
-
在 Kafka 集群中更改,Operator 会更新
KafkaTopic
这可让您声明 KafkaTopic
作为应用程序部署的一部分,Topic Operator 将为您创建该主题。您的应用程序只需要处理从所需主题中产生或使用的内容。
如果该主题被重新配置或重新分配给不同的 Kafka 节点,KafkaTopic
将始终保持最新状态。
1.2.3. User Operator
User Operator 通过监视 Kafka 用户描述 Kafka 用户并确保在
Kafka 集群中正确配置了这些用户来管理 Kafka 集群的 Kafka 用户。
例如,如果 KafkaUser
是:
- 创建,User Operator 创建它描述的用户
- 删除时,User Operator 会删除它所描述的用户
- 更改后,User Operator 会更新它所描述的用户
与主题 Operator 不同,User Operator 不会将 Kafka 集群的任何更改与 OpenShift 资源同步。Kafka 主题可以直接由 Kafka 中的应用程序创建,但用户不必与 User Operator 并行直接在 Kafka 集群中管理。
User Operator 允许您将 KafkaUser
资源声明为应用程序部署的一部分。您可以为用户指定身份验证和授权机制。您还可以配置 用户配额 来控制对 Kafka 资源的使用,例如,确保用户不会专利对代理的访问。
创建用户时,会在 Secret
中创建用户凭据。您的应用需要使用用户 及其凭据进行身份验证,并生成或使用消息。
除了管理用于身份验证的凭证外,User Operator 还通过在 KafkaUser
声明中包含用户访问权限描述来管理授权规则。
1.3. AMQ Streams 自定义资源
使用 AMQ Streams 将 Kafka 组件部署到 OpenShift 集群可通过应用自定义资源进行高度配置。自定义资源作为自定义资源定义(CRD)添加的 API 实例创建,以扩展 OpenShift 资源。
CRD 用作描述 OpenShift 集群中自定义资源的配置说明,并为部署中使用的每个 Kafka 组件以及用户和主题提供 AMQ Streams。CRD 和自定义资源定义为 YAML 文件。AMQ Streams 发行版提供了示例 YAML 文件。
CRD 还允许 AMQ Streams 资源受益于原生 OpenShift 功能,如 CLI 访问和配置验证。
1.3.1. AMQ Streams 自定义资源示例
CRD 需要在集群中进行一次性安装,以定义用于实例化和管理 AMQ Streams 特定资源的 schema。
通过安装 CRD 在集群中添加新的自定义资源类型后,您可以根据具体规格创建资源实例。
根据集群设置,安装通常需要集群管理员特权。
管理自定义资源的访问权限仅限于 AMQ Streams 管理员。如需更多信息,请参阅 OpenShift 上部署和升级 AMQ 流指南中的指定 AMQ 流 管理员。
CRD 在 OpenShift 集群中定义 一种
新型资源,如 kind:Kafka
。
Kubernetes API 服务器允许基于 kind
创建自定义资源,并且从 CRD 中了解如何在添加到 OpenShift 集群时验证和存储自定义资源。
当 CRD 被删除时,该类型的自定义资源也会被删除。另外,自定义资源创建的资源也会被删除,如 pod 和 statefulsets。
每个 AMQ Streams 特定自定义资源都符合 CRD 为资源类型定义的 schema 。
AMQ Streams 组件的自定义资源具有常见的配置属性,这些属性在 spec
下定义。
要了解 CRD 和自定义资源之间的关系,让我们看一下 Kafka 主题的 CRD 示例。
Kafka 主题 CRD
apiVersion: kafka.strimzi.io/v1beta1 kind: CustomResourceDefinition metadata: 1 name: kafkatopics.kafka.strimzi.io labels: app: strimzi spec: 2 group: kafka.strimzi.io versions: v1beta1 scope: Namespaced names: # ... singular: kafkatopic plural: kafkatopics shortNames: - kt 3 additionalPrinterColumns: 4 # ... subresources: status: {} 5 validation: 6 openAPIV3Schema: properties: spec: type: object properties: partitions: type: integer minimum: 1 replicas: type: integer minimum: 1 maximum: 32767 # ...
- 1
- 主题 CRD 的元数据、名称和标识 CRD 的标签。
- 2
- 此 CRD 的规格,包括组(域)名称、复数名称和支持的 schema 版本,用于 URL 以访问该主题的 API。其他名称用于标识 CLI 中的实例资源。例如,oc
get kafkatopic my-topic
或oc get kafkatopics
。 - 3
- 可以在 CLI 命令中使用短名称。例如,
oc get kt
可用作缩写而不是oc get kafkatopic
。 - 4
- 在自定义资源上使用
get
命令时显示的信息。 - 5
- CRD 的当前状态,如资源的 schema 引用 中所述。
- 6
- OpenAPIV3Schema 验证提供了用于创建主题自定义资源的验证。例如,一个主题至少需要一个分区和一个副本。
您可以识别 AMQ Streams 安装文件提供的 CRD YAML 文件,因为文件名包含索引号后跟 'Crd'。
以下是 KafkaTopic
自定义资源的对应示例。
Kafka 主题自定义资源
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic 1 metadata: name: my-topic labels: strimzi.io/cluster: my-cluster 2 spec: 3 partitions: 1 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 status: conditions: 4 lastTransitionTime: "2019-08-20T11:37:00.706Z" status: "True" type: Ready observedGeneration: 1 / ...
自定义资源可以通过平台 CLI 应用到集群。创建自定义资源时,它将使用与 Kubernetes API 内置资源相同的验证。
创建 KafkaTopic
自定义资源后,Tpic Operator 将获得通知,并在 AMQ Streams 中创建对应的 Kafka 主题。
1.4. AMQ 流安装方法
在 OpenShift 上安装 AMQ Streams 有两种方法:
安装方法 | 描述 | 支持的版本 |
---|---|---|
安装工件(YAML 文件) |
从 AMQ Streams 下载网站下载 | OpenShift 3.11 及更新的版本 |
OperatorHub | 使用 OperatorHub 中的 AMQ Streams Operator 将 Cluster Operator 部署到单个命名空间或所有命名空间中。 | OpenShift 4.x 仅 |
为获得最大的灵活性,请选择安装工件方法。如果要使用 OpenShift 4 Web 控制台在标准配置中将 AMQ Streams 安装到 OpenShift 4,请选择 OperatorHub 方法。OperatorHub 还允许您利用自动更新。
对于这两种方法,Cluster Operator 都部署到 OpenShift 集群,您可以使用提供的 YAML 示例文件部署 AMQ Streams Streams 其他组件,从 Kafka 集群开始。
AMQ 流安装工件
AMQ Streams 安装工件包含各种 YAML 文件,可使用 oc
部署到 OpenShift 中以创建自定义资源,包括:
- Deployments
- 自定义资源定义(CRD)
- 角色和角色绑定
- 服务帐户
为 Cluster Operator、Tpic Operator、User Operator 和 Strimzi Admin 角色提供了 YAML 安装文件。
OperatorHub
在 OpenShift 4 中,Operator Lifecycle Manager(OLM) 可帮助集群管理员安装、更新和管理所有 Operator 以及在集群中运行的关联服务的生命周期。OLM 是 Operator Framework 的一部分,后者是一个开源工具包,旨在以有效、自动化且可扩展的方式管理 Kubernetes 原生应用程序(Operator)。
OperatorHub 是 OpenShift 4 Web 控制台的一部分。集群管理员可以使用它来发现、安装和升级 Operator。Operator 可以从 OperatorHub 拉取(安装在 OpenShift 集群上)到单个(项目)命名空间或所有(项目)命名空间,并由 OLM 管理。然后,工程团队可以使用 OLM 在开发、测试和生产环境中独立管理软件。
OperatorHub 不适用于版本 4 更早的 OpenShift 版本。
AMQ Streams Operator
AMQ Streams Operator 可从 OperatorHub 安装。安装后,AMQ Streams Operator 将 Cluster Operator 部署到 OpenShift 集群,以及必要的 CRD 和基于角色的访问控制(RBAC)资源。
其他资源
使用安装工件安装 AMQ Streams:
从 OperatorHub 安装 AMQ Streams:
- 第 4.1.1.5 节 “从 OperatorHub 部署 Cluster Operator”
- OpenShift 文档中的 操作器 指南.
第 2 章 使用 AMQ Streams 部署哪些内容
通过 AMQ Streams 发行版为 OpenShift 部署提供了 Apache Kafka 组件。Kafka 组件通常作为集群运行,以实现可用性。
集成 Kafka 组件的典型部署可能包括:
- 代理节点的 Kafka 集群
- 复制 ZooKeeper 实例的 zookeeper 集群
- Kafka Connect 集群用于外部数据连接
- Kafka MirrorMaker 集群,在第二个集群中镜像 Kafka 集群
- kafka Exporter 用于提取额外的 Kafka 指标数据以进行监控
- Kafka Bridge 向 Kafka 集群发出基于 HTTP 的请求
并非所有组件都是必须的,但您至少需要 Kafka 和 ZooKeeper。有些组件可以在没有 Kafka 的情况下部署,如 MirrorMaker 或 Kafka Connect。
2.1. 部署顺序
部署到 OpenShift 集群所需的顺序如下:
- 部署 Cluster operator 以管理 Kafka 集群
- 使用 ZooKeeper 集群部署 Kafka 集群,并在部署中包含 Topic Operator 和 User Operator
(可选)部署:
- 如果没有在 Kafka 集群中部署它们,则 Topic Operator 和 User Operator 独立
- Kafka Connect
- Kafka MirrorMaker
- Kafka Bridge
- 用于监控指标的组件
2.2. 其他部署配置选项
本指南中的部署步骤描述了使用 AMQ Streams 提供的示例安装 YAML 文件进行的部署。程序突出显示了任何重要的配置注意事项,但不描述所有可用的配置选项。
您可以使用自定义资源来优化部署。
部署 AMQ Streams 之前,您可能希望查看可用于 Kafka 组件的配置选项。如需有关通过自定义资源进行配置的更多信息,请参阅 OpenShift 上使用 AMQ Streams 中的 部署配置。
2.2.1. 保护 Kafka
在部署时,Cluster Operator 会自动为集群中的数据加密和身份验证设置 TLS 证书。
AMQ Streams 提供用于 加密、身份验证和 授权 的额外配置选项,如 OpenShift 指南中的使用 AMQ Streams 所述:
- 通过 配置 Kafka 资源,在 Kafka 集群和客户端之间进行安全数据交换。
- 将部署配置为使用授权服务器来提供 OAuth 2.0 身份验证和 OAuth 2.0 授权。
- 使用您自己的证书保护 Kafka。
2.2.2. 监控您的部署
AMQ Streams 支持额外的部署选项来监控您的部署。
- 通过使用 Kafka 集群部署 Prometheus 和 Grafana 来提取指标并监控 Kafka 组件。
- 通过使用 Kafka 集群部署 Kafka Exporter,提取与 监控消费者滞后相关的其他指标。
- 通过 设置分布式追踪来跟踪 消息,如 OpenShift 中使用 AMQ Streams 指南中所述。
第 3 章 准备 AMQ Streams 部署
本节介绍如何准备 AMQ Streams 部署,描述:
要运行本指南中的命令,您的集群用户必须有权管理基于角色的访问控制(RBAC)和 CRD。
3.1. 部署先决条件
要部署 AMQ Streams,请确保:
OpenShift 3.11 和更新的集群可用。
AMQ Streams 基于 Strimzi 0.20.x。
-
已安装并配置了
oc
命令行工具以连接到正在运行的集群。
AMQ Streams 支持 OpenShift 专用的一些功能,这些集成对 OpenShift 用户有益,并且没有使用标准 Kubernetes 的等效实施。
3.2. 下载 AMQ Streams 发行版工件
要安装 AMQ Streams,请从 AMQ Streams 下载网站从 amq-streams-<version>-ocp-install-examples.zip
文件中下载 并提取发行工件。
AMQ Streams 发行版工件包括示例 YAML 文件,可帮助您将 AMQ Streams 组件部署到 OpenShift、执行常用操作和配置 Kafka 集群。
使用 oc
从下载的 ZIP 文件的 install/cluster-operator
文件夹部署 Cluster Operator。有关部署和配置 Cluster Operator 的更多信息,请参阅 第 4.1.1 节 “部署 Cluster Operator”。
另外,如果要使用 Topic 和 User Operator 的独立安装,带有不由 AMQ Streams Cluster Operator 管理的 Kafka 集群,您可以从 install/topic-operator 和
文件夹中部署它们。
install/user-operator
另外,AMQ Streams 容器镜像也可通过 红帽生态系统目录 提供。但是,我们建议您使用提供的 YAML 文件来部署 AMQ Streams。
3.3. 将容器镜像推送到您自己的 registry
红帽生态系统目录中提供了 AMQ Streams 的容器镜像。AMQ Streams 提供的安装 YAML 文件将直接从 红帽生态系统目录中拉取镜像。
如果您无法访问 红帽生态系统目录,或者想要使用自己的容器存储库:
- 拉取此处 列出的所有 容器镜像
- 将它们推送到您自己的 registry
- 更新安装 YAML 文件中的镜像名称
该发行版本支持的每个 Kafka 版本都有一个单独的镜像。
容器镜像 | 命名空间/存储库 | 描述 |
---|---|---|
kafka |
| 用于运行 Kafka 的 AMQ Streams 镜像,包括:
|
Operator |
| 用于运行操作器的 AMQ Streams 镜像:
|
Kafka Bridge |
| 用于运行 AMQ Streams Kafka Bridge 的 AMQ Streams 镜像 |
3.4. 指定 AMQ Streams 管理员
AMQ Streams 提供用于配置部署的自定义资源。默认情况下,查看、创建、编辑和删除这些资源的权限仅限于 OpenShift 集群管理员。AMQ Streams 提供了两个集群角色,可用于将这些权限分配给其他用户:
-
strimzi-view
允许用户查看和列出 AMQ Streams 资源。 -
strimzi-admin
允许用户创建、编辑或删除 AMQ Streams 资源。
安装这些角色时,它们将自动聚合(添加)默认 OpenShift 集群 roles. strimzi-view
聚合到 view
角色,strimzi-admin
聚合到 edit
和 admin
角色。由于聚合,您可能不需要将这些角色分配给已拥有类似权限的用户。
以下流程演示了如何分配 astrim zi-admin
角色,允许非集群管理员管理 AMQ Streams 资源。
系统管理员可以在部署 Cluster Operator 后指定 AMQ Streams 管理员。
先决条件
- AMQ Streams 自定义资源定义(CRD)和基于角色的访问控制(RBAC)资源已使用 Cluster Operator 部署, 以管理 CRD。
步骤
在 OpenShift 中创建
strimzi-view
和strimzi-admin
集群角色。oc apply -f install/strimzi-admin
如果需要,为需要它们的用户分配访问权限的角色。
oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2
第 4 章 部署 AMQ 流
为 部署 AMQ Streams 准备了环境,本节显示:
- 如何创建 Kafka 集群
根据您的要求部署其他 Kafka 组件的可选步骤:
流程假定 OpenShift 集群可用并在运行。
AMQ Streams 基于 AMQ Streams Strimzi 0.20.x。本节介绍在 OpenShift 3.11 及之后的版本中部署 AMQ Streams 的步骤。
要运行本指南中的命令,您的集群用户必须有权管理基于角色的访问控制(RBAC)和 CRD。
4.1. 创建 Kafka 集群
要创建 Kafka 集群,您需要部署 Cluster Operator 以管理 Kafka 集群,然后部署 Kafka 集群。
当使用 Kafka 资源部署 Kafka
集群时,您可以同时部署 Topic Operator 和 User Operator。另外,如果您使用非 AMQ Streams Kafka 集群,您可以将 Topic Operator 和 User Operator 部署为独立组件。
使用 Topic Operator 和 User Operator 部署 Kafka 集群
如果要将 Topic Operator 和 User Operator 与由 AMQ Streams 管理的 Kafka 集群搭配使用,请执行这些部署步骤。
- 部署 Cluster Operator
使用 Cluster Operator 部署:
部署独立主题 Operator 和 User Operator
如果要将 Topic Operator 和 User Operator 与 不由 AMQ Streams 管理的 Kafka 集群搭配使用,请执行这些部署步骤。
4.1.1. 部署 Cluster Operator
Cluster Operator 负责在 OpenShift 集群中部署和管理 Apache Kafka 集群。
本节中的步骤显示:
如何部署 Cluster Operator 以 监视 :
备选部署选项:
4.1.1.1. 监视 Cluster Operator 部署的选项
当 Cluster Operator 运行时,它开始 监视 Kafka 资源的更新。
您可以选择部署 Cluster Operator 来监视 Kafka 资源:
- 单个命名空间(包含 Cluster Operator 的同一命名空间)
- 多个命名空间
- 所有命名空间
AMQ Streams 提供示例 YAML 文件,以简化部署过程。
Cluster Operator 会监视以下资源的变化:
-
Kafka
集群的 Kafka。 -
Kafka Connect 集群的 Kafka
Connect。 -
用于
Kafka Connect 集群的 KafkaConnectS2I
,支持 Source2Image。 -
用于在 Kafka Connect 集群中创建和管理连接器的 KafkaConnector
。 -
Kafka MirrorMaker 实例的 KafkaMirror
Maker。 -
Kafka Bridge 实例的
KafkaBridge
在 OpenShift 集群中创建其中一个资源时,操作器从该资源获取集群描述,并通过创建必要的 OpenShift 资源(如 StatefulSets、Service 和 ConfigMap)来开始为资源创建新集群。
每次更新 Kafka 资源时,操作器都会对构成该资源的 OpenShift 资源执行相应的更新。
资源可以修补或删除,然后重新创建以便集群反映集群所需的状态。此操作可能会导致滚动更新,可能会导致服务中断。
删除资源时,操作器将取消部署集群并删除所有相关 OpenShift 资源。
4.1.1.2. 部署 Cluster Operator 以监控单个命名空间
此流程演示了如何部署 Cluster Operator 以监控 OpenShift 集群中单一命名空间中的 AMQ Streams 资源。
先决条件
-
此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义
ResourceDefinition、
ClusterRole 和
ClusterRoleBinding。
在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有system:admin
等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。
步骤
编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。
例如,在此流程中,Cluster Operator 被安装到命名空间
my-cluster-operator-namespace
中。在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
部署 Cluster Operator:
oc apply -f install/cluster-operator -n my-cluster-operator-namespace
验证 Cluster Operator 是否已成功部署:
oc get deployments
4.1.1.3. 部署 Cluster Operator 以监控多个命名空间
此流程演示了如何部署 Cluster Operator 以在 OpenShift 集群的多个命名空间中监控 AMQ Streams 资源。
先决条件
-
此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义
ResourceDefinition、
ClusterRole 和
ClusterRoleBinding。
在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有system:admin
等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。
步骤
编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。
例如,在此流程中,Cluster Operator 被安装到命名空间
my-cluster-operator-namespace
中。在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
编辑
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
文件,将 Cluster Operator 将监视的所有命名空间的列表添加到STRIMZI_NAMESPACE
环境变量。例如,在此流程中,Cluster Operator 将监视命名空间
watched-namespace-1
、watched-namespace-2
、watched-namespace-3
。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: watched-namespace-1,watched-namespace-2,watched-namespace-3
对于列出的每个命名空间,安装
RoleBindings
。在本例中,我们将这些命令中的
watched-namespace
替换为上一步中列出的命名空间,为watched-namespace-1、watched-namespace
-2、
重复它们:watched-namespace
-3oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
部署 Cluster Operator:
oc apply -f install/cluster-operator -n my-cluster-operator-namespace
验证 Cluster Operator 是否已成功部署:
oc get deployments
4.1.1.4. 部署 Cluster Operator 以监视所有命名空间
此流程演示了如何部署 Cluster Operator 以监控 OpenShift 集群中所有命名空间中的 AMQ Streams 资源。
在这个模式下运行时,Cluster Operator 会在创建的任何新命名空间中自动管理集群。
先决条件
-
此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义
ResourceDefinition、
ClusterRole 和
ClusterRoleBinding。
在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有system:admin
等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。
步骤
编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。
例如,在此流程中,Cluster Operator 被安装到命名空间
my-cluster-operator-namespace
中。在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
编辑
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
文件,将STRIMZI_NAMESPACE
环境变量的值设置为*
。apiVersion: apps/v1 kind: Deployment spec: # ... template: spec: # ... serviceAccountName: strimzi-cluster-operator containers: - name: strimzi-cluster-operator image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7 imagePullPolicy: IfNotPresent env: - name: STRIMZI_NAMESPACE value: "*" # ...
创建
ClusterRoleBinding,
将所有命名空间的集群范围访问权限授予 Cluster Operator。oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator
将
my-cluster-operator-namespace
替换为您要将 Cluster Operator 安装到的命名空间。将 Cluster Operator 部署到 OpenShift 集群。
oc apply -f install/cluster-operator -n my-cluster-operator-namespace
验证 Cluster Operator 是否已成功部署:
oc get deployments
4.1.1.5. 从 OperatorHub 部署 Cluster Operator
您可以通过从 OperatorHub 安装 AMQ Streams Operator,将 Cluster Operator 部署到 OpenShift 集群。OperatorHub 仅适用于 OpenShift 4。
先决条件
-
OpenShift 集群中启用了 Red Hat Operator
OperatorSource
。如果您在 OperatorHub 中看到 Red Hat Operator,则会启用正确的OperatorSource
。如需更多信息,请参阅 Operators 指南。 - 安装需要具有足够特权的用户来安装来自 OperatorHub 的 Operator。
步骤
- 在 OpenShift 4 Web 控制台中,点 Operators > OperatorHub。
在流和 消息传递 类别中搜索或浏览 AMQ Streams Operator。
- 单击 AMQ Streams 标题,然后在右侧的侧边栏中点 Install。
在 Create Operator Subscription 屏幕上,从以下安装和更新选项中进行选择:
- 安装模式 :选择将 AMQ Streams Operator 安装到集群中的所有(项目)命名空间(默认选项)或特定(项目)命名空间。最好使用命名空间来分隔函数。建议您将特定命名空间专用于 Kafka 集群和其他 AMQ Streams 组件。
- 批准策略 :默认情况下,Operator Lifecycle Manager(OLM)自动将 AMQ Streams Operator 升级到最新的 AMQ Streams 版本。(可选)如果您想要手动批准以后的升级,请选择 Manual。如需更多信息,请参阅 OpenShift 文档中的 Operator 指南。
点 Subscribe; AMQ Streams Operator 已安装到 OpenShift 集群。
AMQ Streams Operator 将 Cluster Operator、CRD 和基于角色的访问控制(RBAC)资源部署到所选命名空间或所有命名空间中。
在 Installed Operators 屏幕上,检查安装的进度。当 AMQ Streams Operator 的状态变为 InstallSucceeded 时,就可以使用它。
接下来,您可以使用 YAML 示例文件部署 AMQ Streams 其他组件(从 Kafka 集群开始)。
4.1.2. 部署 Kafka
Apache Kafka 是一个开源分布式发布-订阅消息传递系统,用于容错实时数据源。
本节中的步骤显示:
如何使用 Cluster Operator 部署:
- 临时或 持久的 Kafka 集群
配置
Kafka
自定义资源的 Topic Operator 和 User Operator:
Topic Operator 和 User Operator 的替代独立部署步骤:
安装 Kafka 时,AMQ Streams 还会安装 ZooKeeper 集群,并添加必要的配置以将 Kafka 与 ZooKeeper 连接。
4.1.2.1. 部署 Kafka 集群
此流程演示了如何使用 Cluster Operator 将 Kafka 集群部署到 OpenShift 中。
部署使用 YAML 文件提供规格来创建 Kafka
资源。
AMQ Streams 提供了在 example /kafka/ 中部署所需的 YAML 文件示例
:
kafka-persistent.yaml
- 使用三个 ZooKeeper 和三个 Kafka 节点部署一个持久集群。
kafka-jbod.yaml
- 使用三个 ZooKeeper 和三个 Kafka 节点部署持久集群(每个集群使用多个持久性卷)。
kafka-persistent-single.yaml
- 使用单个 ZooKeeper 节点和单个 Kafka 节点部署持久集群。
kafka-ephemeral.yaml
- 使用三个 ZooKeeper 和三个 Kafka 节点部署一个临时集群。
kafka-ephemeral-single.yaml
- 部署具有三个 ZooKeeper 节点和单个 Kafka 节点的临时集群。
在这一流程中,我们使用 临时 和永久 Kafka 集群部署的示例:
- 临时集群
-
通常,临时(或临时) Kafka 集群适合用于开发和测试目的,不适用于生产环境。此部署使用
emptyDir
卷来存储代理信息(用于 ZooKeeper)和主题或分区(对于 Kafka)。使用emptyDir
卷意味着其内容严格与 pod 生命周期相关,并在 pod 停机时被删除。 - 持久性集群
-
持久的 Kafka 集群使用
PersistentVolume 来存储
ZooKeeper 和 Kafka 数据。该PersistentVolume
使用PersistentVolumeClaim
来获取,使它独立于实际的PersistentVolume
类型。例如,它可以在 Amazon AWS 部署中使用 Amazon EBS 卷,而不更改 YAML 文件。PersistentVolumeClaim
可以使用StorageClass
来触发自动卷置备。
示例集群默认命名为 my-cluster
。集群名称通过资源名称定义,在部署集群后无法更改。要在部署集群前更改集群名称,请在相关 YAML 文件中编辑 Kafka 资源的
属性。
Kafka
.metadata.name
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster # ...
有关配置 Kafka
资源的更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 Kafka 集群配置。
步骤
创建和部署 临时或 持久 集群。
对于开发或测试,您可能更喜欢使用临时集群。您可以随时使用持久性存储。
创建和部署 临时 集群:
oc apply -f examples/kafka/kafka-ephemeral.yaml
创建和部署 持久 集群:
oc apply -f examples/kafka/kafka-persistent.yaml
验证 Kafka 集群是否已成功部署:
oc get deployments
4.1.2.2. 使用 Cluster Operator 部署主题 Operator
此流程描述了如何使用 Cluster Operator 部署 Topic Operator。
您可以将 Kafka
资源的 principal Operator
属性配置为包含 topicOperator
。
如果要将 Topic Operator 与不由 AMQ Streams 管理的 Kafka 集群搭配使用,则必须将 Topic Operator 部署为独立组件。
有关配置 实体Operator 和
属性的更多信息,请参阅 OpenShift 指南的使用 AMQ Streams 中的 Entity Operator。
topicOperator
步骤
编辑
Kafka
资源的 principalOperator
属性,使其包含topicOperator
:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
使用
EntityTopicOperatorSpec
模式参考 中所述的属性配置 Topic Operatorspec
。如果您希望所有属性都使用其默认值,请使用空对象(
{}
)。创建或更新资源:
使用
oc apply
:oc apply -f <your-file>
4.1.2.3. 使用 Cluster Operator 部署 User Operator
此流程描述了如何使用 Cluster Operator 部署 User Operator。
您可以将 Kafka
资源的 principal Operator
属性配置为包含 userOperator
。
如果要将 User Operator 与不由 AMQ Streams 管理的 Kafka 集群搭配使用,您必须将 User Operator 部署为独立组件。
有关配置 实体Operator 和
属性的更多信息,请参阅 OpenShift 指南的使用 AMQ Streams 中的 Entity Operator。
userOperator
步骤
编辑
Kafka
资源的 principalOperator
属性,使其包含userOperator
:apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: #... entityOperator: topicOperator: {} userOperator: {}
使用 OpenShift 指南的使用 AMQ Streams 中的
EntityUserOperatorSpec
schema 参考 中描述的属性配置 User Operatorspec
。如果您希望所有属性都使用其默认值,请使用空对象(
{}
)。创建或更新资源:
oc apply -f <your-file>
4.1.3. AMQ Streams Operator 的替代独立部署选项
当使用 Cluster Operator 部署 Kafka 集群时,您还可以部署 Topic Operator 和 User Operator。或者,您也可以执行独立部署。
独立部署意味着 Topic Operator 和 User Operator 可以在不由 AMQ Streams 管理的 Kafka 集群中运行。
4.1.3.1. 部署独立主题 Operator
此流程演示了如何将 Topic Operator 部署为独立组件。
独立部署需要配置环境变量,比 使用 Cluster Operator 部署 Topic Operator 复杂。但是,独立部署更为灵活,因为 Topic Operator 可在任何 Kafka 集群下运行,而不必由 Cluster Operator 部署。
先决条件
- 您需要一个现有的 Kafka 集群供 Topic Operator 连接到。
步骤
通过设置,编辑
install/topic-operator/05-Deployment-strimzi-topic-operator
属性:.yaml 文件中的 Deployment.spec.template.spec.containers[0].
env-
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
,用于列出 Kafka 集群中的 bootstrap 代理,以逗号分隔的主机名 :\{0\}端口
对列表形式指定。 -
用于列出 ZooKeeper 节点的
STRIMZI_ZOOKEEPER_CONNECT
节点,以逗号分隔的主机名 :\{0\}端口
对列表形式提供。这应该与 Kafka 集群使用的 ZooKeeper 集群相同。 -
STRIMZI_NAMESPACE
到您希望操作员监视KafkaTopic
资源的 OpenShift 命名空间。 -
STRIMZI_RESOURCE_LABELS
到标签选择器,用于标识由操作器管理的KafkaTopic
资源。 -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
,以毫秒为单位指定定期协调之间的间隔。 -
STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS
,以指定从 Kafka 获取主题元数据的尝试次数。每次尝试之间的时间都定义为指数回退。考虑在因为分区或副本数量而创建主题需要更多时间时增加这个值。默认
6. -
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
到 ZooKeeper 会话超时,以毫秒为单位。例如,10000
。Default20000(
20 秒)。 -
STRIMZI_TOPICS_PATH
到 Topic Operator 存储其元数据的 Zookeeper 节点路径。默认/strimzi/topics
. -
STRIMZI_TLS_ENABLED
启用 TLS 支持以加密与 Kafka 代理的通信。默认为true
。 -
STRIMZI_TRUSTSTORE_LOCATION
,指向包含用于启用基于 TLS 的通信的证书的信任存储的路径。只有在通过STRIMZI_TLS_ENABLED 启用 TLS 时才必需
。 -
STRIMZI_TRUSTSTORE_PASSWORD
到用于访问STRIMZI_TRUSTSTORE_LOCATION
定义的信任存储的密码。只有在通过STRIMZI_TLS_ENABLED 启用 TLS 时才必需
。 -
STRIMZI_KEYSTORE_LOCATION
到含有私钥的密钥存储的路径,以启用基于 TLS 的通信。只有在通过STRIMZI_TLS_ENABLED 启用 TLS 时才必需
。 -
STRIMZI_KEYSTORE_PASSWORD
到用于访问STRIMZI_KEYSTORE_LOCATION
定义的密钥存储的密码.只有在通过STRIMZI_TLS_ENABLED 启用 TLS 时才必需
。 -
STRIMZI_LOG_LEVEL
到打印日志记录消息的级别。该值可设置为:ERROR
、WARNING
、INFO
、DEBUG
和TRACE
.默认INFO
. -
STRIMZI_JAVA_OPTS
(可选),用于运行主题 Operator 的 JVM。例如-Xmx=512M -Xms=256M
。 -
STRIMZI_JAVA_SYSTEM_PROPERTIES
(可选) 列出设置为 Topic Operator 的-D
选项。An example is-Djavax.net.debug=verbose -DpropertyName=value
.
-
部署主题 Operator:
oc apply -f install/topic-operator
验证 Topic Operator 是否已成功部署:
oc describe deployment strimzi-topic-operator
当
Replicas:
条目显示1 可用
时,Tpic Operator 会被部署。注意如果您与 OpenShift 集群的连接较慢并且之前没有下载镜像,则部署可能会遇到延迟。
4.1.3.2. 部署 standalone User Operator
此流程演示了如何将 User Operator 部署为独立组件。
独立部署需要配置环境变量,比 使用 Cluster Operator 部署 User Operator 复杂。但是,独立的部署更灵活,因为 User Operator 可以操作 任何 Kafka 集群,而不必由 Cluster Operator 部署。
先决条件
- 您需要一个现有的 Kafka 集群供 User Operator 连接到。
步骤
通过设置,编辑
install/user-operator/05-Deployment-strimzi-user-operator
属性:.yaml 文件中的以下 Deployment.spec.template.spec.containers[0].
env-
STRIMZI_KAFKA_BOOTSTRAP_SERVERS
来列出 Kafka 代理,以逗号分隔的主机名 :\{0\}端口
对列表形式指定。 -
用于列出 ZooKeeper 节点的
STRIMZI_ZOOKEEPER_CONNECT
节点,以逗号分隔的主机名 :\{0\}端口
对列表形式提供。这必须与您的 Kafka 集群使用的 ZooKeeper 集群相同。不支持使用 TLS 加密连接到 ZooKeeper 节点。 -
STRIMZI_NAMESPACE
到 OpenShift 命名空间,您希望操作员在该命名空间中监视KafkaUser
资源。 -
STRIMZI_LABELS
到用于标识由操作器管理的KafkaUser
资源标签选择器。 -
STRIMZI_FULL_RECONCILIATION_INTERVAL_MS
,以毫秒为单位指定定期协调之间的间隔。 -
STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS
到 ZooKeeper 会话超时,以毫秒为单位。例如,10000
。Default20000(
20 秒)。 -
STRIMZI_CA_CERT_NAME
以指向包含证书颁发机构公钥的 OpenShiftSecret
,用于为 TLS 客户端身份验证签名新用户证书。Secret
必须包含密钥ca.crt
下的证书颁发机构的公钥。 -
STRIMZI_CA_KEY_NAME
以指向包含证书颁发机构私钥的 OpenShiftSecret
,用于为 TLS 客户端身份验证签名新用户证书。Secret
必须包含密钥ca.key
下的证书颁发机构的私钥。 -
STRIMZI_CLUSTER_CA_CERT_SECRET_NAME
以指向包含用于签署 Kafka 代理证书以启用基于 TLS 的通信的 OpenShiftSecret
。Secret
必须包含密钥ca.crt
下的证书颁发机构的公钥。这个环境变量是可选的,只有在与 Kafka 集群的通信基于 TLS 时才应设置。 -
STRIMZI_EO_KEY_SECRET_NAME
以指向包含对 Kafka 集群进行 TLS 客户端身份验证的私钥和相关证书的 OpenShiftSecret
。Secret
必须包含密钥存储,其私钥和证书位于 key entity-operator.p12
下,以及 key entity-operator.password
下的相关密码。这个环境变量是可选的,只有在与 Kafka 集群的通信基于 TLS 时需要 TLS 客户端身份验证时才应设置。 -
STRIMZI_CA_VALIDITY
证书颁发机构的有效期限.默认值为365
天。 -
STRIMZI_CA_RENEWAL
认证机构的续订期限. -
STRIMZI_LOG_LEVEL
到打印日志记录消息的级别。该值可设置为:ERROR
、WARNING
、INFO
、DEBUG
和TRACE
.默认INFO
. -
STRIMZI_GC_LOG_ENABLED
,以启用垃圾收集(GC)日志。默认为true
。默认为在旧证书过期前启动证书续订的30
天。 -
STRIMZI_JAVA_OPTS
(可选) 用于运行用户的 JVM 的 Java 选项。例如-Xmx=512M -Xms=256M
。 -
STRIMZI_JAVA_SYSTEM_PROPERTIES
(可选) 列出设置为 User Operator 的-D
选项。An example is-Djavax.net.debug=verbose -DpropertyName=value
.
-
部署 User Operator:
oc apply -f install/user-operator
验证 User Operator 是否已成功部署:
oc describe deployment strimzi-user-operator
当
Replicas:
条目显示1 available
时,会部署 User Operator。注意如果您与 OpenShift 集群的连接较慢并且之前没有下载镜像,则部署可能会遇到延迟。
4.2. 部署 Kafka Connect
Kafka Connect 是一个在 Apache Kafka 和外部系统间流传输数据的工具。
在 AMQ Streams 中,Kafka Connect 被部署为分布式模式。Kafka Connect 也可以以独立模式工作,但 AMQ Streams 不支持此项。
Kafka Connect 使用 连接器 的概念提供了一个框架,用于将大量数据移入 Kafka 集群或移出 Kafka 集群,同时保持可扩展性和可靠性。
Kafka Connect 通常用于将 Kafka 与外部数据库和存储及消息传递系统集成。
本节中的步骤显示如何:
术语 连接器 可以互换使用,以表示在 Kafka Connect 集群中运行的连接器实例或连接器类。在本指南中,当从上下文中明确含义时,将使用 连接器。
4.2.1. 将 Kafka Connect 部署到 OpenShift 集群
此流程演示了如何使用 Cluster Operator 将 Kafka Connect 集群部署到 OpenShift 集群。
Kafka Connect 集群作为 Deployment
实施,具有可配置的节点(也称为 worker ),将连接器工作负载作为 任务 来分布,以便消息流高度可扩展且可靠。
部署使用 YAML 文件提供创建 KafkaConnect
资源规格。
在这一流程中,我们使用 AMQ Streams 提供的示例文件:
-
examples/connect/kafka-connect.yaml
有关配置 KafkaConnect
资源(或使用 Source-to-Image( S2I)支持的 KafkaConnect
S2I 资源)的详情,请参考 使用 AMQ Streams 中的 OpenShift 指南中的 Kafka Connect 集群配置。
步骤
将 Kafka Connect 部署到您的 OpenShift 集群。对于带有 3 或更多代理的 Kafka 集群,请使用
example/connect/kafka-connect.yaml
文件。对于小于 3 个代理的 Kafka 集群,请使用 example/connect/kafka-connect-single-node-kafka.yaml
文件。oc apply -f examples/connect/kafka-connect.yaml
验证 Kafka Connect 是否已成功部署:
oc get deployments
4.2.2. 使用连接器插件扩展 Kafka Connect
Kafka Connect 的 AMQ Streams 容器镜像包括两个内置文件连接器,用于将基于文件的数据移入或移出 Kafka 集群。
表 4.1. 文件连接器
文件连接器 | 描述 |
---|---|
| 从文件(源)传输数据到 Kafka 集群。 |
| 将数据从 Kafka 集群传输到文件(sink)。 |
Cluster Operator 也可以使用您创建的镜像将 Kafka Connect 集群部署到 OpenShift 集群。
本节中的步骤显示如何通过以下方法为连接器镜像添加您自己的连接器类:
4.2.2.1. 从 Kafka Connect 基础镜像创建 Docker 镜像
此流程演示了如何创建自定义镜像并将其添加到 /opt/kafka/plugins
目录中。
您可以使用 Red Hat Ecosystem Catalog 上的 Kafka 容器镜像作为基础镜像,以通过额外的连接器插件创建您自己的自定义镜像。
在启动时,Kafka Connect 的 AMQ Streams 版本加载 /opt/kafka/plugins
目录中包含的任何第三方连接器插件。
步骤
使用
registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7
作为基础镜像创建新的Dockerfile
:FROM registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
插件文件示例
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
- 构建容器镜像。
- 将自定义镜像推送到容器 registry。
指向新容器映像。
您可以:
编辑
KafkaConnect.spec.image
属性,即KafkaConnect
自定义资源。如果设置,此属性会覆盖 Cluster Operator 中的
STRIMZI_KAFKA_CONNECT_IMAGES
变量。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... image: my-new-container-image 2 config: 3 #...
或
-
在
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
文件中编辑STRIMZI_KAFKA_CONNECT_IMAGES
变量以指向新容器镜像,然后重新安装 Cluster Operator。
其他资源
如需了解更多相关信息,请参阅 OpenShift 指南中的使用 AMQ Streams :
4.2.2.2. 使用 OpenShift 构建和源至镜像创建容器镜像
此流程演示了如何使用 OpenShift 构建 和 源至镜像(S2I) 框架来创建新的容器镜像。
OpenShift 构建获取支持 S2I 的构建器镜像,以及用户提供的源代码和二进制文件,并使用它们来构建新的容器镜像。构建之后,容器镜像将存储在 OpenShift 的本地容器镜像存储库中,并可在部署中使用。
红帽生态系统目录 中提供了带有 S2I 支持的 Kafka Connect 构建器镜像,作为 registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7
镜像的一部分。此 S2I 镜像提取您的二进制文件(带有插件和连接器),并将它们存储在 /tmp/kafka-plugins/s2i
目录中。它从这个目录中创建一个新的 Kafka Connect 镜像,然后可与 Kafka Connect 部署一起使用。当使用增强的镜像启动时,Kafka Connect 会从 /tmp/kafka-plugins/s2i
目录中加载任何第三方插件。
步骤
在命令行中,使用
oc apply
命令创建和部署 Kafka Connect S2I 集群:oc apply -f examples/connect/kafka-connect-s2i.yaml
使用 Kafka Connect 插件创建目录:
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-3.4.2.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mongodb-driver-3.4.2.jar │ ├── mongodb-driver-core-3.4.2.jar │ └── README.md ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-0.7.1.jar │ ├── debezium-core-0.7.1.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-0.13.0.jar │ ├── mysql-connector-java-5.1.40.jar │ ├── README.md │ └── wkb-1.0.2.jar └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-0.7.1.jar ├── debezium-core-0.7.1.jar ├── LICENSE.txt ├── postgresql-42.0.0.jar ├── protobuf-java-2.6.1.jar └── README.md
使用
oc start-build
命令,使用准备的目录启动镜像的新构建:oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
注意构建的名称与部署的 Kafka Connect 集群的名称相同。
- 构建完成后,Kafka Connect 部署会自动使用新镜像。
4.2.3. 创建和管理连接器
当您为您的 connector 插件创建容器镜像时,需要在 Kafka Connect 集群中创建连接器实例。然后,您可以配置、监控和管理正在运行的连接器实例。
连接器是特定 连接器类 的实例,它知道如何就消息与相关的外部系统通信。连接器可用于许多外部系统,或者您可以自行创建。
您可以创建连接器的 源 和 接收器 类型。
- 源连接器
- 源连接器是从外部系统获取数据并将其反馈到 Kafka 作为消息的运行时实体。
- sink 连接器
- sink 连接器是一个运行时实体,它从 Kafka 主题获取信息并将其反馈到外部系统。
AMQ Streams 为创建和管理连接器提供了两个 API:
-
KafkaConnector
资源(称为KafkaConnectors
) - Kafka Connect REST API
使用 API,您可以:
- 检查连接器实例的状态
- 重新配置正在运行的连接器
- 增加或减少连接器实例的任务数量
-
重启失败的任务(
KafkaConnector
资源不支持) - 暂停连接器实例
- 恢复之前暂停的连接器实例
- 删除连接器实例
4.2.3.1. KafkaConnector
资源
KafkaConnectors
允许您以 OpenShift 原生的方式为 Kafka Connect 创建和管理连接器实例,因此不需要 cURL 等 HTTP 客户端。与其他 Kafka 资源一样,您可以在部署至 OpenShift 集群的 KafkaConnector
YAML 文件中声明连接器所需的状态,以创建连接器实例。
您可以通过更新对应的 KafkaConnector
并应用更新来管理正在运行的连接器实例。您可以通过删除对应的 KafkaConnector
来删除连接器。
为确保与早期版本的 AMQ Streams 兼容,KafkaConnectors
默认被禁用。要为 Kafka Connect 集群启用它们,您必须在 KafkaConnect
资源上使用注解。具体步骤,请参阅 OpenShift 中使用 AMQ Streams 中的 配置 Kafka Connect。
启用 KafkaConnectors
后,Cluster Operator 开始监视它们。它更新正在运行的连接器实例的配置,以匹配其 KafkaConnectors
中定义的配置。
AMQ Streams 包括 一个名为 example
。您可以使用本例来创建和管理 /connect/source-connector.yaml 的 KafkaConnect
or 示例FileStreamSourceConnector
。
4.2.3.2. Kafka Connect REST API 的可用性
Kafka Connect REST API 在端口 8083 上作为 <connect-cluster-name>-connect-api
服务可用。
如果启用了 KafkaConnectors
,Cluster Operator 将恢复直接使用 Kafka Connect REST API 进行手动更改。
Apache Kafka 文档中的 描述了 REST API 支持的操作。
4.2.4. 将 KafkaConnector
资源部署到 Kafka Connect
这个步骤描述了如何将示例 KafkaConnector
部署到 Kafka Connect 集群。
示例 YAML 将创建一个 FileStreamSourceConnector
,将许可证文件的每一行发送到 Kafka,作为名为 my-topic
的主题的消息。
先决条件
-
启用 Kafka
Connectors 的 Kafka
Connect 部署 - 一个正在运行的 Cluster Operator
步骤
编辑 example
/connect/source-connector.yaml
文件:apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" topic: my-topic # ...
在 OpenShift 集群中创建
KafkaConnector
:oc apply -f examples/connect/source-connector.yaml
检查是否已创建资源:
oc get kctr --selector strimzi.io/cluster=my-connect-cluster -o name
4.3. 部署 Kafka MirrorMaker
Cluster Operator 部署一个或多个 Kafka 镜像(MirrorMaker)副本,以在 Kafka 集群间复制数据。此过程称为镜像,以避免与 Kafka 分区复制概念混淆。MirrorMaker 使用源集群的信息,并将这些消息重新发布到目标集群。
4.3.1. 将 Kafka MirrorMaker 部署到 OpenShift 集群
此流程演示了如何使用 Cluster Operator 将 Kafka MirrorMaker 集群部署到 OpenShift 集群。
部署使用 YAML 文件提供创建 KafkaMirrorMaker 或
资源规格,具体取决于部署的 MirrorMaker 版本。
KafkaMir
rorMaker2
在这一流程中,我们使用 AMQ Streams 提供的示例文件:
-
examples/mirror-maker/kafka-mirror-maker.yaml
-
examples/mirror-maker/kafka-mirror-maker-2.yaml
有关配置 KafkaMirrorMaker 或
KafkaMirrorMaker2
资源的详情,请参考 使用 AMQ Streams on OpenShift 指南 中的 Kafka MirrorMaker 集群配置。
步骤
将 Kafka MirrorMaker 部署到 OpenShift 集群:
对于 MirrorMaker:
oc apply -f examples/mirror-maker/kafka-mirror-maker.yaml
对于 MirrorMaker 2.0:
oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yaml
验证 MirrorMaker 是否已成功部署:
oc get deployments
4.4. 部署 Kafka Bridge
Cluster Operator 部署一个或多个 Kafka 网桥副本,以通过 HTTP API 在 Kafka 集群和客户端之间发送数据。
4.4.1. 将 Kafka Bridge 部署到 OpenShift 集群
此流程演示了如何使用 Cluster Operator 将 Kafka Bridge 集群部署到 OpenShift 集群。
部署使用 YAML 文件提供规格来创建 KafkaBridge
资源。
在这一流程中,我们使用 AMQ Streams 提供的示例文件:
-
examples/bridge/kafka-bridge.yaml
有关配置 KafkaBridge
资源的详情,请参考 使用 AMQ Streams 中的 OpenShift 指南中的 Kafka Bridge 集群配置。
步骤
将 Kafka Bridge 部署到 OpenShift 集群:
oc apply -f examples/bridge/kafka-bridge.yaml
验证 Kafka 网桥是否已成功部署:
oc get deployments
第 5 章 设置对 Kafka 集群的客户端访问权限
部署 AMQ Streams 后,本节中的步骤解释了如何:
- 部署示例制作者和消费者客户端,您可以使用它们来验证您的部署
设置对 Kafka 集群的外部客户端访问
为 OpenShift 外部的客户端设置 对 Kafka 集群的访问权限的步骤更为复杂,需要熟悉 OpenShift 中使用 AMQ Streams 指南 中所述的 Kafka 组件配置 流程。
5.1. 部署示例客户端
此流程演示了如何使用您创建的 Kafka 集群发送和接收信息部署示例制作者和消费者客户端。
先决条件
- Kafka 集群可供客户端使用。
步骤
部署 Kafka 制作者.
oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
- 在运行制作者的控制台中键入消息。
- 按 Enter 发送邮件。
部署 Kafka 用户。
oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
- 确认您在使用者控制台中看到传入的消息。
5.2. 为 OpenShift 外部的客户端设置访问权限
此流程演示了如何从 OpenShift 外部配置对 Kafka 集群的客户端访问。
使用 Kafka 集群的地址,您可以提供到不同 OpenShift 命名空间或整个 OpenShift 外部客户端的外部访问。
您可以配置外部 Kafka 侦听程序以提供访问权限。
支持以下外部监听程序类型:
-
使用 OpenShift
Route
和默认 HAProxy 路由器的路由
-
使用负载均衡器
服务的 LoadBalancer -
NodePort
用于在 OpenShift 节点上使用端口 -
Ingress
用于 OpenShift Ingress 和 Kubernetes 的 NGINX Ingress Controller
选择的类型取决于您的要求,以及您的环境和基础架构。例如,负载均衡器可能不适用于某些基础架构,如裸机,其中节点端口提供了一个更好的选项。
在这个过程中:
- 为 Kafka 集群配置了一个外部监听程序,它带有 TLS 加密和身份验证,并启用了 Kafka 简单授权。
-
为客户端创建一个
KafkaUser
,并为 简单授权 定义了 TLS 身份验证和访问控制列表(ACL)。
您可以将侦听器配置为使用 TLS 或 SCRAM-SHA-512 身份验证,这两种身份验证都可用于 TLS 加密。如果使用授权服务器,您可以使用基于令牌的 OAuth 2.0 身份验证和 OAuth 2.0 授权。Open Policy Agent(OPA)授权还支持 Kafka 授权 选项。
当您配置 KafkaUser
身份验证和授权机制时,请确保它们与等同的 Kafka 配置匹配:
-
KafkaUser.spec.authentication
matchesKafka.spec.kafka.listeners[*].authentication
-
KafkaUser.spec.authorization
与Kafka.spec.kafka.authorization
匹配
您应至少有一个监听器支持您要用于 KafkaUser
的身份验证。
Kafka 用户和 Kafka 代理之间的身份验证取决于每个用户的身份验证设置。例如,如果 Kafka 配置中没有启用 TLS 的用户,则无法验证它。
AMQ Streams Operator 自动配置过程:
- Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
- User Operator 根据所选的身份验证类型,创建代表客户端以及用于客户端身份验证的安全凭证的用户。
在这一流程中,使用了 Cluster Operator 生成的证书,但您可以通过 安装您自己的证书 来替换它们。您还可以将监听程序配置为使用 由外部证书颁发机构管理的 Kafka 侦听器证书。
证书以 PKCS #12 格式(.p12)和 PEM(.crt)格式提供。
先决条件
- Kafka 集群可用于客户端
- Cluster Operator 和 User Operator 在集群中运行
- OpenShift 集群外的客户端连接到 Kafka 集群
步骤
使用外部 Kafka 侦听程序配置
Kafka
集群。- 定义通过监听程序访问 Kafka 代理所需的身份验证
在 Kafka 代理上启用授权
例如:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: 1 - name: external 2 port: 9094 3 type: LISTENER-TYPE 4 tls: true 5 authentication: type: tls 6 configuration: preferredNodePortAddressType: InternalDNS 7 bootstrap and broker service overrides 8 #... authorization: 9 type: simple superUsers: - super-user-name 10 # ...
- 1
- 2
- 用于标识侦听器的名称。在 Kafka 集群中必须是唯一的。
- 3
- Kafka 内侦听器使用的端口号。在给定 Kafka 集群中,端口号必须是唯一的。允许的端口号是 9092 及以上,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
- 4
- 外部监听程序类型,指定为
route
、loadbalancer
、nodeport
或ingress
。内部监听程序指定为内部
监听程序。 - 5
- 在监听器上启用 TLS 加密。默认为
false
。路由
监听程序不需要 TLS 加密。 - 6
- 指定为
tls
的身份验证。 - 7
- (可选,仅适用于
节点端口
侦听器)配置 ,以指定 AMQ Streams 作为节点地址使用的第一个地址类型的首选。 - 8
- (可选)AMQ Streams 自动决定要公告给客户端的地址。这些地址由 OpenShift 自动分配。如果您运行 AMQ Streams 的基础架构不提供正确的 地址,您可以覆盖 bootstrap 和代理服务 地址。验证不会在覆盖上执行。覆盖配置根据监听器类型而有所不同。例如,您可以覆盖用于
路由
、DNS 名称或负载均衡器 IP 地址的主机,
以及nodeport
的节点端口。 - 9
- Authoization 指定为
simple
,它使用AclAuthorizer
Kafka 插件。 - 10
- (可选)超级用户可以访问所有代理,无论 ACL 中定义的任何访问权限限制。
警告OpenShift Route 地址包含 Kafka 集群的名称、侦听器的名称以及在其中创建的命名空间的名称。例如,my
-cluster-kafka-listener1-bootstrap-myproject
(CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用的是路由
监听程序类型,请注意地址的长度不超过 63 个字符的最大限制。
创建或更新
Kafka
资源。oc apply -f KAFKA-CONFIG-FILE
使用 TLS 身份验证配置 Kafka 代理监听程序。
为每个 Kafka 代理 pod 创建一个服务。
创建一个服务作为连接到 Kafka 集群的 bootstrap 地址。
服务也作为使用
nodeport
侦听器到 Kafka 集群 的外部 bootstrap 地址 创建。也使用与
Kafka
资源相同的名称来创建用于验证 kafka 代理身份的集群 CA 证书。从
Kafka
资源的状态查找 bootstrap 地址和端口。oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'
使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。
从生成的
KAFKA-CLUSTER-NAME -cluster-ca-cert Secret
中提取公共集群 CA 证书和密码。oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
使用 Kafka 客户端中的证书和密码,使用 TLS 加密连接到 Kafka 集群。
注意默认情况下,集群 CA 证书自动续订。如果您使用您自己的 Kafka 侦听程序证书,则需要 手动续订证书。
创建或修改代表需要访问 Kafka 集群的客户端的用户。
-
指定与
Kafka
侦听器相同的验证类型。 指定简单授权的授权 ACL。
例如:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster 1 spec: authentication: type: tls 2 authorization: type: simple acls: 3 - resource: type: topic name: my-topic patternType: literal operation: Read - resource: type: topic name: my-topic patternType: literal operation: Describe - resource: type: group name: my-group patternType: literal operation: Read
-
指定与
创建或修改
KafkaUser
资源。oc apply -f USER-CONFIG-FILE
已创建用户,以及名称与
KafkaUser
资源相同的 Secret。Secret 包含用于 TLS 客户端身份验证的私钥和公钥。例如:
apiVersion: v1 kind: Secret metadata: name: my-user labels: strimzi.io/kind: KafkaUser strimzi.io/cluster: my-cluster type: Opaque data: ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER user.key: PRIVATE-KEY-OF-USER user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS user.password: PASSWORD-PROTECTING-P12-ARCHIVE
将客户端配置为使用所需的属性连接到 Kafka 集群,以便与 Kafka 集群建立安全连接。
添加公共集群证书的身份验证详情:
security.protocol: SSL 1 ssl.truststore.location: PATH-TO/ssl/keys/truststore 2 ssl.truststore.password: CLUSTER-CA-CERT-PASSWORD 3 ssl.truststore.type=PKCS12 4
注意使用
security.protocol:在通过 TLS 使用 SCRAM-SHA 身份验证时,SASL_SSL
添加用于连接到 Kafka 集群的 bootstrap 地址和端口:
bootstrap.servers: BOOTSTRAP-ADDRESS:PORT
添加公共用户证书的身份验证详情:
ssl.keystore.location: PATH-TO/ssl/keys/user1.keystore 1 ssl.keystore.password: USER-CERT-PASSWORD 2
公共用户证书在创建时由客户端 CA 签名。
第 6 章 为 AMQ Streams 设置指标和仪表板
您可以通过在仪表板上查看关键指标并设置在某些情况下触发的警报来监控 AMQ Streams 部署。指标数据可用于 Kafka、ZooKeeper 以及 AMQ Streams 的其他组件。
为提供指标信息,AMQ Streams 使用 Prometheus 规则和 Grafana 仪表板。
当为 AMQ Streams 的每个组件配置一组规则时,Prometheus 会使用来自集群中运行的 pod 的键指标。然后,Grafana 在仪表板上视觉化这些指标。AMQ Streams 包括 Grafana 仪表板示例,您可以自定义这些仪表板适合您的部署。
在 OpenShift Container Platform 4.x 中,AMQ Streams 采用 对用户定义的项目的监控 (OpenShift 功能)来简化 Prometheus 设置过程。
在 OpenShift Container Platform 3.11 中,您需要将 Prometheus 和 Alertmanager 组件单独部署到集群中。
无论 OpenShift Container Platform 版本是什么,您必须首先 部署 AMQ Streams 的 Prometheus metrics 配置。
接下来,按照 OpenShift Container Platform 版本的说明进行操作:
通过 Prometheus 和 Grafana 设置,您可以使用示例 Grafana 仪表板和警报规则来监控 Kafka 集群。
其他监控选项
Kafka Exporter 是一个可选组件,提供与消费者滞后相关的额外监控。如果要将 Kafka Exporter 用于 AMQ Streams,请参阅 配置 Kafka
资源以使用 Kafka 集群部署 Kafka 导出器。
您还可以通过设置分布式追踪将部署配置为跟踪端到端的消息。如需更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 分布式追踪。
其他资源
- Prometheus 文档
- Grafana 文档
- Kafka 文档中的 Apache Kafka Monitoring 描述了 Apache Kafka 公开的 JMX 指标
- Zookeeper JMEEper 文档中的 zoo keeper JMX 描述了 Apache ZooKeeper 公开的 JMX 指标
6.1. 指标文件示例
您可以在 example /metrics 目录中找到 Grafana 仪表板和其他指标配置文件的示例
。如下表所示,一些文件仅用于 OpenShift Container Platform 3.11,而不适用于 OpenShift Container Platform 4.x。
AMQ Streams 提供的指标文件示例
metrics ├── grafana-dashboards 1 │ ├── strimzi-cruise-control.json │ ├── strimzi-kafka-bridge.json │ ├── strimzi-kafka-connect.json │ ├── strimzi-kafka-exporter.json │ ├── strimzi-kafka-mirror-maker-2.json │ ├── strimzi-kafka.json │ ├── strimzi-operators.json │ └── strimzi-zookeeper.json ├── grafana-install │ └── grafana.yaml 2 ├── prometheus-additional-properties │ └── prometheus-additional.yaml - OPENSHIFT 3.11 ONLY 3 ├── prometheus-alertmanager-config │ └── alert-manager-config.yaml 4 ├── prometheus-install │ ├── alert-manager.yaml - OPENSHIFT 3.11 ONLY 5 │ ├── prometheus-rules.yaml 6 │ ├── prometheus.yaml - OPENSHIFT 3.11 ONLY 7 │ ├── strimzi-pod-monitor.yaml 8 ├── kafka-bridge-metrics.yaml 9 ├── kafka-connect-metrics.yaml 10 ├── kafka-cruise-control-metrics.yaml 11 ├── kafka-metrics.yaml 12 └── kafka-mirror-maker-2-metrics.yaml 13
- 1
- Grafana 仪表板示例.
- 2
- Grafana 镜像的安装文件。
- 3
- OPENSHIFT 3.11 只 :其他 Prometheus 配置用于提取 CPU、内存和磁盘卷使用情况的指标,它们直接来自节点上的 OpenShift cAdvisor 代理和 kubelet。
- 4
- 用于通过 Alertmanager 发送通知的 hook 定义。
- 5
- OPENSHIFT 3.11 只 :用于部署和配置 Alertmanager 的资源.
- 6
- 用于 Prometheus Alertmanager 的警报规则示例。
- 7
- OPENSHIFT 3.11 只 :Prometheus 镜像的安装资源文件。
- 8
- Prometheus Operator 将 PodMonitor 定义转换为作业,以便 Prometheus 服务器能够直接从 pod 中提取指标数据。
- 9
- 启用指标数据的 Kafka Bridge 资源。
- 10
- 为 Kafka Connect 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
- 11
- 定义 Prometheus JMX Exporter 重新标记 Cruise Control 规则的指标配置。
- 12
- 为 Kafka 和 ZooKeeper 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
- 13
- 为 Kafka Mirror Maker 2.0 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
6.1.1. Grafana 仪表板示例
为监控以下资源提供了 Grafana 仪表板示例:
- AMQ Streams Kafka
显示以下指标:
- 在线代理数
- 集群计数中的活跃控制器
- 未清理领导选举率
- 在线副本
- 复制不足的分区数
- 至少在同步副本数中的分区
- 在同步副本数中最小分区
- 没有活跃领导且因此不可写入或可读的分区
- Kafka 代理 pod 内存用量
- 聚合 Kafka 代理 pod CPU 用量
- Kafka 代理 pod 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
- 传入字节速率总数
- 传出字节速率总数
- 传入的消息率
- 生成请求率总数
- 字节率
- 生成请求率
- 获取请求率
- 网络处理器平均空闲百分比
- 请求处理程序平均时间空闲百分比
- 日志大小
- AMQ Streams ZooKeeper
显示以下指标:
- Zookeeper ensemble 的仲裁大小
- 活动 连接 数
- 服务器数中的排队请求
- watchers 数
- zookeeper pod 内存用量
- ZooKeeper pod CPU 使用量聚合
- zookeeper pod 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
- 服务器响应客户端请求所需的时间(最大值、最小值和平均值)
- AMQ Streams Kafka Connect
显示以下指标:
- 传入字节速率总数
- 传出字节速率总数
- 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- AMQ Streams Kafka MirrorMaker 2
显示以下指标:
- 连接器数量
- 任务数量
- 传入字节速率总数
- 传出字节速率总数
- 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- AMQ Streams Operator
显示以下指标:
- 自定义资源
- 每小时成功对自定义资源进行协调
- 每小时自定义资源协调失败
- 每小时无锁定的调节
- 协调开始时间
- 每小时定期协调
- 最大协调时间
- 平均协调时间
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
还为 AMQ Streams 的 Kafka Bridge 和 Cruise Control 组件提供仪表板。
所有控制面板都提供 JVM 指标,以及特定于各个组件的指标。例如,Operator 仪表板提供有关正在处理的协调或自定义资源数量的信息。
6.1.2. Prometheus 指标配置示例
AMQ Streams 使用 Prometheus JMX Exporter 来通过 HTTP 端点公开 JMX 指标,然后由 Prometheus 提取。
Grafana 仪表板依赖于 Prometheus JMX Exporter 重新标记规则,这些规则是为 AMQ Streams 组件定义的,作为自定义资源配置。
标签是 name-value 对。重新标记是动态编写标签的过程。例如,标签的值可以从 Kafka 服务器名称和客户端 ID 中获得。
AMQ Streams 提供示例自定义资源配置 YAML 文件,其中已定义了重新标记规则。部署 Prometheus 指标配置时,您可以部署示例自定义资源,或将指标配置复制到您自己的自定义资源定义中。
表 6.1. 带有指标配置的自定义资源示例
组件 | 自定义资源 | YAML 文件示例 |
---|---|---|
Kafka 和 ZooKeeper |
|
|
Kafka Connect |
|
|
Kafka MirrorMaker 2.0 |
|
|
Kafka Bridge |
|
|
Sything Control |
|
|
其他资源
- 第 6.2 节 “部署 Prometheus 指标配置”
- 有关使用重新标记的更多信息,请参阅 Prometheus 文档中的 配置。
6.2. 部署 Prometheus 指标配置
AMQ Streams 提供了带有重新标记规则 的示例自定义资源配置 YAML 文件。
要应用重新标记规则的指标配置,请执行以下操作之一:
6.2.1. 将 Prometheus metrics 配置复制到自定义资源
要使用 Grafana 仪表板来监控,请将 指标配置示例复制到自定义资源 中。
在这个流程中,会更新 Kafka
资源,但所有支持监控的组件的步骤都相同。
步骤
对部署中的每个 Kafka
资源执行以下步骤。
更新编辑器中的
Kafka
资源。oc edit kafka KAFKA-CONFIG-FILE
-
将
kafka-metrics.yaml
中的示例配置 复制到您自己的Kafka
资源定义中。 - 保存文件,并等待更新的资源被协调。
6.2.2. 使用 Prometheus 指标配置部署 Kafka 集群
要使用 Grafana 仪表板进行监控,您可以使用 指标配置部署一个示例 Kafka 集群。
在这一流程中,kafka-metrics.yaml
文件用于 Kafka
资源。
步骤
使用 示例指标配置 部署 Kafka 集群。
oc apply -f kafka-metrics.yaml
6.3. 在 OpenShift 4 中查看 Kafka 指标和仪表板
当 AMQ Streams 部署到 OpenShift Container Platform 4.x 时,指标 将通过监控用户定义的项目 来提供。此 OpenShift 功能可让开发人员访问单独的 Prometheus 实例来监控其自己的项目(如 Kafka
项目)。
如果启用了对用户定义的项目的监控,openshift -user-workload-monitoring
项目包含以下组件:
- Prometheus Operator
- Prometheus 实例(由 Prometheus Operator 自动部署)
- Thanos Ruler 实例
AMQ Streams 使用这些组件来消耗指标。
集群管理员必须启用对用户定义的项目的监控,然后授予开发人员和其他用户权限来监控其项目中的应用。
Grafana 部署
您可以将 Grafana 实例部署到包含 Kafka 集群的项目中。然后,可以将 Grafana 仪表板示例用于在 Grafana 用户界面中视觉化 AMQ Streams 的 Prometheus 指标。
openshift-monitoring
项目为核心平台组件提供监控。不要在此 项目中使用 Prometheus 和 Grafana 组件来配置 OpenShift Container Platform 4.x 上的 AMQ Streams 监控。
Grafana 版本 6.3 是最低支持的版本。
先决条件
- 已使用示例 YAML 文件 部署了 Prometheus 指标配置。
启用了 对用户定义的项目的监控。集群管理员必须已在 OpenShift Container Platform 集群中创建了
cluster-monitoring-config
ConfigMap。如需更多信息,请参阅以下资源:- 在 OpenShift Container Platform 4.6 中为用户定义的项目启用监控。
- 在 OpenShift Container Platform 4.5 中启用对您自己的服务 的监控。
要监控用户定义的项目,集群管理员必须已被分配了
monitoring-rules-edit
或monitoring-edit 角色
。请参阅:- 授予用户权限来监控 OpenShift Container Platform 4.6 中的用户定义的项目。
- 在 OpenShift Container Platform 4.5 中使用 Web 控制台授予用户权限。
流程提纲
要在 OpenShift Container Platform 4.x 中设置 AMQ Streams 监控,请按照以下步骤操作:
6.3.1. 部署 Prometheus 资源
在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。
要启用 Prometheus 以使用 Kafka 指标,您可以在示例指标文件中配置和部署 PodMonitor 资源
。PodMonnitors
直接从 Apache Kafka、ZooKeeper、Operator、Kafka Bridge 和 Cruise Control 的 pod 中提取数据。
然后,您为 Alertmanager 部署示例警报规则。
先决条件
- 正在运行的 Kafka 集群。
- 检查 AMQ Streams 提供的示例警报规则。
步骤
检查是否启用了对用户定义的项目的监控:
oc get pods -n openshift-user-workload-monitoring
如果启用,则返回监控组件的 Pod。例如:
NAME READY STATUS RESTARTS AGE prometheus-operator-5cc59f9bc6-kgcq8 1/1 Running 0 25s prometheus-user-workload-0 5/5 Running 1 14s prometheus-user-workload-1 5/5 Running 1 14s thanos-ruler-user-workload-0 3/3 Running 0 14s thanos-ruler-user-workload-1 3/3 Running 0 14s
如果没有返回 pod,则禁用对用户定义的项目的监控。请参阅 第 6.3 节 “在 OpenShift 4 中查看 Kafka 指标和仪表板” 中的前提条件。
example
/metrics/prometheus-install/strimzi-pod-monitor.yaml
中定义了多个PodMonitor 资源
。对于每个
PodMonitor
资源,编辑spec.namespaceSelector.matchNames
属性:apiVersion: monitoring.coreos.com/v1 kind: PodMonitor metadata: name: cluster-operator-metrics labels: app: strimzi spec: selector: matchLabels: strimzi.io/kind: cluster-operator namespaceSelector: matchNames: - PROJECT-NAME 1 podMetricsEndpoints: - path: /metrics port: http # ...
- 1
- Pod 从中提取指标的项目,如
Kafka
。
将
strimzi-pod-monitor.yaml
文件部署到运行 Kafka 集群的项目中:oc apply -f strimzi-pod-monitor.yaml -n MY-PROJECT
将 Prometheus 规则示例部署到同一项目中:
oc apply -f prometheus-rules.yaml -n MY-PROJECT
其他资源
- OpenShift Container Platform 4.6 的监控 指南
- 第 6.4.3.3 节 “警报规则示例”
6.3.2. 为 Grafana 创建服务帐户
在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。
AMQ Streams 的 Grafana 实例需要使用分配 cluster-monitoring-view
角色的服务帐户运行。
先决条件
步骤
为 Grafana 创建
ServiceAccount
。此处资源命名为grafana-serviceaccount
。apiVersion: v1 kind: ServiceAccount metadata: name: grafana-serviceaccount labels: app: strimzi
将
ServiceAccount
部署到包含 Kafka 集群的项目中:oc apply -f GRAFANA-SERVICEACCOUNT -n MY-PROJECT
创建一个
ClusterRoleBinding
资源,它将cluster-monitoring-view
角色分配给 GrafanaServiceAccount
。此处资源命名为grafana-cluster-monitoring-binding
。apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: grafana-cluster-monitoring-binding labels: app: strimzi subjects: - kind: ServiceAccount name: grafana-serviceaccount namespace: MY-PROJECT 1 roleRef: kind: ClusterRole name: cluster-monitoring-view apiGroup: rbac.authorization.k8s.io
- 1
- 您的项目名称。
将
ClusterRoleBinding
部署到包含 Kafka 集群的项目:oc apply -f GRAFANA-CLUSTER-MONITORING-BINDING -n MY-PROJECT
6.3.3. 使用 Prometheus 数据源部署 Grafana
在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。
此流程描述了如何部署为 OpenShift Container Platform 4.x 监控堆栈配置的 Grafana 应用程序。
OpenShift Container Platform 4.x 在 openshift-monitoring
项目中包含一个 Thanos Querier 实例。Thanos Querier 用于聚合平台指标。
要使用所需的平台指标,您的 Grafana 实例需要一个可以连接到 Thanos Querier 的 Prometheus 数据源。要配置此连接,您需要创建一个 Config Map,它将使用令牌验证与 Thanos Querier 一起运行的 oauth-proxy
sidecar。datasource.yaml
文件用作 Config Map 的来源。
最后,您要使用作为卷挂载的 Config Map 将 Grafana 应用程序部署到包含 Kafka 集群的项目中。
步骤
获取 Grafana
ServiceAccount
的访问令牌:oc serviceaccounts get-token grafana-serviceaccount -n MY-PROJECT
复制要在下一步中使用的访问令牌。
创建一个包含 Grafana Thanos Querier 配置的
数据源.yaml
文件。将访问令牌粘贴到
httpHeaderValue1
属性中,如下所示:apiVersion: 1 datasources: - name: Prometheus type: prometheus url: https://thanos-querier.openshift-monitoring.svc.cluster.local:9091 access: proxy basicAuth: false withCredentials: false isDefault: true jsonData: timeInterval: 5s tlsSkipVerify: true httpHeaderName1: "Authorization" secureJsonData: httpHeaderValue1: "Bearer ${GRAFANA-ACCESS-TOKEN}" 1 editable: true
- 1
GRAFANA-ACCESS-TOKEN
:GrafanaServiceAccount
的访问令牌值。
从
datasource.yaml
文件创建一个名为grafana-config
的 Config Map:oc create configmap grafana-config --from-file=datasource.yaml -n MY-PROJECT
创建包含
Deployment 和
Service
的 Grafana 应用程序。The
grafana-config
Config Map 被挂载为数据源配置的卷。apiVersion: apps/v1 kind: Deployment metadata: name: grafana labels: app: strimzi spec: replicas: 1 selector: matchLabels: name: grafana template: metadata: labels: name: grafana spec: serviceAccountName: grafana-serviceaccount containers: - name: grafana image: grafana/grafana:6.3.0 ports: - name: grafana containerPort: 3000 protocol: TCP volumeMounts: - name: grafana-data mountPath: /var/lib/grafana - name: grafana-logs mountPath: /var/log/grafana - name: grafana-config mountPath: /etc/grafana/provisioning/datasources/datasource.yaml readOnly: true subPath: datasource.yaml readinessProbe: httpGet: path: /api/health port: 3000 initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: path: /api/health port: 3000 initialDelaySeconds: 15 periodSeconds: 20 volumes: - name: grafana-data emptyDir: {} - name: grafana-logs emptyDir: {} - name: grafana-config configMap: name: grafana-config --- apiVersion: v1 kind: Service metadata: name: grafana labels: app: strimzi spec: ports: - name: grafana port: 3000 targetPort: 3000 protocol: TCP selector: name: grafana type: ClusterIP
将 Grafana 应用程序部署到包含 Kafka 集群的项目中:
oc apply -f GRAFANA-APPLICATION -n MY-PROJECT
其他资源
- 第 6.3 节 “在 OpenShift 4 中查看 Kafka 指标和仪表板”
- OpenShift Container Platform 4.6 的监控 指南
6.3.4. 创建到 Grafana 服务的路由
在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。
您可以通过公开 Grafana 服务的路由访问 Grafana 用户界面。
步骤
创建到
grafana
服务的边缘路由:oc create route edge MY-GRAFANA-ROUTE --service=grafana --namespace=KAFKA-NAMESPACE
6.3.5. 导入 Grafana 仪表板示例
在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。
使用 Grafana 用户界面导入示例 Grafana 仪表板。
步骤
获取到 Grafana 服务的路由详情。例如:
oc get routes NAME HOST/PORT PATH SERVICES MY-GRAFANA-ROUTE MY-GRAFANA-ROUTE-amq-streams.net grafana
- 在 Web 浏览器中,使用 Route 主机和端口的 URL 访问 Grafana 登录屏幕。
输入您的用户名和密码,然后单击 Log In。
默认的 Grafana 用户名和密码都是
admin
。第一次登录后,您可以更改密码。- 在 Configuration > Data Sources 中,检查是否创建了 Prometheus 数据源。该数据源是在 第 6.3.3 节 “使用 Prometheus 数据源部署 Grafana” 中创建的。
- 点击 Dashboards > Manage,然后点 Import。
-
在
示例/metrics/grafana-dashboards 中
,复制仪表板的 JSON 以导入。 - 将 JSON 粘贴到文本框中,然后单击 Load。
- 为另一个示例 Grafana 仪表板重复步骤 1 -7。
导入的 Grafana 仪表板可以从 Dashboards 主页查看。
6.4. 在 OpenShift 3.11 中查看 Kafka 指标和仪表板
当将 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,您可以使用 Prometheus 为 AMQ Streams 提供的示例 Grafana 仪表板提供监控数据。您需要手动将 Prometheus 组件部署到集群中。
要运行 Grafana 仪表板示例,您必须:
本节中引用的资源旨在作为设置监控的起点,但仅作为示例提供。如果需要进一步支持在生产环境中配置和运行 Prometheus 或 Grafana,请尝试联系其各自的社区。
6.4.1. Prometheus 支持
当 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,不支持 Prometheus 服务器。但是,支持 Prometheus 端点和用于公开指标的 Prometheus JMX Exporter。
为方便起见,我们提供详细说明和示例指标配置文件,以便您希望使用 Prometheus 进行监控。
6.4.2. 设置 Prometheus
在 OpenShift Container Platform 3.11 上运行 AMQ Streams 时,请使用以下步骤。
Prometheus 为系统监控和警报通知提供一套开源的组件。
此处我们介绍了如何在将 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,如何使用提供的 Prometheus 镜像和配置文件来运行和管理 Prometheus 服务器。
先决条件
- 您已将 Prometheus 和 Grafana 的兼容版本部署到 OpenShift Container Platform 3.11 集群中。
用于运行 Prometheus 服务器 pod 的服务帐户有权访问 OpenShift API 服务器。这允许服务帐户检索集群中获取指标的 pod 列表。
如需更多信息,请参阅 发现服务。
6.4.2.1. Prometheus 配置
AMQ Streams 为 Prometheus 服务器提供了示例配置文件。
为部署提供了一个 Prometheus 镜像:
-
prometheus.yaml
以下文件中还提供了与 Prometheus 相关的其他配置:
-
prometheus-additional.yaml
-
prometheus-rules.yaml
-
strimzi-pod-monitor.yaml
若要让 Prometheus 获取监控数据,您必须已将 Prometheus 的兼容版本部署到 OpenShift Container Platform 3.11 集群中。
然后,使用配置文件来部署 Prometheus。
6.4.2.2. Prometheus 资源
应用 Prometheus 配置时,会在 OpenShift 集群中创建并由 Prometheus Operator 管理以下资源:
-
为 Prometheus 授予权限来读取 Kafka 和 ZooKeeper pod、cAdvisor 和 kubelet 针对容器指标公开的健康端点的
ClusterRole
。 -
要运行的 Prometheus Pod 的
ServiceAccount
。 -
一个
ClusterRoleBinding
,它将ClusterRole
绑定到ServiceAccount
。 -
用于管理 Prometheus Operator pod 的
Deployment
。 -
用于管理 Prometheus pod 配置的
PodMonitor
。 -
用于管理
Prometheus
pod 配置的 Prometheus。 -
用于管理
Prometheus pod 的警报规则的 PrometheusRule
。 -
用于管理其他 Prometheus 设置的
Secret
。 -
一个
服务
,允许在集群中运行的应用程序连接到 Prometheus(例如,使用 Prometheus 作为数据源的 Grafana)。
6.4.2.3. 部署 Prometheus
要在 Kafka 集群中获取监控数据,您可以通过应用 Prometheus docker 镜像的示例安装资源文件以及 Prometheus 相关资源的 YAML 文件来使用您自己的 Prometheus 部署或部署 Prometheus 部署。
部署过程创建一个 ClusterRoleBinding
,并在为部署指定的命名空间中发现 Alertmanager 实例。
先决条件
- 检查 提供的示例警报规则
步骤
根据要安装到的命名空间 Prometheus 修改 Prometheus 安装文件(
prometheus.yaml
):在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-namespace/' prometheus.yaml
在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-namespace/' prometheus.yaml
编辑
PodMonitor
资源 instrimzi-pod-monitor.yaml
,以定义 Prometheus 作业,它将从 pod 中提取指标数据。将
namespaceSelector.matchNames
属性更新为 pod 要从中提取指标的命名空间。PodMonitor
用于直接从 pod 中提取数据,用于 Apache Kafka、ZooKeeper、Operator、Kafka Bridge 和 Cruise Control。编辑
prometheus.yaml
安装文件,使其包含用于直接从节点提取指标的额外配置。提供的 Grafana 仪表板显示 CPU、内存和磁盘卷使用情况的指标,它们直接来自节点上的 OpenShift cAdvisor 代理和 kubelet。
从配置文件(如 example
/metrics/prometheus
)创建-additional-properties 目录中的 prometheus-additional
.yamlSecret
资源:oc apply -f prometheus-additional.yaml
-
编辑
prometheus.yaml
文件中的additionalScrapeConfigs
属性,使其包含Secret
的名称和prometheus-additional.yaml
文件。
部署 Prometheus 资源:
oc apply -f strimzi-pod-monitor.yaml oc apply -f prometheus-rules.yaml oc apply -f prometheus.yaml
6.4.3. 设置 Prometheus Alertmanager
Prometheus Alertmanager 是一个插件,用于处理警报并将其路由到通知服务。Alertmanager 支持监控的一个重要方面,它将收到根据警报规则指示潜在问题的条件通知。
6.4.3.1. Alertmanager 配置
AMQ Streams 为 Prometheus Alertmanager 提供示例配置文件。
配置文件定义用于部署 Alertmanager 的资源:
-
alert-manager.yaml
一个额外的配置文件提供用于从 Kafka 集群发送通知的 hook 定义。
-
alert-manager-config.yaml
对于 Alertmanger 处理 Prometheus 警报,请使用配置文件:
6.4.3.2. 警报规则
警报规则提供有关指标中观察到的特定条件的通知。规则在 Prometheus 服务器上声明,但 Prometheus Alertmanager 负责警报通知。
Prometheus 警报规则描述使用不断评估的 PromQL 表达式的条件。
当警报表达式变为 true 时,满足该条件,Prometheus 服务器会向 Alertmanager 发送警报数据。然后,Alertmanager 使用为其部署配置的通信方法发出通知。
Alertmanager 可以配置为使用电子邮件、聊天消息或其他通知方法。
其他资源
有关设置警报规则的更多信息,请参阅 Prometheus 文档中的 配置。
6.4.3.3. 警报规则示例
Kafka 和 ZooKeeper 指标的警报规则示例随 AMQ Streams 提供,供 Prometheus 部署 使用。
关于警报规则定义的常规要点:
-
for
属性与规则配合使用,以确定条件在触发警报前必须保留的时间。 -
tick 是一个基本的 ZooKeeper 时间单元,以毫秒为单位计算并使用
Kafka.spec.zookeeper.config
的tickTime
参数进行配置。例如,如果 ZooKeepertickTime=3000
,3 ticks(3 x 3000)等于 9000 毫秒。 -
ZookeeperRunningOutOfSpace
指标和警报的可用性取决于所使用的 OpenShift 配置和存储实施。某些平台的存储实施可能无法提供有关指标提供警报所需的可用空间的信息。
Kafka 警报规则
UnderReplicatedPartitions
-
提供当前代理是潜在副本但副本数比为
min.insync.replicas 配置的
副本数更少的分区数量。这个指标提供了有关托管后续副本的代理的信息。这些追随者跟不上领导者的步调。其原因可能包括(或已经)脱机,以及过限内联复制。当此值大于零时引发警报,以提供关于每个代理的副本分区的信息。 AbnormalControllerState
- 指示当前代理是否为集群的控制器。指标可以是 0 或 1。在集群生命周期内,应该只有一个代理是控制器,集群始终需要有一个活跃的控制器。有两个或多个代理表示它们是控制器,这表示问题。如果条件仍然存在,则当所有代理上此指标的所有值总和不等于 1 时,就会引发警报,即没有活跃的控制器(总和为 0)或多个控制器(总和大于 1)。
UnderMinIsrPartitionCount
-
表示领导 Kafka 代理的最小内同步副本(ISR)数量最少(使用
min.insync.replicas
指定),必须确认没有达到写入操作。指标定义代理所负责的分区数量,其中同步副本数小于最小内同步副本数。当这个值大于零时引发警报,为没有达到最小确认数的每个代理提供分区数的信息。 OfflineLogDirectoryCount
- 指明离线的日志目录数量(例如,由于硬件故障),以便代理无法再存储传入的消息。如果此值大于零,则会引发警报,以提供关于每个代理的离线日志目录数量的信息。
KafkaRunningOutOfSpace
-
表示可用于写入数据的剩余磁盘空间量。当此值低于 5GiB 时,将引发警报,提供关于每个持久卷声明的空间不足的磁盘信息。阈值可以在
prometheus-rules.yaml
中更改。
zookeeper 警报规则
AvgRequestLatency
- 表示服务器响应客户端请求所需的时间。当此值大于 10(勾号)时,会引发警报,从而提供每台服务器平均请求延迟的实际值。
OutstandingRequests
- 表示服务器中的排队请求数。当服务器接收的请求数超过其处理次数时,这个值会增加。如果此值大于 10,则会引发警报,从而为每个服务器提供实际的待处理请求数。
ZookeeperRunningOutOfSpace
- 表示可用于向 ZooKeeper 写入数据的剩余磁盘空间量。当此值低于 5GiB 时,会引发警报。它提供有关每个持久卷声明的空间不足的磁盘信息。
6.4.3.4. 部署 Alertmanager
要部署 Alertmanager,请应用 示例配置文件。
AMQ Streams 提供的示例配置将 Alertmanager 配置为将通知发送到 Slack 频道。
在部署中定义了以下资源:
-
用于管理
Alertmanager
pod 的 Alertmanager。 -
用于管理
Alertmanager 配置的机密。 -
种
服务
,用于为其他服务(如 Prometheus)连接 Alertmanager 提供简单参考主机名的服务。
步骤
从 Alertmanager 配置文件(
alert-manager-config.yaml
)创建Secret
资源:oc create secret generic alertmanager-alertmanager --from-file=alertmanager.yaml=alert-manager-config.yaml
更新
alert-manager-config.yaml
文件以替换:-
slack
_api_url
属性,以及与 Slack 工作区应用相关的 Slack API URL 的实际值 -
具有
要在其上发送通知的实际 Slack 频道的频道属性
-
slack
部署 Alertmanager:
oc apply -f alert-manager.yaml
6.4.4. 设置 Grafana
Grafana 提供 Prometheus 指标的视觉化。
您可以部署和启用 AMQ Streams 提供的 Grafana 仪表板示例。
6.4.4.1. 部署 Grafana
要提供 Prometheus 指标的视觉化,您可以使用您自己的 Grafana 安装,或通过应用 example /metrics
目录中提供的 grafana.yaml
文件来部署 Grafana。
步骤
部署 Grafana:
oc apply -f grafana.yaml
- 启用 Grafana 仪表板。
6.4.4.2. 启用 Grafana 仪表板示例
AMQ Streams 为 Grafana 提供了仪表板配置文件示例。示例仪表板作为 JSON 文件在 example/metrics
目录中提供:
-
strimzi-kafka.json
-
strimzi-zookeeper.json
-
strimzi-kafka-connect.json
-
strimzi-kafka-mirror-maker-2.json
-
strimzi-operators.json
-
strimzi-kafka-bridge.json
-
strimzi-cruise-control.json
示例仪表板是监控关键指标的良好起点,但它们并不代表所有可用的指标。您可以根据您的基础架构修改示例仪表板或添加其他指标。
设置 Prometheus 和 Grafana 后,您可以在 Grafana 仪表板上视觉化 AMQ Streams 数据。
没有定义警报通知规则。
访问仪表板时,您可以使用 port-forward
命令将 Grafana pod 的流量转发到主机。
Grafana pod 的名称为每个用户不同。
步骤
获取 Grafana 服务的详情:
oc get service grafana
例如:
NAME TYPE CLUSTER-IP 端口. grafana
ClusterIP
172.30.123.40
3000/TCP
请注意端口转发的端口号。
使用
port-forward
将 Grafana 用户界面重定向到localhost:3000
:oc port-forward svc/grafana 3000:3000
将 Web 浏览器指向
http://localhost:3000
这时会出现 Grafana Log In 页面。
输入您的用户名和密码,然后单击 Log In。
默认的 Grafana 用户名和密码都是
admin
。第一次登录后,您可以更改密码。添加 Prometheus 作为 数据源。
- 指定名称
- 添加 Prometheus 作为类型
指定 Prometheus 服务器 URL(http://prometheus-operated:9090)
添加详情后,保存并测试连接。
- 从 Dashboards → Import,上传示例仪表板或直接粘贴 JSON。
在顶部标头中,单击仪表板下拉菜单,然后选择您要查看的仪表板。
当 Prometheus 服务器收集 AMQ Streams 集群的指标一段时间时,会填充仪表板。
图 6.1. 仪表板选择选项
- AMQ Streams Kafka
显示以下指标:
- 在线代理数
- 集群计数中的活跃控制器
- 未清理领导选举率
- 在线副本
- 复制不足的分区数
- 至少在同步副本数中的分区
- 在同步副本数中最小分区
- 没有活跃领导且因此不可写入或可读的分区
- Kafka 代理 pod 内存用量
- 聚合 Kafka 代理 pod CPU 用量
- Kafka 代理 pod 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
- 传入字节速率总数
- 传出字节速率总数
- 传入的消息率
- 生成请求率总数
- 字节率
- 生成请求率
- 获取请求率
- 网络处理器平均空闲百分比
- 请求处理程序平均时间空闲百分比
日志大小
图 6.2. AMQ Streams Kafka 仪表板
- AMQ Streams ZooKeeper
显示以下指标:
- Zookeeper ensemble 的仲裁大小
- 活动 连接 数
- 服务器数中的排队请求
- watchers 数
- zookeeper pod 内存用量
- ZooKeeper pod CPU 使用量聚合
- zookeeper pod 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
- 服务器响应客户端请求所需的时间(最大值、最小值和平均值)
- AMQ Streams Kafka Connect
显示以下指标:
- 传入字节速率总数
- 传出字节速率总数
- 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- AMQ Streams Kafka MirrorMaker 2
显示以下指标:
- 连接器数量
- 任务数量
- 传入字节速率总数
- 传出字节速率总数
- 磁盘用量
- 使用的 JVM 内存
- JVM 垃圾收集时间
- AMQ Streams Operator
显示以下指标:
- 自定义资源
- 每小时成功对自定义资源进行协调
- 每小时自定义资源协调失败
- 每小时无锁定的调节
- 协调开始时间
- 每小时定期协调
- 最大协调时间
- 平均协调时间
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
6.5. 添加 Kafka Exporter
Kafka Exporter 是一个开源项目,用于增强对 Apache Kafka 代理和客户端的监控。Kafka Exporter 附带 AMQ Streams 以用于部署 Kafka 集群,以便从 Kafka 代理中提取与偏移、消费者组、消费者滞后和主题相关的额外指标数据。
例如,使用指标数据来帮助识别慢速消费者。
滞后数据作为 Prometheus 指标公开,随后可在 Grafana 中提供以进行分析。
如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为同时提取 Kafka Exporter Prometheus 端点。
6.5.1. 监控消费者滞后
消费滞后表明消息的生产和消耗速度不同。具体来说,消费者对给定消费者组的滞后指示分区中最后一条消息与该消费者当前采用的消息之间的延迟。
与分区日志末尾相比,滞后反映了消费者偏移的位置。
生产者和消费者偏移的消费者滞后情况
这种差别有时被称为制作者偏移和消费者偏移之间的 delta : Kafka 代理主题分区中的读取和写入位置。
假设主题流 100 条消息秒。制作者偏移(主题分区头)和消费者最后的偏移之间有 1000 条消息的滞后意味着 10 秒的延迟。
监控消费者滞后的重要性
对于依赖(near)实时数据处理的应用程序,监控消费者滞后情况对于检查它是否不再太大至关重要。滞后越高,进程距离实时处理目标越高。
例如,消费者的滞后可能是由于消耗过多的陈旧数据而导致的,这些数据尚未清除,或者是通过计划外关机造成的。
减少消费者滞后
降低滞后率的典型操作包括:
- 通过添加新用户来扩展消费者组
- 增加消息的保留时间以保留在某个主题中
- 添加更多磁盘容量来增加消息缓冲
降低消费者滞后问题的操作取决于底层基础架构,而 AMQ Streams 支持用例。例如,适应消费者不太可能从其磁盘缓存服务获取请求的代理中受益。在某些情况下,可以同意在消费者发现信息之前自动丢弃消息。
6.5.2. Kafka Exporter 警报规则示例
如果您执行了在部署中引入指标的步骤,您的 Kafka 集群将配置为使用支持 Kafka Exporter 的警报通知规则。
Kafka Exporter 的规则在 prometheus-rules.yaml
中定义,并使用 Prometheus 部署。如需更多信息,请参阅 Prometheus。
特定于 Kafka Exporter 的警报通知规则示例如下:
UnderReplicatedPartition
- 警告主题被复制不足且代理没有复制到足够分区的警报。如果某个主题有一个或多个复制分区,则默认配置用于警报。该警报可能表示 Kafka 实例停机或 Kafka 集群已过载。可能需要计划重启 Kafka 代理,才能重新启动复制过程。
TooLargeConsumerGroupLag
- 警告使用者组的滞后对于特定主题分区来说太过大的警报。默认配置是 1000 记录。很大的滞后可能意味着消费者太慢,正在落于生产者后面。
NoMessageForTooLong
- 提醒主题在一段时间内未收到消息的警报。时间段的默认配置为 10 分钟。延迟可能是因为配置问题导致制作者无法将消息发布到该主题。
根据您的特定需求调整这些规则的默认配置。
6.5.3. 公开 Kafka Exporter 指标
Kafka Exporter 会将较慢的信息作为 Prometheus 指标在 Grafana 中公开。
Kafka Exporter 为代理、主题和消费者组公开指标数据。
此处描述了所提取的数据。
表 6.2. 代理指标输出
Name | 信息 |
---|---|
| Kafka 集群中代理数量 |
表 6.3. 主题指标输出
Name | 信息 |
---|---|
| 主题的分区数 |
| 代理的当前主题分区偏移 |
| 代理的最旧主题分区偏移 |
| 主题分区的同步副本数 |
| 主题分区的领导代理 ID |
|
显示 |
| 此主题分区的副本数 |
|
如果主题分区复制不足,显示 |
表 6.4. 消费者组指标输出
Name | 信息 |
---|---|
| 消费者组的当前主题分区偏移 |
| 某个消费者小组在某个主题分区的近似滞后情况 |
6.5.4. 配置 Kafka Exporter
此流程演示了如何通过 Kafka Exporter 属性在
。
Kafka
资源中配置 Kafka Exporter
有关配置 Kafka
资源的更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 示例 Kafka YAML 配置。
此流程中显示了与 Kafka Exporter 配置相关的属性。
您可以将这些属性配置为 Kafka 集群的部署或重新部署的一部分。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
步骤
编辑
Kafka 资源的
属性。Kafka
Exporter您可以配置的属性显示在此示例配置中:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: # ... kafkaExporter: image: my-org/my-image:latest 1 groupRegex: ".*" 2 topicRegex: ".*" 3 resources: 4 requests: cpu: 200m memory: 64Mi limits: cpu: 500m memory: 128Mi logging: debug 5 enableSaramaLogging: true 6 template: 7 pod: metadata: labels: label1: value1 imagePullSecrets: - name: my-docker-credentials securityContext: runAsUser: 1000001 fsGroup: 0 terminationGracePeriodSeconds: 120 readinessProbe: 8 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: 9 initialDelaySeconds: 15 timeoutSeconds: 5 # ...
创建或更新资源:
oc apply -f kafka.yaml
接下来要做什么
配置和部署 Kafka Exporter 后,您可以 启用 Grafana 来显示 Kafka Exporter 仪表板。
6.5.5. 启用 Kafka Exporter Grafana 仪表板
AMQ Streams 为 Grafana 提供了仪表板配置文件示例。Kafka Exporter 仪表板作为 JSON 文件在 example/metrics
目录中提供:
-
strimzi-kafka-exporter.json
如果使用 Kafka 集群部署了 Kafka Exporter,您可以视觉化它在 Grafana 仪表板中公开的指标数据。
先决条件
此流程假设您已有权访问 Grafana 用户界面,Prometheus 已添加为数据源。如果您是第一次访问用户界面,请参阅 Grafana。
步骤
- 访问 Grafana 用户界面。
选择 Strimzi Kafka Exporter 仪表板。
收集指标数据一段时间后,会填充 Kafka Exporter chart。
- AMQ Streams Kafka Exporter
显示以下指标:
- 主题数量
- 分区数
- 副本数
- 同步副本数
- 复制不足的分区数
- 至少在同步副本数中的分区
- 在同步副本数中最小分区
- 不在首选节点上的分区
- 每秒来自主题的消息
- 每秒从主题中消耗的消息
- 消费者组每分钟使用的消息
- 消费组的滞后
- 分区数
- 最新偏移
- 最旧的偏移
使用 Grafana 图表分析滞后情况,检查减少滞后问题的操作是否对受影响的消费者组产生影响。例如,如果对 Kafka 代理进行调整以减少滞后,仪表板将显示 Lag by consumer 组 图表停机,并且 每分钟消耗的 Messages 都会增加。
6.6. 监控 Kafka Bridge
如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为也提取 Kafka Bridge Prometheus 端点。
Kafka Bridge 的 Grafana 仪表板示例提供:
- 有关 HTTP 连接和不同端点相关请求的信息
- 有关网桥使用的 Kafka 消费者和生产者的信息
- 网桥本身的 JVM 指标
6.6.1. 配置 Kafka 网桥
您可以使用 enableMetrics
属性在 KafkaBridge
资源中启用 Kafka Bridge 指标。
您可以将此属性配置为 Kafka Bridge 部署或重新部署的一部分。
例如:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaBridge metadata: name: my-bridge spec: # ... bootstrapServers: my-cluster-kafka:9092 http: # ... enableMetrics: true # ...
6.6.2. 启用 Kafka Bridge Grafana 仪表板
如果使用 Kafka 集群部署 Kafka Bridge,您可以启用 Grafana 以显示它公开的指标数据。
example /metrics
目录中以 JSON 文件形式提供了 Kafka Bridge 仪表板:
-
strimzi-kafka-bridge.json
当收集指标数据一段时间后,会填充 Kafka Bridge chart。
- Kafka Bridge
显示以下指标:
- 到 Kafka 网桥计数的 HTTP 连接
- 处理中的 HTTP 请求数
- 每秒处理的请求按 HTTP 方法分组
- 按响应代码分组的总请求率(2XX、4XX、5XX)
- 每秒接收和发送的字节数
- 对每个 Kafka Bridge 端点的请求
- Kafka Bridge 使用的 Kafka 用户、生产者和相关打开连接的数量
Kafka 制作者:
- 每秒发送的记录平均数量(按主题分组)
- 每秒发送到所有代理的传出字节数(按主题分组)
- 导致错误的每秒记录平均数量(按主题分组)
Kafka 用户:
- 每秒消耗的平均记录数(按 clientId-topic 进行分组)
- 每秒消耗的平均字节数(按 clientId-topic 进行分组)
- 分配分区(由 clientId 进行分组)
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
6.7. monitor 清理控制
如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为还提取 Cruise Control Prometheus 端点。
Cruise Control 的 Grafana 仪表板示例提供:
- 有关优化计算、目标违反、集群均衡等的信息
- 有关重新平衡调整和实际重新平衡操作的 REST API 调用的信息
- 来自清理控制本身的 JVM 指标
6.7.1. 配置清理控制
您可以使用包含要公开指标的 JMX 导出器配置的 cruiseControl.metrics
属性在 Kafka
资源中启用 Cruise Control 指标。
例如:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka metadata: name: my-cluster spec: # ... kafka: # ... zookeeper: # ... cruiseControl: metrics: lowercaseOutputName: true rules: - pattern: kafka.cruisecontrol<name=(.+)><>(\w+) name: kafka_cruisecontrol_$1_$2 type: GAUGE
6.7.2. 启用 Cruise Control Grafana 仪表板
如果您使用启用了指标的 Kafka 集群部署了 Cruise Control,您可以启用 Grafana 呈现它公开的指标数据。
example /metrics
目录中以 JSON 文件的形式提供了一个 Cruise Control 仪表板:
-
strimzi-cruise-control.json
收集指标数据一段时间后,会填充 Cruise Control chart。
- Sything Control
显示以下指标:
- Cruise Control 监控的快照窗口数量
- 有效时间窗数量,因为它们包含足够的样本来计算优化建议
- 运行的连续执行数量,用于调整或重新平衡
- 由 Cruise Control 的异常检测器组件计算的 Kafka 集群的当前平衡度分数(默认为 5 分钟)
- 受监控分区的百分比
- 异常检测器报告的目标违反数量(默认为 5 分钟)
- 代理中磁盘读取失败的频率
- 指标示例获取失败的速度
- 计算优化建议所需的时间
- 创建集群模型所需的时间
- 通过 Cruise Control REST API 进行请求或实际重新平衡请求的频率
- 通过 Cruise Control REST API 请求整个集群状态和用户任务状态的频率
- 使用的 JVM 内存
- JVM 垃圾收集时间
- JVM 垃圾回收计数
第 7 章 升级 AMQ Streams
AMQ Streams 可以升级,无需集群停机。AMQ Streams 的每个版本都支持一个或多个 Apache Kafka 版本。只要您的 AMQ Streams 版本支持,就可以升级到更高的 Kafka 版本。
AMQ Streams 的新版本可能支持较新版本的 Kafka,但需要先升级 AMQ Streams,然后才能 升级到更高版本的 Kafka。
要升级 AMQ Streams Operator,您可以使用 OpenShift Container Platform 集群上的 Operator Lifecycle Manager(OLM)。
如果适用,必须在 升级 AMQ Streams 和 Kafka 后进行资源 升级。
7.1. AMQ Streams 和 Kafka 升级
升级 AMQ Streams 是一个两阶段的过程。要在没有停机的情况下升级代理和客户端,您必须 按照以下顺序完成升级过程:
将 Cluster Operator 更新至最新的 AMQ Streams 版本。您采取的方法取决于如何 部署 Cluster Operator。
- 如果使用安装 YAML 文件部署 Cluster Operator,请通过 修改 Operator 安装文件 来执行升级。
如果您从 OperatorHub 部署 Cluster Operator,请使用 Operator Lifecycle Manager(OLM)将 AMQ Streams Operator 的更新频道改为新的 AMQ Streams 版本。
根据您选择的升级策略,按照频道更新进行操作:
- 启动 自动 升级
- 然后,在开始安装前需要批准 手动 升级
有关使用 OperatorHub 升级 Operator 的更多信息,请参阅 升级已安装的 Operator。
- 将所有 Kafka 代理和客户端应用程序升级到最新的 Kafka 版本。
7.1.1. Kafka 版本
Kafka 的日志消息格式版本和broker 协议版本指定附加到消息的日志格式版本以及集群中使用的协议版本。因此,升级过程包括对现有的 Kafka 代理进行配置更改,并对客户端应用程序(使用者和生产者)进行代码更改以确保使用正确的版本。
下表显示了 Kafka 版本之间的区别:
Kafka 版本 | Interbroker 协议版本 | 日志消息格式版本 | zookeeper 版本 |
---|---|---|---|
2.5.0 | 2.5 | 2.5 | 3.5.8 |
2.6.0 | 2.6 | 2.6 | 3.5.8 |
消息格式版本
当生产者向 Kafka 代理发送消息时,该消息将使用特定格式进行编码。Kafka 版本的格式可能会改变,因此消息包括一个版本,用于标识它们编码的格式版本。您可以配置 Kafka 代理,在代理将消息附加到日志前将信息从较新的格式转换为给定的旧格式版本。
在 Kafka 中,设置消息格式版本的方法有两种:
-
message.format.version
属性在主题上设置。 -
log.message.format.version
属性在 Kafka 代理上设置。
主题的 message.format.version
默认值由 Kafka 代理上设置的 log.message.format.version
定义。您可以通过修改主题配置来手动设置主题的 message.format.version
。
本节中的升级任务假定消息格式版本由 log.message.format.version
定义。
7.1.2. 升级 Cluster Operator
本节概述了将 Cluster Operator 部署升级到使用 AMQ Streams 1.6 的步骤。
由 Cluster Operator 管理的 Kafka 集群的可用性不受升级操作的影响。
有关如何升级到该版本的信息,请参阅支持 AMQ Streams 特定版本的文档。
7.1.2.1. 将 Cluster Operator 升级到更新的版本
此流程描述了如何将 Cluster Operator 部署升级到更新的版本。
先决条件
- 现有 Cluster Operator 部署可用。
- 已 下载新版本的安装文件。
步骤
-
记录对现有 Cluster Operator 资源所做的任何配置更改(在
/install/cluster-operator
目录中)。任何更改都将被 Cluster Operator 的新版本 覆盖。 更新 Cluster Operator。
根据 Cluster Operator 运行的命名空间修改新版本的安装文件。
在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
-
如果在现有 Cluster Operator
Deployment
中修改了一个或多个环境变量,请编辑install/cluster-operator/060-Deployment-cluster-operator.yaml
文件来使用这些环境变量。
当您有更新的配置时,请将其与其它安装资源一起部署:
oc apply -f install/cluster-operator
等待滚动更新完成。
获取 Kafka pod 的镜像以确保升级成功:
oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,
<New AMQ Streams version>-kafka-<Current Kafka version>
。更新现有资源以处理已弃用的自定义资源属性。
现在,您有一个更新的 Cluster Operator,但在它管理的集群中运行的 Kafka 版本没有改变。
接下来要做什么
在 Cluster Operator 升级后,您可以执行 Kafka 升级。
7.1.3. 升级 Kafka
将 Cluster Operator 升级到 1.6 后,下一步是将所有 Kafka 代理升级到最新支持的 Kafka 版本。
Kafka 升级由 Cluster Operator 通过对 Kafka 代理的滚动更新来执行。
Cluster Operator 根据 Kafka 集群配置启动滚动更新。
如果 Kafka.spec.kafka.config 包含… | Cluster Operator 启动s… |
---|---|
|
单个滚动更新.更新后, |
| 两次滚动更新. |
没有 | 两次滚动更新. |
作为 Kafka 升级的一部分,Cluster Operator 会启动 ZooKeeper 的滚动更新。
- 即使 ZooKeeper 版本未更改,也会出现一次滚动更新。
- 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会出现额外的滚动更新。
7.1.3.1. Kafka 版本和镜像映射
在升级 Kafka 时,请考虑对 STRIMZI_KAFKA_IMAGES
和 Kafka.spec.kafka.version
属性的设置。
-
每个
Kafka
资源都可以使用Kafka.spec.kafka.version
配置。 Cluster Operator 的
STRIMZI_KAFKA_IMAGES
环境变量提供 Kafka 版本和给定Kafka
资源中请求该版本时使用的镜像之间的映射。-
如果没有配置
Kafka.spec.kafka.image
,则会使用给定版本的默认镜像。 -
如果配置了
Kafka.spec.kafka.image
,则默认镜像会被覆盖。
-
如果没有配置
Cluster Operator 无法验证镜像是否实际包含预期版本的 Kafka 代理。请特别注意确保给定镜像与给定的 Kafka 版本对应。
7.1.3.2. 升级客户端的策略
升级客户端应用程序(包括 Kafka Connect 连接器)的最佳方法取决于您的具体情形。
使用应用程序时,需要以它们所理解的消息格式接收消息。您可以通过以下两种方式之一来确保这是这种情况:
- 在升级任何制作者 之前,先 升级所有消费者的主题。
- 通过将代理关闭消息置于较旧格式。
使用代理 down-conversion 会给代理带来额外的负载,因此在较长时间内依赖所有主题并不理想。要使代理以最佳方式运行,绝对不应是向下转换消息。
代理 down-conversion 可通过两种方式进行配置:
-
主题级别
message.format.version
为单个主题配置它。 -
代理级
log.message.format.version
是未配置主题级别message.format.version
的主题的默认设置。
以新版本格式发布到主题的消息对消费者可见,因为代理在从生产者接收消息时执行 down-conversion,而不是当它们发送到消费者时。
您可以使用多个策略来升级客户端:
- 消费者第一
- 升级所有使用的应用程序。
-
将代理级别的
log.message.format.version
更改为新版本。 升级所有生成的应用程序。
此策略非常简单,可避免任何代理停机转换。但是,它假定贵组织中的所有消费者可以协调升级,而且不适用于消费者和生产者的应用程序。如果升级的客户端出现问题,新格式消息可能会添加到消息日志中,因此您无法恢复到以前的使用者版本。
- 每个主题消费者首先
对于每个主题:
- 升级所有使用的应用程序。
-
将主题级
message.format.version
更改为新版本。 升级所有生成的应用程序。
此策略可避免任何代理 down-conversion,并且意味着您可以逐个主题地进行。它不适用于同时作为同一主题的消费者和制作者的应用。再强调一下,如果升级的客户端出现问题,新格式消息可能会添加到消息日志中。
- 首先每个主题消费者,进行向下转换
对于每个主题:
-
将主题级
message.format.version
更改为旧版本(或依赖主题默认到代理级别的log.message.format.version
)。 - 升级所有使用和生产的应用程序。
- 验证升级的应用是否正常工作。
将主题级
message.format.version
更改为新版本。这个策略需要代理 down-conversion,但代理的负载会最小化,因为一次只需要单个主题(或小组主题)需要它。它还适用于同时是同一主题的消费者和制作者的应用。这种方法可确保升级的生产者和消费者在提交使用新的消息格式版本之前正常工作。
这种方法的主要缺点是,在含有许多主题和应用的集群中进行管理可能比较复杂。
-
将主题级
还可以通过其他策略来升级客户端应用。
也可以应用多个策略。例如,在前几个应用程序和主题中,"每个主题消费者首先使用",可以使用向下转换"策略"。如果这已被证明是成功的,那么更高效的策略可被视为可以接受采用。
7.1.3.3. 升级 Kafka 代理和客户端应用程序
这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。
与您当前的 Kafka 版本相比,新版本可能支持更高的 日志消息格式版本 或代理协议版本,或两者。如果需要,请按照步骤升级这些版本。如需更多信息,请参阅 第 7.1.1 节 “Kafka 版本”。
您还应该选择 升级客户端的策略。Kafka 客户端在此流程的第 6 步中升级。
先决条件
要升级 Kafka
资源,请检查:
- 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
-
Kafka.spec.kafka.config
不包含 新 Kafka 版本中不支持的选项。
步骤
更新 Kafka 集群配置:
oc edit kafka my-cluster
如果配置了,请确保
Kafka.spec.kafka.config
的 log.message.format
设置为 当前 Kafka 版本的默认值。.version
和 inter.broker.protocol.version例如,如果从 Kafka 版本 2.5.0 升级到 2.6.0:
kind: Kafka spec: # ... kafka: version: 2.5.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.5" # ...
如果没有配置
log.message.format
,AMQ Streams 会在下一步中将这些版本更新至 Kafka 版本后自动将这些版本更新至当前的默认值。.version
和 inter.broker.protocol.version注意log.message.format.version
和inter.broker.protocol.version
的值必须是字符串,以防止它们被解释为浮动点数。更改
Kafka.spec.kafka.version
以指定新的 Kafka 版本;保留log.message.format.version
和inter.broker.protocol.version
,默认为 当前 Kafka 版本。注意更改
kafka.version
可确保集群中的所有代理都将升级为使用新的代理二进制文件。在此过程中,一些代理使用旧二进制文件,另一些则已升级至新二进制文件。不要更改inter.broker.protocol.version
,确保代理可以在升级过程中继续相互通信。例如,如果从 Kafka 2.5.0 升级到 2.6.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 1 config: log.message.format.version: "2.5" 2 inter.broker.protocol.version: "2.5" 3 # ...
警告如果新 Kafka 版本的
inter.broker.protocol.version
发生变化,则无法降级 Kafka。broker 协议版本决定了代理存储的持久元数据的模式,包括写入到__consumer_offset
的消息。降级集群将不会理解消息。如果在 Kafka 自定义资源(在 Kafka
.spec.kafka.image 中)中定义了 Kafka
集群的镜像,请将镜像
更新为指向具有新 Kafka 版本的容器镜像。请参阅 Kafka 版本和镜像映射
保存并退出编辑器,然后等待滚动更新完成。
通过观察 pod 状态转换来检查滚动更新的进度:
oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
滚动更新可确保每个 pod 在新版本的 Kafka 中使用代理二进制文件。
根据您选择的 升级客户端策略,升级所有客户端应用以使用新版本的客户端二进制文件。
如果需要,将 Kafka Connect 和 MirrorMaker 的
version
属性设置为 Kafka 的新版本:-
对于 Kafka Connect,更新
KafkaConnect.spec.version
。 -
对于 MirrorMaker,更新
KafkaMirrorMaker.spec.version
。 -
对于 MirrorMaker 2.0,更新
KafkaMirrorMaker2.spec.version
。
-
对于 Kafka Connect,更新
如果配置,更新 Kafka 资源,以使用新的
inter.broker.protocol.version
版本。否则,请转至第 9 步。例如,如果升级到 Kafka 2.6.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.5" inter.broker.protocol.version: "2.6" # ...
- 等待 Cluster Operator 更新集群。
如果配置,更新 Kafka 资源,以使用新的
log.message.format.version
版本。否则,请转至第 10 步。例如,如果升级到 Kafka 2.6.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.6" inter.broker.protocol.version: "2.6" # ...
等待 Cluster Operator 更新集群。
- Kafka 集群和客户端现在使用新的 Kafka 版本。
- 代理配置为使用 Kafka 的新版本的 Inter-broker 协议版本和消息格式发送消息。
在 Kafka 升级后,如果需要,您可以:
7.1.3.4. 更新监听程序配置
AMQ Streams 提供 GenericKafkaListener
模式,用于在 Kafka 资源
中配置 Kafka 侦听器。
GenericKafkaListener
取代 KafkaListeners
模式,后者已被弃用。
使用 GenericKafkaListener
模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。监听器
配置被定义为数组,但已弃用的格式也受到支持。
对于 OpenShift 集群内的客户端,您可以创建 普通
(无需加密)或 tls
内部 侦听器。
对于 OpenShift 集群外的客户端,您可以创建 外部 侦听器并指定连接机制,可以是 节点端口
、负载均衡器、
入口
或 路由
。
KafkaListeners
模式将子属性用于 纯文本
、tls
和外部
监听程序,以及每个节点的固定端口。升级 Kafka 后,您可以将使用 KafkaListeners
模式配置的监听程序转换为 GenericKafkaListener
模式的格式。
例如,如果您当前在 Kafka
配置中使用以下配置:
旧监听程序配置
listeners: plain: # ... tls: # ... external: type: loadbalancer # ...
使用以下方法将监听程序转换为新格式:
新的监听程序配置
listeners: #... - name: plain port: 9092 type: internal tls: false 1 - name: tls port: 9093 type: internal tls: true - name: external port: 9094 type: EXTERNAL-LISTENER-TYPE 2 tls: true
确保使用显示 的确切 名称和端口号。
对于任何其他 配置
或 覆盖
使用旧格式的属性,您需要将它们更新为新格式。
对监听程序 配置
进行了更改:
-
覆盖
与configuration
部分合并 -
dnsAnnotations
已重命名为注解
-
preferredAddressType
被重命名为preferredNodePortAddressType
-
地址
已被重命名为alternativesNames
-
LoadBalancerSourceRanges 和 external
TrafficPolicy
移至现在已弃用的模板
中的监听程序配置
例如,这个配置:
旧的其他监听程序配置
listeners: external: type: loadbalancer authentication: type: tls overrides: bootstrap: dnsAnnotations: #...
更改:
新的附加监听程序配置
listeners: #... - name: external port: 9094 type:loadbalancer tls: true authentication: type: tls configuration: bootstrap: annotations: #...
新监听器配置中显示的名称和端口号 必须 用于向后兼容。使用任何其他值将导致 Kafka 侦听器和 OpenShift 服务重命名。
有关各种侦听器可用的配置选项的更多信息,请参阅 GenericKafkaListener
模式参考。
7.1.3.5. 升级消费者和 Kafka Streams 应用程序以合作重新平衡
您可以升级 Kafka 用户和 Kafka Streams 应用程序,以使用 增量合作重新平衡 协议进行分区重新平衡,而不是默认的预先 重新平衡 协议。在 Kafka 2.4.0 中添加了新协议。
消费者将分区分配保持在合作重新平衡状态,并仅在流程结束时予以撤销(如果需要)来实现平衡集群。这可减少使用者组或 Kafka Streams 应用程序的不可用。
升级到增量合作重新平衡协议是可选的。预先重新平衡协议仍然受到支持。
先决条件
- 您已将 Kafka 代理和客户端应用程序升级到 Kafka 2.6.0。
步骤
要升级 Kafka 消费者以使用增量合作重新平衡协议:
-
将 Kafka client
.jar
文件替换为新版本。 -
在使用者配置中,将
合作粘滞
附加到partition.assignment.strategy
。例如,如果设定了范围
策略,请将配置更改为范围,协作粘性
。 - 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
-
通过从使用者配置中删除先前的
partition.assignment.strategy 来
重新配置组中的每个消费者,仅保留协作式
策略。 - 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
升级 Kafka Streams 应用程序以使用增量合作重新平衡协议:
-
将 Kafka Streams
.jar
文件替换为新版本。 -
在 Kafka Streams 配置中,将
upgrade.from
配置参数设置为您要从中升级的 Kafka 版本(例如 2.3)。 - 依次重新启动每个流处理器(节点)。
-
从 Kafka Streams 配置中删除
upgrade.from
配置参数。 - 依次重新启动组中的每个消费者。
其他资源
- Apache Kafka 文档中的 2.4.0 中的显著变化。
7.2. AMQ Streams 资源升级
kafka.strimzi.io/v1alpha1
API 版本已弃用,用于以下 AMQ Streams 资源:
-
kafka
-
KafkaConnect
-
KafkaConnectS2I
-
KafkaMirrorMaker
-
KafkaTopic
-
KafkaUser
更新这些资源,以使用 kafka.strimzi.io/v1beta1
API 版本。
本节论述了资源的升级步骤。
资源升级 必须在 升级 Cluster Operator 后执行,以便 Cluster Operator 可以了解资源。
如果资源升级不起作用,该怎么办?
如果升级不生效,协调日志中会给出一个警告信息,表示在 apiVersion
更新前无法更新资源。
要触发更新,请对自定义资源进行任意更改,如添加注解。
注解示例:
metadata: # ... annotations: upgrade: "Upgraded to kafka.strimzi.io/v1beta1"
以下流程描述了更新特定资源以使用 kafka.strimzi.io/v1beta1
API 版本的步骤:
7.2.1. 升级 Kafka 资源
先决条件
-
支持
v1beta1
API 版本的一个 Cluster Operator 已启动且正在运行。
步骤
对部署中的每个 Kafka
资源执行以下步骤。
更新编辑器中的
Kafka
资源。oc edit kafka my-cluster
替换:
apiVersion: kafka.strimzi.io/v1alpha1
使用:
apiVersion: kafka.strimzi.io/v1beta1
如果
Kafka
资源有:Kafka.spec.topicOperator
替换为:
Kafka.spec.entityOperator.topicOperator
例如,替换:
spec: # ... topicOperator: {}
使用:
spec: # ... entityOperator: topicOperator: {}
如果存在,移动:
Kafka.spec.entityOperator.affinity
Kafka.spec.entityOperator.tolerations
改为:
Kafka.spec.entityOperator.template.pod.affinity
Kafka.spec.entityOperator.template.pod.tolerations
例如,移动:
spec: # ... entityOperator: affinity {} tolerations {}
改为:
spec: # ... entityOperator: template: pod: affinity {} tolerations {}
如果存在,移动:
Kafka.spec.kafka.affinity
Kafka.spec.kafka.tolerations
改为:
Kafka.spec.kafka.template.pod.affinity
Kafka.spec.kafka.template.pod.tolerations
例如,移动:
spec: # ... kafka: affinity {} tolerations {}
改为:
spec: # ... kafka: template: pod: affinity {} tolerations {}
如果存在,移动:
Kafka.spec.zookeeper.affinity
Kafka.spec.zookeeper.tolerations
改为:
Kafka.spec.zookeeper.template.pod.affinity
Kafka.spec.zookeeper.template.pod.tolerations
例如,移动:
spec: # ... zookeeper: affinity {} tolerations {}
改为:
spec: # ... zookeeper: template: pod: affinity {} tolerations {}
- 保存文件,退出编辑器并等待更新的资源得到协调。
7.2.2. 升级 Kafka Connect 资源
先决条件
-
支持
v1beta1
API 版本的一个 Cluster Operator 已启动且正在运行。
步骤
对部署中的每个 KafkaConnect
资源执行以下步骤。
更新编辑器中的
KafkaConnect
资源。oc edit kafkaconnect my-connect
替换:
apiVersion: kafka.strimzi.io/v1alpha1
使用:
apiVersion: kafka.strimzi.io/v1beta1
如果存在,移动:
KafkaConnect.spec.affinity
KafkaConnect.spec.tolerations
改为:
KafkaConnect.spec.template.pod.affinity
KafkaConnect.spec.template.pod.tolerations
例如,移动:
spec: # ... affinity {} tolerations {}
改为:
spec: # ... template: pod: affinity {} tolerations {}
- 保存文件,退出编辑器并等待更新的资源得到协调。
7.2.3. 升级 Kafka Connect S2I 资源
先决条件
-
支持
v1beta1
API 版本的一个 Cluster Operator 已启动且正在运行。
步骤
对部署中的每个 KafkaConnectS2I
资源执行以下步骤。
更新编辑器中的
KafkaConnectS2I
资源。oc edit kafkaconnects2i my-connect
替换:
apiVersion: kafka.strimzi.io/v1alpha1
使用:
apiVersion: kafka.strimzi.io/v1beta1
如果存在,移动:
KafkaConnectS2I.spec.affinity
KafkaConnectS2I.spec.tolerations
改为:
KafkaConnectS2I.spec.template.pod.affinity
KafkaConnectS2I.spec.template.pod.tolerations
例如,移动:
spec: # ... affinity {} tolerations {}
改为:
spec: # ... template: pod: affinity {} tolerations {}
- 保存文件,退出编辑器并等待更新的资源得到协调。
7.2.4. 升级 Kafka MirrorMaker 资源
先决条件
-
支持
v1beta1
API 版本的一个 Cluster Operator 已启动且正在运行。
步骤
对部署中的每个 KafkaMirrorMaker
资源执行以下步骤。
更新编辑器
中的 KafkaMirrorMaker
资源。oc edit kafkamirrormaker my-connect
替换:
apiVersion: kafka.strimzi.io/v1alpha1
使用:
apiVersion: kafka.strimzi.io/v1beta1
如果存在,移动:
KafkaConnectMirrorMaker.spec.affinity
KafkaConnectMirrorMaker.spec.tolerations
改为:
KafkaConnectMirrorMaker.spec.template.pod.affinity
KafkaConnectMirrorMaker.spec.template.pod.tolerations
例如,移动:
spec: # ... affinity {} tolerations {}
改为:
spec: # ... template: pod: affinity {} tolerations {}
- 保存文件,退出编辑器并等待更新的资源得到协调。
7.2.5. 升级 Kafka 主题资源
先决条件
-
支持
v1beta1
API 版本的一个主题 Operator 已启动且正在运行。
步骤
对部署中的每个 KafkaTopic
资源执行以下步骤。
更新编辑器中的
KafkaTopic
资源。oc edit kafkatopic my-topic
替换:
apiVersion: kafka.strimzi.io/v1alpha1
使用:
apiVersion: kafka.strimzi.io/v1beta1
- 保存文件,退出编辑器并等待更新的资源得到协调。
7.2.6. 升级 Kafka 用户资源
先决条件
-
支持
v1beta1
API 版本的用户 Operator 已启动且正在运行。
步骤
对部署中的每个 KafkaUser
资源执行以下步骤。
更新编辑器中的
KafkaUser
资源。oc edit kafkauser my-user
替换:
apiVersion: kafka.strimzi.io/v1alpha1
使用:
apiVersion: kafka.strimzi.io/v1beta1
- 保存文件,退出编辑器并等待更新的资源得到协调。
第 8 章 降级 AMQ Streams
如果您在升级到的 AMQ Streams 版本上遇到问题,您可以将安装恢复到之前的版本。
您可以执行降级:
将 Cluster Operator 恢复到以前的 AMQ Streams 版本。
将所有 Kafka 代理和客户端应用程序降级到以前的 Kafka 版本。
如果之前版本的 AMQ Streams 不支持您使用的 Kafka 版本,只要附加至消息的日志消息格式版本匹配,您还可以降级 Kafka。
8.1. 将 Cluster Operator 降级为以前的版本
如果您遇到 AMQ Streams 的问题,可以恢复您的安装。
此流程描述了如何将 Cluster Operator 部署降级到以前的版本。
先决条件
- 现有 Cluster Operator 部署可用。
- 已 下载上一个版本的安装文件。
步骤
-
记录对现有 Cluster Operator 资源所做的任何配置更改(在
/install/cluster-operator
目录中)。任何更改都将被之前版本的 Cluster Operator 覆盖。 - 恢复自定义资源,以反映您要降级到的 AMQ Streams 版本支持的配置选项。
更新 Cluster Operator。
根据 Cluster Operator 运行的命名空间修改之前版本的安装文件。
在 Linux 中,使用:
sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
在 MacOS 中,使用:
sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
-
如果在现有 Cluster Operator
Deployment
中修改了一个或多个环境变量,请编辑install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
文件来使用这些环境变量。
当您有更新的配置时,请将其与其它安装资源一起部署:
oc apply -f install/cluster-operator
等待滚动更新完成。
获取 Kafka pod 的镜像以确保降级成功:
oc get pod my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'
镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,NEW
-STREAMS-VERSION-kafka-CURRENT-KAFKA-VERSION
。
您的 Cluster Operator 已降级到以前的版本。
8.2. 降级 Kafka
Kafka 版本降级由 Cluster Operator 执行。
8.2.1. 降级的 Kafka 版本兼容性
Kafka 降级依赖于兼容的当前和目标 Kafka 版本,以及记录消息的状态。
如果该版本不支持集群中 曾使用的任何 inter.broker.protocol.version
设置,或者已将消息添加到使用 较新的 log.message.format.version
的消息日志中,则无法恢复到以前的 Kafka 版本。
inter.broker.protocol.version
决定代理用于存储持久元数据的模式,例如写入到 __consumer_offsets
的消息的 schema。如果您降级到不理解之前在集群中使用的 inter.broker.protocol.version
的 Kafka 版本,代理会遇到它无法理解的数据。
如果 Kafka 的目标降级版本有:
-
与
当前版本相同的
log.message.format.version,通过对代理执行一次滚动重启来降级 Cluster Operator。 只有正在运行的集群 始终将
log.message.format.version
设置为由降级版本使用的版本,才可 使用不同的log.message.format.version
。这通常是在log.message.format.version
改变前中止升级过程的情况。在这种情况下,降级需要:- 如果两个版本的检查程序协议不同,则代理的两个滚动重启
- 单个滚动重启(如果相同)
如果新版本曾使用过上一版本不支持 的 log.message.format.version
,则无法进行降级,包括当使用了 log.message.format.version
的默认值时。例如,这个资源可以降级为 Kafka 版本 2.5.0,因为 log.message.format.version
尚未更改:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.6.0 config: log.message.format.version: "2.5" # ...
如果 log.message.format.version
设置为 "2.6"
或没有值(因此 参数使用 2.6.0 代理 2.6 的默认值),则无法进行降级。
8.2.2. 降级 Kafka 代理和客户端应用程序
此流程描述了如何将 AMQ Streams Kafka 集群降级为 Kafka 的较低(以前的)版本,如从 2.6.0 降级到 2.5.0。
先决条件
对于要降级的 Kafka
资源,检查:
- 重要:Kafka 版本兼容性。
- 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
-
Kafka.spec.kafka.config
不包含 Kafka 版本不支持的选项。 -
Kafka.spec.kafka.config
有一个log.message.format.version
和inter.broker.protocol.version
,它被降级到的 Kafka 版本支持。
步骤
根据需要更新编辑器中的 Kafka 集群配置。
oc edit kafka my-cluster
更改
Kafka.spec.kafka.version
以指定上一版本。例如,如果从 Kafka 2.6.0 降级到 2.5.0:
apiVersion: kafka.strimzi.io/v1beta1 kind: Kafka spec: # ... kafka: version: 2.5.0 1 config: log.message.format.version: "2.5" 2 inter.broker.protocol.version: "2.5" 3 # ...
注意您必须将
log.message.format.version
和inter.broker.protocol.version
的值格式化为字符串,以防止将其解释为浮动点数。如果 Kafka 版本的镜像与 Cluster Operator 的
STRIMZI_KAFKA_IMAGES
中定义的镜像不同,请更新Kafka.spec.kafka.image
。保存并退出编辑器,然后等待滚动更新完成。
检查日志中的更新,或者查看 pod 状态转换:
oc logs -f CLUSTER-OPERATOR-POD-NAME | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
oc get pod -w
检查 Cluster Operator 日志中的
INFO
级别信息:Reconciliation #NUM(watch) Kafka(NAMESPACE/NAME): Kafka version downgrade from FROM-VERSION to TO-VERSION, phase 1 of 1 completed
降级所有客户端应用程序(使用者)以使用先前版本的客户端二进制文件。
Kafka 集群和客户端现在使用以前的 Kafka 版本。
附录 A. 使用您的订阅
AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
访问您的帐户
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
激活订阅
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
下载 Zip 和 Tar 文件
要访问 zip 或 tar 文件,请使用客户门户查找要下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 查找 JBOSS INTEGRATION 类别中 的红帽 AMQ Streams 条目。
- 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
- 单击组件的 Download 链接。
修订了 2022 年 2 月 13 日 15:26:40 +1000