在 OpenShift 上部署和升级 AMQ 流

Red Hat AMQ 2020.Q4

用于 OpenShift Container Platform 上的 AMQ Streams 1.6

摘要

本指南提供有关部署和升级 AMQ 流的说明

第 1 章 部署概述

AMQ Streams 简化了在 OpenShift 集群中运行 Apache Kafka 的过程。

本指南提供有关用于部署和升级 AMQ Streams 的所有选项的说明,描述了部署的内容,以及在 OpenShift 集群中运行 Apache Kafka 所需的部署顺序。

除了说明部署步骤外,本指南还提供了部署前和部署后的说明,以准备和验证部署。介绍的其他部署选项包括引入指标的步骤。为 AMQ Streams 和 Kafka 升级提供了升级说明。

AMQ Streams 旨在处理所有类型的 OpenShift 集群,不论分发如何,从公共云和私有云到用于开发的本地部署。

1.1. AMQ 流如何支持 Kafka

AMQ Streams 提供容器镜像和 Operator,以便在 OpenShift 上运行 Kafka。AMQ Streams Operator 是运行 AMQ Streams 的基础。AMQ Streams 提供的 Operator 是专门构建的,具有可有效管理 Kafka 的专业操作知识。

Operator 简化了以下流程:

  • 部署和运行 Kafka 集群
  • 部署和运行 Kafka 组件
  • 配置对 Kafka 的访问
  • 保护对 Kafka 的访问
  • 升级 Kafka
  • 管理代理
  • 创建和管理主题
  • 创建和管理用户

1.2. AMQ Streams Operator

AMQ Streams 支持使用 Operator 的 Kafka 来部署和管理 Kafka 到 OpenShift 的组件和依赖项。

Operator 是一种打包、部署和管理 OpenShift 应用的方法。AMQ Streams Operator 扩展 OpenShift 功能,自动执行与 Kafka 部署相关的常见复杂任务。通过在代码中了解 Kafka 操作,Kafka 管理任务可以简化,无需人工干预。

Operator

AMQ Streams 提供 Operator 来管理在 OpenShift 集群中运行的 Kafka 集群。

Cluster Operator
部署和管理 Apache Kafka 集群、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter 和 Entity Operator
实体 Operator
由主题 Operator 和 User Operator 组成
主题 Operator
管理 Kafka 主题
User Operator
管理 Kafka 用户

Cluster Operator 可以与 Kafka 集群同时部署 Topic Operator 和 User Operator 作为 Entity Operator 配置的一部分。

AMQ Streams 架构中的 Operator

Operators within the AMQ Streams architecture

1.2.1. Cluster Operator

AMQ Streams 使用 Cluster Operator 来部署和管理集群:

  • Kafka(包括 ZooKeeper、实体 Operator、Kafka Exporter 和 Cruise Control)
  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

自定义资源用于部署集群。

例如,部署 Kafka 集群:

  • OpenShift 集群中创建了带有集群配置的 Kafka 资源。
  • Cluster Operator 根据 Kafka 资源中声明的内容部署对应的 Kafka 集群。

Cluster Operator 也可以部署(通过配置 Kafka 资源):

  • 通过 KafkaTopic 自定义资源提供 operator 风格主题管理的主题 Operator
  • 一个 User Operator,通过 KafkaUser 自定义资源提供 operator 风格的用户管理

部署的 Entity Operator 中的 Topic Operator 和 User Operator 功能。

Cluster Operator 的架构示例

The Cluster Operator creates and deploys Kafka and ZooKeeper clusters

1.2.2. 主题 Operator

Topic Operator 提供了通过 OpenShift 资源管理 Kafka 集群中主题的方法。

Topic Operator 的架构示例

The Topic Operator manages topics for a Kafka cluster via KafkaTopic resources

Topic Operator 的角色是保留一组 KafkaTopic OpenShift 资源,描述与对应的 Kafka 主题同步中的 Kafka 主题。

特别是,如果 KafkaTopic 是:

  • 创建,主题 Operator 会创建该主题
  • 删除的,主题 Operator 会删除该主题
  • 更改,主题 Operator 更新该主题

在另一个方向上工作,如果一个主题是:

  • 在 Kafka 集群中创建,Operator 会创建一个 KafkaTopic
  • 从 Kafka 集群中删除,Operator 会删除 KafkaTopic
  • 在 Kafka 集群中更改,Operator 会更新 KafkaTopic

这可让您声明 KafkaTopic 作为应用程序部署的一部分,Topic Operator 将为您创建该主题。您的应用程序只需要处理从所需主题中产生或使用的内容。

如果该主题被重新配置或重新分配给不同的 Kafka 节点,KafkaTopic 将始终保持最新状态。

1.2.3. User Operator

User Operator 通过监视 Kafka 用户描述 Kafka 用户并确保在 Kafka 集群中正确配置了这些用户来管理 Kafka 集群的 Kafka 用户。

例如,如果 KafkaUser 是:

  • 创建,User Operator 创建它描述的用户
  • 删除时,User Operator 会删除它所描述的用户
  • 更改后,User Operator 会更新它所描述的用户

与主题 Operator 不同,User Operator 不会将 Kafka 集群的任何更改与 OpenShift 资源同步。Kafka 主题可以直接由 Kafka 中的应用程序创建,但用户不必与 User Operator 并行直接在 Kafka 集群中管理。

User Operator 允许您将 KafkaUser 资源声明为应用程序部署的一部分。您可以为用户指定身份验证和授权机制。您还可以配置 用户配额 来控制对 Kafka 资源的使用,例如,确保用户不会专利对代理的访问。

创建用户时,会在 Secret 中创建用户凭据。您的应用需要使用用户 及其凭据进行身份验证,并生成或使用消息。

除了管理用于身份验证的凭证外,User Operator 还通过在 KafkaUser 声明中包含用户访问权限描述来管理授权规则。

1.3. AMQ Streams 自定义资源

使用 AMQ Streams 将 Kafka 组件部署到 OpenShift 集群可通过应用自定义资源进行高度配置。自定义资源作为自定义资源定义(CRD)添加的 API 实例创建,以扩展 OpenShift 资源。

CRD 用作描述 OpenShift 集群中自定义资源的配置说明,并为部署中使用的每个 Kafka 组件以及用户和主题提供 AMQ Streams。CRD 和自定义资源定义为 YAML 文件。AMQ Streams 发行版提供了示例 YAML 文件。

CRD 还允许 AMQ Streams 资源受益于原生 OpenShift 功能,如 CLI 访问和配置验证。

1.3.1. AMQ Streams 自定义资源示例

CRD 需要在集群中进行一次性安装,以定义用于实例化和管理 AMQ Streams 特定资源的 schema。

通过安装 CRD 在集群中添加新的自定义资源类型后,您可以根据具体规格创建资源实例。

根据集群设置,安装通常需要集群管理员特权。

注意

管理自定义资源的访问权限仅限于 AMQ Streams 管理员。如需更多信息,请参阅 OpenShift 上部署和升级 AMQ 流指南中的指定 AMQ 流 管理员

CRD 在 OpenShift 集群中定义 一种 新型资源,如 kind:Kafka

Kubernetes API 服务器允许基于 kind 创建自定义资源,并且从 CRD 中了解如何在添加到 OpenShift 集群时验证和存储自定义资源。

警告

当 CRD 被删除时,该类型的自定义资源也会被删除。另外,自定义资源创建的资源也会被删除,如 pod 和 statefulsets。

每个 AMQ Streams 特定自定义资源都符合 CRD 为资源类型定义的 schema AMQ Streams 组件的自定义资源具有常见的配置属性,这些属性在 spec 下定义。

要了解 CRD 和自定义资源之间的关系,让我们看一下 Kafka 主题的 CRD 示例。

Kafka 主题 CRD

apiVersion: kafka.strimzi.io/v1beta1
kind: CustomResourceDefinition
metadata: 1
  name: kafkatopics.kafka.strimzi.io
  labels:
    app: strimzi
spec: 2
  group: kafka.strimzi.io
  versions:
    v1beta1
  scope: Namespaced
  names:
    # ...
    singular: kafkatopic
    plural: kafkatopics
    shortNames:
    - kt 3
  additionalPrinterColumns: 4
      # ...
  subresources:
    status: {} 5
  validation: 6
    openAPIV3Schema:
      properties:
        spec:
          type: object
          properties:
            partitions:
              type: integer
              minimum: 1
            replicas:
              type: integer
              minimum: 1
              maximum: 32767
      # ...

1
主题 CRD 的元数据、名称和标识 CRD 的标签。
2
此 CRD 的规格,包括组(域)名称、复数名称和支持的 schema 版本,用于 URL 以访问该主题的 API。其他名称用于标识 CLI 中的实例资源。例如,oc get kafkatopic my-topicoc get kafkatopics
3
可以在 CLI 命令中使用短名称。例如,oc get kt 可用作缩写而不是 oc get kafkatopic
4
在自定义资源上使用 get 命令时显示的信息。
5
CRD 的当前状态,如资源的 schema 引用 中所述。
6
OpenAPIV3Schema 验证提供了用于创建主题自定义资源的验证。例如,一个主题至少需要一个分区和一个副本。
注意

您可以识别 AMQ Streams 安装文件提供的 CRD YAML 文件,因为文件名包含索引号后跟 'Crd'。

以下是 KafkaTopic 自定义资源的对应示例。

Kafka 主题自定义资源

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic 1
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster 2
spec: 3
  partitions: 1
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
status:
  conditions: 4
    lastTransitionTime: "2019-08-20T11:37:00.706Z"
    status: "True"
    type: Ready
  observedGeneration: 1
  / ...

1
kindapiVersion 标识自定义资源是实例的 CRD。
2
个标签,仅适用于 KafkaTopicKafkaUser 资源,定义 Kafka 集群的名称(与 Kafka 资源的名称相同),该标签定义某个主题或用户所属的名称。
3
spec 显示主题的分区和副本数量,以及主题本身的配置参数。在本例中,消息保留周期保留在主题中,并指定日志的片段文件大小。
4
KafkaTopic 资源的状态条件。在 lastTransitionTime 中,类型 条件更改为 Ready

自定义资源可以通过平台 CLI 应用到集群。创建自定义资源时,它将使用与 Kubernetes API 内置资源相同的验证。

创建 KafkaTopic 自定义资源后,Tpic Operator 将获得通知,并在 AMQ Streams 中创建对应的 Kafka 主题。

1.4. AMQ 流安装方法

在 OpenShift 上安装 AMQ Streams 有两种方法:

安装方法描述支持的版本

安装工件(YAML 文件)

AMQ Streams 下载网站下载 amq-streams-x.z-ocp-install-examples.zip 文件。接下来,使用 oc 将 YAML 安装构件部署到 OpenShift 集群。首先,将 Cluster Operator 从 install/cluster-operator 部署到单个命名空间、多个命名空间或所有命名空间。

OpenShift 3.11 及更新的版本

OperatorHub

使用 OperatorHub 中的 AMQ Streams Operator 将 Cluster Operator 部署到单个命名空间或所有命名空间中。

OpenShift 4.x 仅

为获得最大的灵活性,请选择安装工件方法。如果要使用 OpenShift 4 Web 控制台在标准配置中将 AMQ Streams 安装到 OpenShift 4,请选择 OperatorHub 方法。OperatorHub 还允许您利用自动更新。

对于这两种方法,Cluster Operator 都部署到 OpenShift 集群,您可以使用提供的 YAML 示例文件部署 AMQ Streams Streams 其他组件,从 Kafka 集群开始。

AMQ 流安装工件

AMQ Streams 安装工件包含各种 YAML 文件,可使用 oc 部署到 OpenShift 中以创建自定义资源,包括:

  • Deployments
  • 自定义资源定义(CRD)
  • 角色和角色绑定
  • 服务帐户

为 Cluster Operator、Tpic Operator、User Operator 和 Strimzi Admin 角色提供了 YAML 安装文件。

OperatorHub

在 OpenShift 4 中,Operator Lifecycle Manager(OLM) 可帮助集群管理员安装、更新和管理所有 Operator 以及在集群中运行的关联服务的生命周期。OLM 是 Operator Framework 的一部分,后者是一个开源工具包,旨在以有效、自动化且可扩展的方式管理 Kubernetes 原生应用程序(Operator)。

OperatorHub 是 OpenShift 4 Web 控制台的一部分。集群管理员可以使用它来发现、安装和升级 Operator。Operator 可以从 OperatorHub 拉取(安装在 OpenShift 集群上)到单个(项目)命名空间或所有(项目)命名空间,并由 OLM 管理。然后,工程团队可以使用 OLM 在开发、测试和生产环境中独立管理软件。

注意

OperatorHub 不适用于版本 4 更早的 OpenShift 版本。

AMQ Streams Operator

AMQ Streams Operator 可从 OperatorHub 安装。安装后,AMQ Streams Operator 将 Cluster Operator 部署到 OpenShift 集群,以及必要的 CRD 和基于角色的访问控制(RBAC)资源。

其他资源

使用安装工件安装 AMQ Streams:

从 OperatorHub 安装 AMQ Streams:

第 2 章 使用 AMQ Streams 部署哪些内容

通过 AMQ Streams 发行版为 OpenShift 部署提供了 Apache Kafka 组件。Kafka 组件通常作为集群运行,以实现可用性。

集成 Kafka 组件的典型部署可能包括:

  • 代理节点的 Kafka 集群
  • 复制 ZooKeeper 实例的 zookeeper 集群
  • Kafka Connect 集群用于外部数据连接
  • Kafka MirrorMaker 集群,在第二个集群中镜像 Kafka 集群
  • kafka Exporter 用于提取额外的 Kafka 指标数据以进行监控
  • Kafka Bridge 向 Kafka 集群发出基于 HTTP 的请求

并非所有组件都是必须的,但您至少需要 Kafka 和 ZooKeeper。有些组件可以在没有 Kafka 的情况下部署,如 MirrorMaker 或 Kafka Connect。

2.1. 部署顺序

部署到 OpenShift 集群所需的顺序如下:

  1. 部署 Cluster operator 以管理 Kafka 集群
  2. 使用 ZooKeeper 集群部署 Kafka 集群,并在部署中包含 Topic Operator 和 User Operator
  3. (可选)部署:

    • 如果没有在 Kafka 集群中部署它们,则 Topic Operator 和 User Operator 独立
    • Kafka Connect
    • Kafka MirrorMaker
    • Kafka Bridge
    • 用于监控指标的组件

2.2. 其他部署配置选项

本指南中的部署步骤描述了使用 AMQ Streams 提供的示例安装 YAML 文件进行的部署。程序突出显示了任何重要的配置注意事项,但不描述所有可用的配置选项。

您可以使用自定义资源来优化部署。

部署 AMQ Streams 之前,您可能希望查看可用于 Kafka 组件的配置选项。如需有关通过自定义资源进行配置的更多信息,请参阅 OpenShift 上使用 AMQ Streams 中的 部署配置

2.2.1. 保护 Kafka

在部署时,Cluster Operator 会自动为集群中的数据加密和身份验证设置 TLS 证书。

AMQ Streams 提供用于 加密身份验证和 授权 的额外配置选项,如 OpenShift 指南中的使用 AMQ Streams 所述:

2.2.2. 监控您的部署

AMQ Streams 支持额外的部署选项来监控您的部署。

第 3 章 准备 AMQ Streams 部署

本节介绍如何准备 AMQ Streams 部署,描述:

注意

要运行本指南中的命令,您的集群用户必须有权管理基于角色的访问控制(RBAC)和 CRD。

3.1. 部署先决条件

要部署 AMQ Streams,请确保:

  • OpenShift 3.11 和更新的集群可用。

    AMQ Streams 基于 Strimzi 0.20.x。

  • 已安装并配置了 oc 命令行工具以连接到正在运行的集群。
注意

AMQ Streams 支持 OpenShift 专用的一些功能,这些集成对 OpenShift 用户有益,并且没有使用标准 Kubernetes 的等效实施。

3.2. 下载 AMQ Streams 发行版工件

要安装 AMQ Streams,请从 AMQ Streams 下载网站从 amq-streams-<version>-ocp-install-examples.zip 文件中下载 并提取发行工件。

AMQ Streams 发行版工件包括示例 YAML 文件,可帮助您将 AMQ Streams 组件部署到 OpenShift、执行常用操作和配置 Kafka 集群。

使用 oc 从下载的 ZIP 文件的 install/cluster-operator 文件夹部署 Cluster Operator。有关部署和配置 Cluster Operator 的更多信息,请参阅 第 4.1.1 节 “部署 Cluster Operator”

另外,如果要使用 Topic 和 User Operator 的独立安装,带有不由 AMQ Streams Cluster Operator 管理的 Kafka 集群,您可以从 install/topic-operator 和 install/user-operator 文件夹中部署它们。

注意

另外,AMQ Streams 容器镜像也可通过 红帽生态系统目录 提供。但是,我们建议您使用提供的 YAML 文件来部署 AMQ Streams。

3.3. 将容器镜像推送到您自己的 registry

红帽生态系统目录中提供了 AMQ Streams 的容器镜像。AMQ Streams 提供的安装 YAML 文件将直接从 红帽生态系统目录中拉取镜像。

如果您无法访问 红帽生态系统目录,或者想要使用自己的容器存储库:

  1. 拉取此处 列出的所有 容器镜像
  2. 将它们推送到您自己的 registry
  3. 更新安装 YAML 文件中的镜像名称
注意

该发行版本支持的每个 Kafka 版本都有一个单独的镜像。

容器镜像命名空间/存储库描述

kafka

  • registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7
  • registry.redhat.io/amq7/amq-streams-kafka-25-rhel7:1.6.7

用于运行 Kafka 的 AMQ Streams 镜像,包括:

  • Kafka Broker
  • Kafka Connect/S2I
  • Kafka Mirror Maker
  • ZooKeeper
  • TLS Sidecars

Operator

  • registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7

用于运行操作器的 AMQ Streams 镜像:

  • Cluster Operator
  • 主题 Operator
  • User Operator
  • Kafka Initializer

Kafka Bridge

  • registry.redhat.io/amq7/amq-streams-bridge-rhel7:1.6.7

用于运行 AMQ Streams Kafka Bridge 的 AMQ Streams 镜像

3.4. 指定 AMQ Streams 管理员

AMQ Streams 提供用于配置部署的自定义资源。默认情况下,查看、创建、编辑和删除这些资源的权限仅限于 OpenShift 集群管理员。AMQ Streams 提供了两个集群角色,可用于将这些权限分配给其他用户:

  • strimzi-view 允许用户查看和列出 AMQ Streams 资源。
  • strimzi-admin 允许用户创建、编辑或删除 AMQ Streams 资源。

安装这些角色时,它们将自动聚合(添加)默认 OpenShift 集群 roles. strimzi-view 聚合到 view 角色,strimzi-admin 聚合到 editadmin 角色。由于聚合,您可能不需要将这些角色分配给已拥有类似权限的用户。

以下流程演示了如何分配 astrim zi-admin 角色,允许非集群管理员管理 AMQ Streams 资源。

系统管理员可以在部署 Cluster Operator 后指定 AMQ Streams 管理员。

先决条件

  • AMQ Streams 自定义资源定义(CRD)和基于角色的访问控制(RBAC)资源已使用 Cluster Operator 部署, 以管理 CRD。

步骤

  1. 在 OpenShift 中创建 strimzi-viewstrimzi-admin 集群角色。

    oc apply -f install/strimzi-admin
  2. 如果需要,为需要它们的用户分配访问权限的角色。

    oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2

第 4 章 部署 AMQ 流

部署 AMQ Streams 准备了环境,本节显示:

流程假定 OpenShift 集群可用并在运行。

AMQ Streams 基于 AMQ Streams Strimzi 0.20.x。本节介绍在 OpenShift 3.11 及之后的版本中部署 AMQ Streams 的步骤。

注意

要运行本指南中的命令,您的集群用户必须有权管理基于角色的访问控制(RBAC)和 CRD。

4.1. 创建 Kafka 集群

要创建 Kafka 集群,您需要部署 Cluster Operator 以管理 Kafka 集群,然后部署 Kafka 集群。

当使用 Kafka 资源部署 Kafka 集群时,您可以同时部署 Topic Operator 和 User Operator。另外,如果您使用非 AMQ Streams Kafka 集群,您可以将 Topic Operator 和 User Operator 部署为独立组件。

使用 Topic Operator 和 User Operator 部署 Kafka 集群

如果要将 Topic Operator 和 User Operator 与由 AMQ Streams 管理的 Kafka 集群搭配使用,请执行这些部署步骤。

部署独立主题 Operator 和 User Operator

如果要将 Topic Operator 和 User Operator 与 不由 AMQ Streams 管理的 Kafka 集群搭配使用,请执行这些部署步骤。

4.1.1. 部署 Cluster Operator

Cluster Operator 负责在 OpenShift 集群中部署和管理 Apache Kafka 集群。

本节中的步骤显示:

4.1.1.1. 监视 Cluster Operator 部署的选项

当 Cluster Operator 运行时,它开始 监视 Kafka 资源的更新。

您可以选择部署 Cluster Operator 来监视 Kafka 资源:

  • 单个命名空间(包含 Cluster Operator 的同一命名空间)
  • 多个命名空间
  • 所有命名空间
注意

AMQ Streams 提供示例 YAML 文件,以简化部署过程。

Cluster Operator 会监视以下资源的变化:

  • Kafka 集群的 Kafka。
  • Kafka Connect 集群的 Kafka Connect。
  • 用于 Kafka Connect 集群的 KafkaConnectS2I,支持 Source2Image。
  • 用于在 Kafka Connect 集群中创建和管理连接器的 KafkaConnector
  • Kafka MirrorMaker 实例的 KafkaMirror Maker。
  • Kafka Bridge 实例的 KafkaBridge

在 OpenShift 集群中创建其中一个资源时,操作器从该资源获取集群描述,并通过创建必要的 OpenShift 资源(如 StatefulSets、Service 和 ConfigMap)来开始为资源创建新集群。

每次更新 Kafka 资源时,操作器都会对构成该资源的 OpenShift 资源执行相应的更新。

资源可以修补或删除,然后重新创建以便集群反映集群所需的状态。此操作可能会导致滚动更新,可能会导致服务中断。

删除资源时,操作器将取消部署集群并删除所有相关 OpenShift 资源。

4.1.1.2. 部署 Cluster Operator 以监控单个命名空间

此流程演示了如何部署 Cluster Operator 以监控 OpenShift 集群中单一命名空间中的 AMQ Streams 资源。

先决条件

  • 此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义 ResourceDefinition、 ClusterRole 和 ClusterRoleBinding。在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有 system:admin 等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。

步骤

  1. 编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。

    例如,在此流程中,Cluster Operator 被安装到命名空间 my-cluster-operator-namespace 中。

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
  2. 部署 Cluster Operator:

    oc apply -f install/cluster-operator -n my-cluster-operator-namespace
  3. 验证 Cluster Operator 是否已成功部署:

    oc get deployments

4.1.1.3. 部署 Cluster Operator 以监控多个命名空间

此流程演示了如何部署 Cluster Operator 以在 OpenShift 集群的多个命名空间中监控 AMQ Streams 资源。

先决条件

  • 此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义 ResourceDefinition、 ClusterRole 和 ClusterRoleBinding。在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有 system:admin 等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。

步骤

  1. 编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。

    例如,在此流程中,Cluster Operator 被安装到命名空间 my-cluster-operator-namespace 中。

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
  2. 编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件,将 Cluster Operator 将监视的所有命名空间的列表添加到 STRIMZI_NAMESPACE 环境变量。

    例如,在此流程中,Cluster Operator 将监视命名空间 watched-namespace-1watched-namespace-2watched-namespace-3

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
          - name: strimzi-cluster-operator
            image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7
            imagePullPolicy: IfNotPresent
            env:
            - name: STRIMZI_NAMESPACE
              value: watched-namespace-1,watched-namespace-2,watched-namespace-3
  3. 对于列出的每个命名空间,安装 RoleBindings

    在本例中,我们将这些命令中的 watched-namespace 替换为上一步中列出的命名空间,为 watched-namespace-1、watched-namespace -2、watched-namespace -3 重复它们:

    oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace
    oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace
    oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n watched-namespace
  4. 部署 Cluster Operator:

    oc apply -f install/cluster-operator -n my-cluster-operator-namespace
  5. 验证 Cluster Operator 是否已成功部署:

    oc get deployments

4.1.1.4. 部署 Cluster Operator 以监视所有命名空间

此流程演示了如何部署 Cluster Operator 以监控 OpenShift 集群中所有命名空间中的 AMQ Streams 资源。

在这个模式下运行时,Cluster Operator 会在创建的任何新命名空间中自动管理集群。

先决条件

  • 此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义 ResourceDefinition、 ClusterRole 和 ClusterRoleBinding。在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有 system:admin 等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。

步骤

  1. 编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。

    例如,在此流程中,Cluster Operator 被安装到命名空间 my-cluster-operator-namespace 中。

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
  2. 编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件,将 STRIMZI_NAMESPACE 环境变量的值设置为 *

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          # ...
          serviceAccountName: strimzi-cluster-operator
          containers:
          - name: strimzi-cluster-operator
            image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.6.7
            imagePullPolicy: IfNotPresent
            env:
            - name: STRIMZI_NAMESPACE
              value: "*"
            # ...
  3. 创建 ClusterRoleBinding, 将所有命名空间的集群范围访问权限授予 Cluster Operator。

    oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator
    oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator
    oc create clusterrolebinding strimzi-cluster-operator-topic-operator-delegation --clusterrole=strimzi-topic-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator

    my-cluster-operator-namespace 替换为您要将 Cluster Operator 安装到的命名空间。

  4. 将 Cluster Operator 部署到 OpenShift 集群。

    oc apply -f install/cluster-operator -n my-cluster-operator-namespace
  5. 验证 Cluster Operator 是否已成功部署:

    oc get deployments

4.1.1.5. 从 OperatorHub 部署 Cluster Operator

您可以通过从 OperatorHub 安装 AMQ Streams Operator,将 Cluster Operator 部署到 OpenShift 集群。OperatorHub 仅适用于 OpenShift 4。

先决条件

  • OpenShift 集群中启用了 Red Hat Operator OperatorSource。如果您在 OperatorHub 中看到 Red Hat Operator,则会启用正确的 OperatorSource。如需更多信息,请参阅 Operators 指南。
  • 安装需要具有足够特权的用户来安装来自 OperatorHub 的 Operator。

步骤

  1. 在 OpenShift 4 Web 控制台中,点 Operators > OperatorHub
  2. 在流和 消息传递 类别中搜索或浏览 AMQ Streams Operator。

    image:OpenShift 4 中 OperatorHub 中的 AMQ Streams Operator
  3. 单击 AMQ Streams 标题,然后在右侧的侧边栏中点 Install
  4. 在 Create Operator Subscription 屏幕上,从以下安装和更新选项中进行选择:

    • 安装模式 :选择将 AMQ Streams Operator 安装到集群中的所有(项目)命名空间(默认选项)或特定(项目)命名空间。最好使用命名空间来分隔函数。建议您将特定命名空间专用于 Kafka 集群和其他 AMQ Streams 组件。
    • 批准策略 :默认情况下,Operator Lifecycle Manager(OLM)自动将 AMQ Streams Operator 升级到最新的 AMQ Streams 版本。(可选)如果您想要手动批准以后的升级,请选择 Manual。如需更多信息,请参阅 OpenShift 文档中的 Operator 指南。
  5. Subscribe; AMQ Streams Operator 已安装到 OpenShift 集群。

    AMQ Streams Operator 将 Cluster Operator、CRD 和基于角色的访问控制(RBAC)资源部署到所选命名空间或所有命名空间中。

  6. 在 Installed Operators 屏幕上,检查安装的进度。当 AMQ Streams Operator 的状态变为 InstallSucceeded 时,就可以使用它。

    在 OpenShift 4 中已安装 Operator

接下来,您可以使用 YAML 示例文件部署 AMQ Streams 其他组件(从 Kafka 集群开始)。

4.1.2. 部署 Kafka

Apache Kafka 是一个开源分布式发布-订阅消息传递系统,用于容错实时数据源。

本节中的步骤显示:

安装 Kafka 时,AMQ Streams 还会安装 ZooKeeper 集群,并添加必要的配置以将 Kafka 与 ZooKeeper 连接。

4.1.2.1. 部署 Kafka 集群

此流程演示了如何使用 Cluster Operator 将 Kafka 集群部署到 OpenShift 中。

部署使用 YAML 文件提供规格来创建 Kafka 资源。

AMQ Streams 提供了在 example /kafka/ 中部署所需的 YAML 文件示例

kafka-persistent.yaml
使用三个 ZooKeeper 和三个 Kafka 节点部署一个持久集群。
kafka-jbod.yaml
使用三个 ZooKeeper 和三个 Kafka 节点部署持久集群(每个集群使用多个持久性卷)。
kafka-persistent-single.yaml
使用单个 ZooKeeper 节点和单个 Kafka 节点部署持久集群。
kafka-ephemeral.yaml
使用三个 ZooKeeper 和三个 Kafka 节点部署一个临时集群。
kafka-ephemeral-single.yaml
部署具有三个 ZooKeeper 节点和单个 Kafka 节点的临时集群。

在这一流程中,我们使用 临时 和永久 Kafka 集群部署的示例:

临时集群
通常,临时(或临时) Kafka 集群适合用于开发和测试目的,不适用于生产环境。此部署使用 emptyDir 卷来存储代理信息(用于 ZooKeeper)和主题或分区(对于 Kafka)。使用 emptyDir 卷意味着其内容严格与 pod 生命周期相关,并在 pod 停机时被删除。
持久性集群
持久的 Kafka 集群使用 PersistentVolume 来存储 ZooKeeper 和 Kafka 数据。该 PersistentVolume 使用 PersistentVolumeClaim 来获取,使它独立于实际的 PersistentVolume 类型。例如,它可以在 Amazon AWS 部署中使用 Amazon EBS 卷,而不更改 YAML 文件。PersistentVolumeClaim 可以使用 StorageClass 来触发自动卷置备。

示例集群默认命名为 my-cluster。集群名称通过资源名称定义,在部署集群后无法更改。要在部署集群前更改集群名称,请在相关 YAML 文件中编辑 Kafka 资源的 Kafka.metadata.name 属性。

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
# ...

有关配置 Kafka 资源的更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 Kafka 集群配置

步骤

  1. 创建和部署 临时或 持久 集群。

    对于开发或测试,您可能更喜欢使用临时集群。您可以随时使用持久性存储。

    • 创建和部署 临时 集群:

      oc apply -f examples/kafka/kafka-ephemeral.yaml
    • 创建和部署 持久 集群:

      oc apply -f examples/kafka/kafka-persistent.yaml
  2. 验证 Kafka 集群是否已成功部署:

    oc get deployments

4.1.2.2. 使用 Cluster Operator 部署主题 Operator

此流程描述了如何使用 Cluster Operator 部署 Topic Operator。

您可以将 Kafka 资源的 principal Operator 属性配置为包含 topicOperator

如果要将 Topic Operator 与不由 AMQ Streams 管理的 Kafka 集群搭配使用,则必须将 Topic Operator 部署为独立组件

有关配置 实体Operator 和 topicOperator 属性的更多信息,请参阅 OpenShift 指南的使用 AMQ Streams 中的 Entity Operator

步骤

  1. 编辑 Kafka 资源的 principal Operator 属性,使其包含 topicOperator:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      #...
      entityOperator:
        topicOperator: {}
        userOperator: {}
  2. 使用 EntityTopicOperatorSpec 模式参考 中所述的属性配置 Topic Operator spec

    如果您希望所有属性都使用其默认值,请使用空对象({})。

  3. 创建或更新资源:

    使用 oc apply:

    oc apply -f <your-file>

4.1.2.3. 使用 Cluster Operator 部署 User Operator

此流程描述了如何使用 Cluster Operator 部署 User Operator。

您可以将 Kafka 资源的 principal Operator 属性配置为包含 userOperator

如果要将 User Operator 与不由 AMQ Streams 管理的 Kafka 集群搭配使用,您必须将 User Operator 部署为独立组件

有关配置 实体Operator 和 userOperator 属性的更多信息,请参阅 OpenShift 指南的使用 AMQ Streams 中的 Entity Operator

步骤

  1. 编辑 Kafka 资源的 principal Operator 属性,使其包含 userOperator:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      #...
      entityOperator:
        topicOperator: {}
        userOperator: {}
  2. 使用 OpenShift 指南的使用 AMQ Streams 中的 EntityUserOperatorSpec schema 参考 中描述的属性配置 User Operator spec

    如果您希望所有属性都使用其默认值,请使用空对象({})。

  3. 创建或更新资源:

    oc apply -f <your-file>

4.1.3. AMQ Streams Operator 的替代独立部署选项

当使用 Cluster Operator 部署 Kafka 集群时,您还可以部署 Topic Operator 和 User Operator。或者,您也可以执行独立部署。

独立部署意味着 Topic Operator 和 User Operator 可以在不由 AMQ Streams 管理的 Kafka 集群中运行。

4.1.3.1. 部署独立主题 Operator

此流程演示了如何将 Topic Operator 部署为独立组件。

独立部署需要配置环境变量,比 使用 Cluster Operator 部署 Topic Operator 复杂。但是,独立部署更为灵活,因为 Topic Operator 可在任何 Kafka 集群下运行,而不必由 Cluster Operator 部署。

先决条件

  • 您需要一个现有的 Kafka 集群供 Topic Operator 连接到。

步骤

  1. 通过设置,编辑 install/topic-operator/05-Deployment-strimzi-topic-operator .yaml 文件中的 Deployment.spec.template.spec.containers[0]. env 属性:

    1. STRIMZI_KAFKA_BOOTSTRAP_SERVERS,用于列出 Kafka 集群中的 bootstrap 代理,以逗号分隔 的主机名 :\{0\}端口 对列表形式指定。
    2. 用于列出 ZooKeeper 节点的 STRIMZI_ZOOKEEPER_CONNECT 节点,以逗号分隔 的主机名 :\{0\}端口 对列表形式提供。这应该与 Kafka 集群使用的 ZooKeeper 集群相同。
    3. STRIMZI_NAMESPACE 到您希望操作员监视 KafkaTopic 资源的 OpenShift 命名空间。
    4. STRIMZI_RESOURCE_LABELS 到标签选择器,用于标识由操作器管理的 KafkaTopic 资源。
    5. STRIMZI_FULL_RECONCILIATION_INTERVAL_MS,以毫秒为单位指定定期协调之间的间隔。
    6. STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS,以指定从 Kafka 获取主题元数据的尝试次数。每次尝试之间的时间都定义为指数回退。考虑在因为分区或副本数量而创建主题需要更多时间时增加这个值。默认 6.
    7. STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS 到 ZooKeeper 会话超时,以毫秒为单位。例如,10000。Default 20000( 20 秒)。
    8. STRIMZI_TOPICS_PATH 到 Topic Operator 存储其元数据的 Zookeeper 节点路径。默认 /strimzi/topics.
    9. STRIMZI_TLS_ENABLED 启用 TLS 支持以加密与 Kafka 代理的通信。默认为 true
    10. STRIMZI_TRUSTSTORE_LOCATION,指向包含用于启用基于 TLS 的通信的证书的信任存储的路径。只有在通过 STRIMZI_TLS_ENABLED 启用 TLS 时才必需
    11. STRIMZI_TRUSTSTORE_PASSWORD 到用于访问 STRIMZI_TRUSTSTORE_LOCATION 定义的信任存储的密码。只有在通过 STRIMZI_TLS_ENABLED 启用 TLS 时才必需
    12. STRIMZI_KEYSTORE_LOCATION 到含有私钥的密钥存储的路径,以启用基于 TLS 的通信。只有在通过 STRIMZI_TLS_ENABLED 启用 TLS 时才必需
    13. STRIMZI_KEYSTORE_PASSWORD 到用于访问 STRIMZI_KEYSTORE_LOCATION 定义的密钥存储的密码.只有在通过 STRIMZI_TLS_ENABLED 启用 TLS 时才必需
    14. STRIMZI_LOG_LEVEL 到打印日志记录消息的级别。该值可设置为:ERRORWARNINGINFODEBUGTRACE.默认 INFO.
    15. STRIMZI_JAVA_OPTS (可选),用于运行主题 Operator 的 JVM。例如 -Xmx=512M -Xms=256M
    16. STRIMZI_JAVA_SYSTEM_PROPERTIES (可选) 列出设置为 Topic Operator 的 -D 选项。An example is -Djavax.net.debug=verbose -DpropertyName=value.
  2. 部署主题 Operator:

    oc apply -f install/topic-operator
  3. 验证 Topic Operator 是否已成功部署:

    oc describe deployment strimzi-topic-operator

    Replicas: 条目显示 1 可用 时,Tpic Operator 会被部署。

    注意

    如果您与 OpenShift 集群的连接较慢并且之前没有下载镜像,则部署可能会遇到延迟。

4.1.3.2. 部署 standalone User Operator

此流程演示了如何将 User Operator 部署为独立组件。

独立部署需要配置环境变量,比 使用 Cluster Operator 部署 User Operator 复杂。但是,独立的部署更灵活,因为 User Operator 可以操作 任何 Kafka 集群,而不必由 Cluster Operator 部署。

先决条件

  • 您需要一个现有的 Kafka 集群供 User Operator 连接到。

步骤

  1. 通过设置,编辑 install/user-operator/05-Deployment-strimzi-user-operator .yaml 文件中的以下 Deployment.spec.template.spec.containers[0]. env 属性:

    1. STRIMZI_KAFKA_BOOTSTRAP_SERVERS 来列出 Kafka 代理,以逗号分隔 的主机名 :\{0\}端口 对列表形式指定。
    2. 用于列出 ZooKeeper 节点的 STRIMZI_ZOOKEEPER_CONNECT 节点,以逗号分隔 的主机名 :\{0\}端口 对列表形式提供。这必须与您的 Kafka 集群使用的 ZooKeeper 集群相同。不支持使用 TLS 加密连接到 ZooKeeper 节点。
    3. STRIMZI_NAMESPACE 到 OpenShift 命名空间,您希望操作员在该命名空间中监视 KafkaUser 资源。
    4. STRIMZI_LABELS 到用于标识由操作器管理的 KafkaUser 资源标签选择器。
    5. STRIMZI_FULL_RECONCILIATION_INTERVAL_MS,以毫秒为单位指定定期协调之间的间隔。
    6. STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS 到 ZooKeeper 会话超时,以毫秒为单位。例如,10000。Default 20000( 20 秒)。
    7. STRIMZI_CA_CERT_NAME 以指向包含证书颁发机构公钥的 OpenShift Secret,用于为 TLS 客户端身份验证签名新用户证书。Secret 必须包含密钥 ca.crt 下的证书颁发机构的公钥。
    8. STRIMZI_CA_KEY_NAME 以指向包含证书颁发机构私钥的 OpenShift Secret,用于为 TLS 客户端身份验证签名新用户证书。Secret 必须包含密钥 ca.key 下的证书颁发机构的私钥。
    9. STRIMZI_CLUSTER_CA_CERT_SECRET_NAME 以指向包含用于签署 Kafka 代理证书以启用基于 TLS 的通信的 OpenShift SecretSecret 必须包含密钥 ca.crt 下的证书颁发机构的公钥。这个环境变量是可选的,只有在与 Kafka 集群的通信基于 TLS 时才应设置。
    10. STRIMZI_EO_KEY_SECRET_NAME 以指向包含对 Kafka 集群进行 TLS 客户端身份验证的私钥和相关证书的 OpenShift SecretSecret 必须包含密钥存储,其私钥和证书位于 key entity -operator.p12 下,以及 key entity -operator.password 下的相关密码。这个环境变量是可选的,只有在与 Kafka 集群的通信基于 TLS 时需要 TLS 客户端身份验证时才应设置。
    11. STRIMZI_CA_VALIDITY 证书颁发机构的有效期限.默认值为 365 天。
    12. STRIMZI_CA_RENEWAL 认证机构的续订期限.
    13. STRIMZI_LOG_LEVEL 到打印日志记录消息的级别。该值可设置为:ERRORWARNINGINFODEBUGTRACE.默认 INFO.
    14. STRIMZI_GC_LOG_ENABLED,以启用垃圾收集(GC)日志。默认为 true。默认为在旧证书过期前启动证书续订的 30 天。
    15. STRIMZI_JAVA_OPTS (可选) 用于运行用户的 JVM 的 Java 选项。例如 -Xmx=512M -Xms=256M
    16. STRIMZI_JAVA_SYSTEM_PROPERTIES (可选) 列出设置为 User Operator 的 -D 选项。An example is -Djavax.net.debug=verbose -DpropertyName=value.
  2. 部署 User Operator:

    oc apply -f install/user-operator
  3. 验证 User Operator 是否已成功部署:

    oc describe deployment strimzi-user-operator

    Replicas: 条目显示 1 available 时,会部署 User Operator。

    注意

    如果您与 OpenShift 集群的连接较慢并且之前没有下载镜像,则部署可能会遇到延迟。

4.2. 部署 Kafka Connect

Kafka Connect 是一个在 Apache Kafka 和外部系统间流传输数据的工具。

在 AMQ Streams 中,Kafka Connect 被部署为分布式模式。Kafka Connect 也可以以独立模式工作,但 AMQ Streams 不支持此项。

Kafka Connect 使用 连接器 的概念提供了一个框架,用于将大量数据移入 Kafka 集群或移出 Kafka 集群,同时保持可扩展性和可靠性。

Kafka Connect 通常用于将 Kafka 与外部数据库和存储及消息传递系统集成。

本节中的步骤显示如何:

注意

术语 连接器 可以互换使用,以表示在 Kafka Connect 集群中运行的连接器实例或连接器类。在本指南中,当从上下文中明确含义时,将使用 连接器

4.2.1. 将 Kafka Connect 部署到 OpenShift 集群

此流程演示了如何使用 Cluster Operator 将 Kafka Connect 集群部署到 OpenShift 集群。

Kafka Connect 集群作为 Deployment 实施,具有可配置的节点(也称为 worker ,将连接器工作负载作为 任务 来分布,以便消息流高度可扩展且可靠。

部署使用 YAML 文件提供创建 KafkaConnect 资源规格。

在这一流程中,我们使用 AMQ Streams 提供的示例文件:

  • examples/connect/kafka-connect.yaml

有关配置 KafkaConnect 资源(或使用 Source-to-Image( S2I)支持的 KafkaConnect S2I 资源)的详情,请参考 使用 AMQ Streams 中的 OpenShift 指南中的 Kafka Connect 集群配置

步骤

  1. 将 Kafka Connect 部署到您的 OpenShift 集群。对于带有 3 或更多代理的 Kafka 集群,请使用 example/connect/kafka-connect.yaml 文件。对于小于 3 个代理的 Kafka 集群,请使用 example /connect/kafka-connect-single-node-kafka.yaml 文件。

    oc apply -f examples/connect/kafka-connect.yaml
  2. 验证 Kafka Connect 是否已成功部署:

    oc get deployments

4.2.2. 使用连接器插件扩展 Kafka Connect

Kafka Connect 的 AMQ Streams 容器镜像包括两个内置文件连接器,用于将基于文件的数据移入或移出 Kafka 集群。

表 4.1. 文件连接器

文件连接器描述

FileStreamSourceConnector

从文件(源)传输数据到 Kafka 集群。

FileStreamSinkConnector

将数据从 Kafka 集群传输到文件(sink)。

Cluster Operator 也可以使用您创建的镜像将 Kafka Connect 集群部署到 OpenShift 集群。

本节中的步骤显示如何通过以下方法为连接器镜像添加您自己的连接器类:

4.2.2.1. 从 Kafka Connect 基础镜像创建 Docker 镜像

此流程演示了如何创建自定义镜像并将其添加到 /opt/kafka/plugins 目录中。

您可以使用 Red Hat Ecosystem Catalog 上的 Kafka 容器镜像作为基础镜像,以通过额外的连接器插件创建您自己的自定义镜像。

在启动时,Kafka Connect 的 AMQ Streams 版本加载 /opt/kafka/plugins 目录中包含的任何第三方连接器插件。

步骤

  1. 使用 registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 作为基础镜像创建新的 Dockerfile

    FROM registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER 1001

    插件文件示例

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-3.4.2.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-3.4.2.jar
    │   ├── mongodb-driver-core-3.4.2.jar
    │   └── README.md
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-0.13.0.jar
    │   ├── mysql-connector-java-5.1.40.jar
    │   ├── README.md
    │   └── wkb-1.0.2.jar
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-0.7.1.jar
        ├── debezium-core-0.7.1.jar
        ├── LICENSE.txt
        ├── postgresql-42.0.0.jar
        ├── protobuf-java-2.6.1.jar
        └── README.md

  2. 构建容器镜像。
  3. 将自定义镜像推送到容器 registry。
  4. 指向新容器映像。

    您可以:

    • 编辑 KafkaConnect.spec.image 属性,即 KafkaConnect 自定义资源。

      如果设置,此属性会覆盖 Cluster Operator 中的 STRIMZI_KAFKA_CONNECT_IMAGES 变量。

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
      spec: 1
        #...
        image: my-new-container-image 2
        config: 3
          #...
      1
      2
      容器集的 docker 镜像。
      3
      配置 Kafka Connect worker (而非连接器)。

    • install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件中编辑 STRIMZI_KAFKA_CONNECT_IMAGES 变量以指向新容器镜像,然后重新安装 Cluster Operator。

其他资源

如需了解更多相关信息,请参阅 OpenShift 指南中的使用 AMQ Streams

4.2.2.2. 使用 OpenShift 构建和源至镜像创建容器镜像

此流程演示了如何使用 OpenShift 构建源至镜像(S2I) 框架来创建新的容器镜像。

OpenShift 构建获取支持 S2I 的构建器镜像,以及用户提供的源代码和二进制文件,并使用它们来构建新的容器镜像。构建之后,容器镜像将存储在 OpenShift 的本地容器镜像存储库中,并可在部署中使用。

红帽生态系统目录 中提供了带有 S2I 支持的 Kafka Connect 构建器镜像,作为 registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 镜像的一部分。此 S2I 镜像提取您的二进制文件(带有插件和连接器),并将它们存储在 /tmp/kafka-plugins/s2i 目录中。它从这个目录中创建一个新的 Kafka Connect 镜像,然后可与 Kafka Connect 部署一起使用。当使用增强的镜像启动时,Kafka Connect 会从 /tmp/kafka-plugins/s2i 目录中加载任何第三方插件。

步骤

  1. 在命令行中,使用 oc apply 命令创建和部署 Kafka Connect S2I 集群:

    oc apply -f examples/connect/kafka-connect-s2i.yaml
  2. 使用 Kafka Connect 插件创建目录:

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-3.4.2.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-3.4.2.jar
    │   ├── mongodb-driver-core-3.4.2.jar
    │   └── README.md
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-0.13.0.jar
    │   ├── mysql-connector-java-5.1.40.jar
    │   ├── README.md
    │   └── wkb-1.0.2.jar
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-0.7.1.jar
        ├── debezium-core-0.7.1.jar
        ├── LICENSE.txt
        ├── postgresql-42.0.0.jar
        ├── protobuf-java-2.6.1.jar
        └── README.md
  3. 使用 oc start-build 命令,使用准备的目录启动镜像的新构建:

    oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
    注意

    构建的名称与部署的 Kafka Connect 集群的名称相同。

  4. 构建完成后,Kafka Connect 部署会自动使用新镜像。

4.2.3. 创建和管理连接器

当您为您的 connector 插件创建容器镜像时,需要在 Kafka Connect 集群中创建连接器实例。然后,您可以配置、监控和管理正在运行的连接器实例。

连接器是特定 连接器类 的实例,它知道如何就消息与相关的外部系统通信。连接器可用于许多外部系统,或者您可以自行创建。

您可以创建连接器的 接收器 类型。

源连接器
源连接器是从外部系统获取数据并将其反馈到 Kafka 作为消息的运行时实体。
sink 连接器
sink 连接器是一个运行时实体,它从 Kafka 主题获取信息并将其反馈到外部系统。

AMQ Streams 为创建和管理连接器提供了两个 API:

  • KafkaConnector 资源(称为 KafkaConnectors
  • Kafka Connect REST API

使用 API,您可以:

  • 检查连接器实例的状态
  • 重新配置正在运行的连接器
  • 增加或减少连接器实例的任务数量
  • 重启失败的任务( KafkaConnector 资源不支持)
  • 暂停连接器实例
  • 恢复之前暂停的连接器实例
  • 删除连接器实例

4.2.3.1. KafkaConnector 资源

KafkaConnectors 允许您以 OpenShift 原生的方式为 Kafka Connect 创建和管理连接器实例,因此不需要 cURL 等 HTTP 客户端。与其他 Kafka 资源一样,您可以在部署至 OpenShift 集群的 KafkaConnector YAML 文件中声明连接器所需的状态,以创建连接器实例。

您可以通过更新对应的 KafkaConnector 并应用更新来管理正在运行的连接器实例。您可以通过删除对应的 KafkaConnector 来删除连接器。

为确保与早期版本的 AMQ Streams 兼容,KafkaConnectors 默认被禁用。要为 Kafka Connect 集群启用它们,您必须在 KafkaConnect 资源上使用注解。具体步骤,请参阅 OpenShift 中使用 AMQ Streams 中的 配置 Kafka Connect

启用 KafkaConnectors 后,Cluster Operator 开始监视它们。它更新正在运行的连接器实例的配置,以匹配其 KafkaConnectors 中定义的配置。

AMQ Streams 包括 一个名为 example /connect/source-connector.yaml 的 KafkaConnect or 示例。您可以使用本例来创建和管理 FileStreamSourceConnector

4.2.3.2. Kafka Connect REST API 的可用性

Kafka Connect REST API 在端口 8083 上作为 <connect-cluster-name>-connect-api 服务可用。

如果启用了 KafkaConnectors,Cluster Operator 将恢复直接使用 Kafka Connect REST API 进行手动更改。

Apache Kafka 文档中的 描述了 REST API 支持的操作。

4.2.4. 将 KafkaConnector 资源部署到 Kafka Connect

这个步骤描述了如何将示例 KafkaConnector 部署到 Kafka Connect 集群。

示例 YAML 将创建一个 FileStreamSourceConnector,将许可证文件的每一行发送到 Kafka,作为名为 my-topic 的主题的消息。

先决条件

步骤

  1. 编辑 example /connect/source-connector.yaml 文件:

    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaConnector
    metadata:
      name: my-source-connector 1
      labels:
        strimzi.io/cluster: my-connect-cluster 2
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
      tasksMax: 2 4
      config: 5
        file: "/opt/kafka/LICENSE"
        topic: my-topic
        # ...
    1
    输入 KafkaConnector 资源的名称。这将在 Kafka Connect 中用作连接器的名称。您可以选择对 OpenShift 资源有效的任何名称。
    2
    输入要在其中创建连接器的 Kafka Connect 集群的名称。
    3
    连接器类的名称或别名。这应该存在于由 Kafka Connect 集群使用的镜像中。
    4
    连接器可以创建的任务数量上限。
    5
    连接器的配置设置。可用的配置选项取决于连接器类。
  2. 在 OpenShift 集群中创建 KafkaConnector

    oc apply -f examples/connect/source-connector.yaml
  3. 检查是否已创建资源:

    oc get kctr --selector strimzi.io/cluster=my-connect-cluster -o name

4.3. 部署 Kafka MirrorMaker

Cluster Operator 部署一个或多个 Kafka 镜像(MirrorMaker)副本,以在 Kafka 集群间复制数据。此过程称为镜像,以避免与 Kafka 分区复制概念混淆。MirrorMaker 使用源集群的信息,并将这些消息重新发布到目标集群。

4.3.1. 将 Kafka MirrorMaker 部署到 OpenShift 集群

此流程演示了如何使用 Cluster Operator 将 Kafka MirrorMaker 集群部署到 OpenShift 集群。

部署使用 YAML 文件提供创建 KafkaMirrorMaker 或 KafkaMir rorMaker2 资源规格,具体取决于部署的 MirrorMaker 版本。

在这一流程中,我们使用 AMQ Streams 提供的示例文件:

  • examples/mirror-maker/kafka-mirror-maker.yaml
  • examples/mirror-maker/kafka-mirror-maker-2.yaml

有关配置 KafkaMirrorMaker 或 KafkaMirrorMaker2 资源的详情,请参考 使用 AMQ Streams on OpenShift 指南 中的 Kafka MirrorMaker 集群配置

步骤

  1. 将 Kafka MirrorMaker 部署到 OpenShift 集群:

    对于 MirrorMaker:

    oc apply -f examples/mirror-maker/kafka-mirror-maker.yaml

    对于 MirrorMaker 2.0:

    oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yaml
  2. 验证 MirrorMaker 是否已成功部署:

    oc get deployments

4.4. 部署 Kafka Bridge

Cluster Operator 部署一个或多个 Kafka 网桥副本,以通过 HTTP API 在 Kafka 集群和客户端之间发送数据。

4.4.1. 将 Kafka Bridge 部署到 OpenShift 集群

此流程演示了如何使用 Cluster Operator 将 Kafka Bridge 集群部署到 OpenShift 集群。

部署使用 YAML 文件提供规格来创建 KafkaBridge 资源。

在这一流程中,我们使用 AMQ Streams 提供的示例文件:

  • examples/bridge/kafka-bridge.yaml

有关配置 KafkaBridge 资源的详情,请参考 使用 AMQ Streams 中的 OpenShift 指南中的 Kafka Bridge 集群配置

步骤

  1. 将 Kafka Bridge 部署到 OpenShift 集群:

    oc apply -f examples/bridge/kafka-bridge.yaml
  2. 验证 Kafka 网桥是否已成功部署:

    oc get deployments

第 5 章 设置对 Kafka 集群的客户端访问权限

部署 AMQ Streams 后,本节中的步骤解释了如何:

  • 部署示例制作者和消费者客户端,您可以使用它们来验证您的部署
  • 设置对 Kafka 集群的外部客户端访问

    为 OpenShift 外部的客户端设置 对 Kafka 集群的访问权限的步骤更为复杂,需要熟悉 OpenShift 中使用 AMQ Streams 指南 中所述的 Kafka 组件配置 流程

5.1. 部署示例客户端

此流程演示了如何使用您创建的 Kafka 集群发送和接收信息部署示例制作者和消费者客户端。

先决条件

  • Kafka 集群可供客户端使用。

步骤

  1. 部署 Kafka 制作者.

    oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
  2. 在运行制作者的控制台中键入消息。
  3. Enter 发送邮件。
  4. 部署 Kafka 用户。

    oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.7 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
  5. 确认您在使用者控制台中看到传入的消息。

5.2. 为 OpenShift 外部的客户端设置访问权限

此流程演示了如何从 OpenShift 外部配置对 Kafka 集群的客户端访问。

使用 Kafka 集群的地址,您可以提供到不同 OpenShift 命名空间或整个 OpenShift 外部客户端的外部访问。

您可以配置外部 Kafka 侦听程序以提供访问权限。

支持以下外部监听程序类型:

选择的类型取决于您的要求,以及您的环境和基础架构。例如,负载均衡器可能不适用于某些基础架构,如裸机,其中节点端口提供了一个更好的选项。

在这个过程中:

  1. 为 Kafka 集群配置了一个外部监听程序,它带有 TLS 加密和身份验证,并启用了 Kafka 简单授权
  2. 为客户端创建一个 KafkaUser,并为 简单授权 定义了 TLS 身份验证和访问控制列表(ACL)。

您可以将侦听器配置为使用 TLS 或 SCRAM-SHA-512 身份验证,这两种身份验证都可用于 TLS 加密。如果使用授权服务器,您可以使用基于令牌的 OAuth 2.0 身份验证和 OAuth 2.0 授权。Open Policy Agent(OPA)授权还支持 Kafka 授权 选项。

当您配置 KafkaUser 身份验证和授权机制时,请确保它们与等同的 Kafka 配置匹配:

  • KafkaUser.spec.authentication matches Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorizationKafka.spec.kafka.authorization匹配

您应至少有一个监听器支持您要用于 KafkaUser 的身份验证。

注意

Kafka 用户和 Kafka 代理之间的身份验证取决于每个用户的身份验证设置。例如,如果 Kafka 配置中没有启用 TLS 的用户,则无法验证它。

AMQ Streams Operator 自动配置过程:

  • Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
  • User Operator 根据所选的身份验证类型,创建代表客户端以及用于客户端身份验证的安全凭证的用户。

在这一流程中,使用了 Cluster Operator 生成的证书,但您可以通过 安装您自己的证书 来替换它们。您还可以将监听程序配置为使用 由外部证书颁发机构管理的 Kafka 侦听器证书。

证书以 PKCS #12 格式(.p12)和 PEM(.crt)格式提供。

先决条件

  • Kafka 集群可用于客户端
  • Cluster Operator 和 User Operator 在集群中运行
  • OpenShift 集群外的客户端连接到 Kafka 集群

步骤

  1. 使用外部 Kafka 侦听程序配置 Kafka 集群。

    • 定义通过监听程序访问 Kafka 代理所需的身份验证
    • 在 Kafka 代理上启用授权

      例如:

      apiVersion: kafka.strimzi.io/v1beta1
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 1
          - name: external 2
            port: 9094 3
            type: LISTENER-TYPE 4
            tls: true 5
            authentication:
              type: tls 6
            configuration:
              preferredNodePortAddressType: InternalDNS 7
              bootstrap and broker service overrides 8
              #...
          authorization: 9
            type: simple
            superUsers:
              - super-user-name 10
        # ...
      1
      2
      用于标识侦听器的名称。在 Kafka 集群中必须是唯一的。
      3
      Kafka 内侦听器使用的端口号。在给定 Kafka 集群中,端口号必须是唯一的。允许的端口号是 9092 及以上,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
      4
      外部监听程序类型,指定为 routeloadbalancernodeportingress。内部监听程序指定为 内部 监听程序。
      5
      在监听器上启用 TLS 加密。默认为 false路由 监听程序不需要 TLS 加密。
      6
      指定为 tls 的身份验证。
      7
      (可选,仅适用于 节点端口 侦听器)配置 ,以指定 AMQ Streams 作为节点地址使用的第一个地址类型的首选
      8
      (可选)AMQ Streams 自动决定要公告给客户端的地址。这些地址由 OpenShift 自动分配。如果您运行 AMQ Streams 的基础架构不提供正确的 地址,您可以覆盖 bootstrap 和代理服务 地址。验证不会在覆盖上执行。覆盖配置根据监听器类型而有所不同。例如,您可以覆盖用于 路由、DNS 名称或 负载均衡器 IP 地址的主机, 以及 nodeport 的节点端口。
      9
      Authoization 指定为 simple,它使用 AclAuthorizer Kafka 插件。
      10
      (可选)超级用户可以访问所有代理,无论 ACL 中定义的任何访问权限限制。
      警告

      OpenShift Route 地址包含 Kafka 集群的名称、侦听器的名称以及在其中创建的命名空间的名称。例如,my -cluster-kafka-listener1-bootstrap-myproject (CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用的是 路由 监听程序类型,请注意地址的长度不超过 63 个字符的最大限制。

  2. 创建或更新 Kafka 资源。

    oc apply -f KAFKA-CONFIG-FILE

    使用 TLS 身份验证配置 Kafka 代理监听程序。

    为每个 Kafka 代理 pod 创建一个服务。

    创建一个服务作为连接到 Kafka 集群的 bootstrap 地址

    服务也作为使用 nodeport 侦听器到 Kafka 集群 的外部 bootstrap 地址 创建。

    也使用与 Kafka 资源相同的名称来创建用于验证 kafka 代理身份的集群 CA 证书。

  3. Kafka 资源的状态查找 bootstrap 地址和端口。

    oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'

    使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。

  4. 从生成的 KAFKA-CLUSTER-NAME -cluster-ca-cert Secret 中提取公共集群 CA 证书和密码。

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password

    使用 Kafka 客户端中的证书和密码,使用 TLS 加密连接到 Kafka 集群。

    注意

    默认情况下,集群 CA 证书自动续订。如果您使用您自己的 Kafka 侦听程序证书,则需要 手动续订证书

  5. 创建或修改代表需要访问 Kafka 集群的客户端的用户。

    • 指定与 Kafka 侦听器相同的验证类型。
    • 指定简单授权的授权 ACL。

      例如:

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 1
      spec:
        authentication:
          type: tls 2
        authorization:
          type: simple
          acls: 3
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Read
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Describe
            - resource:
                type: group
                name: my-group
                patternType: literal
              operation: Read
      1
      该标签必须与 Kafka 集群的标签匹配才能创建用户。
      2
      指定为 tls 的身份验证。
      3
      简单授权需要附带的 ACL 规则列表才能应用到用户。规则根据 用户名 (my-user)定义 Kafka 资源允许的操作。
  6. 创建或修改 KafkaUser 资源。

    oc apply -f USER-CONFIG-FILE

    已创建用户,以及名称与 KafkaUser 资源相同的 Secret。Secret 包含用于 TLS 客户端身份验证的私钥和公钥。

    例如:

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA
      user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER
      user.key: PRIVATE-KEY-OF-USER
      user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS
      user.password: PASSWORD-PROTECTING-P12-ARCHIVE
  7. 将客户端配置为使用所需的属性连接到 Kafka 集群,以便与 Kafka 集群建立安全连接。

    1. 添加公共集群证书的身份验证详情:

      security.protocol: SSL 1
      ssl.truststore.location: PATH-TO/ssl/keys/truststore 2
      ssl.truststore.password: CLUSTER-CA-CERT-PASSWORD 3
      ssl.truststore.type=PKCS12 4
      1
      启用 TLS 加密(带有或不进行 TLS 客户端身份验证)。
      2
      指定导入证书的信任存储位置。
      3
      指定用于访问信任存储的密码。如果信任存储不需要此属性,则可以省略它。
      4
      标识 truststore 类型。
      注意

      使用 security.protocol:在通过 TLS 使用 SCRAM-SHA 身份验证时,SASL_SSL

    2. 添加用于连接到 Kafka 集群的 bootstrap 地址和端口:

      bootstrap.servers: BOOTSTRAP-ADDRESS:PORT
    3. 添加公共用户证书的身份验证详情:

      ssl.keystore.location: PATH-TO/ssl/keys/user1.keystore 1
      ssl.keystore.password: USER-CERT-PASSWORD 2
      1
      指定导入证书的密钥存储位置。
      2
      指定用于访问密钥存储的密码。如果密钥存储不需要此属性,则可以省略它。

      公共用户证书在创建时由客户端 CA 签名。

第 6 章 为 AMQ Streams 设置指标和仪表板

您可以通过在仪表板上查看关键指标并设置在某些情况下触发的警报来监控 AMQ Streams 部署。指标数据可用于 Kafka、ZooKeeper 以及 AMQ Streams 的其他组件。

为提供指标信息,AMQ Streams 使用 Prometheus 规则和 Grafana 仪表板。

当为 AMQ Streams 的每个组件配置一组规则时,Prometheus 会使用来自集群中运行的 pod 的键指标。然后,Grafana 在仪表板上视觉化这些指标。AMQ Streams 包括 Grafana 仪表板示例,您可以自定义这些仪表板适合您的部署。

在 OpenShift Container Platform 4.x 中,AMQ Streams 采用 对用户定义的项目的监控 (OpenShift 功能)来简化 Prometheus 设置过程。

在 OpenShift Container Platform 3.11 中,您需要将 Prometheus 和 Alertmanager 组件单独部署到集群中。

无论 OpenShift Container Platform 版本是什么,您必须首先 部署 AMQ Streams 的 Prometheus metrics 配置

接下来,按照 OpenShift Container Platform 版本的说明进行操作:

通过 Prometheus 和 Grafana 设置,您可以使用示例 Grafana 仪表板和警报规则来监控 Kafka 集群。

其他监控选项

Kafka Exporter 是一个可选组件,提供与消费者滞后相关的额外监控。如果要将 Kafka Exporter 用于 AMQ Streams,请参阅 配置 Kafka 资源以使用 Kafka 集群部署 Kafka 导出器

您还可以通过设置分布式追踪将部署配置为跟踪端到端的消息。如需更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 分布式追踪

其他资源

6.1. 指标文件示例

您可以在 example /metrics 目录中找到 Grafana 仪表板和其他指标配置文件的示例 。如下表所示,一些文件仅用于 OpenShift Container Platform 3.11,而不适用于 OpenShift Container Platform 4.x。

AMQ Streams 提供的指标文件示例

metrics
├── grafana-dashboards 1
│   ├── strimzi-cruise-control.json
│   ├── strimzi-kafka-bridge.json
│   ├── strimzi-kafka-connect.json
│   ├── strimzi-kafka-exporter.json
│   ├── strimzi-kafka-mirror-maker-2.json
│   ├── strimzi-kafka.json
│   ├── strimzi-operators.json
│   └── strimzi-zookeeper.json
├── grafana-install
│   └── grafana.yaml 2
├── prometheus-additional-properties
│   └── prometheus-additional.yaml - OPENSHIFT 3.11 ONLY 3
├── prometheus-alertmanager-config
│   └── alert-manager-config.yaml 4
├── prometheus-install
│    ├── alert-manager.yaml - OPENSHIFT 3.11 ONLY 5
│    ├── prometheus-rules.yaml 6
│    ├── prometheus.yaml - OPENSHIFT 3.11 ONLY 7
│    ├── strimzi-pod-monitor.yaml 8
├── kafka-bridge-metrics.yaml 9
├── kafka-connect-metrics.yaml 10
├── kafka-cruise-control-metrics.yaml 11
├── kafka-metrics.yaml 12
└── kafka-mirror-maker-2-metrics.yaml 13

1
Grafana 仪表板示例.
2
Grafana 镜像的安装文件。
3
OPENSHIFT 3.11 只 :其他 Prometheus 配置用于提取 CPU、内存和磁盘卷使用情况的指标,它们直接来自节点上的 OpenShift cAdvisor 代理和 kubelet。
4
用于通过 Alertmanager 发送通知的 hook 定义。
5
OPENSHIFT 3.11 只 :用于部署和配置 Alertmanager 的资源.
6
用于 Prometheus Alertmanager 的警报规则示例。
7
OPENSHIFT 3.11 只 :Prometheus 镜像的安装资源文件。
8
Prometheus Operator 将 PodMonitor 定义转换为作业,以便 Prometheus 服务器能够直接从 pod 中提取指标数据。
9
启用指标数据的 Kafka Bridge 资源。
10
为 Kafka Connect 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
11
定义 Prometheus JMX Exporter 重新标记 Cruise Control 规则的指标配置。
12
为 Kafka 和 ZooKeeper 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
13
为 Kafka Mirror Maker 2.0 定义 Prometheus JMX Exporter 重新标记规则的指标配置。

6.1.1. Grafana 仪表板示例

为监控以下资源提供了 Grafana 仪表板示例:

AMQ Streams Kafka

显示以下指标:

  • 在线代理数
  • 集群计数中的活跃控制器
  • 未清理领导选举率
  • 在线副本
  • 复制不足的分区数
  • 至少在同步副本数中的分区
  • 在同步副本数中最小分区
  • 没有活跃领导且因此不可写入或可读的分区
  • Kafka 代理 pod 内存用量
  • 聚合 Kafka 代理 pod CPU 用量
  • Kafka 代理 pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 传入字节速率总数
  • 传出字节速率总数
  • 传入的消息率
  • 生成请求率总数
  • 字节率
  • 生成请求率
  • 获取请求率
  • 网络处理器平均空闲百分比
  • 请求处理程序平均时间空闲百分比
  • 日志大小
AMQ Streams ZooKeeper

显示以下指标:

  • Zookeeper ensemble 的仲裁大小
  • 活动 连接
  • 服务器数中的排队请求
  • watchers 数
  • zookeeper pod 内存用量
  • ZooKeeper pod CPU 使用量聚合
  • zookeeper pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 服务器响应客户端请求所需的时间(最大值、最小值和平均值)
AMQ Streams Kafka Connect

显示以下指标:

  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Kafka MirrorMaker 2

显示以下指标:

  • 连接器数量
  • 任务数量
  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Operator

显示以下指标:

  • 自定义资源
  • 每小时成功对自定义资源进行协调
  • 每小时自定义资源协调失败
  • 每小时无锁定的调节
  • 协调开始时间
  • 每小时定期协调
  • 最大协调时间
  • 平均协调时间
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

还为 AMQ Streams 的 Kafka BridgeCruise Control 组件提供仪表板。

所有控制面板都提供 JVM 指标,以及特定于各个组件的指标。例如,Operator 仪表板提供有关正在处理的协调或自定义资源数量的信息。

6.1.2. Prometheus 指标配置示例

AMQ Streams 使用 Prometheus JMX Exporter 来通过 HTTP 端点公开 JMX 指标,然后由 Prometheus 提取。

Grafana 仪表板依赖于 Prometheus JMX Exporter 重新标记规则,这些规则是为 AMQ Streams 组件定义的,作为自定义资源配置。

标签是 name-value 对。重新标记是动态编写标签的过程。例如,标签的值可以从 Kafka 服务器名称和客户端 ID 中获得。

AMQ Streams 提供示例自定义资源配置 YAML 文件,其中已定义了重新标记规则。部署 Prometheus 指标配置时,您可以部署示例自定义资源,或将指标配置复制到您自己的自定义资源定义中。

表 6.1. 带有指标配置的自定义资源示例

组件自定义资源YAML 文件示例

Kafka 和 ZooKeeper

kafka

kafka-metrics.yaml

Kafka Connect

KafkaConnectKafkaConnectS2I

kafka-connect-metrics.yaml

Kafka MirrorMaker 2.0

KafkaMirrorMaker2

kafka-mirror-maker-2-metrics.yaml

Kafka Bridge

KafkaBridge

kafka-bridge-metrics.yaml

Sything Control

kafka

kafka-cruise-control-metrics.yaml

其他资源

6.2. 部署 Prometheus 指标配置

AMQ Streams 提供了带有重新标记规则 的示例自定义资源配置 YAML 文件

要应用重新标记规则的指标配置,请执行以下操作之一:

6.2.1. 将 Prometheus metrics 配置复制到自定义资源

要使用 Grafana 仪表板来监控,请将 指标配置示例复制到自定义资源 中。

在这个流程中,会更新 Kafka 资源,但所有支持监控的组件的步骤都相同。

步骤

对部署中的每个 Kafka 资源执行以下步骤。

  1. 更新编辑器中的 Kafka 资源。

    oc edit kafka KAFKA-CONFIG-FILE
  2. kafka-metrics.yaml中的示例配置 复制到您自己的 Kafka 资源定义中。
  3. 保存文件,并等待更新的资源被协调。

6.2.2. 使用 Prometheus 指标配置部署 Kafka 集群

要使用 Grafana 仪表板进行监控,您可以使用 指标配置部署一个示例 Kafka 集群

在这一流程中,kafka-metrics.yaml 文件用于 Kafka 资源。

步骤

6.3. 在 OpenShift 4 中查看 Kafka 指标和仪表板

当 AMQ Streams 部署到 OpenShift Container Platform 4.x 时,指标 将通过监控用户定义的项目 来提供。此 OpenShift 功能可让开发人员访问单独的 Prometheus 实例来监控其自己的项目(如 Kafka 项目)。

如果启用了对用户定义的项目的监控,openshift -user-workload-monitoring 项目包含以下组件:

  • Prometheus Operator
  • Prometheus 实例(由 Prometheus Operator 自动部署)
  • Thanos Ruler 实例

AMQ Streams 使用这些组件来消耗指标。

集群管理员必须启用对用户定义的项目的监控,然后授予开发人员和其他用户权限来监控其项目中的应用。

Grafana 部署

您可以将 Grafana 实例部署到包含 Kafka 集群的项目中。然后,可以将 Grafana 仪表板示例用于在 Grafana 用户界面中视觉化 AMQ Streams 的 Prometheus 指标。

重要

openshift-monitoring 项目为核心平台组件提供监控。不要在此 项目中使用 Prometheus 和 Grafana 组件来配置 OpenShift Container Platform 4.x 上的 AMQ Streams 监控。

Grafana 版本 6.3 是最低支持的版本。

先决条件

流程提纲

要在 OpenShift Container Platform 4.x 中设置 AMQ Streams 监控,请按照以下步骤操作:

6.3.1. 部署 Prometheus 资源

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

要启用 Prometheus 以使用 Kafka 指标,您可以在示例指标文件中配置和部署 PodMonitor 资源PodMonnitors 直接从 Apache Kafka、ZooKeeper、Operator、Kafka Bridge 和 Cruise Control 的 pod 中提取数据。

然后,您为 Alertmanager 部署示例警报规则。

先决条件

步骤

  1. 检查是否启用了对用户定义的项目的监控:

    oc get pods -n openshift-user-workload-monitoring

    如果启用,则返回监控组件的 Pod。例如:

    NAME                                   READY   STATUS    RESTARTS   AGE
    prometheus-operator-5cc59f9bc6-kgcq8   1/1     Running   0          25s
    prometheus-user-workload-0             5/5     Running   1          14s
    prometheus-user-workload-1             5/5     Running   1          14s
    thanos-ruler-user-workload-0           3/3     Running   0          14s
    thanos-ruler-user-workload-1           3/3     Running   0          14s

    如果没有返回 pod,则禁用对用户定义的项目的监控。请参阅 第 6.3 节 “在 OpenShift 4 中查看 Kafka 指标和仪表板” 中的前提条件。

  2. example /metrics/prometheus-install/strimzi-pod-monitor.yaml 中定义了多个 PodMonitor 资源

    对于每个 PodMonitor 资源,编辑 spec.namespaceSelector.matchNames 属性:

    apiVersion: monitoring.coreos.com/v1
    kind: PodMonitor
    metadata:
      name: cluster-operator-metrics
      labels:
        app: strimzi
    spec:
      selector:
        matchLabels:
          strimzi.io/kind: cluster-operator
      namespaceSelector:
        matchNames:
          - PROJECT-NAME 1
      podMetricsEndpoints:
      - path: /metrics
        port: http
    # ...
    1
    Pod 从中提取指标的项目,如 Kafka
  3. strimzi-pod-monitor.yaml 文件部署到运行 Kafka 集群的项目中:

    oc apply -f strimzi-pod-monitor.yaml -n MY-PROJECT
  4. 将 Prometheus 规则示例部署到同一项目中:

    oc apply -f prometheus-rules.yaml -n MY-PROJECT

其他资源

6.3.2. 为 Grafana 创建服务帐户

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

AMQ Streams 的 Grafana 实例需要使用分配 cluster-monitoring-view 角色的服务帐户运行。

步骤

  1. 为 Grafana 创建 ServiceAccount。此处资源命名为 grafana-serviceaccount

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: grafana-serviceaccount
      labels:
        app: strimzi
  2. ServiceAccount 部署到包含 Kafka 集群的项目中:

    oc apply -f GRAFANA-SERVICEACCOUNT -n MY-PROJECT
  3. 创建一个 ClusterRoleBinding 资源,它将 cluster-monitoring-view 角色分配给 Grafana ServiceAccount。此处资源命名为 grafana-cluster-monitoring-binding

    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: grafana-cluster-monitoring-binding
      labels:
        app: strimzi
    subjects:
      - kind: ServiceAccount
        name: grafana-serviceaccount
        namespace: MY-PROJECT 1
    roleRef:
      kind: ClusterRole
      name: cluster-monitoring-view
      apiGroup: rbac.authorization.k8s.io
    1
    您的项目名称。
  4. ClusterRoleBinding 部署到包含 Kafka 集群的项目:

    oc apply -f GRAFANA-CLUSTER-MONITORING-BINDING -n MY-PROJECT

6.3.3. 使用 Prometheus 数据源部署 Grafana

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

此流程描述了如何部署为 OpenShift Container Platform 4.x 监控堆栈配置的 Grafana 应用程序。

OpenShift Container Platform 4.x 在 openshift-monitoring 项目中包含一个 Thanos Querier 实例。Thanos Querier 用于聚合平台指标。

要使用所需的平台指标,您的 Grafana 实例需要一个可以连接到 Thanos Querier 的 Prometheus 数据源。要配置此连接,您需要创建一个 Config Map,它将使用令牌验证与 Thanos Querier 一起运行的 oauth-proxy sidecar。datasource.yaml 文件用作 Config Map 的来源。

最后,您要使用作为卷挂载的 Config Map 将 Grafana 应用程序部署到包含 Kafka 集群的项目中。

步骤

  1. 获取 Grafana ServiceAccount 的访问令牌:

    oc serviceaccounts get-token grafana-serviceaccount -n MY-PROJECT

    复制要在下一步中使用的访问令牌。

  2. 创建一个包含 Grafana Thanos Querier 配置的 数据源.yaml 文件。

    将访问令牌粘贴到 httpHeaderValue1 属性中,如下所示:

    apiVersion: 1
    
    datasources:
    - name: Prometheus
      type: prometheus
      url: https://thanos-querier.openshift-monitoring.svc.cluster.local:9091
      access: proxy
      basicAuth: false
      withCredentials: false
      isDefault: true
      jsonData:
        timeInterval: 5s
        tlsSkipVerify: true
        httpHeaderName1: "Authorization"
      secureJsonData:
        httpHeaderValue1: "Bearer ${GRAFANA-ACCESS-TOKEN}" 1
      editable: true
    1
    GRAFANA-ACCESS-TOKEN :Grafana ServiceAccount 的访问令牌值。
  3. datasource.yaml 文件创建一个名 为grafana-config 的 Config Map:

    oc create configmap grafana-config --from-file=datasource.yaml -n MY-PROJECT
  4. 创建包含 Deployment 和 Service 的 Grafana 应用程序。

    The grafana-config Config Map 被挂载为数据源配置的卷。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: grafana
      labels:
        app: strimzi
    spec:
      replicas: 1
      selector:
        matchLabels:
          name: grafana
      template:
        metadata:
          labels:
            name: grafana
        spec:
          serviceAccountName: grafana-serviceaccount
          containers:
          - name: grafana
            image: grafana/grafana:6.3.0
            ports:
            - name: grafana
              containerPort: 3000
              protocol: TCP
            volumeMounts:
            - name: grafana-data
              mountPath: /var/lib/grafana
            - name: grafana-logs
              mountPath: /var/log/grafana
            - name: grafana-config
              mountPath: /etc/grafana/provisioning/datasources/datasource.yaml
              readOnly: true
              subPath: datasource.yaml
            readinessProbe:
              httpGet:
                path: /api/health
                port: 3000
              initialDelaySeconds: 5
              periodSeconds: 10
            livenessProbe:
              httpGet:
                path: /api/health
                port: 3000
              initialDelaySeconds: 15
              periodSeconds: 20
          volumes:
          - name: grafana-data
            emptyDir: {}
          - name: grafana-logs
            emptyDir: {}
          - name: grafana-config
            configMap:
              name: grafana-config
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: grafana
      labels:
        app: strimzi
    spec:
      ports:
      - name: grafana
        port: 3000
        targetPort: 3000
        protocol: TCP
      selector:
        name: grafana
      type: ClusterIP
  5. 将 Grafana 应用程序部署到包含 Kafka 集群的项目中:

    oc apply -f GRAFANA-APPLICATION -n MY-PROJECT

其他资源

6.3.4. 创建到 Grafana 服务的路由

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

您可以通过公开 Grafana 服务的路由访问 Grafana 用户界面。

步骤

  • 创建到 grafana 服务的边缘路由:

    oc create route edge MY-GRAFANA-ROUTE --service=grafana --namespace=KAFKA-NAMESPACE

6.3.5. 导入 Grafana 仪表板示例

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

使用 Grafana 用户界面导入示例 Grafana 仪表板。

步骤

  1. 获取到 Grafana 服务的路由详情。例如:

    oc get routes
    
    NAME               HOST/PORT                         PATH  SERVICES
    MY-GRAFANA-ROUTE   MY-GRAFANA-ROUTE-amq-streams.net        grafana
  2. 在 Web 浏览器中,使用 Route 主机和端口的 URL 访问 Grafana 登录屏幕。
  3. 输入您的用户名和密码,然后单击 Log In

    默认的 Grafana 用户名和密码都是 admin。第一次登录后,您可以更改密码。

  4. Configuration > Data Sources 中,检查是否创建了 Prometheus 数据源。该数据源是在 第 6.3.3 节 “使用 Prometheus 数据源部署 Grafana” 中创建的。
  5. 点击 Dashboards > Manage,然后点 Import
  6. 示例/metrics/grafana-dashboards 中,复制仪表板的 JSON 以导入。
  7. 将 JSON 粘贴到文本框中,然后单击 Load
  8. 为另一个示例 Grafana 仪表板重复步骤 1 -7。

导入的 Grafana 仪表板可以从 Dashboards 主页查看。

6.4. 在 OpenShift 3.11 中查看 Kafka 指标和仪表板

当将 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,您可以使用 Prometheus 为 AMQ Streams 提供的示例 Grafana 仪表板提供监控数据。您需要手动将 Prometheus 组件部署到集群中。

要运行 Grafana 仪表板示例,您必须:

注意

本节中引用的资源旨在作为设置监控的起点,但仅作为示例提供。如果需要进一步支持在生产环境中配置和运行 Prometheus 或 Grafana,请尝试联系其各自的社区。

6.4.1. Prometheus 支持

当 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,不支持 Prometheus 服务器。但是,支持 Prometheus 端点和用于公开指标的 Prometheus JMX Exporter。

为方便起见,我们提供详细说明和示例指标配置文件,以便您希望使用 Prometheus 进行监控。

6.4.2. 设置 Prometheus

注意

在 OpenShift Container Platform 3.11 上运行 AMQ Streams 时,请使用以下步骤。

Prometheus 为系统监控和警报通知提供一套开源的组件。

此处我们介绍了如何在将 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,如何使用提供的 Prometheus 镜像和配置文件来运行和管理 Prometheus 服务器。

先决条件

  • 您已将 Prometheus 和 Grafana 的兼容版本部署到 OpenShift Container Platform 3.11 集群中。
  • 用于运行 Prometheus 服务器 pod 的服务帐户有权访问 OpenShift API 服务器。这允许服务帐户检索集群中获取指标的 pod 列表。

    如需更多信息,请参阅 发现服务

6.4.2.1. Prometheus 配置

AMQ Streams 为 Prometheus 服务器提供了示例配置文件

为部署提供了一个 Prometheus 镜像:

  • prometheus.yaml

以下文件中还提供了与 Prometheus 相关的其他配置:

  • prometheus-additional.yaml
  • prometheus-rules.yaml
  • strimzi-pod-monitor.yaml

若要让 Prometheus 获取监控数据,您必须已将 Prometheus 的兼容版本部署到 OpenShift Container Platform 3.11 集群中。

然后,使用配置文件来部署 Prometheus

6.4.2.2. Prometheus 资源

应用 Prometheus 配置时,会在 OpenShift 集群中创建并由 Prometheus Operator 管理以下资源:

  • 为 Prometheus 授予权限来读取 Kafka 和 ZooKeeper pod、cAdvisor 和 kubelet 针对容器指标公开的健康端点的 ClusterRole
  • 要运行的 Prometheus Pod 的 ServiceAccount
  • 一个 ClusterRoleBinding,它将 ClusterRole 绑定到 ServiceAccount
  • 用于管理 Prometheus Operator pod 的 Deployment
  • 用于管理 Prometheus pod 配置的 PodMonitor
  • 用于管理 Prometheus pod 配置的 Prometheus。
  • 用于管理 Prometheus pod 的警报规则的 PrometheusRule
  • 用于管理其他 Prometheus 设置的 Secret
  • 一个 服务,允许在集群中运行的应用程序连接到 Prometheus(例如,使用 Prometheus 作为数据源的 Grafana)。

6.4.2.3. 部署 Prometheus

要在 Kafka 集群中获取监控数据,您可以通过应用 Prometheus docker 镜像的示例安装资源文件以及 Prometheus 相关资源的 YAML 文件来使用您自己的 Prometheus 部署或部署 Prometheus 部署

部署过程创建一个 ClusterRoleBinding,并在为部署指定的命名空间中发现 Alertmanager 实例。

先决条件

步骤

  1. 根据要安装到的命名空间 Prometheus 修改 Prometheus 安装文件(prometheus.yaml):

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-namespace/' prometheus.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-namespace/' prometheus.yaml
  2. 编辑 PodMonitor 资源 in strimzi-pod-monitor.yaml,以定义 Prometheus 作业,它将从 pod 中提取指标数据。

    namespaceSelector.matchNames 属性更新为 pod 要从中提取指标的命名空间。

    PodMonitor 用于直接从 pod 中提取数据,用于 Apache Kafka、ZooKeeper、Operator、Kafka Bridge 和 Cruise Control。

  3. 编辑 prometheus.yaml 安装文件,使其包含用于直接从节点提取指标的额外配置。

    提供的 Grafana 仪表板显示 CPU、内存和磁盘卷使用情况的指标,它们直接来自节点上的 OpenShift cAdvisor 代理和 kubelet。

    1. 从配置文件(如 example /metrics/prometheus-additional-properties 目录中的 prometheus-additional.yaml )创建 Secret 资源:

      oc apply -f prometheus-additional.yaml
    2. 编辑 prometheus.yaml 文件中的 additionalScrapeConfigs 属性,使其包含 Secret 的名称和 prometheus-additional.yaml 文件。
  4. 部署 Prometheus 资源:

    oc apply -f strimzi-pod-monitor.yaml
    oc apply -f prometheus-rules.yaml
    oc apply -f prometheus.yaml

6.4.3. 设置 Prometheus Alertmanager

Prometheus Alertmanager 是一个插件,用于处理警报并将其路由到通知服务。Alertmanager 支持监控的一个重要方面,它将收到根据警报规则指示潜在问题的条件通知。

6.4.3.1. Alertmanager 配置

AMQ Streams 为 Prometheus Alertmanager 提供示例配置文件

配置文件定义用于部署 Alertmanager 的资源:

  • alert-manager.yaml

一个额外的配置文件提供用于从 Kafka 集群发送通知的 hook 定义。

  • alert-manager-config.yaml

对于 Alertmanger 处理 Prometheus 警报,请使用配置文件:

6.4.3.2. 警报规则

警报规则提供有关指标中观察到的特定条件的通知。规则在 Prometheus 服务器上声明,但 Prometheus Alertmanager 负责警报通知。

Prometheus 警报规则描述使用不断评估的 PromQL 表达式的条件。

当警报表达式变为 true 时,满足该条件,Prometheus 服务器会向 Alertmanager 发送警报数据。然后,Alertmanager 使用为其部署配置的通信方法发出通知。

Alertmanager 可以配置为使用电子邮件、聊天消息或其他通知方法。

其他资源

有关设置警报规则的更多信息,请参阅 Prometheus 文档中的 配置

6.4.3.3. 警报规则示例

Kafka 和 ZooKeeper 指标的警报规则示例随 AMQ Streams 提供,供 Prometheus 部署 使用。

关于警报规则定义的常规要点:

  • for 属性与规则配合使用,以确定条件在触发警报前必须保留的时间。
  • tick 是一个基本的 ZooKeeper 时间单元,以毫秒为单位计算并使用 Kafka.spec.zookeeper.configtickTime 参数进行配置。例如,如果 ZooKeeper tickTime=3000,3 ticks(3 x 3000)等于 9000 毫秒。
  • ZookeeperRunningOutOfSpace 指标和警报的可用性取决于所使用的 OpenShift 配置和存储实施。某些平台的存储实施可能无法提供有关指标提供警报所需的可用空间的信息。

Kafka 警报规则

UnderReplicatedPartitions
提供当前代理是潜在副本但副本数比为 min.insync.replicas 配置的 副本数更少的分区数量。这个指标提供了有关托管后续副本的代理的信息。这些追随者跟不上领导者的步调。其原因可能包括(或已经)脱机,以及过限内联复制。当此值大于零时引发警报,以提供关于每个代理的副本分区的信息。
AbnormalControllerState
指示当前代理是否为集群的控制器。指标可以是 0 或 1。在集群生命周期内,应该只有一个代理是控制器,集群始终需要有一个活跃的控制器。有两个或多个代理表示它们是控制器,这表示问题。如果条件仍然存在,则当所有代理上此指标的所有值总和不等于 1 时,就会引发警报,即没有活跃的控制器(总和为 0)或多个控制器(总和大于 1)。
UnderMinIsrPartitionCount
表示领导 Kafka 代理的最小内同步副本(ISR)数量最少(使用 min.insync.replicas 指定),必须确认没有达到写入操作。指标定义代理所负责的分区数量,其中同步副本数小于最小内同步副本数。当这个值大于零时引发警报,为没有达到最小确认数的每个代理提供分区数的信息。
OfflineLogDirectoryCount
指明离线的日志目录数量(例如,由于硬件故障),以便代理无法再存储传入的消息。如果此值大于零,则会引发警报,以提供关于每个代理的离线日志目录数量的信息。
KafkaRunningOutOfSpace
表示可用于写入数据的剩余磁盘空间量。当此值低于 5GiB 时,将引发警报,提供关于每个持久卷声明的空间不足的磁盘信息。阈值可以在 prometheus-rules.yaml 中更改。

zookeeper 警报规则

AvgRequestLatency
表示服务器响应客户端请求所需的时间。当此值大于 10(勾号)时,会引发警报,从而提供每台服务器平均请求延迟的实际值。
OutstandingRequests
表示服务器中的排队请求数。当服务器接收的请求数超过其处理次数时,这个值会增加。如果此值大于 10,则会引发警报,从而为每个服务器提供实际的待处理请求数。
ZookeeperRunningOutOfSpace
表示可用于向 ZooKeeper 写入数据的剩余磁盘空间量。当此值低于 5GiB 时,会引发警报。它提供有关每个持久卷声明的空间不足的磁盘信息。

6.4.3.4. 部署 Alertmanager

要部署 Alertmanager,请应用 示例配置文件

AMQ Streams 提供的示例配置将 Alertmanager 配置为将通知发送到 Slack 频道。

在部署中定义了以下资源:

  • 用于管理 Alertmanager pod 的 Alertmanager。
  • 用于管理 Alertmanager 配置的机密。
  • 服务,用于为其他服务(如 Prometheus)连接 Alertmanager 提供简单参考主机名的服务。

步骤

  1. 从 Alertmanager 配置文件(alert-manager-config.yaml)创建 Secret 资源:

    oc create secret generic alertmanager-alertmanager --from-file=alertmanager.yaml=alert-manager-config.yaml
  2. 更新 alert-manager-config.yaml 文件以替换:

    • slack _api_url 属性,以及与 Slack 工作区应用相关的 Slack API URL 的实际值
    • 具有 要在其上发送通知的实际 Slack 频道的频道属性
  3. 部署 Alertmanager:

    oc apply -f alert-manager.yaml

6.4.4. 设置 Grafana

Grafana 提供 Prometheus 指标的视觉化。

您可以部署和启用 AMQ Streams 提供的 Grafana 仪表板示例。

6.4.4.1. 部署 Grafana

要提供 Prometheus 指标的视觉化,您可以使用您自己的 Grafana 安装,或通过应用 example /metrics 目录中提供的 grafana.yaml 文件来部署 Grafana。

步骤

  1. 部署 Grafana:

    oc apply -f grafana.yaml
  2. 启用 Grafana 仪表板

6.4.4.2. 启用 Grafana 仪表板示例

AMQ Streams 为 Grafana 提供了仪表板配置文件示例。示例仪表板作为 JSON 文件在 example/metrics 目录中提供:

  • strimzi-kafka.json
  • strimzi-zookeeper.json
  • strimzi-kafka-connect.json
  • strimzi-kafka-mirror-maker-2.json
  • strimzi-operators.json
  • strimzi-kafka-bridge.json
  • strimzi-cruise-control.json

示例仪表板是监控关键指标的良好起点,但它们并不代表所有可用的指标。您可以根据您的基础架构修改示例仪表板或添加其他指标。

设置 Prometheus 和 Grafana 后,您可以在 Grafana 仪表板上视觉化 AMQ Streams 数据。

注意

没有定义警报通知规则。

访问仪表板时,您可以使用 port-forward 命令将 Grafana pod 的流量转发到主机。

注意

Grafana pod 的名称为每个用户不同。

步骤

  1. 获取 Grafana 服务的详情:

    oc get service grafana

    例如:

    NAMETYPECLUSTER-IP端口.

    grafana

    ClusterIP

    172.30.123.40

    3000/TCP

    请注意端口转发的端口号。

  2. 使用 port-forward 将 Grafana 用户界面重定向到 localhost:3000:

    oc port-forward svc/grafana 3000:3000
  3. 将 Web 浏览器指向 http://localhost:3000

    这时会出现 Grafana Log In 页面。

  4. 输入您的用户名和密码,然后单击 Log In

    默认的 Grafana 用户名和密码都是 admin。第一次登录后,您可以更改密码。

  5. 添加 Prometheus 作为 数据源

  6. DashboardsImport,上传示例仪表板或直接粘贴 JSON。
  7. 在顶部标头中,单击仪表板下拉菜单,然后选择您要查看的仪表板。

    当 Prometheus 服务器收集 AMQ Streams 集群的指标一段时间时,会填充仪表板。

图 6.1. 仪表板选择选项

AMQ Streams 仪表板选择
AMQ Streams Kafka

显示以下指标:

  • 在线代理数
  • 集群计数中的活跃控制器
  • 未清理领导选举率
  • 在线副本
  • 复制不足的分区数
  • 至少在同步副本数中的分区
  • 在同步副本数中最小分区
  • 没有活跃领导且因此不可写入或可读的分区
  • Kafka 代理 pod 内存用量
  • 聚合 Kafka 代理 pod CPU 用量
  • Kafka 代理 pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 传入字节速率总数
  • 传出字节速率总数
  • 传入的消息率
  • 生成请求率总数
  • 字节率
  • 生成请求率
  • 获取请求率
  • 网络处理器平均空闲百分比
  • 请求处理程序平均时间空闲百分比
  • 日志大小

    图 6.2. AMQ Streams Kafka 仪表板

    Kafka 仪表板
AMQ Streams ZooKeeper

显示以下指标:

  • Zookeeper ensemble 的仲裁大小
  • 活动 连接
  • 服务器数中的排队请求
  • watchers 数
  • zookeeper pod 内存用量
  • ZooKeeper pod CPU 使用量聚合
  • zookeeper pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 服务器响应客户端请求所需的时间(最大值、最小值和平均值)
AMQ Streams Kafka Connect

显示以下指标:

  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Kafka MirrorMaker 2

显示以下指标:

  • 连接器数量
  • 任务数量
  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Operator

显示以下指标:

  • 自定义资源
  • 每小时成功对自定义资源进行协调
  • 每小时自定义资源协调失败
  • 每小时无锁定的调节
  • 协调开始时间
  • 每小时定期协调
  • 最大协调时间
  • 平均协调时间
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

6.5. 添加 Kafka Exporter

Kafka Exporter 是一个开源项目,用于增强对 Apache Kafka 代理和客户端的监控。Kafka Exporter 附带 AMQ Streams 以用于部署 Kafka 集群,以便从 Kafka 代理中提取与偏移、消费者组、消费者滞后和主题相关的额外指标数据。

例如,使用指标数据来帮助识别慢速消费者。

滞后数据作为 Prometheus 指标公开,随后可在 Grafana 中提供以进行分析。

如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为同时提取 Kafka Exporter Prometheus 端点。

6.5.1. 监控消费者滞后

消费滞后表明消息的生产和消耗速度不同。具体来说,消费者对给定消费者组的滞后指示分区中最后一条消息与该消费者当前采用的消息之间的延迟。

与分区日志末尾相比,滞后反映了消费者偏移的位置。

生产者和消费者偏移的消费者滞后情况

Consumer lag

这种差别有时被称为制作者偏移和消费者偏移之间的 delta : Kafka 代理主题分区中的读取和写入位置。

假设主题流 100 条消息秒。制作者偏移(主题分区头)和消费者最后的偏移之间有 1000 条消息的滞后意味着 10 秒的延迟。

监控消费者滞后的重要性

对于依赖(near)实时数据处理的应用程序,监控消费者滞后情况对于检查它是否不再太大至关重要。滞后越高,进程距离实时处理目标越高。

例如,消费者的滞后可能是由于消耗过多的陈旧数据而导致的,这些数据尚未清除,或者是通过计划外关机造成的。

减少消费者滞后

降低滞后率的典型操作包括:

  • 通过添加新用户来扩展消费者组
  • 增加消息的保留时间以保留在某个主题中
  • 添加更多磁盘容量来增加消息缓冲

降低消费者滞后问题的操作取决于底层基础架构,而 AMQ Streams 支持用例。例如,适应消费者不太可能从其磁盘缓存服务获取请求的代理中受益。在某些情况下,可以同意在消费者发现信息之前自动丢弃消息。

6.5.2. Kafka Exporter 警报规则示例

如果您执行了在部署中引入指标的步骤,您的 Kafka 集群将配置为使用支持 Kafka Exporter 的警报通知规则。

Kafka Exporter 的规则在 prometheus-rules.yaml 中定义,并使用 Prometheus 部署。如需更多信息,请参阅 Prometheus

特定于 Kafka Exporter 的警报通知规则示例如下:

UnderReplicatedPartition
警告主题被复制不足且代理没有复制到足够分区的警报。如果某个主题有一个或多个复制分区,则默认配置用于警报。该警报可能表示 Kafka 实例停机或 Kafka 集群已过载。可能需要计划重启 Kafka 代理,才能重新启动复制过程。
TooLargeConsumerGroupLag
警告使用者组的滞后对于特定主题分区来说太过大的警报。默认配置是 1000 记录。很大的滞后可能意味着消费者太慢,正在落于生产者后面。
NoMessageForTooLong
提醒主题在一段时间内未收到消息的警报。时间段的默认配置为 10 分钟。延迟可能是因为配置问题导致制作者无法将消息发布到该主题。

根据您的特定需求调整这些规则的默认配置。

6.5.3. 公开 Kafka Exporter 指标

Kafka Exporter 会将较慢的信息作为 Prometheus 指标在 Grafana 中公开。

Kafka Exporter 为代理、主题和消费者组公开指标数据。

此处描述了所提取的数据。

表 6.2. 代理指标输出

Name信息

kafka_brokers

Kafka 集群中代理数量

表 6.3. 主题指标输出

Name信息

kafka_topic_partitions

主题的分区数

kafka_topic_partition_current_offset

代理的当前主题分区偏移

kafka_topic_partition_oldest_offset

代理的最旧主题分区偏移

kafka_topic_partition_in_sync_replica

主题分区的同步副本数

kafka_topic_partition_leader

主题分区的领导代理 ID

kafka_topic_partition_leader_is_preferred

显示 1 如果主题分区正在使用首选代理

kafka_topic_partition_replicas

此主题分区的副本数

kafka_topic_partition_under_replicated_partition

如果主题分区复制不足,显示 1

表 6.4. 消费者组指标输出

Name信息

kafka_consumergroup_current_offset

消费者组的当前主题分区偏移

kafka_consumergroup_lag

某个消费者小组在某个主题分区的近似滞后情况

6.5.4. 配置 Kafka Exporter

此流程演示了如何通过 Kafka Exporter 属性在 Kafka 资源中配置 Kafka Exporter

有关配置 Kafka 资源的更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 示例 Kafka YAML 配置

此流程中显示了与 Kafka Exporter 配置相关的属性。

您可以将这些属性配置为 Kafka 集群的部署或重新部署的一部分。

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

步骤

  1. 编辑 Kafka 资源的 Kafka Exporter 属性。

    您可以配置的属性显示在此示例配置中:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      # ...
      kafkaExporter:
        image: my-org/my-image:latest 1
        groupRegex: ".*" 2
        topicRegex: ".*" 3
        resources: 4
          requests:
            cpu: 200m
            memory: 64Mi
          limits:
            cpu: 500m
            memory: 128Mi
        logging: debug 5
        enableSaramaLogging: true 6
        template: 7
          pod:
            metadata:
              labels:
                label1: value1
            imagePullSecrets:
              - name: my-docker-credentials
            securityContext:
              runAsUser: 1000001
              fsGroup: 0
            terminationGracePeriodSeconds: 120
        readinessProbe: 8
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe: 9
          initialDelaySeconds: 15
          timeoutSeconds: 5
    # ...
    1
    高级选项:容器镜像配置,建议仅在特殊情况下 使用。
    2
    用于指定要包含在指标中的使用者组的正则表达式。
    3
    用于指定要包含在指标中的主题的正则表达式。
    4
    5
    记录配置,记录具有给定严重性(debug、info、warn、error、fatal)的消息。
    6
    用于启用 Sarama 日志记录的布尔值,Kafka Exporter 使用的 Go 客户端库。
    7
    8
    9
  2. 创建或更新资源:

    oc apply -f kafka.yaml

接下来要做什么

配置和部署 Kafka Exporter 后,您可以 启用 Grafana 来显示 Kafka Exporter 仪表板

6.5.5. 启用 Kafka Exporter Grafana 仪表板

AMQ Streams 为 Grafana 提供了仪表板配置文件示例。Kafka Exporter 仪表板作为 JSON 文件在 example/metrics 目录中提供:

  • strimzi-kafka-exporter.json

如果使用 Kafka 集群部署了 Kafka Exporter,您可以视觉化它在 Grafana 仪表板中公开的指标数据。

此流程假设您已有权访问 Grafana 用户界面,Prometheus 已添加为数据源。如果您是第一次访问用户界面,请参阅 Grafana

步骤

  1. 访问 Grafana 用户界面
  2. 选择 Strimzi Kafka Exporter 仪表板。

    收集指标数据一段时间后,会填充 Kafka Exporter chart。

    AMQ Streams Kafka Exporter

    显示以下指标:

    • 主题数量
    • 分区数
    • 副本数
    • 同步副本数
    • 复制不足的分区数
    • 至少在同步副本数中的分区
    • 在同步副本数中最小分区
    • 不在首选节点上的分区
    • 每秒来自主题的消息
    • 每秒从主题中消耗的消息
    • 消费者组每分钟使用的消息
    • 消费组的滞后
    • 分区数
    • 最新偏移
    • 最旧的偏移

使用 Grafana 图表分析滞后情况,检查减少滞后问题的操作是否对受影响的消费者组产生影响。例如,如果对 Kafka 代理进行调整以减少滞后,仪表板将显示 Lag by consumer 组 图表停机,并且 每分钟消耗的 Messages 都会增加。

6.6. 监控 Kafka Bridge

如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为也提取 Kafka Bridge Prometheus 端点。

Kafka Bridge 的 Grafana 仪表板示例提供:

  • 有关 HTTP 连接和不同端点相关请求的信息
  • 有关网桥使用的 Kafka 消费者和生产者的信息
  • 网桥本身的 JVM 指标

6.6.1. 配置 Kafka 网桥

您可以使用 enableMetrics 属性在 KafkaBridge 资源中启用 Kafka Bridge 指标。

您可以将此属性配置为 Kafka Bridge 部署或重新部署的一部分。

例如:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  bootstrapServers: my-cluster-kafka:9092
  http:
    # ...
  enableMetrics: true
  # ...

6.6.2. 启用 Kafka Bridge Grafana 仪表板

如果使用 Kafka 集群部署 Kafka Bridge,您可以启用 Grafana 以显示它公开的指标数据。

example /metrics 目录中以 JSON 文件形式提供了 Kafka Bridge 仪表板:

  • strimzi-kafka-bridge.json

当收集指标数据一段时间后,会填充 Kafka Bridge chart。

Kafka Bridge

显示以下指标:

  • 到 Kafka 网桥计数的 HTTP 连接
  • 处理中的 HTTP 请求数
  • 每秒处理的请求按 HTTP 方法分组
  • 按响应代码分组的总请求率(2XX、4XX、5XX)
  • 每秒接收和发送的字节数
  • 对每个 Kafka Bridge 端点的请求
  • Kafka Bridge 使用的 Kafka 用户、生产者和相关打开连接的数量
  • Kafka 制作者:

    • 每秒发送的记录平均数量(按主题分组)
    • 每秒发送到所有代理的传出字节数(按主题分组)
    • 导致错误的每秒记录平均数量(按主题分组)
  • Kafka 用户:

    • 每秒消耗的平均记录数(按 clientId-topic 进行分组)
    • 每秒消耗的平均字节数(按 clientId-topic 进行分组)
    • 分配分区(由 clientId 进行分组)
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

6.7. monitor 清理控制

如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为还提取 Cruise Control Prometheus 端点。

Cruise Control 的 Grafana 仪表板示例提供:

  • 有关优化计算、目标违反、集群均衡等的信息
  • 有关重新平衡调整和实际重新平衡操作的 REST API 调用的信息
  • 来自清理控制本身的 JVM 指标

6.7.1. 配置清理控制

您可以使用包含要公开指标的 JMX 导出器配置的 cruiseControl.metrics 属性在 Kafka 资源中启用 Cruise Control 指标。

例如:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  # ...
  kafka:
    # ...
  zookeeper:
    # ...
  cruiseControl:
    metrics:
      lowercaseOutputName: true
      rules:
      - pattern: kafka.cruisecontrol<name=(.+)><>(\w+)
        name: kafka_cruisecontrol_$1_$2
        type: GAUGE

6.7.2. 启用 Cruise Control Grafana 仪表板

如果您使用启用了指标的 Kafka 集群部署了 Cruise Control,您可以启用 Grafana 呈现它公开的指标数据。

example /metrics 目录中以 JSON 文件的形式提供了一个 Cruise Control 仪表板:

  • strimzi-cruise-control.json

收集指标数据一段时间后,会填充 Cruise Control chart。

Sything Control

显示以下指标:

  • Cruise Control 监控的快照窗口数量
  • 有效时间窗数量,因为它们包含足够的样本来计算优化建议
  • 运行的连续执行数量,用于调整或重新平衡
  • 由 Cruise Control 的异常检测器组件计算的 Kafka 集群的当前平衡度分数(默认为 5 分钟)
  • 受监控分区的百分比
  • 异常检测器报告的目标违反数量(默认为 5 分钟)
  • 代理中磁盘读取失败的频率
  • 指标示例获取失败的速度
  • 计算优化建议所需的时间
  • 创建集群模型所需的时间
  • 通过 Cruise Control REST API 进行请求或实际重新平衡请求的频率
  • 通过 Cruise Control REST API 请求整个集群状态和用户任务状态的频率
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

第 7 章 升级 AMQ Streams

AMQ Streams 可以升级,无需集群停机。AMQ Streams 的每个版本都支持一个或多个 Apache Kafka 版本。只要您的 AMQ Streams 版本支持,就可以升级到更高的 Kafka 版本。

AMQ Streams 的新版本可能支持较新版本的 Kafka,但需要先升级 AMQ Streams,然后才能 升级到更高版本的 Kafka。

要升级 AMQ Streams Operator,您可以使用 OpenShift Container Platform 集群上的 Operator Lifecycle Manager(OLM)。

重要

如果适用,必须在 升级 AMQ Streams 和 Kafka 后进行资源 升级。

7.1. AMQ Streams 和 Kafka 升级

升级 AMQ Streams 是一个两阶段的过程。要在没有停机的情况下升级代理和客户端,您必须 按照以下顺序完成升级过程:

  1. 将 Cluster Operator 更新至最新的 AMQ Streams 版本。您采取的方法取决于如何 部署 Cluster Operator

    • 如果使用安装 YAML 文件部署 Cluster Operator,请通过 修改 Operator 安装文件 来执行升级。
    • 如果您从 OperatorHub 部署 Cluster Operator,请使用 Operator Lifecycle Manager(OLM)将 AMQ Streams Operator 的更新频道改为新的 AMQ Streams 版本。

      根据您选择的升级策略,按照频道更新进行操作:

      • 启动 自动 升级
      • 然后,在开始安装前需要批准 手动 升级

      有关使用 OperatorHub 升级 Operator 的更多信息,请参阅 升级已安装的 Operator

  2. 将所有 Kafka 代理和客户端应用程序升级到最新的 Kafka 版本。

7.1.1. Kafka 版本

Kafka 的日志消息格式版本和broker 协议版本指定附加到消息的日志格式版本以及集群中使用的协议版本。因此,升级过程包括对现有的 Kafka 代理进行配置更改,并对客户端应用程序(使用者和生产者)进行代码更改以确保使用正确的版本。

下表显示了 Kafka 版本之间的区别:

Kafka 版本Interbroker 协议版本日志消息格式版本zookeeper 版本

2.5.0

2.5

2.5

3.5.8

2.6.0

2.6

2.6

3.5.8

消息格式版本

当生产者向 Kafka 代理发送消息时,该消息将使用特定格式进行编码。Kafka 版本的格式可能会改变,因此消息包括一个版本,用于标识它们编码的格式版本。您可以配置 Kafka 代理,在代理将消息附加到日志前将信息从较新的格式转换为给定的旧格式版本。

在 Kafka 中,设置消息格式版本的方法有两种:

  • message.format.version 属性在主题上设置。
  • log.message.format.version 属性在 Kafka 代理上设置。

主题的 message.format.version 默认值由 Kafka 代理上设置的 log.message.format.version 定义。您可以通过修改主题配置来手动设置主题的 message.format.version

本节中的升级任务假定消息格式版本由 log.message.format.version 定义。

7.1.2. 升级 Cluster Operator

本节概述了将 Cluster Operator 部署升级到使用 AMQ Streams 1.6 的步骤。

由 Cluster Operator 管理的 Kafka 集群的可用性不受升级操作的影响。

注意

有关如何升级到该版本的信息,请参阅支持 AMQ Streams 特定版本的文档。

7.1.2.1. 将 Cluster Operator 升级到更新的版本

此流程描述了如何将 Cluster Operator 部署升级到更新的版本。

先决条件

步骤

  1. 记录对现有 Cluster Operator 资源所做的任何配置更改(在 /install/cluster-operator 目录中)。任何更改都将被 Cluster Operator 的新版本 覆盖
  2. 更新 Cluster Operator。

    1. 根据 Cluster Operator 运行的命名空间修改新版本的安装文件。

      在 Linux 中,使用:

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      在 MacOS 中,使用:

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. 如果在现有 Cluster Operator Deployment 中修改了一个或多个环境变量,请编辑 install/cluster-operator/060-Deployment-cluster-operator.yaml 文件来使用这些环境变量。
  3. 当您有更新的配置时,请将其与其它安装资源一起部署:

    oc apply -f install/cluster-operator

    等待滚动更新完成。

  4. 获取 Kafka pod 的镜像以确保升级成功:

    oc get po my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,<New AMQ Streams version>-kafka-<Current Kafka version>

  5. 更新现有资源以处理已弃用的自定义资源属性。

现在,您有一个更新的 Cluster Operator,但在它管理的集群中运行的 Kafka 版本没有改变。

接下来要做什么

在 Cluster Operator 升级后,您可以执行 Kafka 升级

7.1.3. 升级 Kafka

将 Cluster Operator 升级到 1.6 后,下一步是将所有 Kafka 代理升级到最新支持的 Kafka 版本。

Kafka 升级由 Cluster Operator 通过对 Kafka 代理的滚动更新来执行。

Cluster Operator 根据 Kafka 集群配置启动滚动更新。

如果 Kafka.spec.kafka.config 包含…​Cluster Operator 启动s…​

inter.broker.protocol.versionlog.message.format.version

单个滚动更新.更新后,inter.broker.protocol.version 必须手动更新,后接 log.message.format.version。更改每项将触发进一步滚动更新。

inter.broker.protocol.versionlog.message.format.version

两次滚动更新.

没有 inter.broker.protocol.version 或 log.message.format.version 的配置。

两次滚动更新.

作为 Kafka 升级的一部分,Cluster Operator 会启动 ZooKeeper 的滚动更新。

  • 即使 ZooKeeper 版本未更改,也会出现一次滚动更新。
  • 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会出现额外的滚动更新。

7.1.3.1. Kafka 版本和镜像映射

在升级 Kafka 时,请考虑对 STRIMZI_KAFKA_IMAGESKafka.spec.kafka.version 属性的设置。

  • 每个 Kafka 资源都可以使用 Kafka.spec.kafka.version 配置。
  • Cluster Operator 的 STRIMZI_KAFKA_IMAGES 环境变量提供 Kafka 版本和给定 Kafka 资源中请求该版本时使用的镜像之间的映射。

    • 如果没有配置 Kafka.spec.kafka.image,则会使用给定版本的默认镜像。
    • 如果配置了 Kafka.spec.kafka.image,则默认镜像会被覆盖。
警告

Cluster Operator 无法验证镜像是否实际包含预期版本的 Kafka 代理。请特别注意确保给定镜像与给定的 Kafka 版本对应。

7.1.3.2. 升级客户端的策略

升级客户端应用程序(包括 Kafka Connect 连接器)的最佳方法取决于您的具体情形。

使用应用程序时,需要以它们所理解的消息格式接收消息。您可以通过以下两种方式之一来确保这是这种情况:

  • 在升级任何制作者 之前,先 升级所有消费者的主题。
  • 通过将代理关闭消息置于较旧格式。

使用代理 down-conversion 会给代理带来额外的负载,因此在较长时间内依赖所有主题并不理想。要使代理以最佳方式运行,绝对不应是向下转换消息。

代理 down-conversion 可通过两种方式进行配置:

  • 主题级别 message.format.version 为单个主题配置它。
  • 代理级 log.message.format.version 是未配置主题级别 message.format.version 的主题的默认设置。

以新版本格式发布到主题的消息对消费者可见,因为代理在从生产者接收消息时执行 down-conversion,而不是当它们发送到消费者时。

您可以使用多个策略来升级客户端:

消费者第一
  1. 升级所有使用的应用程序。
  2. 将代理级别的 log.message.format.version 更改为新版本。
  3. 升级所有生成的应用程序。

    此策略非常简单,可避免任何代理停机转换。但是,它假定贵组织中的所有消费者可以协调升级,而且不适用于消费者和生产者的应用程序。如果升级的客户端出现问题,新格式消息可能会添加到消息日志中,因此您无法恢复到以前的使用者版本。

每个主题消费者首先

对于每个主题:

  1. 升级所有使用的应用程序。
  2. 将主题级 message.format.version 更改为新版本。
  3. 升级所有生成的应用程序。

    此策略可避免任何代理 down-conversion,并且意味着您可以逐个主题地进行。它不适用于同时作为同一主题的消费者和制作者的应用。再强调一下,如果升级的客户端出现问题,新格式消息可能会添加到消息日志中。

首先每个主题消费者,进行向下转换

对于每个主题:

  1. 将主题级 message.format.version 更改为旧版本(或依赖主题默认到代理级别的 log.message.format.version)。
  2. 升级所有使用和生产的应用程序。
  3. 验证升级的应用是否正常工作。
  4. 将主题级 message.format.version 更改为新版本。

    这个策略需要代理 down-conversion,但代理的负载会最小化,因为一次只需要单个主题(或小组主题)需要它。它还适用于同时是同一主题的消费者和制作者的应用。这种方法可确保升级的生产者和消费者在提交使用新的消息格式版本之前正常工作。

    这种方法的主要缺点是,在含有许多主题和应用的集群中进行管理可能比较复杂。

还可以通过其他策略来升级客户端应用。

注意

也可以应用多个策略。例如,在前几个应用程序和主题中,"每个主题消费者首先使用",可以使用向下转换"策略"。如果这已被证明是成功的,那么更高效的策略可被视为可以接受采用。

7.1.3.3. 升级 Kafka 代理和客户端应用程序

这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。

与您当前的 Kafka 版本相比,新版本可能支持更高的 日志消息格式版本 或代理协议版本,或两者。如果需要,请按照步骤升级这些版本。如需更多信息,请参阅 第 7.1.1 节 “Kafka 版本”

您还应该选择 升级客户端的策略。Kafka 客户端在此流程的第 6 步中升级。

先决条件

要升级 Kafka 资源,请检查:

  • 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
  • Kafka.spec.kafka.config 不包含 新 Kafka 版本中不支持的选项。

步骤

  1. 更新 Kafka 集群配置:

    oc edit kafka my-cluster
  2. 如果配置了,请确保 Kafka.spec.kafka.config 的 log.message.format .version 和 inter.broker.protocol.version 设置为 当前 Kafka 版本的默认值。

    例如,如果从 Kafka 版本 2.5.0 升级到 2.6.0:

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.5.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.5"
          # ...

    如果没有配置 log.message.format .version 和 inter.broker.protocol.version,AMQ Streams 会在下一步中将这些版本更新至 Kafka 版本后自动将这些版本更新至当前的默认值。

    注意

    log.message.format.versioninter.broker.protocol.version 的值必须是字符串,以防止它们被解释为浮动点数。

  3. 更改 Kafka.spec.kafka.version 以指定新的 Kafka 版本;保留 log.message.format.versioninter.broker.protocol.version,默认为 当前 Kafka 版本。

    注意

    更改 kafka.version 可确保集群中的所有代理都将升级为使用新的代理二进制文件。在此过程中,一些代理使用旧二进制文件,另一些则已升级至新二进制文件。不要更改 inter.broker.protocol.version,确保代理可以在升级过程中继续相互通信。

    例如,如果从 Kafka 2.5.0 升级到 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0 1
        config:
          log.message.format.version: "2.5" 2
          inter.broker.protocol.version: "2.5" 3
          # ...
    1
    Kafka 版本已改为新版本。
    2
    消息格式版本保持不变。
    3
    代理协议版本没有改变。
    警告

    如果新 Kafka 版本的 inter.broker.protocol.version 发生变化,则无法降级 Kafka。broker 协议版本决定了代理存储的持久元数据的模式,包括写入到 __consumer_offset 的消息。降级集群将不会理解消息。

  4. 如果在 Kafka 自定义资源(在 Kafka .spec.kafka.image 中)中定义了 Kafka 集群 的镜像,请将镜像 更新为指向具有新 Kafka 版本的容器镜像。

    请参阅 Kafka 版本和镜像映射

  5. 保存并退出编辑器,然后等待滚动更新完成。

    通过观察 pod 状态转换来检查滚动更新的进度:

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    滚动更新可确保每个 pod 在新版本的 Kafka 中使用代理二进制文件。

  6. 根据您选择的 升级客户端策略,升级所有客户端应用以使用新版本的客户端二进制文件。

    如果需要,将 Kafka Connect 和 MirrorMaker 的 version 属性设置为 Kafka 的新版本:

    1. 对于 Kafka Connect,更新 KafkaConnect.spec.version
    2. 对于 MirrorMaker,更新 KafkaMirrorMaker.spec.version
    3. 对于 MirrorMaker 2.0,更新 KafkaMirrorMaker2.spec.version
  7. 如果配置,更新 Kafka 资源,以使用新的 inter.broker.protocol.version 版本。否则,请转至第 9 步。

    例如,如果升级到 Kafka 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.5"
          inter.broker.protocol.version: "2.6"
          # ...
  8. 等待 Cluster Operator 更新集群。
  9. 如果配置,更新 Kafka 资源,以使用新的 log.message.format.version 版本。否则,请转至第 10 步。

    例如,如果升级到 Kafka 2.6.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.6.0
        config:
          log.message.format.version: "2.6"
          inter.broker.protocol.version: "2.6"
          # ...
  10. 等待 Cluster Operator 更新集群。

    • Kafka 集群和客户端现在使用新的 Kafka 版本。
    • 代理配置为使用 Kafka 的新版本的 Inter-broker 协议版本和消息格式发送消息。

在 Kafka 升级后,如果需要,您可以:

7.1.3.4. 更新监听程序配置

AMQ Streams 提供 GenericKafkaListener 模式,用于在 Kafka 资源 中配置 Kafka 侦听器。

GenericKafkaListener 取代 KafkaListeners 模式,后者已被弃用。

使用 GenericKafkaListener 模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。监听器 配置被定义为数组,但已弃用的格式也受到支持。

对于 OpenShift 集群内的客户端,您可以创建 普通 (无需加密)或 tls 内部 侦听器。

对于 OpenShift 集群外的客户端,您可以创建 外部 侦听器并指定连接机制,可以是 节点端口负载均衡器、 入口路由

KafkaListeners 模式将子属性用于 纯文本tls 和外部 监听程序,以及每个节点的固定端口。升级 Kafka 后,您可以将使用 KafkaListeners 模式配置的监听程序转换为 GenericKafkaListener 模式的格式。

例如,如果您当前在 Kafka 配置中使用以下配置:

旧监听程序配置

listeners:
  plain:
    # ...
  tls:
    # ...
  external:
    type: loadbalancer
    # ...

使用以下方法将监听程序转换为新格式:

新的监听程序配置

listeners:
  #...
  - name: plain
    port: 9092
    type: internal
    tls: false 1
  - name: tls
    port: 9093
    type: internal
    tls: true
  - name: external
    port: 9094
    type: EXTERNAL-LISTENER-TYPE 2
    tls: true

1
现在,所有监听器都需要 TLS 属性。
2
选项: 入口负载均衡器、 节点端口路由

确保使用显示 的确切 名称和端口号。

对于任何其他 配置覆盖 使用旧格式的属性,您需要将它们更新为新格式。

对监听程序 配置 进行了更改:

  • 覆盖configuration 部分合并
  • dnsAnnotations 已重命名为 注解
  • preferredAddressType 被重命名为 preferredNodePortAddressType
  • 地址 已被重命名为 alternativesNames
  • LoadBalancerSourceRanges 和 external TrafficPolicy 移至现在已弃用 的模板中的监听程序配置

例如,这个配置:

旧的其他监听程序配置

listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          #...

更改:

新的附加监听程序配置

listeners:
    #...
  - name: external
    port: 9094
    type:loadbalancer
    tls: true
    authentication:
      type: tls
    configuration:
      bootstrap:
        annotations:
          #...

重要

新监听器配置中显示的名称和端口号 必须 用于向后兼容。使用任何其他值将导致 Kafka 侦听器和 OpenShift 服务重命名。

有关各种侦听器可用的配置选项的更多信息,请参阅 GenericKafkaListener 模式参考

7.1.3.5. 升级消费者和 Kafka Streams 应用程序以合作重新平衡

您可以升级 Kafka 用户和 Kafka Streams 应用程序,以使用 增量合作重新平衡 协议进行分区重新平衡,而不是默认的预先 重新平衡 协议。在 Kafka 2.4.0 中添加了新协议。

消费者将分区分配保持在合作重新平衡状态,并仅在流程结束时予以撤销(如果需要)来实现平衡集群。这可减少使用者组或 Kafka Streams 应用程序的不可用。

注意

升级到增量合作重新平衡协议是可选的。预先重新平衡协议仍然受到支持。

先决条件

步骤

要升级 Kafka 消费者以使用增量合作重新平衡协议:

  1. 将 Kafka client .jar 文件替换为新版本。
  2. 在使用者配置中,将 合作粘滞 附加到 partition.assignment.strategy。例如,如果设定了 范围 策略,请将配置更改为 范围,协作粘性
  3. 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
  4. 通过从使用者配置中删除先前的 partition.assignment.strategy 来 重新配置组中的每个消费者,仅保留 协作式 策略。
  5. 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。

升级 Kafka Streams 应用程序以使用增量合作重新平衡协议:

  1. 将 Kafka Streams .jar 文件替换为新版本。
  2. 在 Kafka Streams 配置中,将 upgrade.from 配置参数设置为您要从中升级的 Kafka 版本(例如 2.3)。
  3. 依次重新启动每个流处理器(节点)。
  4. 从 Kafka Streams 配置中删除 upgrade.from 配置参数。
  5. 依次重新启动组中的每个消费者。

其他资源

7.2. AMQ Streams 资源升级

kafka.strimzi.io/v1alpha1 API 版本已弃用,用于以下 AMQ Streams 资源:

  • kafka
  • KafkaConnect
  • KafkaConnectS2I
  • KafkaMirrorMaker
  • KafkaTopic
  • KafkaUser

更新这些资源,以使用 kafka.strimzi.io/v1beta1 API 版本。

本节论述了资源的升级步骤。

重要

资源升级 必须在 升级 Cluster Operator 后执行,以便 Cluster Operator 可以了解资源。

如果资源升级不起作用,该怎么办?

如果升级不生效,协调日志中会给出一个警告信息,表示在 apiVersion 更新前无法更新资源。

要触发更新,请对自定义资源进行任意更改,如添加注解。

注解示例:

metadata:
  # ...
  annotations:
    upgrade: "Upgraded to kafka.strimzi.io/v1beta1"

以下流程描述了更新特定资源以使用 kafka.strimzi.io/v1beta1 API 版本的步骤:

7.2.1. 升级 Kafka 资源

先决条件

  • 支持 v1beta1 API 版本的一个 Cluster Operator 已启动且正在运行。

步骤

对部署中的每个 Kafka 资源执行以下步骤。

  1. 更新编辑器中的 Kafka 资源。

    oc edit kafka my-cluster
  2. 替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta1
  3. 如果 Kafka 资源有:

    Kafka.spec.topicOperator

    替换为:

    Kafka.spec.entityOperator.topicOperator

    例如,替换:

    spec:
      # ...
      topicOperator: {}

    使用:

    spec:
      # ...
      entityOperator:
        topicOperator: {}
  4. 如果存在,移动:

    Kafka.spec.entityOperator.affinity
    Kafka.spec.entityOperator.tolerations

    改为:

    Kafka.spec.entityOperator.template.pod.affinity
    Kafka.spec.entityOperator.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      entityOperator:
        affinity {}
        tolerations {}

    改为:

    spec:
      # ...
      entityOperator:
        template:
          pod:
            affinity {}
            tolerations {}
  5. 如果存在,移动:

    Kafka.spec.kafka.affinity
    Kafka.spec.kafka.tolerations

    改为:

    Kafka.spec.kafka.template.pod.affinity
    Kafka.spec.kafka.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      kafka:
        affinity {}
        tolerations {}

    改为:

    spec:
      # ...
      kafka:
        template:
          pod:
            affinity {}
            tolerations {}
  6. 如果存在,移动:

    Kafka.spec.zookeeper.affinity
    Kafka.spec.zookeeper.tolerations

    改为:

    Kafka.spec.zookeeper.template.pod.affinity
    Kafka.spec.zookeeper.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      zookeeper:
        affinity {}
        tolerations {}

    改为:

    spec:
      # ...
      zookeeper:
        template:
          pod:
            affinity {}
            tolerations {}
  7. 保存文件,退出编辑器并等待更新的资源得到协调。

7.2.2. 升级 Kafka Connect 资源

先决条件

  • 支持 v1beta1 API 版本的一个 Cluster Operator 已启动且正在运行。

步骤

对部署中的每个 KafkaConnect 资源执行以下步骤。

  1. 更新编辑器中的 KafkaConnect 资源。

    oc edit kafkaconnect my-connect
  2. 替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta1
  3. 如果存在,移动:

    KafkaConnect.spec.affinity
    KafkaConnect.spec.tolerations

    改为:

    KafkaConnect.spec.template.pod.affinity
    KafkaConnect.spec.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      affinity {}
      tolerations {}

    改为:

    spec:
      # ...
      template:
        pod:
          affinity {}
          tolerations {}
  4. 保存文件,退出编辑器并等待更新的资源得到协调。

7.2.3. 升级 Kafka Connect S2I 资源

先决条件

  • 支持 v1beta1 API 版本的一个 Cluster Operator 已启动且正在运行。

步骤

对部署中的每个 KafkaConnectS2I 资源执行以下步骤。

  1. 更新编辑器中的 KafkaConnectS2I 资源。

    oc edit kafkaconnects2i my-connect
  2. 替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta1
  3. 如果存在,移动:

    KafkaConnectS2I.spec.affinity
    KafkaConnectS2I.spec.tolerations

    改为:

    KafkaConnectS2I.spec.template.pod.affinity
    KafkaConnectS2I.spec.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      affinity {}
      tolerations {}

    改为:

    spec:
      # ...
      template:
        pod:
          affinity {}
          tolerations {}
  4. 保存文件,退出编辑器并等待更新的资源得到协调。

7.2.4. 升级 Kafka MirrorMaker 资源

先决条件

  • 支持 v1beta1 API 版本的一个 Cluster Operator 已启动且正在运行。

步骤

对部署中的每个 KafkaMirrorMaker 资源执行以下步骤。

  1. 更新编辑器 中的 KafkaMirrorMaker 资源。

    oc edit kafkamirrormaker my-connect
  2. 替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta1
  3. 如果存在,移动:

    KafkaConnectMirrorMaker.spec.affinity
    KafkaConnectMirrorMaker.spec.tolerations

    改为:

    KafkaConnectMirrorMaker.spec.template.pod.affinity
    KafkaConnectMirrorMaker.spec.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      affinity {}
      tolerations {}

    改为:

    spec:
      # ...
      template:
        pod:
          affinity {}
          tolerations {}
  4. 保存文件,退出编辑器并等待更新的资源得到协调。

7.2.5. 升级 Kafka 主题资源

先决条件

  • 支持 v1beta1 API 版本的一个主题 Operator 已启动且正在运行。

步骤

对部署中的每个 KafkaTopic 资源执行以下步骤。

  1. 更新编辑器中的 KafkaTopic 资源。

    oc edit kafkatopic my-topic
  2. 替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta1
  3. 保存文件,退出编辑器并等待更新的资源得到协调。

7.2.6. 升级 Kafka 用户资源

先决条件

  • 支持 v1beta1 API 版本的用户 Operator 已启动且正在运行。

步骤

对部署中的每个 KafkaUser 资源执行以下步骤。

  1. 更新编辑器中的 KafkaUser 资源。

    oc edit kafkauser my-user
  2. 替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta1
  3. 保存文件,退出编辑器并等待更新的资源得到协调。

第 8 章 降级 AMQ Streams

如果您在升级到的 AMQ Streams 版本上遇到问题,您可以将安装恢复到之前的版本。

您可以执行降级:

  1. 将 Cluster Operator 恢复到以前的 AMQ Streams 版本。

  2. 将所有 Kafka 代理和客户端应用程序降级到以前的 Kafka 版本。

如果之前版本的 AMQ Streams 不支持您使用的 Kafka 版本,只要附加至消息的日志消息格式版本匹配,您还可以降级 Kafka。

8.1. 将 Cluster Operator 降级为以前的版本

如果您遇到 AMQ Streams 的问题,可以恢复您的安装。

此流程描述了如何将 Cluster Operator 部署降级到以前的版本。

先决条件

步骤

  1. 记录对现有 Cluster Operator 资源所做的任何配置更改(在 /install/cluster-operator 目录中)。任何更改都将被之前版本的 Cluster Operator 覆盖
  2. 恢复自定义资源,以反映您要降级到的 AMQ Streams 版本支持的配置选项。
  3. 更新 Cluster Operator。

    1. 根据 Cluster Operator 运行的命名空间修改之前版本的安装文件。

      在 Linux 中,使用:

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      在 MacOS 中,使用:

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. 如果在现有 Cluster Operator Deployment 中修改了一个或多个环境变量,请编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件来使用这些环境变量。
  4. 当您有更新的配置时,请将其与其它安装资源一起部署:

    oc apply -f install/cluster-operator

    等待滚动更新完成。

  5. 获取 Kafka pod 的镜像以确保降级成功:

    oc get pod my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,NEW -STREAMS-VERSION-kafka-CURRENT-KAFKA-VERSION

您的 Cluster Operator 已降级到以前的版本。

8.2. 降级 Kafka

Kafka 版本降级由 Cluster Operator 执行。

8.2.1. 降级的 Kafka 版本兼容性

Kafka 降级依赖于兼容的当前和目标 Kafka 版本,以及记录消息的状态。

如果该版本不支持集群中 曾使用的任何 inter.broker.protocol.version 设置,或者已将消息添加到使用 较新的 log.message.format.version 的消息日志中,则无法恢复到以前的 Kafka 版本。

inter.broker.protocol.version 决定代理用于存储持久元数据的模式,例如写入到 __consumer_offsets 的消息的 schema。如果您降级到不理解之前在集群中使用的 inter.broker.protocol.version 的 Kafka 版本,代理会遇到它无法理解的数据。

如果 Kafka 的目标降级版本有:

  • 当前版本相同的 log.message.format.version,通过对代理执行一次滚动重启来降级 Cluster Operator。
  • 只有正在运行的集群 始终将 log.message.format.version 设置为由降级版本使用的版本,才可 使用不同的 log.message.format.version。这通常是在 log.message.format.version 改变前中止升级过程的情况。在这种情况下,降级需要:

    • 如果两个版本的检查程序协议不同,则代理的两个滚动重启
    • 单个滚动重启(如果相同)

如果新版本曾使用过上一版本不支持log.message.format.version,则无法进行降级,包括当使用了 log.message.format.version 的默认值时。例如,这个资源可以降级为 Kafka 版本 2.5.0,因为 log.message.format.version 尚未更改:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
spec:
  # ...
  kafka:
    version: 2.6.0
    config:
      log.message.format.version: "2.5"
      # ...

如果 log.message.format.version 设置为 "2.6" 或没有值(因此 参数使用 2.6.0 代理 2.6 的默认值),则无法进行降级。

8.2.2. 降级 Kafka 代理和客户端应用程序

此流程描述了如何将 AMQ Streams Kafka 集群降级为 Kafka 的较低(以前的)版本,如从 2.6.0 降级到 2.5.0。

先决条件

对于要降级的 Kafka 资源,检查:

  • 重要:Kafka 版本兼容性
  • 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
  • Kafka.spec.kafka.config 不包含 Kafka 版本不支持的选项。
  • Kafka.spec.kafka.config 有一个 log.message.format.versioninter.broker.protocol.version,它被降级到的 Kafka 版本支持。

步骤

  1. 根据需要更新编辑器中的 Kafka 集群配置。

    oc edit kafka my-cluster
  2. 更改 Kafka.spec.kafka.version 以指定上一版本。

    例如,如果从 Kafka 2.6.0 降级到 2.5.0:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.5.0 1
        config:
          log.message.format.version: "2.5" 2
          inter.broker.protocol.version: "2.5" 3
          # ...
    1
    Kafka 版本已改为以前的版本。
    2
    消息格式版本保持不变。
    3
    代理协议版本没有改变。
    注意

    您必须将 log.message.format.versioninter.broker.protocol.version 的值格式化为字符串,以防止将其解释为浮动点数。

  3. 如果 Kafka 版本的镜像与 Cluster Operator 的 STRIMZI_KAFKA_IMAGES 中定义的镜像不同,请更新 Kafka.spec.kafka.image

    请查看 第 7.1.3.1 节 “Kafka 版本和镜像映射”

  4. 保存并退出编辑器,然后等待滚动更新完成。

    检查日志中的更新,或者查看 pod 状态转换:

    oc logs -f CLUSTER-OPERATOR-POD-NAME | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
    oc get pod -w

    检查 Cluster Operator 日志中的 INFO 级别信息:

    Reconciliation #NUM(watch) Kafka(NAMESPACE/NAME): Kafka version downgrade from FROM-VERSION to TO-VERSION, phase 1 of 1 completed
  5. 降级所有客户端应用程序(使用者)以使用先前版本的客户端二进制文件。

    Kafka 集群和客户端现在使用以前的 Kafka 版本。

附录 A. 使用您的订阅

AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 My Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 Zip 和 Tar 文件

要访问 zip 或 tar 文件,请使用客户门户查找要下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads
  2. 查找 JBOSS INTEGRATION 类别中 的红帽 AMQ Streams 条目。
  3. 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
  4. 单击组件的 Download 链接。

修订了 2022 年 2 月 13 日 15:26:40 +1000