在 OpenShift 上部署和升级 AMQ 流

Red Hat AMQ 2021.Q3

用于 OpenShift Container Platform 上的 AMQ Streams 1.8

摘要

本指南提供有关部署和升级 AMQ 流的说明

使开源包含更多

红帽承诺替换我们的代码、文档和网页属性中存在问题的语言。我们从这四个术语开始: master、slave、blacklist 和 whitelist。这些更改将在即将发行的几个发行本中逐渐实施。如需了解更多详细信息,请参阅 CTO Chris Wright 信息

第 1 章 部署概述

AMQ Streams 简化了在 OpenShift 集群中运行 Apache Kafka 的过程。

本指南提供有关用于部署和升级 AMQ Streams 的所有选项的说明,描述了部署的内容,以及在 OpenShift 集群中运行 Apache Kafka 所需的部署顺序。

除了说明部署步骤外,本指南还提供了部署前和部署后的说明,以准备和验证部署。介绍的其他部署选项包括引入指标的步骤。为 AMQ Streams 和 Kafka 升级提供了升级说明。

AMQ Streams 旨在处理所有类型的 OpenShift 集群,不论分发如何,从公共云和私有云到用于开发的本地部署。

1.1. AMQ 流如何支持 Kafka

AMQ Streams 提供容器镜像和 Operator,以便在 OpenShift 上运行 Kafka。AMQ Streams Operator 是运行 AMQ Streams 的基础。AMQ Streams 提供的 Operator 是专门构建的,具有可有效管理 Kafka 的专业操作知识。

Operator 简化了以下流程:

  • 部署和运行 Kafka 集群
  • 部署和运行 Kafka 组件
  • 配置对 Kafka 的访问
  • 保护对 Kafka 的访问
  • 升级 Kafka
  • 管理代理
  • 创建和管理主题
  • 创建和管理用户

1.2. AMQ Streams Operator

AMQ Streams 支持使用 Operator 的 Kafka 来部署和管理 Kafka 到 OpenShift 的组件和依赖项。

Operator 是一种打包、部署和管理 OpenShift 应用的方法。AMQ Streams Operator 扩展 OpenShift 功能,自动执行与 Kafka 部署相关的常见复杂任务。通过在代码中了解 Kafka 操作,Kafka 管理任务可以简化,无需人工干预。

Operator

AMQ Streams 提供 Operator 来管理在 OpenShift 集群中运行的 Kafka 集群。

Cluster Operator
部署和管理 Apache Kafka 集群、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter 和 Entity Operator
实体 Operator
由主题 Operator 和 User Operator 组成
主题 Operator
管理 Kafka 主题
User Operator
管理 Kafka 用户

Cluster Operator 可以与 Kafka 集群同时部署 Topic Operator 和 User Operator 作为 Entity Operator 配置的一部分。

AMQ Streams 架构中的 Operator

Operators within the AMQ Streams architecture

1.2.1. Cluster Operator

AMQ Streams 使用 Cluster Operator 来部署和管理集群:

  • Kafka(包括 ZooKeeper、实体 Operator、Kafka Exporter 和 Cruise Control)
  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

自定义资源用于部署集群。

例如,部署 Kafka 集群:

  • OpenShift 集群中创建了带有集群配置的 Kafka 资源。
  • Cluster Operator 根据 Kafka 资源中声明的内容部署对应的 Kafka 集群。

Cluster Operator 也可以部署(通过配置 Kafka 资源):

  • 通过 KafkaTopic 自定义资源提供 operator 风格主题管理的主题 Operator
  • 一个 User Operator,通过 KafkaUser 自定义资源提供 operator 风格的用户管理

部署的 Entity Operator 中的 Topic Operator 和 User Operator 功能。

Cluster Operator 的架构示例

The Cluster Operator creates and deploys Kafka and ZooKeeper clusters

1.2.2. 主题 Operator

Topic Operator 提供了通过 OpenShift 资源管理 Kafka 集群中主题的方法。

Topic Operator 的架构示例

The Topic Operator manages topics for a Kafka cluster via KafkaTopic resources

Topic Operator 的角色是保留一组 KafkaTopic OpenShift 资源,描述与对应的 Kafka 主题同步中的 Kafka 主题。

特别是,如果 KafkaTopic 是:

  • 创建,主题 Operator 会创建该主题
  • 删除的,主题 Operator 会删除该主题
  • 更改,主题 Operator 更新该主题

在另一个方向上工作,如果一个主题是:

  • 在 Kafka 集群中创建,Operator 会创建一个 KafkaTopic
  • 从 Kafka 集群中删除,Operator 会删除 KafkaTopic
  • 在 Kafka 集群中更改,Operator 会更新 KafkaTopic

这可让您声明 KafkaTopic 作为应用程序部署的一部分,Topic Operator 将为您创建该主题。您的应用程序只需要处理从所需主题中产生或使用的内容。

Topic Operator 在主题 存储中维护每个主题 的信息,它与来自 Kafka 主题或 OpenShift KafkaTopic 自定义资源的更新持续同步。应用到本地内存中主题存储的操作的更新会保留在磁盘上的备份主题存储中。如果某个主题被重新配置或重新分配给其他代理,KafkaTopic 将始终保持最新状态。

1.2.3. User Operator

User Operator 通过监视 Kafka 用户描述 Kafka 用户并确保在 Kafka 集群中正确配置了这些用户来管理 Kafka 集群的 Kafka 用户。

例如,如果 KafkaUser 是:

  • 创建,User Operator 创建它描述的用户
  • 删除时,User Operator 会删除它所描述的用户
  • 更改后,User Operator 会更新它所描述的用户

与主题 Operator 不同,User Operator 不会将 Kafka 集群的任何更改与 OpenShift 资源同步。Kafka 主题可以直接由 Kafka 中的应用程序创建,但用户不必与 User Operator 并行直接在 Kafka 集群中管理。

User Operator 允许您将 KafkaUser 资源声明为应用程序部署的一部分。您可以为用户指定身份验证和授权机制。您还可以配置 用户配额 来控制对 Kafka 资源的使用,例如,确保用户不会专利对代理的访问。

创建用户时,会在 Secret 中创建用户凭据。您的应用需要使用用户 及其凭据进行身份验证,并生成或使用消息。

除了管理用于身份验证的凭证外,User Operator 还通过在 KafkaUser 声明中包含用户访问权限描述来管理授权规则。

1.2.4. AMQ Streams Operator 中的功能门

您可以使用功能 门来启用和禁用 Operator 的一些功能

功能门在操作器配置中设置,具有三个成熟度阶段:alpha、beta 或 General Availability(GA)。

如需更多信息,请参阅 功能门

1.3. AMQ Streams 自定义资源

使用 AMQ Streams 将 Kafka 组件部署到 OpenShift 集群可通过应用自定义资源进行高度配置。自定义资源作为自定义资源定义(CRD)添加的 API 实例创建,以扩展 OpenShift 资源。

CRD 用作描述 OpenShift 集群中自定义资源的配置说明,并为部署中使用的每个 Kafka 组件以及用户和主题提供 AMQ Streams。CRD 和自定义资源定义为 YAML 文件。AMQ Streams 发行版提供了示例 YAML 文件。

CRD 还允许 AMQ Streams 资源受益于原生 OpenShift 功能,如 CLI 访问和配置验证。

1.3.1. AMQ Streams 自定义资源示例

CRD 需要在集群中进行一次性安装,以定义用于实例化和管理 AMQ Streams 特定资源的 schema。

通过安装 CRD 在集群中添加新的自定义资源类型后,您可以根据具体规格创建资源实例。

根据集群设置,安装通常需要集群管理员特权。

注意

管理自定义资源的访问权限仅限于 AMQ Streams 管理员。如需更多信息,请参阅 OpenShift 上部署和升级 AMQ 流指南中的指定 AMQ 流 管理员

CRD 在 OpenShift 集群中定义 一种 新型资源,如 kind:Kafka

Kubernetes API 服务器允许基于 kind 创建自定义资源,并且从 CRD 中了解如何在添加到 OpenShift 集群时验证和存储自定义资源。

警告

当 CRD 被删除时,该类型的自定义资源也会被删除。另外,自定义资源创建的资源也会被删除,如 pod 和 statefulsets。

每个 AMQ Streams 特定自定义资源都符合 CRD 为资源类型定义的 schema AMQ Streams 组件的自定义资源具有常见的配置属性,这些属性在 spec 下定义。

要了解 CRD 和自定义资源之间的关系,让我们看一下 Kafka 主题的 CRD 示例。

Kafka 主题 CRD

apiVersion: kafka.strimzi.io/v1beta2
kind: CustomResourceDefinition
metadata: 1
  name: kafkatopics.kafka.strimzi.io
  labels:
    app: strimzi
spec: 2
  group: kafka.strimzi.io
  versions:
    v1beta2
  scope: Namespaced
  names:
    # ...
    singular: kafkatopic
    plural: kafkatopics
    shortNames:
    - kt 3
  additionalPrinterColumns: 4
      # ...
  subresources:
    status: {} 5
  validation: 6
    openAPIV3Schema:
      properties:
        spec:
          type: object
          properties:
            partitions:
              type: integer
              minimum: 1
            replicas:
              type: integer
              minimum: 1
              maximum: 32767
      # ...

1
主题 CRD 的元数据、名称和标识 CRD 的标签。
2
此 CRD 的规格,包括组(域)名称、复数名称和支持的 schema 版本,用于 URL 以访问该主题的 API。其他名称用于标识 CLI 中的实例资源。例如,oc get kafkatopic my-topicoc get kafkatopics
3
可以在 CLI 命令中使用短名称。例如,oc get kt 可用作缩写而不是 oc get kafkatopic
4
在自定义资源上使用 get 命令时显示的信息。
5
CRD 的当前状态,如资源的 schema 引用 中所述。
6
OpenAPIV3Schema 验证提供了用于创建主题自定义资源的验证。例如,一个主题至少需要一个分区和一个副本。
注意

您可以识别 AMQ Streams 安装文件提供的 CRD YAML 文件,因为文件名包含索引号后跟 'Crd'。

以下是 KafkaTopic 自定义资源的对应示例。

Kafka 主题自定义资源

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic 1
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster 2
spec: 3
  partitions: 1
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
status:
  conditions: 4
    lastTransitionTime: "2019-08-20T11:37:00.706Z"
    status: "True"
    type: Ready
  observedGeneration: 1
  / ...

1
kindapiVersion 标识自定义资源是实例的 CRD。
2
个标签,仅适用于 KafkaTopicKafkaUser 资源,定义 Kafka 集群的名称(与 Kafka 资源的名称相同),该标签定义某个主题或用户所属的名称。
3
spec 显示主题的分区和副本数量,以及主题本身的配置参数。在本例中,消息保留周期保留在主题中,并指定日志的片段文件大小。
4
KafkaTopic 资源的状态条件。在 lastTransitionTime 中,类型 条件更改为 Ready

自定义资源可以通过平台 CLI 应用到集群。创建自定义资源时,它将使用与 Kubernetes API 内置资源相同的验证。

创建 KafkaTopic 自定义资源后,Tpic Operator 将获得通知,并在 AMQ Streams 中创建对应的 Kafka 主题。

1.4. AMQ 流安装方法

在 OpenShift 上安装 AMQ Streams 有两种方法:

安装方法描述支持的平台

安装工件(YAML 文件)

AMQ Streams 下载网站下载 amq-streams-x.z-ocp-install-examples.zip 文件。接下来,使用 oc 将 YAML 安装构件部署到 OpenShift 集群。首先,将 Cluster Operator 从 install/cluster-operator 部署到单个命名空间、多个命名空间或所有命名空间。

OpenShift 4.6 和 4.8

OperatorHub

使用 Red Hat Integration - OperatorHub 中的 AMQ Streams Operator 将 AMQ Streams 部署到单个命名空间或所有命名空间中。

OpenShift 4.6 和 4.8

为获得最大的灵活性,请选择安装工件方法。如果要使用 Web 控制台在标准配置中将 AMQ Streams 安装到 OpenShift 4.6 和 4.8,请选择 OperatorHub 方法。OperatorHub 还允许您利用自动更新。

这两种方法都会将 Cluster Operator 安装到 OpenShift 集群。从 Kafka 集群开始,使用相同的 方法来部署其他组件。如果使用安装工件方法,则会提供 YAML 文件示例。如果使用 OperatorHub,AMQ Streams Operator 会提供 Kafka 组件可从 OpenShift Web 控制台安装。

AMQ 流安装工件

AMQ Streams 安装工件包含各种 YAML 文件,可使用 oc 部署到 OpenShift 中以创建自定义资源,包括:

  • 部署
  • 自定义资源定义(CRD)
  • 角色和角色绑定
  • 服务帐户

为 Cluster Operator、Tpic Operator、User Operator 和 Strimzi Admin 角色提供了 YAML 安装文件。

OperatorHub

从 OpenShift 4 开始,Operator Lifecycle Manager(OLM) 可帮助集群管理员安装、更新和管理所有 Operator 以及在集群中运行的关联服务的生命周期。OLM 是 Operator Framework 的一部分,后者是一个开源工具包,旨在以有效、自动化且可扩展的方式管理 Kubernetes 原生应用程序(Operator)。

OperatorHub 是 OpenShift Web 控制台的一部分。集群管理员可以使用它来发现、安装和升级 Operator。Operator 可以从 OperatorHub 拉取(安装在 OpenShift 集群上)到单个命名空间或所有命名空间,并由 OLM 管理。然后,工程团队可以使用 OLM 在开发、测试和生产环境中独立管理软件。

Red Hat Integration - AMQ Streams Operator

Red Hat Integration - AMQ Streams Operator 可从 OperatorHub 安装。安装后,AMQ Streams Operator 将 Cluster Operator 部署到 OpenShift 集群,以及必要的 CRD 和基于角色的访问控制(RBAC)资源。您仍然需要从 OpenShift Web 控制台安装 Kafka 组件。

其它资源

使用安装工件安装 AMQ Streams:

从 OperatorHub 安装 AMQ Streams:

第 2 章 使用 AMQ Streams 部署哪些内容

通过 AMQ Streams 发行版为 OpenShift 部署提供了 Apache Kafka 组件。Kafka 组件通常作为集群运行,以实现可用性。

集成 Kafka 组件的典型部署可能包括:

  • 代理节点的 Kafka 集群
  • 复制 ZooKeeper 实例的 zookeeper 集群
  • Kafka Connect 集群用于外部数据连接
  • Kafka MirrorMaker 集群,在第二个集群中镜像 Kafka 集群
  • kafka Exporter 用于提取额外的 Kafka 指标数据以进行监控
  • Kafka Bridge 向 Kafka 集群发出基于 HTTP 的请求

并非所有组件都是必须的,但您至少需要 Kafka 和 ZooKeeper。有些组件可以在没有 Kafka 的情况下部署,如 MirrorMaker 或 Kafka Connect。

2.1. 部署顺序

部署到 OpenShift 集群所需的顺序如下:

  1. 部署 Cluster Operator 以管理 Kafka 集群
  2. 使用 ZooKeeper 集群部署 Kafka 集群,并在部署中包含 Topic Operator 和 User Operator
  3. (可选)部署:

    • 如果没有在 Kafka 集群中部署它们,则 Topic Operator 和 User Operator 独立
    • Kafka Connect
    • Kafka MirrorMaker
    • Kafka Bridge
    • 用于监控指标的组件

2.2. 其他部署配置选项

本指南中的部署步骤描述了使用 AMQ Streams 提供的示例安装 YAML 文件进行的部署。程序突出显示了任何重要的配置注意事项,但不描述所有可用的配置选项。

您可以使用自定义资源来优化部署。

部署 AMQ Streams 之前,您可能希望查看可用于 Kafka 组件的配置选项。如需有关通过自定义资源进行配置的更多信息,请参阅 OpenShift 上使用 AMQ Streams 中的 部署配置

2.2.1. 保护 Kafka

在部署时,Cluster Operator 会自动为集群中的数据加密和身份验证设置 TLS 证书。

AMQ Streams 提供用于 加密身份验证和 授权 的额外配置选项,如 OpenShift 指南中的使用 AMQ Streams 所述:

2.2.2. 监控您的部署

AMQ Streams 支持额外的部署选项来监控您的部署。

第 3 章 准备 AMQ Streams 部署

本节介绍如何准备 AMQ Streams 部署,描述:

注意

要运行本指南中的命令,您的集群用户必须有权管理基于角色的访问控制(RBAC)和 CRD。

3.1. 部署先决条件

要部署 AMQ Streams,请确保:

  • 提供了一个 OpenShift 4.6 和 4.8 集群。

    AMQ Streams 基于 AMQ Streams Strimzi 0.24.x。

  • 已安装并配置了 oc 命令行工具以连接到正在运行的集群。

3.2. 下载 AMQ Streams 发行版工件

要安装 AMQ Streams,请从 AMQ Streams 下载网站从 amq-streams-<version>-ocp-install-examples.zip 文件中下载 并提取发行工件。

AMQ Streams 发行版工件包括示例 YAML 文件,可帮助您将 AMQ Streams 组件部署到 OpenShift、执行常用操作和配置 Kafka 集群。

使用 oc 从下载的 ZIP 文件的 install/cluster-operator 文件夹部署 Cluster Operator。有关部署和配置 Cluster Operator 的更多信息,请参阅 第 5.1.1 节 “部署 Cluster Operator”

另外,如果要使用 Topic 和 User Operator 的独立安装,带有不由 AMQ Streams Cluster Operator 管理的 Kafka 集群,您可以从 install/topic-operator 和 install/user-operator 文件夹中部署它们。

注意

另外,AMQ Streams 容器镜像也可通过 红帽生态系统目录 提供。但是,我们建议您使用提供的 YAML 文件来部署 AMQ Streams。

3.3. 使用 Kafka Connect S2I 的容器 registry 进行身份验证

您需要使用红帽容器注册表(registry.redhat.io)配置身份验证,然后才能 使用 OpenShift 构建和源至镜像(S2I)创建容器镜像

容器 registry 用于将 AMQ Streams 容器镜像存储在 红帽生态系统目录 上。Catalog 包含支持 S2I 的 Kafka Connect 构建器镜像。OpenShift 构建提取此构建器镜像以及源代码和二进制文件,并使用它来构建新的容器镜像。

注意

只有在使用 Kafka Connect S2I 时,才需要使用红帽容器 registry 进行身份验证。其他 AMQ Streams 组件不需要它。

先决条件

  • 集群管理员对 OpenShift Container Platform 集群的访问权限。
  • 红帽客户门户帐户的登录详情。请参阅 附录 A, 使用您的订阅

流程

  1. 如果需要,以管理员身份登录 OpenShift 集群:

    oc login --user system:admin --token=my-token --server=https://my-cluster.example.com:6443
  2. 打开包含 Kafka Connect S2I 集群的项目:

    oc project CLUSTER-NAME
    注意
  3. 使用您的红帽客户门户网站帐户创建一个 docker-registry secret,将 PULL-SECRET-NAME 替换为要创建的 secret 名称:

    oc create secret docker-registry PULL-SECRET-NAME \
     --docker-server=registry.redhat.io \
     --docker-username=CUSTOMER-PORTAL-USERNAME \
     --docker-password=CUSTOMER-PORTAL-PASSWORD \
     --docker-email=EMAIL-ADDRESS

    您应该看到以下输出:

    secret/PULL-SECRET-NAME created
    重要

    您必须在每个将对 registry.redhat.io 进行身份验证的 OpenShift 项目中创建此 docker-registry 机密。

  4. 将机密链接到您的服务帐户,以使用机密拉取镜像。服务帐户名称必须与 OpenShift 容器集使用的名称匹配。

    oc secrets link SERVICE-ACCOUNT-NAME PULL-SECRET-NAME --for=pull

    例如,使用 default 服务帐户和名为 my-secret 的 secret:

    oc secrets link default my-secret --for=pull
  5. 将 secret 链接到 builder 服务帐户,以使用 secret 推送和拉取构建镜像:

    oc secrets link builder PULL-SECRET-NAME
    注意

    如果您不想使用您的红帽用户名和密码来创建 pull secret,您可以使用 registry 服务帐户创建身份验证令牌。

3.4. 将容器镜像推送到您自己的 registry

红帽生态系统目录中提供了 AMQ Streams 的容器镜像。AMQ Streams 提供的安装 YAML 文件将直接从 红帽生态系统目录中拉取镜像。

如果您无法访问 红帽生态系统目录,或者想要使用自己的容器存储库:

  1. 拉取此处 列出的所有 容器镜像
  2. 将它们推送到您自己的 registry
  3. 更新安装 YAML 文件中的镜像名称
注意

该发行版本支持的每个 Kafka 版本都有一个单独的镜像。

容器镜像命名空间/存储库描述

kafka

  • registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0
  • registry.redhat.io/amq7/amq-streams-kafka-27-rhel7:1.8.0

用于运行 Kafka 的 AMQ Streams 镜像,包括:

  • Kafka Broker
  • Kafka Connect / S2I
  • Kafka Mirror Maker
  • ZooKeeper
  • TLS Sidecars

Operator

  • registry.redhat.io/amq7/amq-streams-rhel7-operator:1.8.0

用于运行操作器的 AMQ Streams 镜像:

  • Cluster Operator
  • 主题 Operator
  • User Operator
  • Kafka Initializer

Kafka Bridge

  • registry.redhat.io/amq7/amq-streams-bridge-rhel7:1.8.0

用于运行 AMQ Streams Kafka Bridge 的 AMQ Streams 镜像

3.5. 指定 AMQ Streams 管理员

AMQ Streams 提供用于配置部署的自定义资源。默认情况下,查看、创建、编辑和删除这些资源的权限仅限于 OpenShift 集群管理员。AMQ Streams 提供了两个集群角色,可用于将这些权限分配给其他用户:

  • strimzi-view 允许用户查看和列出 AMQ Streams 资源。
  • strimzi-admin 允许用户创建、编辑或删除 AMQ Streams 资源。

安装这些角色时,它们将这些权限自动聚合(添加)到默认的 OpenShift 集群角色。strimzi-view 聚合到 view 角色,strimzi-admin 聚合到 editadmin 角色。由于聚合,您可能不需要将这些角色分配给已拥有类似权限的用户。

以下流程演示了如何分配 astrim zi-admin 角色,允许非集群管理员管理 AMQ Streams 资源。

系统管理员可以在部署 Cluster Operator 后指定 AMQ Streams 管理员。

先决条件

  • AMQ Streams 自定义资源定义(CRD)和基于角色的访问控制(RBAC)资源已使用 Cluster Operator 部署, 以管理 CRD。

流程

  1. 在 OpenShift 中创建 strimzi-viewstrimzi-admin 集群角色。

    oc create -f install/strimzi-admin
  2. 如果需要,为需要它们的用户分配访问权限的角色。

    oc create clusterrolebinding strimzi-admin --clusterrole=strimzi-admin --user=user1 --user=user2

第 4 章 从 OperatorHub 部署 AMQ Streams

使用 Red Hat Integration - AMQ Streams Operator 部署来自 OperatorHub 的 AMQ Streams。

本节中的步骤显示如何:

4.1. 使用 Red Hat Integration Operator 安装 AMQ Streams Operator

Red Hat Integration Operator 允许您选择并安装管理 Red Hat Integration 组件的 Operator。如果您有多个红帽集成订阅,您可以使用 Red Hat Integration Operator 安装和更新 AMQ Streams Operator 以及所有订阅的红帽集成组件的 Operator。

与 AMQ Streams Operator 一样,您可以使用 Operator Lifecycle Manager(OLM)从 OCP 控制台中的 OperatorHub 在 OpenShift Container Platform(OCP)集群上安装 Red Hat Integration Operator。

其它资源

有关安装和使用 Red Hat Integration Operator 的更多信息,请参阅 在 OpenShift 上安装 Red Hat Integration Operator

4.2. 从 OperatorHub 部署 AMQ Streams Operator

您可以通过从 OperatorHub 安装 AMQ Streams Operator,将 Cluster Operator 部署到 OpenShift 集群。

警告

请确定您使用适当的更新频道。如果您位于 OpenShift 支持的版本中,从默认 稳定 频道安装 AMQ Streams 是安全的。但是,如果您使用不支持的 OpenShift 版本,从 stable 频道安装 AMQ Streams 不安全,特别是在启用自动更新时,因为集群将会获得 OpenShift 发行版本不支持的新组件的自动更新。

先决条件

  • OpenShift 集群中启用了 Red Hat Operator OperatorSource。如果您在 OperatorHub 中看到 Red Hat Operator,则会启用正确的 OperatorSource。如需更多信息,请参阅 Operators 指南。
  • 安装需要具有足够特权的用户来安装来自 OperatorHub 的 Operator。

流程

  1. 在 OpenShift Web 控制台中,点 Operators > OperatorHub
  2. 在流和 消息传递 类别中搜索或浏览 AMQ Streams Operator。
  3. 点击 Red Hat Integration - AMQ Streams Operator 标题,然后在右侧的侧边栏中点 Install
  4. Create Operator Subscription 屏幕上,从以下安装和更新选项中进行选择:

    • 更新频道 :为 AMQ Streams Operator 选择更新频道。

      • (默认) s table 频道包含所有最新的更新和发行版本,包括主版本、次版本和微版本,这些版本被假设经过了良好的测试且稳定。
      • amq-streams-X. x 频道包含主要版本的次版本和微发行版本更新,其中 X 是主要发行版本号。
      • amq-streams-X.Y.x 频道包含次发行版本的微发行版本更新,其中 X 是主要发行版本号,Y 是副版本号。
    • Installation Mode :选择将 AMQ Streams Operator 安装到集群中的所有命名空间(默认选项)或特定命名空间。最好使用命名空间来分隔函数。建议您将特定命名空间专用于 Kafka 集群和其他 AMQ Streams 组件。
    • 批准策略 :默认情况下,Operator Lifecycle Manager(OLM)会自动将 AMQ Streams Operator 升级到最新的 AMQ Streams 版本。(可选)如果您想要手动批准以后的升级,请选择 Manual。如需更多信息,请参阅 OpenShift 文档中的 Operator 指南。
  5. Subscribe; AMQ Streams Operator 已安装到 OpenShift 集群。

    AMQ Streams Operator 将 Cluster Operator、CRD 和基于角色的访问控制(RBAC)资源部署到所选命名空间或所有命名空间中。

  6. Installed Operators 屏幕上,检查安装的进度。当 AMQ Streams Operator 的状态变为 InstallSucceeded 时,就可以使用它。

接下来,您可以使用 AMQ Streams Operator 部署 Kafka 组件,从 Kafka 集群开始。

4.3. 使用 AMQ Streams Operator 部署 Kafka 组件

在 Openshift Container Platform 上安装时,AMQ Streams Operator 使 Kafka 组件可从用户界面安装。

可用于安装的 Kafka 组件:

  • kafka
  • Kafka Connect
  • Kafka Connect Source to Image(S2I)
  • Kafka MirrorMaker
  • Kafka MirrorMaker 2
  • kafka 主题
  • Kafka 用户
  • Kafka Bridge
  • Kafka Connector
  • kafka 重新平衡

先决条件

流程

  1. 导航到 Installed Operators 并点击 Red Hat Integration - AMQ Streams Operator 以显示 Operator 详情 页面。
  2. Provided APIs,点您要安装的 Kafka 组件的 Create Instance

    每个组件的默认配置都封装在 CRD spec 属性中。

  3. (可选)在执行安装前,从 表单YAML 视图配置安装规格。
  4. 点击 Create 以开始安装所选组件。

    等待状态变为 Succeeded

第 5 章 使用安装工件部署 AMQ 流

作为使用 AMQ Streams Operator 部署 AMQ Streams Operator 的替代方案,您可以使用安装工件。为 部署 AMQ Streams 准备了环境,本节显示:

流程假定 OpenShift 集群可用并在运行。

AMQ Streams 基于 AMQ Streams Strimzi 0.24.x。本节论述了在 OpenShift 4.6 和 4.8 中部署 AMQ Streams 的步骤。

注意

要运行本指南中的命令,您的集群用户必须有权管理基于角色的访问控制(RBAC)和 CRD。

5.1. 创建 Kafka 集群

要使用 Cluster Operator 管理 Kafka 集群,您必须将其部署为 Kafka 资源。AMQ Streams 提供了用于执行此操作的示例部署文件。您可以使用这些文件来同时部署 Topic Operator 和 User Operator。

如果您还没有将 Kafka 集群部署为 Kafka 资源,则无法使用 Cluster Operator 管理它。例如,这适用于在 OpenShift 外部运行的 Kafka 集群。但是,您可以部署和使用 Topic Operator 和 User Operator 作为独立组件。

注意

Cluster Operator 可以监视 OpenShift 集群中的一个、多个或所有命名空间。Topic Operator 和 User Operator 在 Kafka 集群部署的单一命名空间中监视 KafkaTopics 和 KafkaUsers

使用 Topic Operator 和 User Operator 部署 Kafka 集群

如果要将 Topic Operator 和 User Operator 与由 AMQ Streams 管理的 Kafka 集群搭配使用,请执行这些部署步骤。

部署独立主题 Operator 和 User Operator

如果要将 Topic Operator 和 User Operator 与 不由 AMQ Streams 管理的 Kafka 集群搭配使用,请执行这些部署步骤。

5.1.1. 部署 Cluster Operator

Cluster Operator 负责在 OpenShift 集群中部署和管理 Apache Kafka 集群。

本节中的步骤显示:

5.1.1.1. 监视 Cluster Operator 部署的选项

当 Cluster Operator 运行时,它开始 监视 Kafka 资源的更新。

您可以选择部署 Cluster Operator 来监视 Kafka 资源:

  • 单个命名空间(包含 Cluster Operator 的同一命名空间)
  • 多个命名空间
  • 所有命名空间
注意

AMQ Streams 提供示例 YAML 文件,以简化部署过程。

Cluster Operator 会监视以下资源的变化:

  • Kafka 集群的 Kafka。
  • Kafka Connect 集群的 Kafka Connect。
  • 用于 Kafka Connect 集群的 KafkaConnectS2I,支持 Source2Image。
  • 用于在 Kafka Connect 集群中创建和管理连接器的 KafkaConnector
  • Kafka MirrorMaker 实例的 KafkaMirror Maker。
  • Kafka Bridge 实例的 KafkaBridge

在 OpenShift 集群中创建其中一个资源时,操作器从该资源获取集群描述,并通过创建必要的 OpenShift 资源(如 StatefulSets、Service 和 ConfigMap)来开始为资源创建新集群。

每次更新 Kafka 资源时,操作器都会对构成该资源的 OpenShift 资源执行相应的更新。

资源可以修补或删除,然后重新创建以便集群反映集群所需的状态。此操作可能会导致滚动更新,可能会导致服务中断。

删除资源时,操作器将取消部署集群并删除所有相关 OpenShift 资源。

5.1.1.2. 部署 Cluster Operator 以监控单个命名空间

此流程演示了如何部署 Cluster Operator 以监控 OpenShift 集群中单一命名空间中的 AMQ Streams 资源。

先决条件

  • 此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义 ResourceDefinition、 ClusterRole 和 ClusterRoleBinding。在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有 system:admin 等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。

流程

  1. 编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。

    例如,在此流程中,Cluster Operator 被安装到命名空间 my-cluster-operator-namespace 中。

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
  2. 部署 Cluster Operator:

    oc create -f install/cluster-operator -n my-cluster-operator-namespace
  3. 验证 Cluster Operator 是否已成功部署:

    oc get deployments

5.1.1.3. 部署 Cluster Operator 以监控多个命名空间

此流程演示了如何部署 Cluster Operator 以在 OpenShift 集群的多个命名空间中监控 AMQ Streams 资源。

先决条件

  • 此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义 ResourceDefinition、 ClusterRole 和 ClusterRoleBinding。在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有 system:admin 等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。

流程

  1. 编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。

    例如,在此流程中,Cluster Operator 被安装到命名空间 my-cluster-operator-namespace 中。

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
  2. 编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件,将 Cluster Operator 将监视的所有命名空间的列表添加到 STRIMZI_NAMESPACE 环境变量。

    例如,在此流程中,Cluster Operator 将监视命名空间 watched-namespace-1watched-namespace-2watched-namespace-3

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          serviceAccountName: strimzi-cluster-operator
          containers:
          - name: strimzi-cluster-operator
            image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.8.0
            imagePullPolicy: IfNotPresent
            env:
            - name: STRIMZI_NAMESPACE
              value: watched-namespace-1,watched-namespace-2,watched-namespace-3
  3. 对于列出的每个命名空间,安装 RoleBindings

    在本例中,我们将这些命令中的 watched-namespace 替换为上一步中列出的命名空间,为 watched-namespace-1、watched-namespace -2、watched-namespace -3 重复它们:

    oc create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n watched-namespace
    oc create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n watched-namespace
  4. 部署 Cluster Operator:

    oc create -f install/cluster-operator -n my-cluster-operator-namespace
  5. 验证 Cluster Operator 是否已成功部署:

    oc get deployments

5.1.1.4. 部署 Cluster Operator 以监视所有命名空间

此流程演示了如何部署 Cluster Operator 以监控 OpenShift 集群中所有命名空间中的 AMQ Streams 资源。

在这个模式下运行时,Cluster Operator 会在创建的任何新命名空间中自动管理集群。

先决条件

  • 此流程需要使用 OpenShift 用户帐户,该帐户可以创建自定义 ResourceDefinition、 ClusterRole 和 ClusterRoleBinding。在 OpenShift 集群中使用 Role Base Access Control(RBAC)通常意味着只有 system:admin 等 OpenShift 集群管理员才有创建、编辑和删除这些资源的权限。

流程

  1. 编辑 AMQ Streams 安装文件以使用 Cluster Operator 将安装到的命名空间。

    例如,在此流程中,Cluster Operator 被安装到命名空间 my-cluster-operator-namespace 中。

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
  2. 编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件,将 STRIMZI_NAMESPACE 环境变量的值设置为 *

    apiVersion: apps/v1
    kind: Deployment
    spec:
      # ...
      template:
        spec:
          # ...
          serviceAccountName: strimzi-cluster-operator
          containers:
          - name: strimzi-cluster-operator
            image: registry.redhat.io/amq7/amq-streams-rhel7-operator:1.8.0
            imagePullPolicy: IfNotPresent
            env:
            - name: STRIMZI_NAMESPACE
              value: "*"
            # ...
  3. 创建 ClusterRoleBinding, 将所有命名空间的集群范围访问权限授予 Cluster Operator。

    oc create clusterrolebinding strimzi-cluster-operator-namespaced --clusterrole=strimzi-cluster-operator-namespaced --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator
    oc create clusterrolebinding strimzi-cluster-operator-entity-operator-delegation --clusterrole=strimzi-entity-operator --serviceaccount my-cluster-operator-namespace:strimzi-cluster-operator

    my-cluster-operator-namespace 替换为您要将 Cluster Operator 安装到的命名空间。

  4. 将 Cluster Operator 部署到 OpenShift 集群。

    oc create -f install/cluster-operator -n my-cluster-operator-namespace
  5. 验证 Cluster Operator 是否已成功部署:

    oc get deployments

5.1.2. 部署 Kafka

Apache Kafka 是一个开源分布式发布-订阅消息传递系统,用于容错实时数据源。

本节中的步骤显示:

安装 Kafka 时,AMQ Streams 还会安装 ZooKeeper 集群,并添加必要的配置以将 Kafka 与 ZooKeeper 连接。

5.1.2.1. 部署 Kafka 集群

此流程演示了如何使用 Cluster Operator 将 Kafka 集群部署到 OpenShift 中。

部署使用 YAML 文件提供规格来创建 Kafka 资源。

AMQ Streams 提供了在 example /kafka/ 中部署所需的 YAML 文件示例

kafka-persistent.yaml
使用三个 ZooKeeper 和三个 Kafka 节点部署一个持久集群。
kafka-jbod.yaml
使用三个 ZooKeeper 和三个 Kafka 节点部署持久集群(每个集群使用多个持久性卷)。
kafka-persistent-single.yaml
使用单个 ZooKeeper 节点和单个 Kafka 节点部署持久集群。
kafka-ephemeral.yaml
使用三个 ZooKeeper 和三个 Kafka 节点部署一个临时集群。
kafka-ephemeral-single.yaml
部署具有三个 ZooKeeper 节点和单个 Kafka 节点的临时集群。

在这一流程中,我们使用 临时 和永久 Kafka 集群部署的示例:

临时集群
通常,临时(或临时) Kafka 集群适合用于开发和测试目的,不适用于生产环境。此部署使用 emptyDir 卷来存储代理信息(用于 ZooKeeper)和主题或分区(对于 Kafka)。使用 emptyDir 卷意味着其内容严格与 pod 生命周期相关,并在 pod 停机时被删除。
持久性集群
持久的 Kafka 集群使用 PersistentVolume 来存储 ZooKeeper 和 Kafka 数据。该 PersistentVolume 使用 PersistentVolumeClaim 来获取,使它独立于实际的 PersistentVolume 类型。例如,它可以在 Amazon AWS 部署中使用 Amazon EBS 卷,而不更改 YAML 文件。PersistentVolumeClaim 可以使用 StorageClass 来触发自动卷置备。

YAML 文件示例指定最新支持的 Kafka 版本,以及其支持的日志消息格式版本和broker 协议版本的配置。在 升级 Kafka 时,需要更新这些属性。

示例集群默认命名为 my-cluster。集群名称通过资源名称定义,在部署集群后无法更改。要在部署集群前更改集群名称,请在相关 YAML 文件中编辑 Kafka 资源的 Kafka.metadata.name 属性。

默认集群名称和指定的 Kafka 版本

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.8.0
    #...
    config:
      #...
      log.message.format.version: 2.8
      inter.broker.protocol.version: 2.8
  # ...

有关配置 Kafka 资源的更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 Kafka 集群配置

流程

  1. 创建和部署 临时或 持久 集群。

    对于开发或测试,您可能更喜欢使用临时集群。您可以随时使用持久性存储。

    • 创建和部署 临时 集群:

      oc apply -f examples/kafka/kafka-ephemeral.yaml
    • 创建和部署 持久 集群:

      oc apply -f examples/kafka/kafka-persistent.yaml
  2. 验证 Kafka 集群是否已成功部署:

    oc get deployments

5.1.2.2. 使用 Cluster Operator 部署主题 Operator

此流程描述了如何使用 Cluster Operator 部署 Topic Operator。

您可以将 Kafka 资源的 principal Operator 属性配置为包含 topicOperator。默认情况下,Tpic Operator 监视 Kafka 集群部署命名空间中的 KafkaTopics

如果要将 Topic Operator 与不由 AMQ Streams 管理的 Kafka 集群搭配使用,则必须将 Topic Operator 部署为独立组件

有关配置 实体Operator 和 topicOperator 属性的更多信息,请参阅 使用 AMQ Streams 中的 OpenShift 指南中配置 Entity Operator

流程

  1. 编辑 Kafka 资源的 principal Operator 属性,使其包含 topicOperator:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      #...
      entityOperator:
        topicOperator: {}
        userOperator: {}
  2. 使用 EntityTopicOperatorSpec 模式参考 中所述的属性配置 Topic Operator spec

    如果您希望所有属性都使用其默认值,请使用空对象({})。

  3. 创建或更新资源:

    使用 oc apply:

    oc apply -f <your-file>

5.1.2.3. 使用 Cluster Operator 部署 User Operator

此流程描述了如何使用 Cluster Operator 部署 User Operator。

您可以将 Kafka 资源的 principal Operator 属性配置为包含 userOperator。默认情况下,User Operator 在 Kafka 集群部署的命名空间中监视 KafkaUsers

如果要将 User Operator 与不由 AMQ Streams 管理的 Kafka 集群搭配使用,您必须将 User Operator 部署为独立组件

有关配置 实体Operator 和 userOperator 属性的更多信息,请参阅 使用 AMQ Streams 中的 OpenShift 指南中配置 Entity Operator

流程

  1. 编辑 Kafka 资源的 principal Operator 属性,使其包含 userOperator:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      #...
      entityOperator:
        topicOperator: {}
        userOperator: {}
  2. 使用 OpenShift 指南的使用 AMQ Streams 中的 EntityUserOperatorSpec schema 参考 中描述的属性配置 User Operator spec

    如果您希望所有属性都使用其默认值,请使用空对象({})。

  3. 创建或更新资源:

    oc apply -f <your-file>

5.1.3. AMQ Streams Operator 的替代独立部署选项

您可以执行 Topic Operator 和 User Operator 的独立部署,而不是使用 Cluster Operator 部署这些操作器。如果要将 Topic Operator 和 User Operator 与不由 Cluster Operator 管理的 Kafka 集群搭配使用,请考虑独立部署。

在独立部署中,Kafka 可以在 OpenShift 外部运行。例如,您可能在使用 Kafka 作为受管服务。您可以调整独立操作器的部署配置,使其与 Kafka 集群的地址匹配。

注意

在独立部署中,Tpic Operator 和 User Operator 不会 连接到 AMQ Streams Kafka 集群(即,使用 Cluster Operator 部署的Kafka 集群)。即使建立与集群的连接,Operator 也不会正常工作。

5.1.3.1. 部署独立主题 Operator

此流程演示了如何将 Topic Operator 部署为主题管理的独立组件。您可以将独立 Topic Operator 与不由 Cluster Operator 管理的 Kafka 集群搭配使用。

独立部署可以与任何 Kafka 集群一起操作。

提供了独立的部署文件。配置 05-Deployment-strimzi-topic-operator.yaml 部署文件,以添加使 Topic Operator 连接到 Kafka 集群的环境变量。

先决条件

  • 您正在为要连接的 Topic Operator 运行 Kafka 集群。

流程

  1. 编辑 install/topic-operator/05-Deployment-strimzi-topic-operator.yaml 独立部署文件中的 env 属性。

    standalone Topic Operator 部署配置示例

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: strimzi-topic-operator
      labels:
        app: strimzi
    spec:
      # ...
      template:
        # ...
        spec:
          # ...
          containers:
            - name: strimzi-topic-operator
              # ...
              env:
                - name: STRIMZI_NAMESPACE 1
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.namespace
                - name: STRIMZI_KAFKA_BOOTSTRAP_SERVERS 2
                  value: my-kafka-bootstrap-address:9092
                - name: STRIMZI_RESOURCE_LABELS 3
                  value: "strimzi.io/cluster=my-cluster"
                - name: STRIMZI_ZOOKEEPER_CONNECT 4
                  value: my-cluster-zookeeper-client:2181
                - name: STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS 5
                  value: "18000"
                - name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS 6
                  value: "120000"
                - name: STRIMZI_TOPIC_METADATA_MAX_ATTEMPTS 7
                  value: "6"
                - name: STRIMZI_LOG_LEVEL 8
                  value: INFO
                - name: STRIMZI_TLS_ENABLED 9
                  value: "false"
                - name: STRIMZI_JAVA_OPTS 10
                  value: "-Xmx=512M -Xms=256M"
                - name: STRIMZI_JAVA_SYSTEM_PROPERTIES 11
                  value: "-Djavax.net.debug=verbose -DpropertyName=value"

    1
    用于监视 KafkaTopic 资源的 Topic Operator 的 OpenShift 命名空间。指定 Kafka 集群的命名空间。
    2
    bootstrap 代理地址的主机和端口对,用于发现并连接到 Kafka 集群中的所有代理。使用逗号分隔列表来指定两个或三个代理地址(如果服务器停机)。
    3
    用于标识由 Topic Operator 管理的 KafkaTopic 资源的标签选择器。
    4
    用于连接 ZooKeeper 集群的主机和端口对。这必须与您的 Kafka 集群使用的 ZooKeeper 集群相同。
    5
    ZooKeeper 会话超时,以毫秒为单位。默认值为 18000( 18 秒)。
    6
    定期协调之间的间隔,以毫秒为单位。默认值为 120000( 2 分钟)。
    7
    从 Kafka 获取主题元数据的尝试数量。每次尝试之间的时间都定义为指数回退。如果因为分区或副本数量而需要更多时间,请考虑增加这个值。默认值为 6 个 尝试。
    8
    用于打印日志消息的级别。您可以将级别设置为 ERROR、WARN INGINFODEBUGTRACE
    9
    对与 Kafka 代理的加密通信启用 TLS 支持。
    10
    (可选)运行 Topic Operator 的 JVM 使用的 Java 选项。
    11
    (可选)为 Topic Operator 设置的调试(-D)选项。
  2. 如果您使用 STRIMZI_TLS_ENABLED 环境变量启用了 TLS,请指定用于验证与 Kafka 集群连接的密钥存储和信任存储。

    TLS 配置示例

    # ....
    env:
      - name: STRIMZI_TRUSTSTORE_LOCATION 1
        value: "/path/to/truststore.p12"
      - name: STRIMZI_TRUSTSTORE_PASSWORD 2
        value: "TRUSTSTORE-PASSWORD"
      - name: STRIMZI_KEYSTORE_LOCATION 3
        value: "/path/to/keystore.p12"
      - name: STRIMZI_KEYSTORE_PASSWORD 4
        value: "KEYSTORE-PASSWORD"
    # ..."

    1
    truststore 包含证书颁发机构的公钥(ca.crt)值,用于为 TLS 客户端身份验证签署新用户证书。
    2
    访问信任存储的密码。
    3
    密钥存储包含证书颁发机构的私钥(ca.key),用于为 TLS 客户端身份验证签名新用户证书。
    4
    用于访问密钥存储的密码。
  3. 部署独立主题 Operator。

    oc create -f install/topic-operator
  4. 验证独立 Topic Operator 是否已成功部署。

    oc describe deployment strimzi-topic-operator

    Replicas 条目显示 1 可用 时,会部署独立 Topic Operator。

    注意

    如果您与 OpenShift 集群的连接较慢,并且之前没有下载 Topic Operator 镜像,则部署可能会出现延迟。

5.1.3.2. 部署 standalone User Operator

此流程演示了如何将 User Operator 部署为用户管理的独立组件。您可以将独立 User Operator 与不由 Cluster Operator 管理的 Kafka 集群搭配使用。

独立部署可以与任何 Kafka 集群一起操作。

提供了独立的部署文件。编辑 05-Deployment-strimzi-user-operator.yaml 部署文件,以添加使 User Operator 连接到 Kafka 集群的环境变量。

先决条件

  • 您为 User Operator 运行 Kafka 集群以连接。

流程

  1. 编辑 install/user-operator/05-Deployment-strimzi-user-operator.yaml 独立部署文件中的以下 env 属性。

    standalone User Operator 部署配置示例

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: strimzi-user-operator
      labels:
        app: strimzi
    spec:
      # ...
      template:
        # ...
        spec:
          # ...
          containers:
            - name: strimzi-user-operator
              # ...
              env:
                - name: STRIMZI_NAMESPACE 1
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.namespace
                - name: STRIMZI_KAFKA_BOOTSTRAP_SERVERS 2
                  value: my-kafka-bootstrap-address:9092
                - name: STRIMZI_CA_CERT_NAME 3
                  value: my-cluster-clients-ca-cert
                - name: STRIMZI_CA_KEY_NAME 4
                  value: my-cluster-clients-ca
                - name: STRIMZI_ZOOKEEPER_CONNECT 5
                  value: my-cluster-zookeeper-client:2181
                - name: STRIMZI_LABELS 6
                  value: "strimzi.io/cluster=my-cluster"
                - name: STRIMZI_FULL_RECONCILIATION_INTERVAL_MS 7
                  value: "120000"
                - name: STRIMZI_ZOOKEEPER_CONNECT 8
                  value: my-cluster-zookeeper-client:2181
                - name: STRIMZI_ZOOKEEPER_SESSION_TIMEOUT_MS 9
                  value: "18000"
                - name: STRIMZI_LOG_LEVEL 10
                  value: INFO
                - name: STRIMZI_GC_LOG_ENABLED 11
                  value: "true"
                - name: STRIMZI_CA_VALIDITY 12
                  value: "365"
                - name: STRIMZI_CA_RENEWAL 13
                  value: "30"
                - name: STRIMZI_JAVA_OPTS 14
                  value: "-Xmx=512M -Xms=256M"
                - name: STRIMZI_JAVA_SYSTEM_PROPERTIES 15
                  value: "-Djavax.net.debug=verbose -DpropertyName=value"

    1
    用于监视 KafkaUser 资源的 User Operator 的 OpenShift 命名空间。只能指定一个命名空间。
    2
    bootstrap 代理地址的主机和端口对,用于发现并连接到 Kafka 集群中的所有代理。使用逗号分隔列表来指定两个或三个代理地址(如果服务器停机)。
    3
    包含证书颁发机构的公钥(ca.crt)值的 OpenShift Secret,为 TLS 客户端身份验证签署新用户证书。
    4
    包含证书颁发机构的私钥(ca.key)值的 OpenShift Secret,为 TLS 客户端身份验证签署新用户证书。
    5
    用于连接 ZooKeeper 集群的主机和端口对。这必须与您的 Kafka 集群使用的 ZooKeeper 集群相同。
    6
    用于标识由 User Operator 管理的 KafkaUser 资源的标签选择器。
    7
    定期协调之间的间隔,以毫秒为单位。默认值为 120000( 2 分钟)。
    8
    用于连接 ZooKeeper 集群的主机和端口对。这必须与您的 Kafka 集群使用的 ZooKeeper 集群相同。
    9
    ZooKeeper 会话超时,以毫秒为单位。默认值为 18000( 18 秒)。
    10
    用于打印日志消息的级别。您可以将级别设置为 ERROR、WARN INGINFODEBUGTRACE
    11
    启用垃圾回收(GC)日志。默认值为 true
    12
    证书颁发机构的有效期限。默认值为 365 天。
    13
    证书授权机构的续订期限。续订期的测量从当前证书到期之日后方进行。默认是在旧证书过期前启动证书续订的 30 天。
    14
    (可选)运行 User Operator 的 JVM 使用的 Java 选项
    15
    (可选)为 User Operator 设置的调试(-D)选项
  2. 如果您使用 TLS 连接到 Kafka 集群,请指定用于验证连接的 secret。否则,请转到下一步。

    TLS 配置示例

    # ....
    env:
      - name: STRIMZI_CLUSTER_CA_CERT_SECRET_NAME 1
        value: my-cluster-cluster-cert
      - name: STRIMZI_EO_KEY_SECRET_NAME 2
        value: my-cluster-cluster-ca
    # ..."

    1
    包含为 TLS 客户端身份验证签署 Kafka 代理证书的证书颁发机构的公钥(ca.crt)值的 OpenShift Secret
    2
    包含密钥存储(实体-operator_.p12)的 OpenShift Secret,以及用于对 Kafka 集群进行 TLS 身份验证的证书。Secret 还必须包含用于访问密钥存储的密码(hostname-operator_.password)。
  3. 部署 standalone User Operator。

    oc create -f install/user-operator
  4. 验证 standalone User Operator 是否已成功部署。

    oc describe deployment strimzi-user-operator

    Replicas 条目显示 1 可用 时,部署 standalone User Operator。

    注意

    如果您与 OpenShift 集群的连接较慢并且之前没有下载用户 Operator 镜像,则部署时可能会出现延迟。

5.2. 部署 Kafka Connect

Kafka Connect 是一个在 Apache Kafka 和外部系统间流传输数据的工具。

在 AMQ Streams 中,Kafka Connect 被部署为分布式模式。Kafka Connect 也可以以独立模式工作,但 AMQ Streams 不支持此项。

Kafka Connect 使用 连接器 的概念提供了一个框架,用于将大量数据移入 Kafka 集群或移出 Kafka 集群,同时保持可扩展性和可靠性。

Kafka Connect 通常用于将 Kafka 与外部数据库和存储及消息传递系统集成。

本节中的步骤显示如何:

注意

术语 连接器 可以互换使用,以表示在 Kafka Connect 集群中运行的连接器实例或连接器类。在本指南中,当从上下文中明确含义时,将使用 连接器

5.2.1. 将 Kafka Connect 部署到 OpenShift 集群

此流程演示了如何使用 Cluster Operator 将 Kafka Connect 集群部署到 OpenShift 集群。

Kafka Connect 集群作为 Deployment 实施,具有可配置的节点(也称为 worker ,将连接器工作负载作为 任务 来分布,以便消息流高度可扩展且可靠。

部署使用 YAML 文件提供创建 KafkaConnect 资源规格。

在这一流程中,我们使用 AMQ Streams 提供的示例文件:

  • examples/connect/kafka-connect.yaml

有关配置 KafkaConnect 资源(或使用 Source-to-Image( S2I)支持的 KafkaConnect S2I 资源)的详情,请参考 使用 AMQ Streams 中的 OpenShift 指南中的 Kafka Connect 集群配置

流程

  1. 将 Kafka Connect 部署到您的 OpenShift 集群。使用 example /connect/kafka-connect.yaml 文件来部署 Kafka Connect。

    oc apply -f examples/connect/kafka-connect.yaml
  2. 验证 Kafka Connect 是否已成功部署:

    oc get deployments

5.2.2. 多个实例的 Kafka Connect 配置

如果您正在运行多个 Kafka Connect 实例,您必须更改以下配置 属性 的默认配置:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: connect-cluster 1
    offset.storage.topic: connect-cluster-offsets 2
    config.storage.topic: connect-cluster-configs 3
    status.storage.topic: connect-cluster-status  4
    # ...
# ...
1
属于该实例的 Kafka Connect 集群组。
2
存储连接器偏移的 Kafka 主题。
3
存储连接器和任务状态配置的 Kafka 主题。
4
存储连接器和任务状态更新的 Kafka 主题。
注意

对于具有同一 group.id 的所有 Kafka Connect 实例,三个主题的值必须相同。

除非更改默认设置,否则每个连接到同一 Kafka 集群的 Kafka Connect 实例都使用相同的值部署。实际上,所有实例都合并在一起,在集群中运行并使用相同的主题。

如果多个 Kafka Connect 集群尝试使用相同的主题,Kafka Connect 将无法按预期工作并生成错误。

如果要运行多个 Kafka Connect 实例,请为每个实例更改这些属性的值。

5.2.3. 使用连接器插件扩展 Kafka Connect

Kafka Connect 的 AMQ Streams 容器镜像包括两个内置文件连接器,用于将基于文件的数据移入或移出 Kafka 集群。

表 5.1. 文件连接器

文件连接器描述

FileStreamSourceConnector

从文件(源)传输数据到 Kafka 集群。

FileStreamSinkConnector

将数据从 Kafka 集群传输到文件(sink)。

本节中的步骤显示如何通过以下方法为连接器镜像添加您自己的连接器类:

5.2.3.1. 使用 AMQ Streams 自动创建新容器镜像

此流程演示了如何配置 Kafka Connect,以便 AMQ Streams 自动使用额外的连接器构建新容器镜像。您可以使用 KafkaConnect 自定义资源的 .spec.build.plugins 属性定义连接器插件。AMQ 流将自动下载并将连接器插件添加到新容器镜像中。容器被推送到在 .spec.build.output 中指定的容器存储库中,并在 Kafka Connect 部署中自动使用。

先决条件

您需要提供自己的容器 registry,从中推送、存储和从中拉取镜像。AMQ Streams 支持私有容器 registry,以及 QuayDocker Hub 等公共 registry。

流程

  1. 通过指定容器 registry in .spec.build.output 和额外的连接器 in. spec.build.plugins 来配置 KafkaConnect 自定义资源:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec: 1
      #...
      build:
        output: 2
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 3
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
                sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.7.0/camel-telegram-kafka-connector-0.7.0-package.tar.gz
                sha512sum: a9b1ac63e3284bea7836d7d24d84208c49cdf5600070e6bd1535de654f6920b74ad950d51733e8020bf4187870699819f54ef5859c7846ee4081507f48873479
      #...
    1
    2
    (必需)配置推送新镜像的容器注册表。
    3
    (必需)要添加到新容器镜像的连接器插件及其工件列表。每个插件必须至少配置一个 工件
  2. 创建或更新资源:

    $ oc apply -f KAFKA-CONNECT-CONFIG-FILE
  3. 等待新容器镜像构建,并且部署 Kafka Connect 集群。
  4. 使用 Kafka Connect REST API 或 KafkaConnector 自定义资源使用您添加的连接器插件。

其它资源

如需了解更多相关信息,请参阅 OpenShift 指南中的使用 AMQ Streams

5.2.3.2. 从 Kafka Connect 基础镜像创建 Docker 镜像

此流程演示了如何创建自定义镜像并将其添加到 /opt/kafka/plugins 目录中。

您可以使用 Red Hat Ecosystem Catalog 上的 Kafka 容器镜像作为基础镜像,以通过额外的连接器插件创建您自己的自定义镜像。

在启动时,Kafka Connect 的 AMQ Streams 版本加载 /opt/kafka/plugins 目录中包含的任何第三方连接器插件。

流程

  1. 使用 registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0 作为基础镜像创建新的 Dockerfile

    FROM registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER 1001

    插件文件示例

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-3.4.2.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-3.4.2.jar
    │   ├── mongodb-driver-core-3.4.2.jar
    │   └── README.md
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-0.13.0.jar
    │   ├── mysql-connector-java-5.1.40.jar
    │   ├── README.md
    │   └── wkb-1.0.2.jar
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-0.7.1.jar
        ├── debezium-core-0.7.1.jar
        ├── LICENSE.txt
        ├── postgresql-42.0.0.jar
        ├── protobuf-java-2.6.1.jar
        └── README.md

  2. 构建容器镜像。
  3. 将自定义镜像推送到容器 registry。
  4. 指向新容器映像。

    您可以:

    • 编辑 KafkaConnect.spec.image 属性,即 KafkaConnect 自定义资源。

      如果设置,此属性会覆盖 Cluster Operator 中的 STRIMZI_KAFKA_CONNECT_IMAGES 变量。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
      spec: 1
        #...
        image: my-new-container-image 2
        config: 3
          #...
      1
      2
      容器集的 docker 镜像。
      3
      配置 Kafka Connect worker (而非连接器)。

      或者

    • install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件中编辑 STRIMZI_KAFKA_CONNECT_IMAGES 变量以指向新容器镜像,然后重新安装 Cluster Operator。

其它资源

如需了解更多相关信息,请参阅 OpenShift 指南中的使用 AMQ Streams

5.2.3.3. 使用 OpenShift 构建和源至镜像创建容器镜像

此流程演示了如何使用 OpenShift 构建源至镜像(S2I) 框架来创建新的容器镜像。

OpenShift 构建获取支持 S2I 的构建器镜像,以及用户提供的源代码和二进制文件,并使用它们来构建新的容器镜像。构建之后,容器镜像将存储在 OpenShift 的本地容器镜像存储库中,并可在部署中使用。

红帽生态系统目录 上提供了支持 S2I 的 Kafka Connect 构建器镜像,作为 registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0 镜像的一部分。此 S2I 镜像提取您的二进制文件(带有插件和连接器),并将它们存储在 /tmp/kafka-plugins/s2i 目录中。它从这个目录中创建一个新的 Kafka Connect 镜像,然后可与 Kafka Connect 部署一起使用。当使用增强的镜像启动时,Kafka Connect 会从 /tmp/kafka-plugins/s2i 目录中加载任何第三方插件。

重要

随着 KafkaConnect 资源引入 构建配置,AMQ Streams 现在可以使用数据连接所需的连接器插件自动构建容器镜像。因此,使用 Source-to-Image(S2I)进行 Kafka Connect 的支持已弃用,并将在 AMQ Streams 1.8 后被删除。要准备此更改,您可以将 Kafka Connect S2I 实例迁移到 Kafka Connect 实例

流程

  1. 在命令行中,使用 oc apply 命令创建和部署 Kafka Connect S2I 集群:

    oc apply -f examples/connect/kafka-connect-s2i.yaml
  2. 使用 Kafka Connect 插件创建目录:

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-3.4.2.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-3.4.2.jar
    │   ├── mongodb-driver-core-3.4.2.jar
    │   └── README.md
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-0.7.1.jar
    │   ├── debezium-core-0.7.1.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-0.13.0.jar
    │   ├── mysql-connector-java-5.1.40.jar
    │   ├── README.md
    │   └── wkb-1.0.2.jar
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-0.7.1.jar
        ├── debezium-core-0.7.1.jar
        ├── LICENSE.txt
        ├── postgresql-42.0.0.jar
        ├── protobuf-java-2.6.1.jar
        └── README.md
  3. 使用 oc start-build 命令,使用准备的目录启动镜像的新构建:

    oc start-build my-connect-cluster-connect --from-dir ./my-plugins/
    注意

    构建的名称与部署的 Kafka Connect 集群的名称相同。

  4. 构建完成后,Kafka Connect 部署会自动使用新镜像。

5.2.4. 创建和管理连接器

当您为您的 connector 插件创建容器镜像时,需要在 Kafka Connect 集群中创建连接器实例。然后,您可以配置、监控和管理正在运行的连接器实例。

连接器是特定 连接器类 的实例,它知道如何就消息与相关的外部系统通信。连接器可用于许多外部系统,或者您可以自行创建。

您可以创建连接器的 接收器 类型。

源连接器
源连接器是从外部系统获取数据并将其反馈到 Kafka 作为消息的运行时实体。
sink 连接器
sink 连接器是一个运行时实体,它从 Kafka 主题获取信息并将其反馈到外部系统。

AMQ Streams 为创建和管理连接器提供了两个 API:

  • KafkaConnector 资源(称为 KafkaConnectors)
  • Kafka Connect REST API

使用 API,您可以:

  • 检查连接器实例的状态
  • 重新配置正在运行的连接器
  • 增加或减少连接器实例的连接器任务数量
  • 重启连接器
  • 重启连接器任务,包括失败的任务
  • 暂停连接器实例
  • 恢复之前暂停的连接器实例
  • 删除连接器实例

5.2.4.1. KafkaConnector 资源

KafkaConnectors 允许您以 OpenShift 原生的方式为 Kafka Connect 创建和管理连接器实例,因此不需要 cURL 等 HTTP 客户端。与其他 Kafka 资源一样,您可以在部署至 OpenShift 集群的 KafkaConnector YAML 文件中声明连接器所需的状态,以创建连接器实例。KafkaConnector 资源必须部署到与其链接的 Kafka Connect 集群相同的命名空间中。

您可以通过更新对应的 KafkaConnector 资源并应用更新来管理正在运行的连接器实例。注释用于手动重启连接器实例和连接器任务。您可以通过删除对应的 KafkaConnector 来删除连接器。

为确保与早期版本的 AMQ Streams 兼容,KafkaConnectors 默认被禁用。要为 Kafka Connect 集群启用它们,您必须在 KafkaConnect 资源上使用注解。具体步骤,请参阅 OpenShift 中使用 AMQ Streams 中的 配置 Kafka Connect

启用 KafkaConnectors 后,Cluster Operator 开始监视它们。它更新正在运行的连接器实例的配置,以匹配其 KafkaConnectors 中定义的配置。

AMQ Streams 包括 一个名为 example /connect/source-connector.yaml 的 KafkaConnect or 示例。您可以使用此示例创建和管理 FileStreamSourceConnectorFileStreamSinkConnector,如 第 5.2.5 节 “部署示例 KafkaConnector 资源” 所述。

5.2.4.2. Kafka Connect REST API 的可用性

Kafka Connect REST API 在端口 8083 上作为 <connect-cluster-name>-connect-api 服务可用。

如果启用了 KafkaConnectors,Cluster Operator 将恢复直接使用 Kafka Connect REST API 进行手动更改。

Apache Kafka 文档中的 描述了 REST API 支持的操作。

5.2.5. 部署示例 KafkaConnector 资源

AMQ Streams 在 example /connect/source-connector.yaml 中包含一个 KafkaConnect or 示例。这会创建一个基本的 FileStreamSourceConnector 实例,它将 Kafka 许可证文件的每一行(一个文件源示例)发送到单个 Kafka 主题。

这个步骤描述了如何创建:

  • 从 Kafka 许可证文件(源)读取数据的 FileStreamSourceConnector,并将数据作为消息写入 Kafka 主题。
  • 从 Kafka 主题读取信息并将消息写入临时文件( sink)的 FileStreamSinkConnector
注意

在生产环境中,您要准备包含所需 Kafka Connect 连接器的容器镜像,如 第 5.2.3 节 “使用连接器插件扩展 Kafka Connect” 所述。

提供 FileStreamSourceConnectorFileStreamSinkConnector 作为示例。如此处所述,在容器中运行这些连接器不太可能适合生产用例。

先决条件

流程

  1. 编辑 example /connect/source-connector.yaml 文件:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector 1
      labels:
        strimzi.io/cluster: my-connect-cluster 2
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
      tasksMax: 2 4
      config: 5
        file: "/opt/kafka/LICENSE" 6
        topic: my-topic 7
        # ...
    1
    KafkaConnector 资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。
    2
    用于在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到与其链接的 Kafka Connect 集群相同的命名空间。
    3
    连接器类的全名或别名.这应该存在于由 Kafka Connect 集群使用的镜像中。
    4
    连接器可以创建的 Kafka Connect 任务 的最大数量。
    5
    连接器配置,作为键值对.
    6
    这个示例源连接器配置从 /opt/kafka/LICENSE 文件中读取数据。
    7
    将源数据发布到的 Kafka 主题。
  2. 在 OpenShift 集群中创建源 KafkaConnector

    oc apply -f examples/connect/source-connector.yaml
  3. 创建一个 示例/connect/sink-connector.yaml 文件:

    touch examples/connect/sink-connector.yaml
  4. 将以下 YAML 粘贴到 sink-connector.yaml 文件中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 1
      tasksMax: 2
      config: 2
        file: "/tmp/my-file" 3
        topics: my-topic 4
    1
    连接器类的全名或别名.这应该存在于由 Kafka Connect 集群使用的镜像中。
    2
    连接器配置,作为键值对.
    3
    发布源数据的临时文件。
    4
    从中读取源数据的 Kafka 主题。
  5. 在 OpenShift 集群中创建 sink KafkaConnector

    oc apply -f examples/connect/sink-connector.yaml
  6. 检查是否创建了连接器资源:

    oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name
    
    my-source-connector
    my-sink-connector

    MY-CONNECT-CLUSTER 替换为您的 Kafka 连接集群。

  7. 在容器中,执行 kafka-console-consumer.sh 读取由源连接器写入主题的消息:

    oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
源和接收器连接器配置选项

连接器配置在 KafkaConnector 资源的 spec.config 属性中定义。

FileStreamSourceConnectorFileStreamSinkConnector 类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。

表 5.2. FileStreamSource 连接器类的配置选项

名称类型默认值描述

file

字符串

null

可将消息写入到的源文件。如果没有指定,则使用标准输入。

主题

list

null

发布数据的 Kafka 主题。

表 5.3. FileStreamSinkConnector 类的配置选项

名称类型默认值描述

file

字符串

null

要写入消息的目标文件。如果没有指定,则使用标准输出。

主题

list

null

一个或多个 Kafka 主题,从中读取数据。

topics.regex

字符串

null

与一个或多个 Kafka 主题匹配以从中读取数据的正则表达式。

5.2.6. 执行 Kafka 连接器重启

此流程描述了如何使用 OpenShift 注解手动触发 Kafka 连接器重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka 连接器的 KafkaConnector 自定义资源的名称:

    oc get KafkaConnector
  2. 要重启连接器,请注解 OpenShift 中的 KafkaConnector 资源。例如,使用 oc annotate:

    oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart=true
  3. 等待下一次协调发生(默认为每隔两分钟)。

    Kafka 连接器重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,该注解会从 KafkaConnector 自定义资源中删除。

其它资源

5.2.7. 执行 Kafka 连接器任务重启

此流程描述了如何使用 OpenShift 注解手动触发 Kafka 连接器任务重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka 连接器任务的 KafkaConnector 自定义资源的名称:

    oc get KafkaConnector
  2. KafkaConnector 自定义资源中找到要重启的任务 ID。任务 ID 是非负整数,从 0 开始。

    oc describe KafkaConnector KAFKACONNECTOR-NAME
  3. 要重启连接器任务,请注解 OpenShift 中的 KafkaConnector 资源。例如,使用 oc annotate 重启任务 0:

    oc annotate KafkaConnector KAFKACONNECTOR-NAME strimzi.io/restart-task=0
  4. 等待下一次协调发生(默认为每隔两分钟)。

    Kafka 连接器任务重启,只要协调过程检测到注解。当 Kafka Connect 接受重启请求时,该注解会从 KafkaConnector 自定义资源中删除。

其它资源

5.3. Deploy Kafka MirrorMaker

Cluster Operator 部署一个或多个 Kafka 镜像(MirrorMaker)副本,以在 Kafka 集群间复制数据。此过程称为镜像,以避免与 Kafka 分区复制概念混淆。MirrorMaker 使用源集群的信息,并将这些消息重新发布到目标集群。

5.3.1. 将 Kafka MirrorMaker 部署到 OpenShift 集群

此流程演示了如何使用 Cluster Operator 将 Kafka MirrorMaker 集群部署到 OpenShift 集群。

部署使用 YAML 文件提供创建 KafkaMirrorMaker 或 KafkaMir rorMaker2 资源规格,具体取决于部署的 MirrorMaker 版本。

在这一流程中,我们使用 AMQ Streams 提供的示例文件:

  • examples/mirror-maker/kafka-mirror-maker.yaml
  • examples/mirror-maker/kafka-mirror-maker-2.yaml

有关配置 KafkaMirrorMaker 或 KafkaMirrorMaker2 资源的详情,请参考 使用 AMQ Streams on OpenShift 指南 中的 Kafka MirrorMaker 集群配置

流程

  1. 将 Kafka MirrorMaker 部署到 OpenShift 集群:

    对于 MirrorMaker:

    oc apply -f examples/mirror-maker/kafka-mirror-maker.yaml

    对于 MirrorMaker 2.0:

    oc apply -f examples/mirror-maker/kafka-mirror-maker-2.yaml
  2. 验证 MirrorMaker 是否已成功部署:

    oc get deployments

5.4. 部署 Kafka Bridge

Cluster Operator 部署一个或多个 Kafka 网桥副本,以通过 HTTP API 在 Kafka 集群和客户端之间发送数据。

5.4.1. 将 Kafka Bridge 部署到 OpenShift 集群

此流程演示了如何使用 Cluster Operator 将 Kafka Bridge 集群部署到 OpenShift 集群。

部署使用 YAML 文件提供规格来创建 KafkaBridge 资源。

在这一流程中,我们使用 AMQ Streams 提供的示例文件:

  • examples/bridge/kafka-bridge.yaml

有关配置 KafkaBridge 资源的详情,请参考 使用 AMQ Streams 中的 OpenShift 指南中的 Kafka Bridge 集群配置

流程

  1. 将 Kafka Bridge 部署到 OpenShift 集群:

    oc apply -f examples/bridge/kafka-bridge.yaml
  2. 验证 Kafka 网桥是否已成功部署:

    oc get deployments

第 6 章 设置对 Kafka 集群的客户端访问权限

部署 AMQ Streams 后,本节中的步骤解释了如何:

  • 部署示例制作者和消费者客户端,您可以使用它们来验证您的部署
  • 设置对 Kafka 集群的外部客户端访问

    为 OpenShift 外部的客户端设置 对 Kafka 集群的访问权限的步骤更为复杂,需要熟悉 OpenShift 中使用 AMQ Streams 指南 中所述的 Kafka 组件配置 流程

6.1. 部署示例客户端

此流程演示了如何使用您创建的 Kafka 集群发送和接收信息部署示例制作者和消费者客户端。

先决条件

  • Kafka 集群可供客户端使用。

流程

  1. 部署 Kafka 制作者.

    oc run kafka-producer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list cluster-name-kafka-bootstrap:9092 --topic my-topic
  2. 在运行制作者的控制台中键入消息。
  3. Enter 发送邮件。
  4. 部署 Kafka 用户。

    oc run kafka-consumer -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server cluster-name-kafka-bootstrap:9092 --topic my-topic --from-beginning
  5. 确认您在使用者控制台中看到传入的消息。

6.2. 为 OpenShift 外部的客户端设置访问权限

此流程演示了如何从 OpenShift 外部配置对 Kafka 集群的客户端访问。

使用 Kafka 集群的地址,您可以提供到不同 OpenShift 命名空间或整个 OpenShift 外部客户端的外部访问。

您可以配置外部 Kafka 侦听程序以提供访问权限。

支持以下外部监听程序类型:

选择的类型取决于您的要求,以及您的环境和基础架构。例如,负载均衡器可能不适用于某些基础架构,如裸机,其中节点端口提供了一个更好的选项。

在这个过程中:

  1. 为 Kafka 集群配置了一个外部监听程序,它带有 TLS 加密和身份验证,并启用了 Kafka 简单授权
  2. 为客户端创建一个 KafkaUser,并为 简单授权 定义了 TLS 身份验证和访问控制列表(ACL)。

您可以将侦听器配置为使用 TLS、SCRAM-SHA-512 或 OAuth 2.0 身份验证。TLS 始终使用加密,但建议您还将加密与 SCRAM-SHA-512 和 OAuth 2.0 身份验证一起使用。

您可以为 Kafka 代理配置简单、OAuth 2.0、OPA 或自定义授权。启用后,授权将应用到所有启用的监听程序。

当您配置 KafkaUser 身份验证和授权机制时,请确保它们与等同的 Kafka 配置匹配:

  • KafkaUser.spec.authentication matches Kafka.spec.kafka.listeners[*].authentication
  • KafkaUser.spec.authorizationKafka.spec.kafka.authorization匹配

您应至少有一个监听器支持您要用于 KafkaUser 的身份验证。

注意

Kafka 用户和 Kafka 代理之间的身份验证取决于每个用户的身份验证设置。例如,如果 Kafka 配置中没有启用 TLS 的用户,则无法验证它。

AMQ Streams Operator 自动配置过程:

  • Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
  • User Operator 根据所选的身份验证类型,创建代表客户端以及用于客户端身份验证的安全凭证的用户。

在这一流程中,使用了 Cluster Operator 生成的证书,但您可以通过 安装您自己的证书 来替换它们。您还可以将监听程序配置为使用 由外部证书颁发机构管理的 Kafka 侦听器证书。

证书以 PKCS #12(.p12)和 PEM(.crt)格式提供。此流程显示 PKCS #12 证书。

先决条件

  • Kafka 集群可用于客户端
  • Cluster Operator 和 User Operator 在集群中运行
  • OpenShift 集群外的客户端连接到 Kafka 集群

流程

  1. 使用外部 Kafka 侦听程序配置 Kafka 集群。

    • 定义通过监听程序访问 Kafka 代理所需的身份验证
    • 在 Kafka 代理上启用授权

      例如:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: Kafka
      metadata:
        name: my-cluster
        namespace: myproject
      spec:
        kafka:
          # ...
          listeners: 1
          - name: external 2
            port: 9094 3
            type: LISTENER-TYPE 4
            tls: true 5
            authentication:
              type: tls 6
            configuration:
              preferredNodePortAddressType: InternalDNS 7
              bootstrap and broker service overrides 8
              #...
          authorization: 9
            type: simple
            superUsers:
              - super-user-name 10
        # ...
      1
      2
      用于标识侦听器的名称。在 Kafka 集群中必须是唯一的。
      3
      Kafka 内侦听器使用的端口号。在给定 Kafka 集群中,端口号必须是唯一的。允许的端口号是 9092 及以上,但端口 9404 和 9999 除外,它们已用于 Prometheus 和 JMX。根据监听程序类型,端口号可能与连接 Kafka 客户端的端口号不同。
      4
      外部监听程序类型,指定为 routeloadbalancernodeportingress。内部监听程序指定为 内部 监听程序。
      5
      在监听器上启用 TLS 加密。默认为 false路由 监听程序不需要 TLS 加密。
      6
      指定为 tls 的身份验证。
      7
      (可选,仅适用于 节点端口 侦听器)配置 ,以指定 AMQ Streams 作为节点地址使用的第一个地址类型的首选
      8
      (可选)AMQ Streams 自动决定要公告给客户端的地址。这些地址由 OpenShift 自动分配。如果您运行 AMQ Streams 的基础架构不提供正确的 地址,您可以覆盖 bootstrap 和代理服务 地址。验证不会在覆盖上执行。覆盖配置根据监听器类型而有所不同。例如,您可以覆盖用于 路由、DNS 名称或 负载均衡器 IP 地址的主机, 以及 nodeport 的节点端口。
      9
      指定为 simple 的授权,它使用 AclAuthorizer Kafka 插件。
      10
      (可选)超级用户可以访问所有代理,无论 ACL 中定义的任何访问权限限制。
      警告

      OpenShift Route 地址包含 Kafka 集群的名称、侦听器的名称以及在其中创建的命名空间的名称。例如,my -cluster-kafka-listener1-bootstrap-myproject (CLUSTER-NAME-kafka-LISTENER-NAME-bootstrap-NAMESPACE)。如果您使用的是 路由 监听程序类型,请注意地址的长度不超过 63 个字符的最大限制。

  2. 创建或更新 Kafka 资源。

    oc apply -f KAFKA-CONFIG-FILE

    使用 TLS 身份验证配置 Kafka 代理监听程序。

    为每个 Kafka 代理 pod 创建一个服务。

    创建一个服务作为连接到 Kafka 集群的 bootstrap 地址

    服务也作为使用 nodeport 侦听器到 Kafka 集群 的外部 bootstrap 地址 创建。

    也使用与 Kafka 资源相同的名称来创建用于验证 kafka 代理身份的集群 CA 证书。

  3. Kafka 资源的状态查找 bootstrap 地址和端口。

    oc get kafka KAFKA-CLUSTER-NAME -o jsonpath='{.status.listeners[?(@.type=="external")].bootstrapServers}'

    使用 Kafka 客户端中的 bootstrap 地址连接到 Kafka 集群。

  4. 创建或修改代表需要访问 Kafka 集群的客户端的用户。

    • 指定与 Kafka 侦听器相同的验证类型。
    • 指定简单授权的授权 ACL。

      例如:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaUser
      metadata:
        name: my-user
        labels:
          strimzi.io/cluster: my-cluster 1
      spec:
        authentication:
          type: tls 2
        authorization:
          type: simple
          acls: 3
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Read
            - resource:
                type: topic
                name: my-topic
                patternType: literal
              operation: Describe
            - resource:
                type: group
                name: my-group
                patternType: literal
              operation: Read
      1
      该标签必须与 Kafka 集群的标签匹配才能创建用户。
      2
      指定为 tls 的身份验证。
      3
      简单授权需要附带的 ACL 规则列表才能应用到用户。规则根据 用户名 (my-user)定义 Kafka 资源允许的操作。
  5. 创建或修改 KafkaUser 资源。

    oc apply -f USER-CONFIG-FILE

    已创建用户,以及名称与 KafkaUser 资源相同的 Secret。Secret 包含用于 TLS 客户端身份验证的私钥和公钥。

    例如:

    apiVersion: v1
    kind: Secret
    metadata:
      name: my-user
      labels:
        strimzi.io/kind: KafkaUser
        strimzi.io/cluster: my-cluster
    type: Opaque
    data:
      ca.crt: PUBLIC-KEY-OF-THE-CLIENT-CA
      user.crt: USER-CERTIFICATE-CONTAINING-PUBLIC-KEY-OF-USER
      user.key: PRIVATE-KEY-OF-USER
      user.p12: P12-ARCHIVE-FILE-STORING-CERTIFICATES-AND-KEYS
      user.password: PASSWORD-PROTECTING-P12-ARCHIVE
  6. 将公共集群 CA 证书提取到所需的证书格式:

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12
  7. 从密码文件中提取密码:

    oc get secret KAFKA-CLUSTER-NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password
  8. 使用公共集群证书的身份验证详情配置客户端:

    客户端代码示例

    properties.put("security.protocol","SSL"); 1
    properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/path/to/ca.p12"); 2
    properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,CA-PASSWORD); 3
    properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"PKCS12"); 4

    1
    启用 TLS 加密(带有或不进行 TLS 客户端身份验证)。
    2
    指定导入证书的信任存储位置。
    3
    指定用于访问信任存储的密码。如果信任存储不需要此属性,则可以省略它。
    4
    标识 truststore 类型。
    注意

    在通过 TLS 使用 SCRAM-SHA 身份验证时,请使用 security.protocol: SASL_SSL

  9. 将用户 CA 证书从用户 Secret 提取到所需的证书格式:

    oc get secret USER-NAME -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12
  10. 从密码文件中提取密码:

    oc get secret USER-NAME -o jsonpath='{.data.user\.password}' | base64 -d > user.password
  11. 使用用户 CA 证书的身份验证详情配置您的客户端:

    客户端代码示例

    properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/path/to/user.p12"); 1
    properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"<user.password>"); 2
    properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"PKCS12"); 3

    1
    指定导入证书的密钥存储位置。
    2
    指定用于访问密钥存储的密码。如果密钥存储不需要此属性,则可以省略它。公共用户证书在创建时由客户端 CA 签名。
    3
    标识密钥存储类型。
  12. 添加用于连接到 Kafka 集群的 bootstrap 地址和端口:

    bootstrap.servers: BOOTSTRAP-ADDRESS:PORT

其它资源

第 7 章 为 AMQ Streams 设置指标和仪表板

您可以通过在仪表板上查看关键指标并设置在某些情况下触发的警报来监控 AMQ Streams 部署。指标数据可用于 Kafka、ZooKeeper 以及 AMQ Streams 的其他组件。

为提供指标信息,AMQ Streams 使用 Prometheus 规则和 Grafana 仪表板。

当为 AMQ Streams 的每个组件配置一组规则时,Prometheus 会使用来自集群中运行的 pod 的键指标。然后,Grafana 在仪表板上视觉化这些指标。AMQ Streams 包括 Grafana 仪表板示例,您可以自定义这些仪表板适合您的部署。

在 OpenShift Container Platform 4.x 中,AMQ Streams 采用 对用户定义的项目的监控 (OpenShift 功能)来简化 Prometheus 设置过程。

在 OpenShift Container Platform 3.11 中,您需要将 Prometheus 和 Alertmanager 组件单独部署到集群中。

无论 OpenShift Container Platform 版本是什么,您必须首先 部署 AMQ Streams 的 Prometheus metrics 配置

接下来,按照 OpenShift Container Platform 版本的说明进行操作:

通过 Prometheus 和 Grafana 设置,您可以使用示例 Grafana 仪表板和警报规则来监控 Kafka 集群。

其他监控选项

Kafka Exporter 是一个可选组件,提供与消费者滞后相关的额外监控。如果要将 Kafka Exporter 用于 AMQ Streams,请参阅 配置 Kafka 资源以使用 Kafka 集群部署 Kafka 导出器

您还可以通过设置分布式追踪将部署配置为跟踪端到端的消息。如需更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 分布式追踪

其它资源

7.1. 指标文件示例

您可以在 example /metrics 目录中找到 Grafana 仪表板和其他指标配置文件的示例 。如下表所示,一些文件仅用于 OpenShift Container Platform 3.11,而不适用于 OpenShift Container Platform 4.x。

AMQ Streams 提供的指标文件示例

metrics
├── grafana-dashboards 1
│   ├── strimzi-cruise-control.json
│   ├── strimzi-kafka-bridge.json
│   ├── strimzi-kafka-connect.json
│   ├── strimzi-kafka-exporter.json
│   ├── strimzi-kafka-mirror-maker-2.json
│   ├── strimzi-kafka.json
│   ├── strimzi-operators.json
│   └── strimzi-zookeeper.json
├── grafana-install
│   └── grafana.yaml 2
├── prometheus-additional-properties
│   └── prometheus-additional.yaml - OPENSHIFT 3.11 ONLY 3
├── prometheus-alertmanager-config
│   └── alert-manager-config.yaml 4
├── prometheus-install
│    ├── alert-manager.yaml - OPENSHIFT 3.11 ONLY 5
│    ├── prometheus-rules.yaml 6
│    ├── prometheus.yaml - OPENSHIFT 3.11 ONLY 7
│    ├── strimzi-pod-monitor.yaml 8
├── kafka-bridge-metrics.yaml 9
├── kafka-connect-metrics.yaml 10
├── kafka-cruise-control-metrics.yaml 11
├── kafka-metrics.yaml 12
└── kafka-mirror-maker-2-metrics.yaml 13

1
Grafana 仪表板示例.
2
Grafana 镜像的安装文件。
3
OPENSHIFT 3.11 ONLY: Additional Prometheus 配置提取 CPU、内存和磁盘卷用量的指标,它们直接来自节点上的 OpenShift cAdvisor 代理和 kubelet。
4
用于通过 Alertmanager 发送通知的 hook 定义。
5
OPENSHIFT 3.11 ONLY :用于部署和配置 Alertmanager 的资源。
6
用于 Prometheus Alertmanager 的警报规则示例。
7
OPENSHIFT 3.11 ONLY:Prometheus 镜像的安装资源文件。
8
Prometheus Operator 将 PodMonitor 定义转换为作业,以便 Prometheus 服务器能够直接从 pod 中提取指标数据。
9
启用指标数据的 Kafka Bridge 资源。
10
为 Kafka Connect 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
11
定义 Prometheus JMX Exporter 重新标记 Cruise Control 规则的指标配置。
12
为 Kafka 和 ZooKeeper 定义 Prometheus JMX Exporter 重新标记规则的指标配置。
13
为 Kafka Mirror Maker 2.0 定义 Prometheus JMX Exporter 重新标记规则的指标配置。

7.1.1. Grafana 仪表板示例

为监控以下资源提供了 Grafana 仪表板示例:

AMQ Streams Kafka

显示以下指标:

  • 在线代理数
  • 集群计数中的活跃控制器
  • 未清理领导选举率
  • 在线副本
  • 复制不足的分区数
  • 至少在同步副本数中的分区
  • 在同步副本数中最小分区
  • 没有活跃领导且因此不可写入或可读的分区
  • Kafka 代理 pod 内存用量
  • 聚合 Kafka 代理 pod CPU 用量
  • Kafka 代理 pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 传入字节速率总数
  • 传出字节速率总数
  • 传入的消息率
  • 生成请求率总数
  • 字节率
  • 生成请求率
  • 获取请求率
  • 网络处理器平均空闲百分比
  • 请求处理程序平均时间空闲百分比
  • 日志大小
AMQ Streams ZooKeeper

显示以下指标:

  • Zookeeper ensemble 的仲裁大小
  • 活动 连接
  • 服务器数中的排队请求
  • watchers 数
  • zookeeper pod 内存用量
  • ZooKeeper pod CPU 使用量聚合
  • zookeeper pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 服务器响应客户端请求所需的时间(最大值、最小值和平均值)
AMQ Streams Kafka Connect

显示以下指标:

  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Kafka MirrorMaker 2

显示以下指标:

  • 连接器数量
  • 任务数量
  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Operator

显示以下指标:

  • 自定义资源
  • 每小时成功对自定义资源进行协调
  • 每小时自定义资源协调失败
  • 每小时无锁定的调节
  • 协调开始时间
  • 每小时定期协调
  • 最大协调时间
  • 平均协调时间
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

还为 AMQ Streams 的 Kafka BridgeCruise Control 组件提供仪表板。

所有控制面板都提供 JVM 指标,以及特定于各个组件的指标。例如,Operator 仪表板提供有关正在处理的协调或自定义资源数量的信息。

7.1.2. Prometheus 指标配置示例

AMQ Streams 使用 Prometheus JMX Exporter 来通过 HTTP 端点公开 JMX 指标,然后由 Prometheus 提取。

Grafana 仪表板依赖于 Prometheus JMX Exporter 重新标记规则,这些规则是为 AMQ Streams 组件定义的,作为自定义资源配置。

标签是 name-value 对。重新标记是动态编写标签的过程。例如,标签的值可以从 Kafka 服务器名称和客户端 ID 中获得。

AMQ Streams 提供示例自定义资源配置 YAML 文件,其中已定义了重新标记规则。部署 Prometheus 指标配置时,您可以部署示例自定义资源,或将指标配置复制到您自己的自定义资源定义中。

表 7.1. 带有指标配置的自定义资源示例

组件自定义资源YAML 文件示例

Kafka 和 ZooKeeper

kafka

kafka-metrics.yaml

Kafka Connect

KafkaConnectKafkaConnectS2I

kafka-connect-metrics.yaml

Kafka MirrorMaker 2.0

KafkaMirrorMaker2

kafka-mirror-maker-2-metrics.yaml

Kafka Bridge

KafkaBridge

kafka-bridge-metrics.yaml

Sything Control

kafka

kafka-cruise-control-metrics.yaml

其它资源

7.2. 部署 Prometheus 指标配置

AMQ Streams 提供了带有重新标记规则 的示例自定义资源配置 YAML 文件

要应用重新标记规则的指标配置,请执行以下操作之一:

7.2.1. 将 Prometheus metrics 配置复制到自定义资源

要使用 Grafana 仪表板来监控,请将 指标配置示例复制到自定义资源 中。

在这个流程中,会更新 Kafka 资源,但所有支持监控的组件的步骤都相同。

流程

对部署中的每个 Kafka 资源执行以下步骤。

  1. 更新编辑器中的 Kafka 资源。

    oc edit kafka KAFKA-CONFIG-FILE
  2. kafka-metrics.yaml中的示例配置 复制到您自己的 Kafka 资源定义中。
  3. 保存文件,并等待更新的资源被协调。

7.2.2. 使用 Prometheus 指标配置部署 Kafka 集群

要使用 Grafana 仪表板进行监控,您可以使用 指标配置部署一个示例 Kafka 集群

在这一流程中,kafka-metrics.yaml 文件用于 Kafka 资源。

流程

7.3. 在 OpenShift 4 中查看 Kafka 指标和仪表板

当 AMQ Streams 部署到 OpenShift Container Platform 4.x 时,指标 将通过监控用户定义的项目 来提供。此 OpenShift 功能可让开发人员访问单独的 Prometheus 实例来监控其自己的项目(如 Kafka 项目)。

如果启用了对用户定义的项目的监控,openshift -user-workload-monitoring 项目包含以下组件:

  • Prometheus Operator
  • Prometheus 实例(由 Prometheus Operator 自动部署)
  • Thanos Ruler 实例

AMQ Streams 使用这些组件来消耗指标。

集群管理员必须启用对用户定义的项目的监控,然后授予开发人员和其他用户权限来监控其项目中的应用。

Grafana 部署

您可以将 Grafana 实例部署到包含 Kafka 集群的项目中。然后,可以将 Grafana 仪表板示例用于在 Grafana 用户界面中视觉化 AMQ Streams 的 Prometheus 指标。

重要

openshift-monitoring 项目为核心平台组件提供监控。不要在此 项目中使用 Prometheus 和 Grafana 组件来配置 OpenShift Container Platform 4.x 上的 AMQ Streams 监控。

Grafana 版本 6.3 是最低支持的版本。

先决条件

流程提纲

要在 OpenShift Container Platform 4.x 中设置 AMQ Streams 监控,请按照以下步骤操作:

7.3.1. 部署 Prometheus 资源

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

要启用 Prometheus 以使用 Kafka 指标,您可以在示例指标文件中配置和部署 PodMonitor 资源PodMonnitors 直接从 Apache Kafka、ZooKeeper、Operator、Kafka Bridge 和 Cruise Control 的 pod 中提取数据。

然后,您为 Alertmanager 部署示例警报规则。

先决条件

流程

  1. 检查是否启用了对用户定义的项目的监控:

    oc get pods -n openshift-user-workload-monitoring

    如果启用,则返回监控组件的 Pod。例如:

    NAME                                   READY   STATUS    RESTARTS   AGE
    prometheus-operator-5cc59f9bc6-kgcq8   1/1     Running   0          25s
    prometheus-user-workload-0             5/5     Running   1          14s
    prometheus-user-workload-1             5/5     Running   1          14s
    thanos-ruler-user-workload-0           3/3     Running   0          14s
    thanos-ruler-user-workload-1           3/3     Running   0          14s

    如果没有返回 pod,则禁用对用户定义的项目的监控。请参阅 第 7.3 节 “在 OpenShift 4 中查看 Kafka 指标和仪表板” 中的前提条件。

  2. example /metrics/prometheus-install/strimzi-pod-monitor.yaml 中定义了多个 PodMonitor 资源

    对于每个 PodMonitor 资源,编辑 spec.namespaceSelector.matchNames 属性:

    apiVersion: monitoring.coreos.com/v1
    kind: PodMonitor
    metadata:
      name: cluster-operator-metrics
      labels:
        app: strimzi
    spec:
      selector:
        matchLabels:
          strimzi.io/kind: cluster-operator
      namespaceSelector:
        matchNames:
          - PROJECT-NAME 1
      podMetricsEndpoints:
      - path: /metrics
        port: http
    # ...
    1
    Pod 从中提取指标的项目,如 Kafka
  3. strimzi-pod-monitor.yaml 文件部署到运行 Kafka 集群的项目中:

    oc apply -f strimzi-pod-monitor.yaml -n MY-PROJECT
  4. 将 Prometheus 规则示例部署到同一项目中:

    oc apply -f prometheus-rules.yaml -n MY-PROJECT

其它资源

7.3.2. 为 Grafana 创建服务帐户

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

AMQ Streams 的 Grafana 实例需要使用分配 cluster-monitoring-view 角色的服务帐户运行。

流程

  1. 为 Grafana 创建 ServiceAccount。此处资源命名为 grafana-serviceaccount

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: grafana-serviceaccount
      labels:
        app: strimzi
  2. ServiceAccount 部署到包含 Kafka 集群的项目中:

    oc apply -f GRAFANA-SERVICEACCOUNT -n MY-PROJECT
  3. 创建一个 ClusterRoleBinding 资源,它将 cluster-monitoring-view 角色分配给 Grafana ServiceAccount。此处资源命名为 grafana-cluster-monitoring-binding

    apiVersion: rbac.authorization.k8s.io/v1
    kind: ClusterRoleBinding
    metadata:
      name: grafana-cluster-monitoring-binding
      labels:
        app: strimzi
    subjects:
      - kind: ServiceAccount
        name: grafana-serviceaccount
        namespace: MY-PROJECT 1
    roleRef:
      kind: ClusterRole
      name: cluster-monitoring-view
      apiGroup: rbac.authorization.k8s.io
    1
    您的项目名称。
  4. ClusterRoleBinding 部署到包含 Kafka 集群的项目:

    oc apply -f GRAFANA-CLUSTER-MONITORING-BINDING -n MY-PROJECT

7.3.3. 使用 Prometheus 数据源部署 Grafana

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

此流程描述了如何部署为 OpenShift Container Platform 4.x 监控堆栈配置的 Grafana 应用程序。

OpenShift Container Platform 4.x 在 openshift-monitoring 项目中包含一个 Thanos Querier 实例。Thanos Querier 用于聚合平台指标。

要使用所需的平台指标,您的 Grafana 实例需要一个可以连接到 Thanos Querier 的 Prometheus 数据源。要配置此连接,您需要创建一个 Config Map,它将使用令牌验证与 Thanos Querier 一起运行的 oauth-proxy sidecar。datasource.yaml 文件用作 Config Map 的来源。

最后,您要使用作为卷挂载的 Config Map 将 Grafana 应用程序部署到包含 Kafka 集群的项目中。

流程

  1. 获取 Grafana ServiceAccount 的访问令牌:

    oc serviceaccounts get-token grafana-serviceaccount -n MY-PROJECT

    复制要在下一步中使用的访问令牌。

  2. 创建一个包含 Grafana Thanos Querier 配置的 数据源.yaml 文件。

    将访问令牌粘贴到 httpHeaderValue1 属性中,如下所示:

    apiVersion: 1
    
    datasources:
    - name: Prometheus
      type: prometheus
      url: https://thanos-querier.openshift-monitoring.svc.cluster.local:9091
      access: proxy
      basicAuth: false
      withCredentials: false
      isDefault: true
      jsonData:
        timeInterval: 5s
        tlsSkipVerify: true
        httpHeaderName1: "Authorization"
      secureJsonData:
        httpHeaderValue1: "Bearer ${GRAFANA-ACCESS-TOKEN}" 1
      editable: true
    1
    GRAFANA-ACCESS-TOKEN :Grafana ServiceAccount 的访问令牌值。
  3. datasource.yaml 文件创建一个名 为grafana-config 的 Config Map:

    oc create configmap grafana-config --from-file=datasource.yaml -n MY-PROJECT
  4. 创建包含 Deployment 和 Service 的 Grafana 应用程序。

    The grafana-config Config Map 被挂载为数据源配置的卷。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: grafana
      labels:
        app: strimzi
    spec:
      replicas: 1
      selector:
        matchLabels:
          name: grafana
      template:
        metadata:
          labels:
            name: grafana
        spec:
          serviceAccountName: grafana-serviceaccount
          containers:
          - name: grafana
            image: grafana/grafana:6.3.0
            ports:
            - name: grafana
              containerPort: 3000
              protocol: TCP
            volumeMounts:
            - name: grafana-data
              mountPath: /var/lib/grafana
            - name: grafana-logs
              mountPath: /var/log/grafana
            - name: grafana-config
              mountPath: /etc/grafana/provisioning/datasources/datasource.yaml
              readOnly: true
              subPath: datasource.yaml
            readinessProbe:
              httpGet:
                path: /api/health
                port: 3000
              initialDelaySeconds: 5
              periodSeconds: 10
            livenessProbe:
              httpGet:
                path: /api/health
                port: 3000
              initialDelaySeconds: 15
              periodSeconds: 20
          volumes:
          - name: grafana-data
            emptyDir: {}
          - name: grafana-logs
            emptyDir: {}
          - name: grafana-config
            configMap:
              name: grafana-config
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: grafana
      labels:
        app: strimzi
    spec:
      ports:
      - name: grafana
        port: 3000
        targetPort: 3000
        protocol: TCP
      selector:
        name: grafana
      type: ClusterIP
  5. 将 Grafana 应用程序部署到包含 Kafka 集群的项目中:

    oc apply -f GRAFANA-APPLICATION -n MY-PROJECT

其它资源

7.3.4. 创建到 Grafana 服务的路由

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

您可以通过公开 Grafana 服务的路由访问 Grafana 用户界面。

流程

  • 创建到 grafana 服务的边缘路由:

    oc create route edge MY-GRAFANA-ROUTE --service=grafana --namespace=KAFKA-NAMESPACE

7.3.5. 导入 Grafana 仪表板示例

注意

在 OpenShift Container Platform 4.x 上运行 AMQ Streams 时使用此流程。

使用 Grafana 用户界面导入示例 Grafana 仪表板。

流程

  1. 获取到 Grafana 服务的路由详情。例如:

    oc get routes
    
    NAME               HOST/PORT                         PATH  SERVICES
    MY-GRAFANA-ROUTE   MY-GRAFANA-ROUTE-amq-streams.net        grafana
  2. 在 Web 浏览器中,使用 Route 主机和端口的 URL 访问 Grafana 登录屏幕。
  3. 输入您的用户名和密码,然后单击 Log In

    默认的 Grafana 用户名和密码都是 admin。第一次登录后,您可以更改密码。

  4. Configuration > Data Sources 中,检查是否创建了 Prometheus 数据源。该数据源是在 第 7.3.3 节 “使用 Prometheus 数据源部署 Grafana” 中创建的。
  5. 点击 Dashboards > Manage,然后点 Import
  6. 示例/metrics/grafana-dashboards 中,复制仪表板的 JSON 以导入。
  7. 将 JSON 粘贴到文本框中,然后单击 Load
  8. 为另一个示例 Grafana 仪表板重复步骤 1 -7。

导入的 Grafana 仪表板可以从 Dashboards 主页查看。

7.4. 在 OpenShift 3.11 中查看 Kafka 指标和仪表板

当将 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,您可以使用 Prometheus 为 AMQ Streams 提供的示例 Grafana 仪表板提供监控数据。您需要手动将 Prometheus 组件部署到集群中。

要运行 Grafana 仪表板示例,您必须:

注意

本节中引用的资源旨在作为设置监控的起点,但仅作为示例提供。如果需要进一步支持在生产环境中配置和运行 Prometheus 或 Grafana,请尝试联系其各自的社区。

7.4.1. Prometheus 支持

当 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,不支持 Prometheus 服务器。但是,支持 Prometheus 端点和用于公开指标的 Prometheus JMX Exporter。

为方便起见,我们提供详细说明和示例指标配置文件,以便您希望使用 Prometheus 进行监控。

7.4.2. 设置 Prometheus

注意

在 OpenShift Container Platform 3.11 上运行 AMQ Streams 时,请使用以下步骤。

Prometheus 为系统监控和警报通知提供一套开源的组件。

此处我们介绍了如何在将 AMQ Streams 部署到 OpenShift Container Platform 3.11 时,如何使用提供的 Prometheus 镜像和配置文件来运行和管理 Prometheus 服务器。

先决条件

  • 您已将 Prometheus 和 Grafana 的兼容版本部署到 OpenShift Container Platform 3.11 集群中。
  • 用于运行 Prometheus 服务器 pod 的服务帐户有权访问 OpenShift API 服务器。这允许服务帐户检索集群中获取指标的 pod 列表。

    如需更多信息,请参阅 发现服务

7.4.2.1. Prometheus 配置

AMQ Streams 为 Prometheus 服务器提供了示例配置文件

为部署提供了一个 Prometheus 镜像:

  • prometheus.yaml

以下文件中还提供了与 Prometheus 相关的其他配置:

  • prometheus-additional.yaml
  • prometheus-rules.yaml
  • strimzi-pod-monitor.yaml

若要让 Prometheus 获取监控数据,您必须已将 Prometheus 的兼容版本部署到 OpenShift Container Platform 3.11 集群中。

然后,使用配置文件来部署 Prometheus

7.4.2.2. Prometheus 资源

应用 Prometheus 配置时,会在 OpenShift 集群中创建并由 Prometheus Operator 管理以下资源:

  • 为 Prometheus 授予权限来读取 Kafka 和 ZooKeeper pod、cAdvisor 和 kubelet 针对容器指标公开的健康端点的 ClusterRole
  • 要运行的 Prometheus Pod 的 ServiceAccount
  • 一个 ClusterRoleBinding,它将 ClusterRole 绑定到 ServiceAccount
  • 用于管理 Prometheus Operator pod 的 Deployment
  • 用于管理 Prometheus pod 配置的 PodMonitor
  • 用于管理 Prometheus pod 配置的 Prometheus。
  • 用于管理 Prometheus pod 的警报规则的 PrometheusRule
  • 用于管理其他 Prometheus 设置的 Secret
  • 一个 服务,允许在集群中运行的应用程序连接到 Prometheus(例如,使用 Prometheus 作为数据源的 Grafana)。

7.4.2.3. 部署 Prometheus

要在 Kafka 集群中获取监控数据,您可以通过应用 Prometheus docker 镜像的示例安装资源文件以及 Prometheus 相关资源的 YAML 文件来使用您自己的 Prometheus 部署或部署 Prometheus 部署

部署过程创建一个 ClusterRoleBinding,并在为部署指定的命名空间中发现 Alertmanager 实例。

先决条件

流程

  1. 根据要安装到的命名空间 Prometheus 修改 Prometheus 安装文件(prometheus.yaml):

    在 Linux 中,使用:

    sed -i 's/namespace: .*/namespace: my-namespace/' prometheus.yaml

    在 MacOS 中,使用:

    sed -i '' 's/namespace: .*/namespace: my-namespace/' prometheus.yaml
  2. 编辑 PodMonitor 资源 in strimzi-pod-monitor.yaml,以定义 Prometheus 作业,它将从 pod 中提取指标数据。

    namespaceSelector.matchNames 属性更新为 pod 要从中提取指标的命名空间。

    PodMonitor 用于直接从 pod 中提取数据,用于 Apache Kafka、ZooKeeper、Operator、Kafka Bridge 和 Cruise Control。

  3. 编辑 prometheus.yaml 安装文件,使其包含用于直接从节点提取指标的额外配置。

    提供的 Grafana 仪表板显示 CPU、内存和磁盘卷使用情况的指标,它们直接来自节点上的 OpenShift cAdvisor 代理和 kubelet。

    1. 从配置文件(如 example /metrics/prometheus-additional-properties 目录中的 prometheus-additional.yaml )创建 Secret 资源:

      oc apply -f prometheus-additional.yaml
    2. 编辑 prometheus.yaml 文件中的 additionalScrapeConfigs 属性,使其包含 Secret 的名称和 prometheus-additional.yaml 文件。
  4. 部署 Prometheus 资源:

    oc apply -f strimzi-pod-monitor.yaml
    oc apply -f prometheus-rules.yaml
    oc apply -f prometheus.yaml

7.4.3. 设置 Prometheus Alertmanager

Prometheus Alertmanager 是一个插件,用于处理警报并将其路由到通知服务。Alertmanager 支持监控的一个重要方面,它将收到根据警报规则指示潜在问题的条件通知。

7.4.3.1. Alertmanager 配置

AMQ Streams 为 Prometheus Alertmanager 提供示例配置文件

配置文件定义用于部署 Alertmanager 的资源:

  • alert-manager.yaml

一个额外的配置文件提供用于从 Kafka 集群发送通知的 hook 定义。

  • alert-manager-config.yaml

对于 Alertmanger 处理 Prometheus 警报,请使用配置文件:

7.4.3.2. 警报规则

警报规则提供有关指标中观察到的特定条件的通知。规则在 Prometheus 服务器上声明,但 Prometheus Alertmanager 负责警报通知。

Prometheus 警报规则描述使用不断评估的 PromQL 表达式的条件。

当警报表达式变为 true 时,满足该条件,Prometheus 服务器会向 Alertmanager 发送警报数据。然后,Alertmanager 使用为其部署配置的通信方法发出通知。

Alertmanager 可以配置为使用电子邮件、聊天消息或其他通知方法。

其它资源

有关设置警报规则的更多信息,请参阅 Prometheus 文档中的 配置

7.4.3.3. 警报规则示例

Kafka 和 ZooKeeper 指标的警报规则示例随 AMQ Streams 提供,供 Prometheus 部署 使用。

关于警报规则定义的常规要点:

  • for 属性与规则配合使用,以确定条件在触发警报前必须保留的时间。
  • tick 是一个基本的 ZooKeeper 时间单元,以毫秒为单位计算并使用 Kafka.spec.zookeeper.configtickTime 参数进行配置。例如,如果 ZooKeeper tickTime=3000,3 ticks(3 x 3000)等于 9000 毫秒。
  • ZookeeperRunningOutOfSpace 指标和警报的可用性取决于所使用的 OpenShift 配置和存储实施。某些平台的存储实施可能无法提供有关指标提供警报所需的可用空间的信息。

Kafka 警报规则

UnderReplicatedPartitions
提供当前代理是潜在副本但副本数比为 min.insync.replicas 配置的 副本数更少的分区数量。这个指标提供了有关托管后续副本的代理的信息。这些追随者跟不上领导者的步调。其原因可能包括(或已经)脱机,以及过限内联复制。当此值大于零时引发警报,以提供关于每个代理的副本分区的信息。
AbnormalControllerState
指示当前代理是否为集群的控制器。指标可以是 0 或 1。在集群生命周期内,应该只有一个代理是控制器,集群始终需要有一个活跃的控制器。有两个或多个代理表示它们是控制器,这表示问题。如果条件仍然存在,则当所有代理上此指标的所有值总和不等于 1 时,就会引发警报,即没有活跃的控制器(总和为 0)或多个控制器(总和大于 1)。
UnderMinIsrPartitionCount
表示领导 Kafka 代理的最小内同步副本(ISR)数量最少(使用 min.insync.replicas 指定),必须确认没有达到写入操作。指标定义代理所负责的分区数量,其中同步副本数小于最小内同步副本数。当这个值大于零时引发警报,为没有达到最小确认数的每个代理提供分区数的信息。
OfflineLogDirectoryCount
指明离线的日志目录数量(例如,由于硬件故障),以便代理无法再存储传入的消息。如果此值大于零,则会引发警报,以提供关于每个代理的离线日志目录数量的信息。
KafkaRunningOutOfSpace
表示可用于写入数据的剩余磁盘空间量。当此值低于 5GiB 时,将引发警报,提供关于每个持久卷声明的空间不足的磁盘信息。阈值可以在 prometheus-rules.yaml 中更改。

zookeeper 警报规则

AvgRequestLatency
表示服务器响应客户端请求所需的时间。当此值大于 10(勾号)时,会引发警报,从而提供每台服务器平均请求延迟的实际值。
OutstandingRequests
表示服务器中的排队请求数。当服务器接收的请求数超过其处理次数时,这个值会增加。如果此值大于 10,则会引发警报,从而为每个服务器提供实际的待处理请求数。
ZookeeperRunningOutOfSpace
表示可用于向 ZooKeeper 写入数据的剩余磁盘空间量。当此值低于 5GiB 时,会引发警报。它提供有关每个持久卷声明的空间不足的磁盘信息。

7.4.3.4. 部署 Alertmanager

要部署 Alertmanager,请应用 示例配置文件

AMQ Streams 提供的示例配置将 Alertmanager 配置为将通知发送到 Slack 频道。

在部署中定义了以下资源:

  • 用于管理 Alertmanager pod 的 Alertmanager。
  • 用于管理 Alertmanager 配置的机密。
  • 服务,用于为其他服务(如 Prometheus)连接 Alertmanager 提供简单参考主机名的服务。

流程

  1. 从 Alertmanager 配置文件(example /metrics/prometheus-alertmanager-config 目录中的 alert-manager-config.yaml )创建 Secret 资源:

    oc apply -f alert-manager-config.yaml
  2. 更新 alert-manager-config.yaml 文件以替换:

    • slack _api_url 属性,以及与 Slack 工作区应用相关的 Slack API URL 的实际值
    • 具有 要在其上发送通知的实际 Slack 频道的频道属性
  3. 部署 Alertmanager:

    oc apply -f alert-manager.yaml

7.4.4. 设置 Grafana

Grafana 提供 Prometheus 指标的视觉化。

您可以部署和启用 AMQ Streams 提供的 Grafana 仪表板示例。

7.4.4.1. 部署 Grafana

要提供 Prometheus 指标的视觉化,您可以使用您自己的 Grafana 安装,或通过应用 example /metrics 目录中提供的 grafana.yaml 文件来部署 Grafana。

流程

  1. 部署 Grafana:

    oc apply -f grafana.yaml
  2. 启用 Grafana 仪表板

7.4.4.2. 启用 Grafana 仪表板示例

AMQ Streams 为 Grafana 提供了仪表板配置文件示例。示例仪表板作为 JSON 文件在 example/metrics/grafana-dashboards 目录中提供:

  • strimzi-kafka.json
  • strimzi-zookeeper.json
  • strimzi-operators.json
  • strimzi-kafka-connect.json
  • strimzi-kafka-mirror-maker-2.json
  • strimzi-kafka-bridge.json
  • strimzi-cruise-control.json
  • strimzi-kafka-exporter.json

示例仪表板是监控关键指标的良好起点,但它们并不代表所有可用的指标。您可以根据您的基础架构修改示例仪表板或添加其他指标。

设置 Prometheus 和 Grafana 后,您可以在 Grafana 仪表板上视觉化 AMQ Streams 数据。

注意

没有定义警报通知规则。

访问仪表板时,您可以使用 port-forward 命令将 Grafana pod 的流量转发到主机。

注意

Grafana pod 的名称为每个用户不同。

流程

  1. 获取 Grafana 服务的详情:

    oc get service grafana

    例如:

    名称类型CLUSTER-IP端口.

    grafana

    ClusterIP

    172.30.123.40

    3000/TCP

    请注意端口转发的端口号。

  2. 使用 port-forward 将 Grafana 用户界面重定向到 localhost:3000:

    oc port-forward svc/grafana 3000:3000
  3. 将 Web 浏览器指向 http://localhost:3000

    这时会出现 Grafana Log In 页面。

  4. 输入您的用户名和密码,然后单击 Log In

    默认的 Grafana 用户名和密码都是 admin。第一次登录后,您可以更改密码。

  5. 添加 Prometheus 作为 数据源

  6. DashboardsImport,上传示例仪表板或直接粘贴 JSON。
  7. 在顶部标头中,单击仪表板下拉菜单,然后选择您要查看的仪表板。

    当 Prometheus 服务器收集 AMQ Streams 集群的指标一段时间时,会填充仪表板。

图 7.1. 仪表板选择选项

AMQ Streams 仪表板选择
AMQ Streams Kafka

显示以下指标:

  • 在线代理数
  • 集群计数中的活跃控制器
  • 未清理领导选举率
  • 在线副本
  • 复制不足的分区数
  • 至少在同步副本数中的分区
  • 在同步副本数中最小分区
  • 没有活跃领导且因此不可写入或可读的分区
  • Kafka 代理 pod 内存用量
  • 聚合 Kafka 代理 pod CPU 用量
  • Kafka 代理 pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 传入字节速率总数
  • 传出字节速率总数
  • 传入的消息率
  • 生成请求率总数
  • 字节率
  • 生成请求率
  • 获取请求率
  • 网络处理器平均空闲百分比
  • 请求处理程序平均时间空闲百分比
  • 日志大小

    图 7.2. AMQ Streams Kafka 仪表板

    Kafka 仪表板
AMQ Streams ZooKeeper

显示以下指标:

  • Zookeeper ensemble 的仲裁大小
  • 活动 连接
  • 服务器数中的排队请求
  • watchers 数
  • zookeeper pod 内存用量
  • ZooKeeper pod CPU 使用量聚合
  • zookeeper pod 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
  • 服务器响应客户端请求所需的时间(最大值、最小值和平均值)
AMQ Streams Operator

显示以下指标:

  • 自定义资源
  • 每小时成功对自定义资源进行协调
  • 每小时自定义资源协调失败
  • 每小时无锁定的调节
  • 协调开始时间
  • 每小时定期协调
  • 最大协调时间
  • 平均协调时间
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数
AMQ Streams Kafka Connect

显示以下指标:

  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Kafka MirrorMaker 2

显示以下指标:

  • 连接器数量
  • 任务数量
  • 传入字节速率总数
  • 传出字节速率总数
  • 磁盘用量
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
AMQ Streams Kafka Bridge
请参阅 第 7.6 节 “监控 Kafka Bridge”
AMQ Streams Cruise Control
请参阅 第 7.7 节 “monitor 清理控制”
AMQ Streams Kafka Exporter
请参阅 第 7.5.5 节 “启用 Kafka Exporter Grafana 仪表板”

7.5. 添加 Kafka Exporter

Kafka Exporter 是一个开源项目,用于增强对 Apache Kafka 代理和客户端的监控。Kafka Exporter 附带 AMQ Streams 以用于部署 Kafka 集群,以便从 Kafka 代理中提取与偏移、消费者组、消费者滞后和主题相关的额外指标数据。

例如,使用指标数据来帮助识别慢速消费者。

滞后数据作为 Prometheus 指标公开,随后可在 Grafana 中提供以进行分析。

如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为同时提取 Kafka Exporter Prometheus 端点。

AMQ Streams 在 示例/metrics/grafana-dashboards/strimzi-kafka-exporter.json 中包含 Kafka Exporter 仪表板示例。

7.5.1. 监控消费者滞后

消费滞后表明消息的生产和消耗速度不同。具体来说,消费者对给定消费者组的滞后指示分区中最后一条消息与该消费者当前采用的消息之间的延迟。

与分区日志末尾相比,滞后反映了消费者偏移的位置。

生产者和消费者偏移的消费者滞后情况

Consumer lag

这种差别有时被称为制作者偏移和消费者偏移之间的 delta : Kafka 代理主题分区中的读取和写入位置。

假设主题流 100 条消息秒。制作者偏移(主题分区头)和消费者最后的偏移之间有 1000 条消息的滞后意味着 10 秒的延迟。

监控消费者滞后的重要性

对于依赖(near)实时数据处理的应用程序,监控消费者滞后情况对于检查它是否不再太大至关重要。滞后越高,进程距离实时处理目标越高。

例如,消费者的滞后可能是由于消耗过多的陈旧数据而导致的,这些数据尚未清除,或者是通过计划外关机造成的。

减少消费者滞后

降低滞后率的典型操作包括:

  • 通过添加新用户来扩展消费者组
  • 增加消息的保留时间以保留在某个主题中
  • 添加更多磁盘容量来增加消息缓冲

降低消费者滞后问题的操作取决于底层基础架构,而 AMQ Streams 支持用例。例如,适应消费者不太可能从其磁盘缓存服务获取请求的代理中受益。在某些情况下,可以同意在消费者发现信息之前自动丢弃消息。

7.5.2. Kafka Exporter 警报规则示例

如果您执行了在部署中引入指标的步骤,您的 Kafka 集群将配置为使用支持 Kafka Exporter 的警报通知规则。

Kafka Exporter 的规则在 prometheus-rules.yaml 中定义,并使用 Prometheus 部署。如需更多信息,请参阅 Prometheus

特定于 Kafka Exporter 的警报通知规则示例如下:

UnderReplicatedPartition
警告主题被复制不足且代理没有复制到足够分区的警报。如果某个主题有一个或多个复制分区,则默认配置用于警报。该警报可能表示 Kafka 实例停机或 Kafka 集群已过载。可能需要计划重启 Kafka 代理,才能重新启动复制过程。
TooLargeConsumerGroupLag
警告使用者组的滞后对于特定主题分区来说太过大的警报。默认配置是 1000 记录。很大的滞后可能意味着消费者太慢,正在落于生产者后面。
NoMessageForTooLong
提醒主题在一段时间内未收到消息的警报。时间段的默认配置为 10 分钟。延迟可能是因为配置问题导致制作者无法将消息发布到该主题。

根据您的特定需求调整这些规则的默认配置。

7.5.3. 公开 Kafka Exporter 指标

Kafka Exporter 会将较慢的信息作为 Prometheus 指标在 Grafana 中公开。

Kafka Exporter 为代理、主题和消费者组公开指标数据。这些指标显示在 example strimzi-kafka-exporter 仪表板上。

此处描述了所提取的数据。

表 7.2. 代理指标输出

名称信息

kafka_brokers

Kafka 集群中代理数量

表 7.3. 主题指标输出

名称信息

kafka_topic_partitions

主题的分区数

kafka_topic_partition_current_offset

代理的当前主题分区偏移

kafka_topic_partition_oldest_offset

代理的最旧主题分区偏移

kafka_topic_partition_in_sync_replica

主题分区的同步副本数

kafka_topic_partition_leader

主题分区的领导代理 ID

kafka_topic_partition_leader_is_preferred

显示 1 如果主题分区正在使用首选代理

kafka_topic_partition_replicas

此主题分区的副本数

kafka_topic_partition_under_replicated_partition

如果主题分区复制不足,显示 1

表 7.4. 消费者组指标输出

名称信息

kafka_consumergroup_current_offset

消费者组的当前主题分区偏移

kafka_consumergroup_lag

某个消费者小组在某个主题分区的近似滞后情况

只有至少有一个消费者组的滞后大于零时,才会在 Kafka Exporter 仪表板中显示消费者组指标。

7.5.4. 配置 Kafka Exporter

此流程演示了如何通过 Kafka Exporter 属性在 Kafka 资源中配置 Kafka Exporter

有关配置 Kafka 资源的更多信息,请参阅 OpenShift 中使用 AMQ Streams 中的 Kafka 集群配置

此流程中显示了与 Kafka Exporter 配置相关的属性。

您可以将这些属性配置为 Kafka 集群的部署或重新部署的一部分。

先决条件

  • OpenShift 集群
  • 一个正在运行的 Cluster Operator

流程

  1. 编辑 Kafka 资源的 Kafka Exporter 属性。

    您可以配置的属性显示在此示例配置中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      # ...
      kafkaExporter:
        image: my-registry.io/my-org/my-exporter-cluster:latest 1
        groupRegex: ".*" 2
        topicRegex: ".*" 3
        resources: 4
          requests:
            cpu: 200m
            memory: 64Mi
          limits:
            cpu: 500m
            memory: 128Mi
        logging: debug 5
        enableSaramaLogging: true 6
        template: 7
          pod:
            metadata:
              labels:
                label1: value1
            imagePullSecrets:
              - name: my-docker-credentials
            securityContext:
              runAsUser: 1000001
              fsGroup: 0
            terminationGracePeriodSeconds: 120
        readinessProbe: 8
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe: 9
          initialDelaySeconds: 15
          timeoutSeconds: 5
    # ...
    1
    ADVANCED OPTION:容器镜像配置,只在特殊情况下推荐这样做
    2
    用于指定要包含在指标中的使用者组的正则表达式。
    3
    用于指定要包含在指标中的主题的正则表达式。
    4
    5
    记录配置,记录具有给定严重性(debug、info、warn、error、fatal)的消息。
    6
    用于启用 Sarama 日志记录的布尔值,Kafka Exporter 使用的 Go 客户端库。
    7
    8
    9
  2. 创建或更新资源:

    oc apply -f kafka.yaml

接下来要做什么

配置和部署 Kafka Exporter 后,您可以 启用 Grafana 来显示 Kafka Exporter 仪表板

7.5.5. 启用 Kafka Exporter Grafana 仪表板

AMQ Streams 为 Grafana 提供了仪表板配置文件示例。Kafka Exporter 仪表板作为 JSON 文件在 example/metrics 目录中提供:

  • strimzi-kafka-exporter.json

如果使用 Kafka 集群部署了 Kafka Exporter,您可以视觉化它在 Grafana 仪表板中公开的指标数据。

此流程假设您已有权访问 Grafana 用户界面,Prometheus 已添加为数据源。如果您是第一次访问用户界面,请参阅 Grafana

流程

  1. 访问 Grafana 用户界面
  2. 选择 Strimzi Kafka Exporter 仪表板。

    收集指标数据一段时间后,会填充 Kafka Exporter chart。

    AMQ Streams Kafka Exporter

    显示以下指标:

    • 主题数量
    • 分区数
    • 副本数
    • 同步副本数
    • 复制不足的分区数
    • 至少在同步副本数中的分区
    • 在同步副本数中最小分区
    • 不在首选节点上的分区
    • 每秒来自主题的消息
    • 每秒从主题中消耗的消息
    • 消费者组每分钟使用的消息
    • 消费组的滞后
    • 分区数
    • 最新偏移
    • 最旧的偏移

使用 Grafana 图表分析滞后情况,检查减少滞后问题的操作是否对受影响的消费者组产生影响。例如,如果对 Kafka 代理进行调整以减少滞后,仪表板将显示 Lag by consumer 组 图表停机,并且 每分钟消耗的 Messages 都会增加。

7.6. 监控 Kafka Bridge

如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为也提取 Kafka Bridge Prometheus 端点。

Kafka Bridge 的 Grafana 仪表板示例提供:

  • 有关 HTTP 连接和不同端点相关请求的信息
  • 有关网桥使用的 Kafka 消费者和生产者的信息
  • 网桥本身的 JVM 指标

7.6.1. 配置 Kafka 网桥

您可以使用 enableMetrics 属性在 KafkaBridge 资源中启用 Kafka Bridge 指标。

您可以将此属性配置为 Kafka Bridge 部署或重新部署的一部分。

例如:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  bootstrapServers: my-cluster-kafka:9092
  http:
    # ...
  enableMetrics: true
  # ...

7.6.2. 启用 Kafka Bridge Grafana 仪表板

如果使用 Kafka 集群部署 Kafka Bridge,您可以启用 Grafana 以显示它公开的指标数据。

example /metrics 目录中以 JSON 文件形式提供了 Kafka Bridge 仪表板:

  • strimzi-kafka-bridge.json

当收集指标数据一段时间后,会填充 Kafka Bridge chart。

Kafka Bridge

显示以下指标:

  • 到 Kafka 网桥计数的 HTTP 连接
  • 处理中的 HTTP 请求数
  • 每秒处理的请求按 HTTP 方法分组
  • 按响应代码分组的总请求率(2XX、4XX、5XX)
  • 每秒接收和发送的字节数
  • 对每个 Kafka Bridge 端点的请求
  • Kafka Bridge 使用的 Kafka 用户、生产者和相关打开连接的数量
  • Kafka 制作者:

    • 每秒发送的记录平均数量(按主题分组)
    • 每秒发送到所有代理的传出字节数(按主题分组)
    • 导致错误的每秒记录平均数量(按主题分组)
  • Kafka 用户:

    • 每秒消耗的平均记录数(按 clientId-topic 进行分组)
    • 每秒消耗的平均字节数(按 clientId-topic 进行分组)
    • 分配分区(由 clientId 进行分组)
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

7.7. monitor 清理控制

如果您已在使用 Prometheus 和 Grafana 监控内置 Kafka 指标,您可以将 Prometheus 配置为还提取 Cruise Control Prometheus 端点。

Cruise Control 的 Grafana 仪表板示例提供:

  • 有关优化计算、目标违反、集群均衡等的信息
  • 有关重新平衡调整和实际重新平衡操作的 REST API 调用的信息
  • 来自清理控制本身的 JVM 指标

7.7.1. 配置清理控制

使用 Kafka 资源中的 cruise Control.metricsConfig 属性启用 Cruise Control 指标,以提供对包含要公开的指标的 JMX 导出器配置的 ConfigMap 的引用。

例如:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  # ...
  kafka:
    # ...
  zookeeper:
    # ...
  cruiseControl:
    metricsConfig:
       type: jmxPrometheusExporter
       valueFrom:
         configMapKeyRef:
           name: my-config-map
           key: my-key

7.7.2. 启用 Cruise Control Grafana 仪表板

如果您使用启用了指标的 Kafka 集群部署了 Cruise Control,您可以启用 Grafana 呈现它公开的指标数据。

example /metrics 目录中以 JSON 文件的形式提供了一个 Cruise Control 仪表板:

  • strimzi-cruise-control.json

收集指标数据一段时间后,会填充 Cruise Control chart。

Sything Control

显示以下指标:

  • Cruise Control 监控的快照窗口数量
  • 有效时间窗数量,因为它们包含足够的样本来计算优化建议
  • 运行的连续执行数量,用于调整或重新平衡
  • 由 Cruise Control 的异常检测器组件计算的 Kafka 集群的当前平衡度分数(默认为 5 分钟)
  • 受监控分区的百分比
  • 异常检测器报告的目标违反数量(默认为 5 分钟)
  • 代理中磁盘读取失败的频率
  • 指标示例获取失败的速度
  • 计算优化建议所需的时间
  • 创建集群模型所需的时间
  • 通过 Cruise Control REST API 进行请求或实际重新平衡请求的频率
  • 通过 Cruise Control REST API 请求整个集群状态和用户任务状态的频率
  • 使用的 JVM 内存
  • JVM 垃圾收集时间
  • JVM 垃圾回收计数

第 8 章 升级 AMQ Streams

AMQ Streams 可以升级至 1.8 版本,以利用新功能、增强功能、性能改进和安全选项。

作为升级的一部分,您要将 Kafka 升级到最新支持的版本。每个 Kafka 版本都会为您的 AMQ Streams 部署带来新功能、改进和程序错误修正。

如果您遇到较新版本的问题,AMQ Streams 可以 降级 到以前的版本。

发布的 AMQ Streams 版本列在红帽客户门户网站 的产品下载部分

升级路径

可以使用两种升级路径:

incremental
将 AMQ Streams 从以前的次版本升级到 1.8 版本。
multi-version

在单个升级(跳过一个或多个中间版本)中将 AMQ Streams 从旧版本升级到 1.8。

例如,从 AMQ Streams 1.5 直接升级到 AMQ Streams 1.7。

从早于 1.7 的版本升级

AMQ Streams 1.7 中引入了所有自定义资源的 v1beta2 API 版本。对于 AMQ Streams 1.8,v1alpha1v1beta1 API 版本已从除 KafkaTopic 和 Kafka User 以外的所有 AMQ Streams 自定义资源中删除。

如果您要从 1.7 版本之前的 AMQ Streams 版本升级:

  1. 将 AMQ Streams 升级到 1.7
  2. 将自定义资源转换为 v1beta2
  3. 将 AMQ Streams 升级到 1.8 或更新版本
注意

作为替代方案,您可以从 1.7 版本安装自定义资源,转换资源,然后升级到 1.8 或更高版本。

kafka 版本支持

Kafka 版本 表列出了 AMQ Streams 1.8 支持的 Kafka 版本。在表中:

  • 生产环境支持 最新的 Kafka 版本。
  • 以前的 Kafka 版本只支持升级到 AMQ Streams 1.8。

在开始 AMQ Streams 升级过程前,决定要升级到的 Kafka 版本。

注意

只要您的 AMQ Streams 版本支持,就可以升级到更高的 Kafka 版本。在某些情况下,您还可以降级到以前支持的 Kafka 版本。

停机时间和可用性

如果针对高可用性配置了主题,则升级 AMQ Streams 应该不会给消费者和生产者造成从这些主题中发布和读取数据的停机时间。高可用性主题的复制因子至少为 3 个,分区在代理之间均匀分布。

升级 AMQ Streams 会触发滚动更新,所有代理都会在流程的不同阶段重新启动。在滚动更新过程中,并非所有代理都在线,因此 集群的整体可用性 会临时降低。减少集群可用性会增加代理失败会导致信息丢失的几率。

8.1. 所需的升级顺序

要在没有停机的情况下升级代理和客户端,您必须 按照以下顺序完成 Strimzi 升级过程:

  1. 更新现有的自定义资源,以支持 v1beta2 API 版本。

    在升级到 AMQ Streams 1.7 后进行此操作,但在升级到 AMQ Streams 1.8 或更高版本之前。对于从版本 1.7 之前的多版本升级:

    1. 跳过此步骤,再继续下列步骤以升级到 1.7 版本。
    2. 返回到此步骤并执行升级部分中的所有步骤,以升级到 1.8 或更新版本。
  2. 将 Cluster Operator 更新至新的 AMQ Streams 版本。

    您采取的方法取决于如何 部署 Cluster Operator

    • 如果使用安装 YAML 文件部署 Cluster Operator,请通过修改 Operator 安装文件来执行升级,如 升级 Cluster Operator 中所述。
    • 如果您从 OperatorHub 部署 Cluster Operator,请使用 Operator Lifecycle Manager(OLM)将 AMQ Streams Operator 的更新频道改为新的 AMQ Streams 版本。

      根据您选择的升级策略,在更新频道后,可以:

      • 启动自动升级
      • 安装开始前需要批准手动升级

        如需有关使用 OperatorHub 升级 Operator 的更多信息,请参阅 OpenShift 文档中的 升级已安装的 Operator

  3. 将所有 Kafka 代理和客户端应用程序升级到最新支持的 Kafka 版本。

可选:增量合作重新平衡升级

考虑升级消费者和 Kafka Streams 应用程序,以使用 增量合作重新平衡 协议进行分区重新平衡。

8.2. AMQ Streams 自定义资源升级

在将 AMQ Streams 升级到 1.8 之前,您必须确保自定义资源使用 API 版本 v1beta2。您可以在升级到 AMQ Streams 1.7 后随时执行此操作,但升级必须在升级到 AMQ Streams 1.8 或更新版本前完成。

重要

在升级 Cluster Operator 前,必须将 自定义资源升级到 v1beta2,以便 Cluster Operator 可以了解资源。

注意

将自定义资源升级到 v1beta2 准备 AMQ Streams 以迁移到 OpenShift CRD v1,OpenShift v 1.22 需要该 v1。

CLI 升级到自定义资源

AMQ Streams 提供了 API 转换工具 及其发行工件。

您可以从 AMQ Streams 下载网站下载 其 ZIP 或 TAR.GZ。要使用 工具,请将其解压缩并使用 bin 目录中的脚本。

在 CLI 中,您可以使用该工具以以下两种方式之一将自定义资源格式转换为 v1beta2

转换自定义资源后,您必须将 v1beta2 设置为 CRD 中的存储 API 版本:

手动升级到自定义资源

您可以手动更新每个自定义资源以使用 v1beta2,而不是使用 API 转换工具将自定义资源更新为 v1beta2

更新 Kafka 自定义资源,包括其他组件的配置:

更新应用到您的部署的其他资源:

手动步骤显示对每个自定义资源所做的更改。在进行这些更改后,您必须使用 API 转换工具来升级 CRD。

8.2.1. API 版本控制

自定义资源通过 CRD 添加到 OpenShift 中的 API 进行编辑和控制。换言之,CRD 扩展 Kubernetes API 以允许创建自定义资源。CRD 本身是 OpenShift 中的资源。它们安装在 OpenShift 集群中,用于定义自定义资源的 API 版本。每个自定义资源 API 版本都可以为该版本定义自己的 schema。OpenShift 客户端(包括 AMQ Streams Operators)使用 URL 路径(API 路径)访问 KubernetesAPI 服务器提供的自定义资源,其中包括 API 版本。

v1beta2 的引入更新自定义资源的 schema。v1alpha1v1beta1 版本已被删除。

v1alpha1 API 版本不再用于以下 AMQ Streams 自定义资源:

  • kafka
  • KafkaConnect
  • KafkaConnectS2I
  • KafkaConnector
  • KafkaMirrorMaker
  • KafkaMirrorMaker2
  • KafkaTopic
  • KafkaUser
  • KafkaBridge
  • KafkaRebalance

v1beta1 API 版本不再用于以下 AMQ Streams 自定义资源:

  • kafka
  • KafkaConnect
  • KafkaConnectS2I
  • KafkaMirrorMaker
  • KafkaTopic
  • KafkaUser

8.2.2. 使用 API 转换工具转换自定义资源配置文件

此流程描述了如何使用 API 转换工具将描述 AMQ Streams 自定义资源配置的 YAML 文件转换为适用于 v1beta2 的格式。为此,请使用 convert-file (cf)命令。

convert-file 命令可以转换包含多个文档的 YAML 文件。对于多文档 YAML 文件,其包含的所有 AMQ Streams 自定义资源都会转换。在转换的输出文件中不修改任何非 AMQ Streams OpenShift 资源。

转换 YAML 文件后,您必须应用配置来更新集群中的自定义资源。或者,如果 GitOps 同步机制用于集群上的更新,您可以使用 GitOps 同步机制应用更改。只有在 OpenShift 集群中更新自定义资源时,转换才会完成。

另外,您可以使用 convert-resource 流程直接转换自定义资源

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。
  • API 转换工具,随发行工件一起提供。
  • 工具需要 Java 11。

使用 CLI 帮助来了解更多有关 API 转换工具的信息,以及 convert-file 命令可用的标记:

bin/api-conversion.sh help
bin/api-conversion.sh help convert-file

如果您使用 Windows,请对此流程使用 bin/api-conversion.cmd

表 8.1. YAML 文件转换标志

标志描述

-f,--file=NAME-OF-YAML-FILE

指定正在转换的 AMQ Streams 自定义资源的 YAML 文件

-o, --output=NAME-OF-CONVERTED-YAML-FILE

为转换的自定义资源创建输出 YAML 文件

--in-place

使用转换的 YAML 更新原始源文件

流程

  1. 使用 convert-file 命令和相应的标志运行 API 转换工具。

    示例 1,转换 YAML 文件并显示输出,但文件没有改变:

    bin/api-conversion.sh convert-file --file input.yaml

    示例 2 转换 YAML 文件,并将更改写入原始源文件中:

    bin/api-conversion.sh convert-file --file input.yaml --in-place

    示例 3,转换 YAML 文件,并将更改写入新的输出文件中:

    bin/api-conversion.sh convert-file --file input.yaml --output output.yaml
  2. 使用转换的配置文件更新自定义资源。

    oc apply -f CONVERTED-CONFIG-FILE
  3. 验证自定义资源是否已转换。

    oc get KIND CUSTOM-RESOURCE-NAME -o yaml

8.2.3. 使用 API 转换工具直接转换自定义资源

此流程描述了如何使用 API 转换工具将 OpenShift 集群中的 AMQ Streams 自定义资源直接转换为适用于 v1beta2 的格式。为此,请使用 convert-resource (cr)命令。命令使用 Kubernetes API 进行转换。

您可以根据 kind 属性指定一个或多个 AMQ Streams 自定义资源类型,也可以转换所有类型。您还可以将特定命名空间或所有命名空间用于转换。以命名空间为目标时,您可以转换该命名空间中的所有自定义资源,或通过指定名称和类型来转换单个自定义资源。

另外,您可以使用 convert-file 流程转换和应用描述自定义资源的 YAML 文件

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。
  • API 转换工具,随发行工件一起提供。
  • 该工具需要 Java 11(OpenJDK)。
  • 步骤需要一个具有 RBAC 权限的用户 admin 帐户,以便:

    • 使用 --name 选项获取转换的 AMQ Streams 自定义资源
    • 在不使用 --name 选项的情况下列出正在转换的 AMQ Streams 自定义资源
    • 替换正在转换的 AMQ Streams 自定义资源

使用 CLI 帮助来了解更多有关 API 转换工具的信息,以及 convert-resource 命令可用的标记:

bin/api-conversion.sh help
bin/api-conversion.sh help convert-resource

如果您使用 Windows,请对此流程使用 bin/api-conversion.cmd

表 8.2. 用于转换自定义资源的标记

标志描述

-k,--kind

指定要转换的自定义资源种类,或者在没有指定的情况下转换所有资源

-a,--all-namespaces

转换所有命名空间中的自定义资源

-n,--namespace

指定 OpenShift 命名空间或 OpenShift 项目,否则使用当前命名空间(如果未指定)

--name

如果使用 --namespace 和单个自定义资源 --kind,请指定正在转换的自定义资源的名称

流程

  1. 使用 convert-resource 命令和相应的标志运行 API 转换工具。

    示例 1,转换当前命名空间中的所有 AMQ Streams 资源:

    bin/api-conversion.sh convert-resource

    示例 2 转换所有命名空间中的所有 AMQ Streams 资源:

    bin/api-conversion.sh convert-resource --all-namespaces

    示例 3,转换 my-kafka 命名空间中的所有 AMQ Streams 资源:

    bin/api-conversion.sh convert-resource --namespace my-kafka

    示例 4,仅转换所有命名空间中的 Kafka 资源:

    bin/api-conversion.sh convert-resource --all-namespaces --kind Kafka

    例如,在所有命名空间中转换 Kafka 和 Kafka Connect 资源:

    bin/api-conversion.sh convert-resource --all-namespaces --kind Kafka --kind KafkaConnect

    示例 6,在 my- kafka 命名空间中转换名为 my- cluster 的 Kafka 自定义资源:

    bin/api-conversion.sh convert-resource --kind Kafka --namespace my-kafka --name my-cluster
  2. 验证自定义资源是否已转换。

    oc get KIND CUSTOM-RESOURCE-NAME -o yaml

8.2.4. 使用 API 转换工具将 CRD 升级到 v1beta2

此流程描述了如何使用 API 转换工具转换 CRD,以适用于 v1beta2 的格式转换定义用于实例化和管理 AMQ Streams 特定资源的 schema。为此,请使用 crd-upgrade 命令。

在将整个 OpenShift 集群中的所有 AMQ Streams 自定义资源转换为 v1beta2 后执行此步骤。如果您首先升级 CRD,然后转换自定义资源,则需要再次运行这个命令。

该命令更新了 CRD 中的 spec.versions,以声明 v1beta2 作为 存储 API 版本。该命令还会更新自定义资源,使其存储在 v1beta2 下。新的自定义资源实例是从存储 API 版本规格创建的,因此只有一个 API 版本被标记为存储版本。

当您升级 CRD 以使用 v1beta2 作为存储版本时,您应该仅在自定义资源中使用 v1beta2 属性。

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。
  • API 转换工具,随发行工件一起提供。
  • 该工具需要 Java 11(OpenJDK)。
  • 自定义资源已转换为 v1beta2
  • 步骤需要一个具有 RBAC 权限的用户 admin 帐户,以便:

    • 列出所有命名空间中的 AMQ Streams 自定义资源
    • 替换正在转换的 AMQ Streams 自定义资源
    • 更新 CRD
    • 替换 CRD 的状态

使用 CLI 帮助来了解更多有关 API 转换工具的信息:

bin/api-conversion.sh help

如果您使用 Windows,请对此流程使用 bin/api-conversion.cmd

流程

  1. 如果您还没有这样做,请将自定义资源转换为使用 v1beta2

    您可以使用 API 转换工具以以下两种方式之一完成此操作:

  2. 使用 crd-upgrade 命令运行 API 转换工具。

    bin/api-conversion.sh crd-upgrade
  3. 验证 CRD 是否已升级,以便 v1beta2 是存储版本。

    例如,对于 Kafka 主题 CRD:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: CustomResourceDefinition
    metadata:
      name: kafkatopics.kafka.strimzi.io
      #...
    spec:
      group: kafka.strimzi.io
      #...
      versions:
      - name: v1beta2
        served: true
        storage: true
        #...
    status:
      #...
      storedVersions:
      - v1beta2

8.2.5. 升级 Kafka 资源以支持 v1beta2

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。

流程

对部署中的每个 Kafka 自定义资源执行以下步骤。

  1. 更新编辑器中的 Kafka 自定义资源。

    oc edit kafka KAFKA-CLUSTER
  2. 如果您还没有这样做,请将 update .spec.kafka.listener 更新为新的通用监听程序格式,如 第 8.2.6 节 “将监听程序更新到通用监听器配置” 所述。

    警告

    API 版本 v1beta2 不支持旧的监听程序格式。

  3. 如果存在,将 关联性.spec.kafka.affinity 移到 .spec.kafka.template.pod.affinity
  4. 如果存在,将 容限.spec.kafka.tolerations 移到 .spec.kafka.template.pod.tolerations
  5. 如果存在,则删除 .spec.kafka.template.tlsSidecarContainer
  6. 如果存在,则删除 .spec.kafka.tlsSidecarContainer
  7. 如果存在以下策略配置:

    • .spec.kafka.template.externalBootstrapService.externalTrafficPolicy
    • .spec.kafka.template.perPodService.externalTrafficPolicy

      1. 将配置移动到 .spec.ka.listeners[].configuration.externalTrafficPolicy,两者都用于 loadbalancertype: nodeport 侦听器。
      2. remove .spec.kafka.template.externalBootstrapService.externalTrafficPolicy.spec.kafka.template.perPodService.externalTrafficPolicy.
  8. 如果存在以下 负载均衡器监听程序 配置之一:

    • .spec.kafka.template.externalBootstrapService.loadBalancerSourceRanges
    • .spec.kafka.template.perPodService.loadBalancerSourceRanges

      1. 将配置移到 .spec.ka.kafka.listeners[].configuration.loadBalancerSourceRanges,用于 type: loadbalancer 侦听器。
      2. remove .spec.kafka.template.externalBootstrapService.loadBalancerSourceRanges.spec.kafka.template.perPodService.loadBalancerSourceRanges.
  9. 如果 type: 外部 日志记录配置了 in .spec.kafka.logging

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  10. 如果使用 .spec.kafka.metrics 字段来启用指标数据:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.kafka.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-metrics
        labels:
          app: strimzi
      data:
        kafka-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.kafka.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-metrics
            key: kafka-metrics-config.yaml
    3. 删除 old .spec.kafka.metrics 字段。
  11. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

接下来要做什么

对于每个 Kafka 自定义资源,升级 ZooKeeper、Topic Operator、实体 Operator 和 Cruise Control(如果已部署)的配置来支持 v1beta2 版本。这在以下步骤中进行了说明。

当所有 Kafka 配置都更新为支持 v1beta2 时,您可以将 Kafka 自定义资源升级到 v1beta2

8.2.6. 将监听程序更新到通用监听器配置

AMQ Streams 提供 GenericKafkaListener 模式,用于在 Kafka 资源 中配置 Kafka 侦听器。

GenericKafkaListener 替换 KafkaListeners 模式,它已从 AMQ Streams 中删除。

使用 GenericKafkaListener 模式时,您可以根据需要配置多个监听器,只要它们的名称和端口是唯一的。监听器 配置被定义为数组,但已弃用的格式也受到支持。

对于 OpenShift 集群内的客户端,您可以创建 普通 (无需加密)或 tls 内部 侦听器。

对于 OpenShift 集群外的客户端,您可以创建 外部 侦听器并指定连接机制,可以是 节点端口负载均衡器、 入口路由

KafkaListeners 模式将子属性用于 纯文本tls 和外部 监听程序,以及每个节点的固定端口。在升级过程的任何阶段,您必须将使用 KafkaListeners 模式配置的监听程序转换为 GenericKafkaListener 模式的格式。

例如,如果您当前在 Kafka 配置中使用以下配置:

旧监听程序配置

listeners:
  plain:
    # ...
  tls:
    # ...
  external:
    type: loadbalancer
    # ...

使用以下方法将监听程序转换为新格式:

新的监听程序配置

listeners:
  #...
  - name: plain
    port: 9092
    type: internal
    tls: false 1
  - name: tls
    port: 9093
    type: internal
    tls: true
  - name: external
    port: 9094
    type: EXTERNAL-LISTENER-TYPE 2
    tls: true

1
现在,所有监听器都需要 TLS 属性。
2
选项: 入口负载均衡器、 节点端口路由

确保使用显示 的确切 名称和端口号。

对于任何其他 配置覆盖 使用旧格式的属性,您需要将它们更新为新格式。

对监听程序 配置 进行了更改:

  • 覆盖configuration 部分合并
  • dnsAnnotations 已重命名为 注解
  • preferredAddressType has been renamed preferredNodePortAddressType
  • 地址 已被重命名为 alternativesNames
  • LoadBalancerSourceRanges 和 external TrafficPolicy 移至现在已弃用 的模板中的监听程序配置

例如,这个配置:

旧的其他监听程序配置

listeners:
  external:
    type: loadbalancer
    authentication:
      type: tls
    overrides:
      bootstrap:
        dnsAnnotations:
          #...

更改:

新的附加监听程序配置

listeners:
    #...
  - name: external
    port: 9094
    type:loadbalancer
    tls: true
    authentication:
      type: tls
    configuration:
      bootstrap:
        annotations:
          #...

重要

新监听器配置中显示的名称和端口号 必须 用于向后兼容。使用任何其他值将导致 Kafka 侦听器和 OpenShift 服务重命名。

有关各种侦听器可用的配置选项的更多信息,请参阅 GenericKafkaListener 模式参考

8.2.7. 升级 ZooKeeper 以支持 v1beta2

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。

流程

对部署中的每个 Kafka 自定义资源执行以下步骤。

  1. 更新编辑器中的 Kafka 自定义资源。

    oc edit kafka KAFKA-CLUSTER
  2. 如果存在,将 关联性.spec.zookeeper.affinity 移入 .spec.zookeeper.template.pod.affinity
  3. 如果存在,请将 容限.spec.zookeeper.tolerations 移到 .spec.zookeeper.template.pod.tolerations
  4. 如果存在,则删除 .spec.zookeeper.template.tlsSidecarContainer
  5. 如果存在,则 remove .spec.zookeeper.tlsSidecarContainer
  6. 如果 type: 外部 日志记录配置了 in .spec.kafka.logging

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  7. 如果使用 .spec.zookeeper.metrics 字段来启用指标数据:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.zookeeper.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-metrics
        labels:
          app: strimzi
      data:
        zookeeper-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.zookeeper.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-metrics
            key: zookeeper-metrics-config.yaml
    3. 删除 old .spec.zookeeper.metrics 字段。
  8. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.8. 升级主题 Operator 以支持 v1beta2

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。

流程

对部署中的每个 Kafka 自定义资源执行以下步骤。

  1. 更新编辑器中的 Kafka 自定义资源。

    oc edit kafka KAFKA-CLUSTER
  2. 如果使用 Kafka.spec.topicOperator:

    1. 关联性.spec.topicOperator.affinity 移到 .spec.entityOperator.template.pod.affinity
    2. 容限.spec.topicOperator.tolerations 移到 .spec.entityOperator.template.pod.tolerations
    3. move .spec.topicOperator.tlsSidecar to .spec.entityOperator.tlsSidecar.
    4. 移动 关联性容限tlsSidecar 后,将剩余的配置 in .spec.topicOperator 移到 .spec.entityOperator.topicOperator
  3. 如果 type: 外部 日志记录配置了 in .spec.topicOperator.logging:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
    注意

    您还可以在 Entity Operator 升级 过程中完成这个步骤。

  4. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.9. 升级实体 Operator 以支持 v1beta2

先决条件

流程

对部署中的每个 Kafka 自定义资源执行以下步骤。

  1. 更新编辑器中的 Kafka 自定义资源。

    oc edit kafka KAFKA-CLUSTER
  2. 关联性.spec.entityOperator.affinity 移到 .spec.entityOperator.template.pod.affinity
  3. 容限.spec.entityOperator.tolerations 移到 .spec.entityOperator.template.pod.tolerations
  4. 如果 类型: 外部 日志记录被配置为 in .spec.entityOperator.userOperator.logging.spec.entityOperator.topicOperator.logging:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
  5. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.10. 升级 Cruise Control 以支持 v1beta2

先决条件

流程

对 Kafka 集群中的每个 Kafka.spec.cruiseControl 配置执行以下步骤。

  1. 更新编辑器中的 Kafka 自定义资源。

    oc edit kafka KAFKA-CLUSTER
  2. 如果 type: 外部 日志记录被配置为 in .spec.cruiseControl.logging

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
  3. 如果使用 .spec.cruiseControl.metrics 字段来启用指标数据:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.cruiseControl.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-metrics
        labels:
          app: strimzi
      data:
        cruise-control-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.cruiseControl.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-metrics
            key: cruise-control-metrics-config.yaml
    3. 删除 old .spec.cruiseControl.metrics 字段。
  4. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.11. 将 Kafka 资源的 API 版本升级到 v1beta2

先决条件

流程

对部署中的每个 Kafka 自定义资源执行以下步骤。

  1. 更新编辑器中的 Kafka 自定义资源。

    oc edit kafka KAFKA-CLUSTER
  2. Kafka 自定义资源的 apiVersion 更新为 v1beta 2:

    替换:

    apiVersion: kafka.strimzi.io/v1beta1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  3. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.12. 将 Kafka Connect 资源升级到 v1beta2

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。

流程

对部署中的每个 KafkaConnect 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaConnect 自定义资源。

    oc edit kafkaconnect KAFKA-CONNECT-CLUSTER
  2. 如果存在,移动:

    KafkaConnect.spec.affinity
    KafkaConnect.spec.tolerations

    改为:

    KafkaConnect.spec.template.pod.affinity
    KafkaConnect.spec.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      affinity:
        # ...
      tolerations:
        # ...

    改为:

    spec:
      # ...
      template:
        pod:
          affinity:
            # ...
          tolerations:
            # ...
  3. 如果 type: 外部 日志记录配置了 in .spec.logging:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  4. 如果使用 .spec.metrics 字段来启用指标:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-connect-metrics
        labels:
          app: strimzi
      data:
        connect-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-connect-metrics
            key: connect-metrics-config.yaml
    3. 删除 old .spec.metrics 字段。
  5. KafkaConnect 自定义资源的 apiVersion 更新为 v1beta2:

    替换:

    apiVersion: kafka.strimzi.io/v1beta1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  6. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.13. 将 Kafka Connect S2I 资源升级到 v1beta2

先决条件

  • 支持 v1beta2 API 版本的一个 Cluster Operator 已启动且正在运行。

流程

对部署中的每个 KafkaConnectS2I 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaConnectS2I 自定义资源。

    oc edit kafkaconnects2i S2I-CLUSTER
  2. 如果存在,移动:

    KafkaConnectS2I.spec.affinity
    KafkaConnectS2I.spec.tolerations

    改为:

    KafkaConnectS2I.spec.template.pod.affinity
    KafkaConnectS2I.spec.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      affinity:
        # ...
      tolerations:
        # ...

    改为:

    spec:
      # ...
      template:
        pod:
          affinity:
            # ...
          tolerations:
            # ...
  3. 如果 type: 外部 日志记录配置了 in .spec.logging:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  4. 如果使用 .spec.metrics 字段来启用指标:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-connect-s2i-metrics
        labels:
          app: strimzi
      data:
        connect-s2i-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-connect-s2i-metrics
            key: connect-s2i-metrics-config.yaml
    3. 删除 old .spec.metrics 字段
  5. KafkaConnectS2I 自定义资源的 apiVersion 更新为 v1beta2

    替换:

    apiVersion: kafka.strimzi.io/v1beta1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  6. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.14. 将 Kafka MirrorMaker 资源升级到 v1beta2

先决条件

流程

对部署中的每个 KafkaMirrorMaker 自定义资源执行以下步骤。

  1. 更新编辑器 中的 KafkaMirrorMaker 自定义资源。

    oc edit kafkamirrormaker MIRROR-MAKER
  2. 如果存在,移动:

    KafkaMirrorMaker.spec.affinity
    KafkaMirrorMaker.spec.tolerations

    改为:

    KafkaMirrorMaker.spec.template.pod.affinity
    KafkaMirrorMaker.spec.template.pod.tolerations

    例如,移动:

    spec:
      # ...
      affinity:
        # ...
      tolerations:
        # ...

    改为:

    spec:
      # ...
      template:
        pod:
          affinity:
            # ...
          tolerations:
            # ...
  3. 如果 type: 外部 日志记录配置了 in .spec.logging:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  4. 如果使用 .spec.metrics 字段来启用指标:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-mm-metrics
        labels:
          app: strimzi
      data:
        mm-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-mm-metrics
            key: mm-metrics-config.yaml
    3. 删除 old .spec.metrics 字段。
  5. KafkaMirrorMaker 自定义资源的 apiVersion 更新为 v1beta2

    替换:

    apiVersion: kafka.strimzi.io/v1beta1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  6. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.15. 将 Kafka MirrorMaker 2.0 资源升级到 v1beta2

先决条件

流程

对部署中的每个 KafkaMirrorMaker2 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaMirrorMaker2 自定义资源。

    oc edit kafkamirrormaker2 MIRROR-MAKER-2
  2. 如果存在,将 关联性.spec.affinity 移到 .spec.template.pod.affinity
  3. 如果存在,请将 容限.spec.tolerations 移到 .spec.template.pod.tolerations
  4. 如果 type: 外部 日志记录配置了 in .spec.logging:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j.properties
  5. 如果使用 .spec.metrics 字段来启用指标:

    1. 创建一个将 JMX Prometheus 导出器的 YAML 配置存储到一个键下的新 ConfigMap。YAML 必须与 .spec.metrics 字段中当前的内容匹配。

      kind: ConfigMap
      apiVersion: v1
      metadata:
        name: kafka-mm2-metrics
        labels:
          app: strimzi
      data:
        mm2-metrics-config.yaml: |
            <YAML>
    2. 添加指向 ConfigMap 和键的 a .spec.metricsConfig 属性:

      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: kafka-mm2-metrics
            key: mm2-metrics-config.yaml
    3. 删除 old .spec.metrics 字段。
  6. KafkaMirrorMaker2 自定义资源的 apiVersion 更新为 v1beta2:

    替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  7. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.16. 将 Kafka Bridge 资源升级到 v1beta2

先决条件

流程

对部署中的每个 KafkaBridge 资源执行以下步骤。

  1. 更新编辑器中的 KafkaBridge 自定义资源。

    oc edit kafkabridge KAFKA-BRIDGE
  2. 如果 type: 外部 日志记录在 KafkaBridge.spec.logging 中配置:

    替换包含日志配置的 ConfigMap 名称

    logging:
      type: external
      name: my-config-map

    使用 valueFrom.configMapKeyRef 字段,并指定将日志记录存储在的 ConfigMap 名称和

    logging:
      type: external
      valueFrom:
        configMapKeyRef:
          name: my-config-map
          key: log4j2.properties
  3. KafkaBridge 自定义资源的 apiVersion 更新为 v1beta2

    替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  4. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.17. 将 Kafka 用户资源升级到 v1beta2

先决条件

  • 支持 v1beta2 API 版本的用户 Operator 已启动且正在运行。

流程

对部署中的每个 KafkaUser 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaUser 自定义资源。

    oc edit kafkauser KAFKA-USER
  2. KafkaUser 自定义资源的 apiVersion 更新为 v1beta2:

    替换:

    apiVersion: kafka.strimzi.io/v1beta1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  3. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.18. 将 Kafka 主题资源升级到 v1beta2

先决条件

  • 支持 v1beta2 API 版本的一个主题 Operator 已启动且正在运行。

流程

对部署中的每个 KafkaTopic 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaTopic 自定义资源。

    oc edit kafkatopic KAFKA-TOPIC
  2. KafkaTopic 自定义资源的 apiVersion 更新为 v1beta2:

    替换:

    apiVersion: kafka.strimzi.io/v1beta1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  3. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.19. 将 Kafka Connector 资源升级到 v1beta2

先决条件

流程

对部署中的每个 KafkaConnector 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaConnector 自定义资源。

    oc edit kafkaconnector KAFKA-CONNECTOR
  2. KafkaConnector 自定义资源的 apiVersion 更新为 v1beta2

    替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  3. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.2.20. 将 Kafka 重新平衡资源升级到 v1beta2

先决条件

流程

对部署中的每个 KafkaRebalance 自定义资源执行以下步骤。

  1. 更新编辑器中的 KafkaRebalance 自定义资源。

    oc edit kafkarebalance KAFKA-REBALANCE
  2. KafkaRebalance 自定义资源的 apiVersion 更新为 v1beta2:

    替换:

    apiVersion: kafka.strimzi.io/v1alpha1

    使用:

    apiVersion: kafka.strimzi.io/v1beta2
  3. 保存文件,退出编辑器并等待更新的自定义资源得到协调。

8.3. 升级 Cluster Operator

此流程描述了如何升级 Cluster Operator 部署以使用 AMQ Streams 1.8。

如果您使用安装 YAML 文件而不是 OperatorHub 部署 Cluster Operator,请按照以下步骤操作。

由 Cluster Operator 管理的 Kafka 集群的可用性不受升级操作的影响。

注意

有关如何升级到该版本的信息,请参阅支持 AMQ Streams 特定版本的文档。

先决条件

流程

  1. 记录对现有 Cluster Operator 资源所做的任何配置更改(在 /install/cluster-operator 目录中)。任何更改都将被 Cluster Operator 的新版本 覆盖
  2. 更新您的自定义资源,以反映 AMQ Streams 版本 1.8 支持的配置选项。
  3. 更新 Cluster Operator。

    1. 根据 Cluster Operator 运行的命名空间,修改新 Cluster Operator 版本的安装文件。

      在 Linux 中,使用:

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      在 MacOS 中,使用:

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. 如果在现有 Cluster Operator Deployment 中修改了一个或多个环境变量,请编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件来使用这些环境变量。
  4. 当您有更新的配置时,请将其与其它安装资源一起部署:

    oc replace -f install/cluster-operator

    等待滚动更新完成。

  5. 如果新 Operator 版本不再支持您要从中升级的 Kafka 版本,Cluster Operator 会返回 "Version not found" 错误消息。否则,不会返回任何错误消息。

    例如:

    "Version 2.4.0 is not supported. Supported versions are: 2.6.0, 2.6.1, 2.7.0."
    • 如果返回出错信息,升级到新的 Cluster Operator 版本支持的 Kafka 版本:

      1. 编辑 Kafka 自定义资源。
      2. spec.kafka.version 属性改为受支持的 Kafka 版本。
    • 如果未 返回错误消息,请转到下一步。稍后您将升级 Kafka 版本。
  6. 获取 Kafka pod 的镜像以确保升级成功:

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    image tag 显示新的 Operator 版本。例如:

    registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:{ContainerVersion}

您的 Cluster Operator 已升级到 1.8 版本,但它管理的集群中运行的 Kafka 版本没有改变。

在 Cluster Operator 升级后,您必须执行 Kafka 升级

8.4. 升级 Kafka

将 Cluster Operator 升级到 1.8 后,下一步是将所有 Kafka 代理升级到最新支持的 Kafka 版本。

Kafka 升级由 Cluster Operator 通过对 Kafka 代理的滚动更新来执行。

Cluster Operator 根据 Kafka 集群配置启动滚动更新。

如果 Kafka.spec.kafka.config 包含…​Cluster Operator 启动s…​

inter.broker.protocol.versionlog.message.format.version

单个滚动更新.更新后,inter.broker.protocol.version 必须手动更新,后接 log.message.format.version。更改每项将触发进一步滚动更新。

inter.broker.protocol.versionlog.message.format.version

两次滚动更新.

没有 inter.broker.protocol.version 或 log.message.format.version 的配置。

两次滚动更新.

作为 Kafka 升级的一部分,Cluster Operator 会启动 ZooKeeper 的滚动更新。

  • 即使 ZooKeeper 版本未更改,也会出现一次滚动更新。
  • 如果 Kafka 的新版本需要新的 ZooKeeper 版本,则会出现额外的滚动更新。

8.4.1. Kafka 版本

Kafka 的日志消息格式版本和 Inter-broker 协议版本分别指定附加到消息的日志格式版本以及集群中使用的 Kafka 协议版本。为确保使用正确的版本,升级过程包括对现有 Kafka 代理进行配置更改,以及对客户端应用程序(使用者和生产者)的代码更改。

下表显示了 Kafka 版本之间的区别:

Kafka 版本Interbroker 协议版本日志消息格式版本zookeeper 版本

2.7.0

2.7

2.7

3.5.8

2.7.1

2.7

2.7

3.5.9

2.8.0

2.8

2.8

3.5.9

Broker 协议版本

在 Kafka 中,用于代理间通信的网络协议称为 代理间协议。每个版本的 Kafka 都有兼容版本的 Inter-broker 协议。协议的次要版本通常会增加以匹配 Kafka 的次要版本,如上表所示。

kroker 协议版本在 Kafka 资源中广泛设置集群。要更改它,请编辑 Kafka .spec.kafka.config 中的 inter.broker.protocol. version 属性。

日志消息格式版本

当生产者向 Kafka 代理发送消息时,该消息将使用特定格式进行编码。Kafka 版本的格式可能会在 Kafka 发行版本间有所变化,因此信息会指定它们编码的格式版本。您可以配置 Kafka 代理,在代理将消息附加到日志前将信息从较新的格式转换为给定的旧格式版本。

在 Kafka 中,设置消息格式版本的方法有两种:

  • message.format.version 属性在主题上设置。
  • log.message.format.version 属性在 Kafka 代理上设置。

主题的 message.format.version 默认值由 Kafka 代理上设置的 log.message.format.version 定义。您可以通过修改主题配置来手动设置主题的 message.format.version

本节中的升级任务假定消息格式版本由 log.message.format.version 定义。

8.4.2. 升级客户端的策略

升级客户端应用程序(包括 Kafka Connect 连接器)的正确方法取决于您的具体情形。

使用应用程序时,需要以它们所理解的消息格式接收消息。您可以通过以下两种方式之一来确保这是这种情况:

  • 在升级任何制作者 之前,先 升级所有消费者的主题。
  • 通过将代理关闭消息置于较旧格式。

使用代理 down-conversion 会给代理带来额外的负载,因此在较长时间内依赖所有主题并不理想。要使代理以最佳方式运行,绝对不应是向下转换消息。

代理 down-conversion 可通过两种方式进行配置:

  • 主题级别 message.format.version 为单个主题配置它。
  • 代理级 log.message.format.version 是未配置主题级别 message.format.version 的主题的默认设置。

以新版本格式发布到主题的消息对消费者可见,因为代理在从生产者接收消息时执行 down-conversion,而不是当它们发送到消费者时。

您可以使用多个策略来升级客户端:

消费者第一
  1. 升级所有使用的应用程序。
  2. 将代理级别的 log.message.format.version 更改为新版本。
  3. 升级所有生成的应用程序。

    此策略非常简单,可避免任何代理停机转换。但是,它假定贵组织中的所有消费者可以协调升级,而且不适用于消费者和生产者的应用程序。如果升级的客户端出现问题,新格式消息可能会添加到消息日志中,因此您无法恢复到以前的使用者版本。

每个主题消费者首先

对于每个主题:

  1. 升级所有使用的应用程序。
  2. 将主题级 message.format.version 更改为新版本。
  3. 升级所有生成的应用程序。

    此策略可避免任何代理 down-conversion,并且意味着您可以逐个主题地进行。它不适用于同时作为同一主题的消费者和制作者的应用。再强调一下,如果升级的客户端出现问题,新格式消息可能会添加到消息日志中。

首先每个主题消费者,进行向下转换

对于每个主题:

  1. 将主题级 message.format.version 更改为旧版本(或依赖主题默认到代理级别的 log.message.format.version)。
  2. 升级所有使用和生产的应用程序。
  3. 验证升级的应用是否正常工作。
  4. 将主题级 message.format.version 更改为新版本。

    这个策略需要代理 down-conversion,但代理的负载会最小化,因为一次只需要单个主题(或小组主题)需要它。它还适用于同时是同一主题的消费者和制作者的应用。这种方法可确保升级的生产者和消费者在提交使用新的消息格式版本之前正常工作。

    这种方法的主要缺点是,在含有许多主题和应用的集群中进行管理可能比较复杂。

还可以通过其他策略来升级客户端应用。

注意

也可以应用多个策略。例如,在前几个应用程序和主题中,"每个主题消费者首先使用",可以使用向下转换"策略"。如果这已被证明是成功的,那么更高效的策略可被视为可以接受采用。

8.4.3. Kafka 版本和镜像映射

在升级 Kafka 时,请考虑 STRIMZI_KAFKA_IMAGES 环境变量和 Kafka.spec.kafka.version 属性的设置。

  • 每个 Kafka 资源都可以使用 Kafka.spec.kafka.version 配置。
  • Cluster Operator 的 STRIMZI_KAFKA_IMAGES 环境变量提供 Kafka 版本和给定 Kafka 资源中请求该版本时使用的镜像之间的映射。

    • 如果没有配置 Kafka.spec.kafka.image,则会使用给定版本的默认镜像。
    • 如果配置了 Kafka.spec.kafka.image,则默认镜像会被覆盖。
警告

Cluster Operator 无法验证镜像是否实际包含预期版本的 Kafka 代理。请特别注意确保给定镜像与给定的 Kafka 版本对应。

8.4.4. 升级 Kafka 代理和客户端应用程序

这个步骤描述了如何将 AMQ Streams Kafka 集群升级到最新支持的 Kafka 版本。

与您当前的 Kafka 版本相比,新版本可能支持更高的 日志消息格式版本 或代理协议版本,或两者。如果需要,请按照步骤升级这些版本。如需更多信息,请参阅 第 8.4.1 节 “Kafka 版本”

您还应该选择 升级客户端的策略。Kafka 客户端在此流程的第 6 步中升级。

先决条件

要升级 Kafka 资源,请检查:

  • 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
  • Kafka.spec.kafka.config 不包含 新 Kafka 版本中不支持的选项。

流程

  1. 更新 Kafka 集群配置:

    oc edit kafka my-cluster
  2. 如果配置了,请确保 Kafka.spec.kafka.config 的 log.message.format .version 和 inter.broker.protocol.version 设置为 当前 Kafka 版本的默认值。

    例如,如果从 Kafka 版本 2.7.0 升级到 2.8.0:

    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.7.0
        config:
          log.message.format.version: "2.7"
          inter.broker.protocol.version: "2.7"
          # ...

    如果没有配置 log.message.format .version 和 inter.broker.protocol.version,AMQ Streams 会在下一步中将这些版本更新至 Kafka 版本后自动将这些版本更新至当前的默认值。

    注意

    log.message.format.versioninter.broker.protocol.version 的值必须是字符串,以防止它们被解释为浮动点数。

  3. 更改 Kafka.spec.kafka.version 以指定新的 Kafka 版本;保留 log.message.format.versioninter.broker.protocol.version,默认为 当前 Kafka 版本。

    注意

    更改 kafka.version 可确保集群中的所有代理都将升级为使用新的代理二进制文件。在此过程中,一些代理使用旧二进制文件,另一些则已升级至新二进制文件。不要更改 inter.broker.protocol.version,确保代理可以在升级过程中继续相互通信。

    例如,如果从 Kafka 2.7.0 升级到 2.8.0:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.8.0 1
        config:
          log.message.format.version: "2.7" 2
          inter.broker.protocol.version: "2.7" 3
          # ...
    1
    Kafka 版本已改为新版本。
    2
    消息格式版本保持不变。
    3
    代理协议版本没有改变。
    警告

    如果新 Kafka 版本的 inter.broker.protocol.version 发生变化,则无法降级 Kafka。broker 协议版本决定了代理存储的持久元数据的模式,包括写入到 __consumer_offset 的消息。降级集群将不会理解消息。

  4. 如果在 Kafka 自定义资源(在 Kafka .spec.kafka.image 中)中定义了 Kafka 集群 的镜像,请将镜像 更新为指向具有新 Kafka 版本的容器镜像。

    请参阅 Kafka 版本和镜像映射

  5. 保存并退出编辑器,然后等待滚动更新完成。

    通过观察 pod 状态转换来检查滚动更新的进度:

    oc get pods my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    滚动更新可确保每个 pod 在新版本的 Kafka 中使用代理二进制文件。

  6. 根据您选择的 升级客户端策略,升级所有客户端应用以使用新版本的客户端二进制文件。

    如果需要,将 Kafka Connect 和 MirrorMaker 的 version 属性设置为 Kafka 的新版本:

    1. 对于 Kafka Connect,更新 KafkaConnect.spec.version
    2. 对于 MirrorMaker,更新 KafkaMirrorMaker.spec.version
    3. 对于 MirrorMaker 2.0,更新 KafkaMirrorMaker2.spec.version
  7. 如果配置,更新 Kafka 资源,以使用新的 inter.broker.protocol.version 版本。否则,请转至第 9 步。

    例如,如果升级到 Kafka 2.8.0:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.8.0
        config:
          log.message.format.version: "2.7"
          inter.broker.protocol.version: "2.8"
          # ...
  8. 等待 Cluster Operator 更新集群。
  9. 如果配置,更新 Kafka 资源,以使用新的 log.message.format.version 版本。否则,请转至第 10 步。

    例如,如果升级到 Kafka 2.8.0:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.8.0
        config:
          log.message.format.version: "2.8"
          inter.broker.protocol.version: "2.8"
          # ...
  10. 等待 Cluster Operator 更新集群。

    • Kafka 集群和客户端现在使用新的 Kafka 版本。
    • 代理配置为使用 Kafka 的新版本的 Inter-broker 协议版本和消息格式发送消息。

在 Kafka 升级后,如果需要,您可以:

8.5. 升级消费者以合作重新平衡

您可以升级 Kafka 用户和 Kafka Streams 应用程序,以使用 增量合作重新平衡 协议进行分区重新平衡,而不是默认的预先 重新平衡 协议。在 Kafka 2.4.0 中添加了新协议。

消费者将分区分配保持在合作重新平衡状态,并仅在流程结束时予以撤销(如果需要)来实现平衡集群。这可减少使用者组或 Kafka Streams 应用程序的不可用。

注意

升级到增量合作重新平衡协议是可选的。预先重新平衡协议仍然受到支持。

先决条件

流程

要升级 Kafka 消费者以使用增量合作重新平衡协议:

  1. 将 Kafka client .jar 文件替换为新版本。
  2. 在使用者配置中,将 合作粘滞 附加到 partition.assignment.strategy。例如,如果设定了 范围 策略,请将配置更改为 范围,协作粘性
  3. 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。
  4. 通过从使用者配置中删除先前的 partition.assignment.strategy 来 重新配置组中的每个消费者,仅保留 协作式 策略。
  5. 依次重新启动组中的每个使用者,等待使用者在每次重启后重新加入组。

升级 Kafka Streams 应用程序以使用增量合作重新平衡协议:

  1. 将 Kafka Streams .jar 文件替换为新版本。
  2. 在 Kafka Streams 配置中,将 upgrade.from 配置参数设置为您要从中升级的 Kafka 版本(例如 2.3)。
  3. 依次重新启动每个流处理器(节点)。
  4. 从 Kafka Streams 配置中删除 upgrade.from 配置参数。
  5. 依次重新启动组中的每个消费者。

其它资源

第 9 章 降级 AMQ Streams

如果您在升级到的 AMQ Streams 版本上遇到问题,您可以将安装恢复到之前的版本。

您可以执行降级:

  1. 将 Cluster Operator 恢复到以前的 AMQ Streams 版本。

  2. 将所有 Kafka 代理和客户端应用程序降级到以前的 Kafka 版本。

如果之前版本的 AMQ Streams 不支持您使用的 Kafka 版本,只要附加至消息的日志消息格式版本匹配,您还可以降级 Kafka。

9.1. 将 Cluster Operator 降级为以前的版本

如果您遇到 AMQ Streams 的问题,可以恢复您的安装。

此流程描述了如何将 Cluster Operator 部署降级到以前的版本。

先决条件

流程

  1. 记录对现有 Cluster Operator 资源所做的任何配置更改(在 /install/cluster-operator 目录中)。任何更改都将被之前版本的 Cluster Operator 覆盖
  2. 恢复自定义资源,以反映您要降级到的 AMQ Streams 版本支持的配置选项。
  3. 更新 Cluster Operator。

    1. 根据 Cluster Operator 运行的命名空间修改之前版本的安装文件。

      在 Linux 中,使用:

      sed -i 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml

      在 MacOS 中,使用:

      sed -i '' 's/namespace: .*/namespace: my-cluster-operator-namespace/' install/cluster-operator/*RoleBinding*.yaml
    2. 如果在现有 Cluster Operator Deployment 中修改了一个或多个环境变量,请编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件来使用这些环境变量。
  4. 当您有更新的配置时,请将其与其它安装资源一起部署:

    oc replace -f install/cluster-operator

    等待滚动更新完成。

  5. 获取 Kafka pod 的镜像以确保降级成功:

    oc get pod my-cluster-kafka-0 -o jsonpath='{.spec.containers[0].image}'

    镜像标签显示新的 AMQ Streams 版本,后跟 Kafka 版本。例如,NEW -STRIMZI-VERSION-kafka-CURRENT-KAFKA-VERSION

您的 Cluster Operator 已降级到以前的版本。

9.2. 降级 Kafka

Kafka 版本降级由 Cluster Operator 执行。

9.2.1. 降级的 Kafka 版本兼容性

Kafka 降级依赖于兼容的当前和目标 Kafka 版本,以及记录消息的状态。

如果该版本不支持集群中 曾使用的任何 inter.broker.protocol.version 设置,或者已将消息添加到使用 较新的 log.message.format.version 的消息日志中,则无法恢复到以前的 Kafka 版本。

inter.broker.protocol.version 决定代理用于存储持久元数据的模式,例如写入到 __consumer_offsets 的消息的 schema。如果您降级到不理解之前在集群中使用的 inter.broker.protocol.version 的 Kafka 版本,代理会遇到它无法理解的数据。

如果 Kafka 的目标降级版本有:

  • 当前版本相同的 log.message.format.version,通过对代理执行一次滚动重启来降级 Cluster Operator。
  • 只有正在运行的集群 始终将 log.message.format.version 设置为由降级版本使用的版本,才可 使用不同的 log.message.format.version。这通常是在 log.message.format.version 改变前中止升级过程的情况。在这种情况下,降级需要:

    • 如果两个版本的检查程序协议不同,则代理的两个滚动重启
    • 单个滚动重启(如果相同)

如果新版本曾使用过上一版本不支持log.message.format.version,则无法进行降级,包括当使用了 log.message.format.version 的默认值时。例如,这个资源可以降级为 Kafka 版本 2.7.0,因为 log.message.format.version 尚未更改:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  # ...
  kafka:
    version: 2.8.0
    config:
      log.message.format.version: "2.7"
      # ...

如果 log.message.format.version 设置为 "2.8" 或没有值(因此该参数使用 2.8.0 代理 2.8 的默认值),则无法进行降级。

9.2.2. 降级 Kafka 代理和客户端应用程序

此流程描述了如何将 AMQ Streams Kafka 集群降级为 Kafka 的较低(以前的)版本,如从 2.8.0 降级到 2.7.0。

先决条件

对于要降级的 Kafka 资源,检查:

  • 重要信息: Kafka 版本兼容性.
  • 支持两个 Kafka 版本的 Cluster Operator 已启动且正在运行。
  • Kafka.spec.kafka.config 不包含 Kafka 版本不支持的选项。
  • Kafka.spec.kafka.config 有一个 log.message.format.versioninter.broker.protocol.version,它被降级到的 Kafka 版本支持。

流程

  1. 更新 Kafka 集群配置。

    oc edit kafka KAFKA-CONFIGURATION-FILE
  2. 更改 Kafka.spec.kafka.version 以指定上一版本。

    例如,如果从 Kafka 2.8.0 降级到 2.7.0:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    spec:
      # ...
      kafka:
        version: 2.7.0 1
        config:
          log.message.format.version: "2.7" 2
          inter.broker.protocol.version: "2.7" 3
          # ...
    1
    Kafka 版本已改为以前的版本。
    2
    消息格式版本保持不变。
    3
    代理协议版本没有改变。
    注意

    您必须将 log.message.format.versioninter.broker.protocol.version 的值格式化为字符串,以防止将其解释为浮动点数。

  3. 如果 Kafka 版本的镜像与 Cluster Operator 的 STRIMZI_KAFKA_IMAGES 中定义的镜像不同,请更新 Kafka.spec.kafka.image

    请查看 第 8.4.3 节 “Kafka 版本和镜像映射”

  4. 保存并退出编辑器,然后等待滚动更新完成。

    检查日志中的更新,或者查看 pod 状态转换:

    oc logs -f CLUSTER-OPERATOR-POD-NAME | grep -E "Kafka version downgrade from [0-9.]+ to [0-9.]+, phase ([0-9]+) of \1 completed"
    oc get pod -w

    检查 Cluster Operator 日志中的 INFO 级别信息:

    Reconciliation #NUM(watch) Kafka(NAMESPACE/NAME): Kafka version downgrade from FROM-VERSION to TO-VERSION, phase 1 of 1 completed
  5. 降级所有客户端应用程序(使用者)以使用先前版本的客户端二进制文件。

    Kafka 集群和客户端现在使用以前的 Kafka 版本。

  6. 如果您要恢复到早于 0.22 的 AMQ Streams 版本(使用 ZooKeeper 存储主题),请从 Kafka 集群中删除内部主题存储主题。

    oc run kafka-admin -ti --image=registry.redhat.io/amq7/amq-streams-kafka-28-rhel7:1.8.0 --rm=true --restart=Never -- ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi-topic-operator-kstreams-topic-store-changelog --delete && ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic __strimzi_store_topic --delete

附录 A. 使用您的订阅

AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 My Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 Zip 和 Tar 文件

要访问 zip 或 tar 文件,请使用客户门户查找要下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads
  2. 查找 INTEGRATION 类别 中的 Red Hat AMQ Streams 条目。
  3. 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
  4. 单击组件的 Download 链接。

2021-12-18 13:38:49 +1000 修订