OpenShift 概述中的 AMQ Streams

Red Hat AMQ 2021.Q3

用于 OpenShift Container Platform 上的 AMQ Streams 1.8

摘要

本指南概述了 AMQ 流的特性和功能。

使开源包含更多

红帽承诺替换我们的代码、文档和网页属性中存在问题的语言。我们从这四个术语开始: master、slave、blacklist 和 whitelist。这些更改将在即将发行的几个发行本中逐渐实施。如需了解更多详细信息,请参阅 CTO Chris Wright 信息

第 1 章 主要特性

AMQ Streams 简化了在 OpenShift 集群中运行 Apache Kafka 的过程。

本指南旨在作为构建对 AMQ 流理解的起点。本指南介绍了 Kafka 背后的一些关键概念(AMQ Streams 核心),简要解释了 Kafka 组件的用途。概述了配置点,包括保护和监控 Kafka 的选项。AMQ Streams 的发行版提供了用于部署和管理 Kafka 集群的文件,以及用于配置和监控部署的示例文件。

描述了典型的 Kafka 部署,以及用于部署和管理 Kafka 的工具。

1.1. Kafka 功能

Kafka 的底层数据流处理功能和组件架构可以提供:

  • 微服务和其他应用以极高的吞吐量和低延迟共享数据
  • 消息排序保证
  • 从数据存储中重获/重播消息以重建应用程序状态
  • 使用键值日志时删除旧记录的消息紧凑
  • 集群配置中的水平可扩展性
  • 数据复制来控制容错
  • 保留大量数据以便立即访问

1.2. Kafka 用例

Kafka 的功能使其适合:

  • 事件驱动的架构
  • 事件源,以捕获对应用状态的更改作为事件日志
  • 消息代理
  • 网站活动跟踪
  • 通过指标进行操作监控
  • 日志收集和聚合
  • 为分布式系统提交日志
  • 流处理,以便应用程序能够实时响应数据

1.3. AMQ 流如何支持 Kafka

AMQ Streams 提供容器镜像和 Operator,以便在 OpenShift 上运行 Kafka。AMQ Streams Operator 是运行 AMQ Streams 的基础。AMQ Streams 提供的 Operator 是专门构建的,具有可有效管理 Kafka 的专业操作知识。

Operator 简化了以下流程:

  • 部署和运行 Kafka 集群
  • 部署和运行 Kafka 组件
  • 配置对 Kafka 的访问
  • 保护对 Kafka 的访问
  • 升级 Kafka
  • 管理代理
  • 创建和管理主题
  • 创建和管理用户

第 2 章 关于 Kafka

Apache Kafka 是一个开源分布式发布-订阅消息传递系统,用于容错实时数据源。

其它资源

2.1. Kafka 概念

了解 Kafka 的关键概念对于了解 AMQ Streams 的工作方式非常重要。

Kafka 集群包含多个代理。主题用于在 Kafka 集群中接收和存储数据。主题按分区分割,其中写入数据。分区在容错主题之间复制。

Kafka 代理和主题

Kafka brokers and topics inside a Kafka cluster showing the partition leader of each topic

broker
代理有时也称为服务器或节点,负责编排存储和传递消息。
主题
主题提供数据存储的目的地。每个主题都被分成一个或多个分区。
Cluster
组代理实例。
分区
主题分区的数量由主题 分区数 来定义。
分区领导
分区领导处理某个主题的所有制作者请求。
分区跟随器

分区后续程序复制分区领导的分区数据,可选择性地处理消费者请求。

主题使用 复制因素 来配置集群中的每个分区的副本数。主题至少包含一个分区。

同步副本 具有与领导相同数量的消息。配置定义要生成消息必须 同步 的副本数,以确保消息只有在成功复制到副本分区后才会提交。这样,如果领导机失败,消息就不会丢失。

Kafka 代理和主题 图中,我们可以看到每个编号分区在复制主题中都有一个领导者和两个跟随者。

2.2. 生产者和消费者

生产者和消费者通过代理发送和接收消息(发布和订阅)。消息包含一个可选 和一个包含消息数据 的值,以及标头和相关元数据。键用于标识消息的主题或消息的属性。消息批量交付,批处理和记录包含标头和元数据,提供有助于客户端过滤和路由的详细信息,如记录的时间戳和偏移位置。

生产者和消费者

A producer sends messages through a broker to a topic containing three partitions. Three consumers in a consumer group read the messages from the partitions

producer
制作者将消息发送到代理主题,以写入到分区的末尾偏移。消息由制作者以轮循方式写入分区,或者写入基于消息密钥的特定分区。
consumer
消费者订阅一个主题,并根据主题、分区和偏移读取信息。
消费者组
消费者组用于共享通常由多个生产者从给定主题生成的大型数据流。用户使用 group.id 进行分组,允许信息分散到成员中。组内的使用者不会从同一分区读取数据,而是可以从一个或多个分区接收数据。
offsets

偏移描述了消息在分区中的位置。给定分区中的每个消息都有一个唯一偏移,这有助于识别使用者在分区中的位置,以跟踪已使用的记录数。

提交的偏移写入偏移提交日志。根据消费者组,__consumer_offsets 主题存储了提交的偏移信息,以及最后一个和下一个偏移的位置。

生成和使用数据

A producer sends a message to a broker topic; the message is written to the end offset (7). A consumer reads messages from offset 5

第 3 章 Kafka 的 AMQ Streams 部署

通过 AMQ Streams 发行版为 OpenShift 部署提供了 Apache Kafka 组件。Kafka 组件通常作为集群运行,以实现可用性。

集成 Kafka 组件的典型部署可能包括:

  • 代理节点的 Kafka 集群
  • 复制 ZooKeeper 实例的 zookeeper 集群
  • Kafka Connect 集群用于外部数据连接
  • Kafka MirrorMaker 集群,在第二个集群中镜像 Kafka 集群
  • kafka Exporter 用于提取额外的 Kafka 指标数据以进行监控
  • Kafka Bridge 向 Kafka 集群发出基于 HTTP 的请求

并非所有组件都是必须的,但您至少需要 Kafka 和 ZooKeeper。有些组件可以在没有 Kafka 的情况下部署,如 MirrorMaker 或 Kafka Connect。

3.1. Kafka 组件架构

Kafka 代理集群处理消息的发送。

代理使用 Apache ZooKeeper 来存储配置数据和集群协调。在运行 Apache Kafka 之前,Apache ZooKeeper 集群必须就绪。

其他 Kafka 组件与 Kafka 集群交互,以执行特定角色。

Kafka 组件交互

Data flows between several Kafka components and the Kafka cluster. See the component descriptions after this image.

Apache ZooKeeper
Apache ZooKeeper 是 Kafka 的核心依赖关系,因为它提供集群协调服务,用于存储和跟踪代理和消费者的状态。zookeeper 也用于分区的领导选举。
Kafka Connect

Kafka Connect 是一个集成工具包,用于使用 Connector 插件在 Kafka 代理和其他系统间流传输数据。Kafka Connect 提供了一个框架,用于将 Kafka 与外部数据源或目标(如数据库)集成,以使用连接器导入或导出数据。连接器是提供所需连接配置的插件。

  • 连接器将外部数据推送(push)到 Kafka 中。
  • sink 连接器从 Kafka 中提取数据

    外部数据被转换并转换为适当的格式。

    您可以使用 build 配置部署 Kafka Connect,这些配置会自动使用数据连接所需的连接器插件构建容器镜像。

Kafka MirrorMaker

Kafka MirrorMaker 在数据中心内或跨数据中心内两个 Kafka 集群之间复制数据。

MirrorMaker 从源 Kafka 集群获取信息,并将其写入目标 Kafka 集群。

Kafka Bridge
Kafka Bridge 为将基于 HTTP 的客户端与 Kafka 集群集成提供了一个 API。
Kafka Exporter
Kafka Exporter 提取数据作为 Prometheus 指标,主要与偏移、消费者组、消费者滞后和主题相关的数据。消费者的滞后是最后写入分区的消息与消费者从该分区中提取的消息之间的延迟

3.2. Kafka Bridge 接口

Kafka Bridge 提供了一个 RESTful 接口,允许基于 HTTP 的客户端与 Kafka 集群交互。  它提供了与 AMQ Streams 的 Web API 连接的优势,不需要客户端应用程序来解释 Kafka 协议。

API 有两个主要资源: consumerstopics,它们通过端点公开并可访问,以便与 Kafka 集群中的用户和生产者交互。资源仅与 Kafka 网桥相关,而不是与 Kafka 直接连接的消费者和生产者。

3.2.1. HTTP 请求

Kafka Bridge 支持对 Kafka 集群的 HTTP 请求,其方法如下:

  • 发送消息到一个主题。
  • 从主题检索消息.
  • 检索主题的分区列表。
  • 创建和删除消费者.
  • 订阅消费者了解主题,以便他们开始接收来自这些主题的信息。
  • 检索消费者订阅的主题列表。
  • 取消订阅消费者的主题.
  • 将分区分配给消费者.
  • 提交消费者偏移列表。
  • 寻找分区,以便使用者开始接受来自第一个或最后一个偏移位置的信息,或者给定的偏移位置。

这些方法提供 JSON 响应和 HTTP 响应代码错误处理。消息可以 JSON 或二进制格式发送。

客户端可以生成和使用消息,而无需使用原生 Kafka 协议。

其它资源

3.2.2. Kafka Bridge 支持的客户端

您可以使用 Kafka Bridge 将 内部和外部 HTTP 客户端应用程序与 Kafka 集群集成。

内部客户端
内部客户端是基于容器的 HTTP 客户端,与 Kafka Bridge 本身 在同一个 OpenShift 集群中运行。内部客户端可以访问 KafkaBridge 自定义资源中定义的主机上的 Kafka Bridge 和端口。
外部客户端
外部客户端是在 OpenShift 集群 外部 运行的 HTTP 客户端,其中部署并运行 Kafka Bridge。外部客户端可以通过 OpenShift Route、负载均衡器服务或使用 Ingress 访问 Kafka 网桥。

HTTP 内部和外部客户端集成

Internal and external HTTP producers and consumers exchange data with the Kafka brokers through the Kafka Bridge

第 4 章 AMQ Streams Operator

AMQ Streams 支持使用 Operator 的 Kafka 来部署和管理 Kafka 到 OpenShift 的组件和依赖项。

Operator 是一种打包、部署和管理 OpenShift 应用的方法。AMQ Streams Operator 扩展 OpenShift 功能,自动执行与 Kafka 部署相关的常见复杂任务。通过在代码中了解 Kafka 操作,Kafka 管理任务可以简化,无需人工干预。

Operator

AMQ Streams 提供 Operator 来管理在 OpenShift 集群中运行的 Kafka 集群。

Cluster Operator
部署和管理 Apache Kafka 集群、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter 和 Entity Operator
实体 Operator
由主题 Operator 和 User Operator 组成
主题 Operator
管理 Kafka 主题
User Operator
管理 Kafka 用户

Cluster Operator 可以与 Kafka 集群同时部署 Topic Operator 和 User Operator 作为 Entity Operator 配置的一部分。

AMQ Streams 架构中的 Operator

Operators within the AMQ Streams architecture

4.1. Cluster Operator

AMQ Streams 使用 Cluster Operator 来部署和管理集群:

  • Kafka(包括 ZooKeeper、实体 Operator、Kafka Exporter 和 Cruise Control)
  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

自定义资源用于部署集群。

例如,部署 Kafka 集群:

  • 在 OpenShift 集群中创建了带有集群配置的 Kafka 资源。
  • Cluster Operator 根据 Kafka 资源中声明的内容部署对应的 Kafka 集群。

Cluster Operator 也可以部署(通过配置 Kafka 资源):

  • 一个主题 Operator,通过 KafkaTopic 自定义资源提供 operator 风格的主题管理
  • 一个 User Operator,用来通过 KafkaUser 自定义资源提供 operator 风格的用户管理

部署的 Entity Operator 中的 Topic Operator 和 User Operator 功能。

Cluster Operator 的架构示例

The Cluster Operator creates and deploys Kafka and ZooKeeper clusters

4.2. 主题 Operator

Topic Operator 提供了通过 OpenShift 资源管理 Kafka 集群中主题的方法。

Topic Operator 的架构示例

The Topic Operator manages topics for a Kafka cluster via KafkaTopic resources

Topic Operator 的角色是保留一组 KafkaTopic OpenShift 资源,描述 Kafka 主题和对应的 Kafka 主题。

特别是,如果 KafkaTopic 是:

  • 创建,主题 Operator 会创建该主题
  • 删除的,主题 Operator 会删除该主题
  • 更改,主题 Operator 更新该主题

在另一个方向上工作,如果一个主题是:

  • 在 Kafka 集群中创建,Operator 会创建一个 KafkaTopic
  • 从 Kafka 集群中删除,Operator 会删除 KafkaTopic
  • 在 Kafka 集群中更改,Operator 会更新 KafkaTopic

这可让您将 KafkaTopic 声明为应用程序部署的一部分,主题 Operator 将为您创建该主题。您的应用程序只需要处理从所需主题中产生或使用的内容。

Topic Operator 在主题 存储中维护每个主题 的信息,它与来自 Kafka 主题或 OpenShift KafkaTopic 自定义资源的更新持续同步。应用到本地内存中主题存储的操作的更新会保留在磁盘上的备份主题存储中。如果某个主题被重新配置或重新分配给其他代理,KafkaTopic 将始终保持最新状态。

4.3. User Operator

User Operator 通过监视描述 Kafka 用户的 KafkaUser 资源并确保在 Kafka 集群中正确配置了这些用户,来管理 Kafka 集群的 Kafka 用户。

例如,如果 KafkaUser 是:

  • 创建,User Operator 创建它描述的用户
  • 删除时,User Operator 会删除它所描述的用户
  • 更改后,User Operator 会更新它所描述的用户

与主题 Operator 不同,User Operator 不会将 Kafka 集群的任何更改与 OpenShift 资源同步。Kafka 主题可以直接由 Kafka 中的应用程序创建,但用户不必与 User Operator 并行直接在 Kafka 集群中管理。

User Operator 允许您将 KafkaUser 资源声明为应用程序部署的一部分。您可以为用户指定身份验证和授权机制。您还可以配置 用户配额 来控制对 Kafka 资源的使用,例如,确保用户不会专利对代理的访问。

创建用户时,会在 Secret 中创建用户凭证。您的应用需要使用用户 及其凭据进行身份验证,并生成或使用消息。

除了管理用于身份验证的凭证外,User Operator 还通过在 KafkaUser 声明中包含用户访问权限描述来管理授权规则。

4.4. AMQ Streams Operator 中的功能门

您可以使用功能 门来启用和禁用 Operator 的一些功能

功能门在操作器配置中设置,具有三个成熟度阶段:alpha、beta 或 General Availability(GA)。

如需更多信息,请参阅 功能门

第 5 章 Kafka 配置

使用 AMQ Streams 将 Kafka 组件部署到 OpenShift 集群可通过应用自定义资源进行高度配置。自定义资源作为自定义资源定义(CRD)添加的 API 实例创建,以扩展 OpenShift 资源。

CRD 用作描述 OpenShift 集群中自定义资源的配置说明,并为部署中使用的每个 Kafka 组件以及用户和主题提供 AMQ Streams。CRD 和自定义资源定义为 YAML 文件。AMQ Streams 发行版提供了示例 YAML 文件。

CRD 还允许 AMQ Streams 资源受益于原生 OpenShift 功能,如 CLI 访问和配置验证。

在本章中,我们将了解如何通过自定义资源配置 Kafka 组件,从常见配置点开始,然后了解与组件相关的重要配置注意事项。

5.1. 自定义资源

通过安装 CRD 在集群中添加新的自定义资源类型后,您可以根据具体规格创建资源实例。

AMQ Streams 组件的自定义资源具有常见的配置属性,这些属性在 spec 下定义。

在这个 Kafka 主题自定义资源的片段中,apiVersionkind 属性标识关联的 CRD。spec 属性显示用于定义该主题的分区和副本数的配置。

Kafka 主题自定义资源

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  # ...

可以将许多附加配置选项合并到 YAML 定义中,一些特定于特定组件的常见配置选项。

5.2. 通用配置

此处介绍了一些资源常用的配置选项。 适用的情况下,也可能采用安全性和 指标收集

Bootstrap 服务器

bootstrap 服务器用于到 Kafka 集群的主机/端口连接,用于:

  • Kafka Connect
  • Kafka Bridge
  • Kafka MirrorMaker 生产者和使用者
CPU 和内存资源

您可以为组件请求 CPU 和内存资源。limits 指定给定容器可以消耗的最大资源。

Topic Operator 和 User Operator 的资源请求和限值在 Kafka 资源中设置。

日志
您可以为组件定义日志级别。可以使用配置映射直接(内线)或外部定义日志记录。
healthchecks
健康检查配置引入了 存活度和 就绪度探测,以了解何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
JVM 选项
JVM 选项提供最大和最小内存分配,以根据组件运行的平台优化其性能。
Pod 调度
pod 调度使用 关联性/反关联性 规则来确定 pod 在什么情况下调度到节点上。

显示常见配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-cluster
spec:
  # ...
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      cpu: 12
      memory: 64Gi
    limits:
      cpu: 12
      memory: 64Gi
  logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  jvmOptions:
    "-Xmx": "2g"
    "-Xms": "2g"
  template:
    pod:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: node-type
                    operator: In
                    values:
                      - fast-network
  # ...

5.3. Kafka 集群配置

kafka 集群由一个或多个代理组成。要使生产者和消费者能够访问代理内的主题,Kafka 配置必须定义数据如何存储在集群中,以及如何访问数据。您可以将 Kafka 集群配置为在 机架 中使用多个代理节点运行。

存储

Kafka 和 ZooKeeper 将数据存储在磁盘上。

AMQ 流需要通过 StorageClass 调配块存储。存储的文件系统格式必须是 XFS 或 EXT4。支持三种类型的数据存储:

临时(仅适用于开发的建议)
临时存储存储实例生命周期的数据。实例重启后数据会丢失。
persistent
持久存储与独立于实例生命周期的长期数据存储相关。
JBOD(只是一个磁盘绑定,仅适用于 Kafka)
JBOD 允许您使用多个磁盘将提交日志存储在每个代理中。

如果基础架构支持,可以增加现有 Kafka 集群使用的磁盘容量。

监听程序

侦听器配置客户端如何连接到 Kafka 集群。

通过为 Kafka 集群中的每个监听程序指定唯一的名称和端口,您可以配置多个监听程序。

支持以下类型的监听程序:

  • 用于 OpenShift 内部访问的内部监听程序
  • 用于在 OpenShift 外部访问的外部监听程序

您可以为监听程序启用 TLS 加密,并配置 身份验证

使用 internal 类型指定内部监听程序。

外部监听程序通过指定外部 type 来公开 Kafka:

如果您使用 OAuth 2.0 进行基于令牌的身份验证,您可以将监听程序配置为使用授权服务器。

机架感知
机架意识是一种配置功能,它可以在 机架 间分发 Kafka 代理 Pod 和主题副本,它代表数据中心中的数据中心或机架或可用性区域。

显示 Kafka 配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external1
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    # ...
    storage:
      type: persistent-claim
      size: 10000Gi
    # ...
    rack:
      topologyKey: topology.kubernetes.io/zone
    # ...

5.4. Kafka MirrorMaker 配置

要设置 MirrorMaker,必须运行一个源和目标(目标) Kafka 集群。

您可以使用带有 MirrorMaker 2.0 的 AMQ Streams,虽然仍支持 MirrorMaker 的早期版本。

MirrorMaker 2.0

MirrorMaker 2.0 基于 Kafka Connect 框架,即管理集群间数据传输的 连接器

MirrorMaker 2.0 使用:

  • 源集群配置以使用源集群的数据
  • 将数据输出到目标集群的目标集群配置
集群配置

您可以在 主动/ 被动或主动/主动 集群配置中使用 MirrorMaker 2.0。

  • 主动/主动 配置中,两个集群都处于活动状态并同时提供相同的数据,如果您想在不同的地理位置在本地提供相同的数据,这很有用。
  • 主动/被动 配置中,来自主动/被动群集的数据复制到被动群集中,该群集仍处于备用状态,例如,在发生系统故障时进行数据恢复。

您可以配置 KafkaMirrorMaker2 自定义资源来定义 Kafka Connect 部署,包括源和目标集群的连接详情,然后运行一组 MirrorMaker 2.0 连接器来进行连接。

主题配置会根据 KafkaMirrorMaker2 自定义资源中定义的主题在源和目标集群间自动同步。将配置更改传播到远程主题,以便检测和创建新主题和分区。主题复制使用正则表达式模式来定义,以包含或排除主题。

以下 MirrorMaker 2.0 连接器和相关内部主题有助于管理集群间数据的传输和同步。

MirrorSourceConnector
MirrorSourceConnector 从源集群中创建远程主题。
MirrorCheckpointConnector
使用偏移 同步主题和 检查点 主题,为指定消费者组提供一个 MirrorCheckpointConnector 跟踪和映射偏移。偏移同步主题映射从记录元数据中复制主题分区的来源和目标偏移。检查点从每个源集群中发出,并通过检查点主题复制到目标集群中。checkpoint 主题映射源和目标集群中最后一个提交的偏移,用于每个消费者组中复制主题分区。
MirrorHeartbeatConnector
MirrorHeartbeatConnector 定期检查集群间的连通性。MirrorHeartbeatConnector 将每秒制作 心跳成一个在本地集群上创建的心跳 主题。如果您在远程和本地位置都有 MirrorMaker 2.0,MirrorHeartbeatConnector 在远程位置发送的心跳会被视为任何远程主题,被本地集群中的 MirrorSourceConnector 镜像。heartbeat 主题可让您轻松检查远程集群是否可用并连接群集。如果出现问题,心跳话题偏移位置和时间戳有助于恢复和诊断。

图 5.1. 在两个集群间复制

MirrorMaker 2.0 replication between a Kafka cluster in Region 1 and a Kafka cluster in Region 2
两个集群的双向复制

MirrorMaker 2.0 架构在 主动/主动 集群配置中支持双向复制,因此两个集群都处于活跃状态,同时提供相同的数据。每个目标目的地都需要一个 MirrorMaker 2.0 集群。

通过自动将集群名称加上主题名称的重命名来区分远程主题。如果要在不同的地理位置本地提供相同的数据,这将非常有用。

但是,如果要在主动/被动集群配置中备份或迁移数据,您可能需要保留主题的原始名称。如果是这样,您可以配置 MirrorMaker 2.0 以关闭自动重命名。

图 5.2. 双向复制

MirrorMaker 2.0 bidirectional architecture
显示 MirrorMaker 2.0 配置的 YAML 示例
  apiVersion: kafka.strimzi.io/v1beta2
  kind: KafkaMirrorMaker2
  metadata:
    name: my-mirror-maker2
    spec:
      version: 2.8.0
      connectCluster: "my-cluster-target"
      clusters:
      - alias: "my-cluster-source"
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092
      - alias: "my-cluster-target"
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector: {}
      topicsPattern: ".*"
      groupsPattern: "group1|group2|group3"

MirrorMaker

较早版本的 MirorMaker 使用生产者和消费者在集群间复制数据。

MirrorMaker 使用:

  • 用于消耗源集群数据的消费者配置
  • 将数据输出到目标集群的制作者配置

使用者和制作者配置包括任何身份验证和加密设置。

include 字段定义从源到目标集群要镜像的主题。

Key Consumer 配置
消费者组标识符
MirrorMaker 消费者的使用者组 ID,以便使用的消息分配到一个消费者组。
消费者流数量
个值,用于确定一个消费者组中并行使用消息的消费者数量。
偏移提交间隔
设置耗用时间与提交消息之间的偏移提交间隔。
主要 Producer 配置
取消发送失败的选项
您可以定义是否忽略了消息发送失败,或者 MirrorMaker 被终止并重新创建。
显示 MirrorMaker 配置的 YAML 示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  # ...
  consumer:
    bootstrapServers: my-source-cluster-kafka-bootstrap:9092
    groupId: "my-group"
    numStreams: 2
    offsetCommitInterval: 120000
    # ...
  producer:
    # ...
    abortOnSendFailure: false
    # ...
  include: "my-topic|other-topic"
  # ...

5.5. Kafka Connect 配置

基本的 Kafka Connect 配置需要一个 bootstrap 地址来连接到 Kafka 集群,以及加密和身份验证详情。

默认情况下会使用相同的配置 Kafka Connect 实例:

  • Kafka Connect 集群的组 ID
  • 保存连接器偏移的 Kafka 主题
  • 存储连接器和任务状态配置的 Kafka 主题
  • 存储连接器和任务状态更新的 Kafka 主题

如果使用多个不同的 Kafka Connect 实例,这些设置必须反映每个实例。

显示 Kafka Connect 配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
  # ...

连接器

连接器与 Kafka Connect 独立配置。该配置描述了要馈入和离开 Kafka Connect 的源输入数据和目标输出数据。外部源数据必须引用存储消息的特定主题。

Kafka 提供两个内置连接器:

  • FileStreamSourceConnector 将外部系统的数据流传输到 Kafka,从输入源读取行并将每行发送到 Kafka 主题。
  • FileStreamSinkConnector 将 Kafka 数据流传输到外部系统,读取 Kafka 主题中的信息,并在输出文件中为每个文件创建一个行。

您可以使用连接器插件添加其他连接器,它们是一组 JAR 文件或 TGZ 存档,定义了连接到特定类型外部系统所需的实施。

您可以创建一个使用新连接器插件的自定义 Kafka Connect 镜像。

要创建镜像,您可以使用:

对于 AMQ Streams 自动创建新镜像,build 配置需要 output 属性引用存储容器镜像的容器 registry,以及 plugins 属性来列出要添加到镜像中的连接器插件及其工件。

output 属性描述了镜像的类型和名称,以及包含访问容器 registry 所需凭证的 Secret 名称(可选)。plugins 属性描述了工件的类型以及从中下载工件的 URL。另外,您可以在解压缩工件前指定一个 SHA-512 校验和来验证工件。

用于自动创建新镜像的 Kafka Connect 配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  # ...
  build:
    output:
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins:
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://ARTIFACT-ADDRESS.tgz
            sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
      # ...
  #...

管理连接器

您可以使用 KafkaConnector 资源或 Kafka Connect REST API 在 Kafka Connect 集群中创建和管理连接器实例。KafkaConnector 资源提供 OpenShift 原生方法,并由 Cluster Operator 管理。

KafkaConnector 资源的 spec 指定连接器类和配置设置,以及处理数据的连接器 任务 的最大数量。

显示 KafkaConnector 配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    file: "/opt/kafka/LICENSE"
    topic: my-topic
    # ...

您可以通过在 KafkaConnect 资源中添加注解来启用 KafkaConnectors。KafkaConnector 资源必须部署到与其链接的 Kafka Connect 集群相同的命名空间中。

启用 KafkaConnector 的 YAML 显示注解示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

5.6. Kafka Bridge 配置

Kafka Bridge 配置需要它连接的 Kafka 集群的 bootstrap 服务器规格,以及所需的任何加密和身份验证选项。

Kafka Bridge consumer 和 producer 配置是标准的配置,如使用者的 Apache Kafka 配置文档和针对 生产者的 Apache Kafka 配置文档 中所述。

与 HTTP 相关的配置选项设置服务器侦听的端口连接。

CORS

Kafka Bridge 支持使用跨 Origin 资源共享(CORS)。CORS 是一种 HTTP 机制,它允许浏览器从多个来源访问选定的资源,例如,在不同的域中的资源。如果选择使用 CORS,您可以定义允许的资源来源列表和 HTTP 方法,以便通过 Kafka Bridge 与 Kafka 集群交互。列表在 Kafka Bridge 配置的 http 规格中定义。

CORS 允许在不同域中的原始源之间 简单预先理解的请求

  • 简单的请求是 HTTP 请求,必须在其标头中定义允许的原始项。
  • 抢占的请求在实际请求前发送一个初始 OPTIONS HTTP 请求,以检查来源和方法是否允许。

显示 Kafka Bridge 配置的 YAML 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  bootstrapServers: my-cluster-kafka:9092
  http:
    port: 8080
    cors:
      allowedOrigins: "https://strimzi.io"
      allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH"
  consumer:
    config:
      auto.offset.reset: earliest
  producer:
    config:
      delivery.timeout.ms: 300000
  # ...

其它资源

第 6 章 保护 Kafka

安全部署 AMQ Streams 可以包含:

  • 为数据交换加密
  • 验证以证明身份
  • 允许或拒绝用户执行的操作的授权

6.1. 加密

AMQ 流支持传输层安全性(TLS),这是一种用于加密通信的协议。

始终对通信进行加密以实现:

  • Kafka 代理
  • zookeeper 节点
  • Operator 和 Kafka 代理
  • Operators 和 ZooKeeper 节点
  • Kafka Exporter

您还可以通过将 TLS 加密应用到 Kafka 代理的监听程序来配置 Kafka 代理和客户端之间的 TLS。在配置外部侦听器时,为外部客户端指定 TLS。

AMQ Streams 组件和 Kafka 客户端使用数字证书加密。Cluster Operator 设置证书以便在 Kafka 集群中启用加密。您可以提供自己的服务器证书,称为 Kafka 侦听程序证书,用于 Kafka 客户端和 Kafka 代理之间的通信,以及集群间通信。

AMQ Streams 使用 Secret 以 PEM 和 PKCS #12 格式存储 TLS 所需的证书和私钥。

TLS 证书颁发机构(CA)发布证书来验证组件的身份。AMQ Streams 根据 CA 证书验证组件的证书。

  • 根据 集群 CA 证书颁发机构(CA)验证 AMQ Streams 组件
  • 针对 客户端 CA 证书颁发机构(CA)验证 Kafka 客户端。

6.2. Authentication

Kafka 侦听器使用身份验证来确保与 Kafka 集群的安全客户端连接。

支持的验证机制:

  • 双向 TLS 客户端身份验证(在启用了 TLS 加密的监听器上)
  • SASL SCRAM-SHA-512
  • 基于 OAuth 2.0 令牌的身份验证

User Operator 管理 TLS 和 SCRAM 身份验证的用户凭证,但不管理 OAuth 2.0。例如,您可以通过 User Operator 创建代表需要访问 Kafka 集群的客户端的用户,并将 TLS 指定为身份验证类型。

通过使用基于 OAuth 2.0 令牌的身份验证,应用程序客户端可以在不公开帐户凭证的情况下访问 Kafka 代理。授权服务器处理关于访问权限的访问和咨询。

6.3. 授权

Kafka 集群使用授权来控制特定客户端或用户在 Kafka 代理上允许的操作。如果应用到 Kafka 集群,则会为用于客户端连接的所有监听程序启用授权。

如果在 Kafka 代理配置中的 超级用户列表中添加用户,则允许用户无限访问集群,无论通过授权机制实施的任何授权限制。

支持的授权机制:

  • 简单授权
  • OAuth 2.0 授权(如果您使用基于 OAuth 2.0 令牌的身份验证)
  • 打开策略代理(OPA)授权
  • 自定义授权

简单授权使用 AclAuthorizer,默认的 Kafka 授权插件。AclAuthorizer 使用访问控制列表(ACL)定义哪些用户有权访问哪些资源。对于自定义授权,您可以将自己的 Authorizer 插件配置为强制执行 ACL 规则。

OAuth 2.0 和 OPA 提供来自授权服务器的基于策略的控制。用于授予对 Kafka 代理上资源访问权限的安全策略和权限在授权服务器上定义。

URL 用于连接授权服务器,并验证是否允许或拒绝客户端或用户请求的操作。用户和客户端与授权服务器中创建的策略匹配,允许对 Kafka 代理执行特定操作。

第 7 章 监控

通过监控数据,您可以监控 AMQ Streams 的性能和健康状况。您可以配置部署,以捕获用于分析和通知的指标数据。

在调查连接和数据交付问题时,指标数据非常有用。例如,指标数据可以识别复制不足的分区或消息的使用量速率。警报规则可以通过指定的通信频道提供此类指标的时间关键通知。监控视觉化呈现实时指标数据,以帮助确定更新部署配置的时间和方式。AMQ Streams 提供了示例指标配置文件。

分布式追踪为通过 AMQ Streams 提供端到端跟踪消息的工具补充指标数据的收集。

cruise Control 支持根据工作负载数据重新平衡 Kafka 集群。

指标和监控工具

AMQ Streams 可使用以下工具进行指标和监控:

  • Prometheus 从 Kafka、ZooKeeper 和 Kafka Connect 集群中提取指标。Prometheus Alertmanager 插件处理警报并将其路由到通知服务。
  • Kafka Exporter 添加额外的 Prometheus 指标
  • Grafana 提供 Prometheus 指标的仪表板视觉化
  • Jaeger 提供分布式追踪支持来跟踪应用程序之间的事务
  • 在 Kafka 集群间实现控制 平衡数据

7.1. Prometheus

Prometheus 可以从 Kafka 组件和 AMQ Streams Operator 中提取指标数据。

要使用 Prometheus 获取指标数据并提供警报,必须部署 Prometheus 和 Prometheus Alertmanager 插件。还必须部署 Kafka 资源或重新部署指标配置,以公开指标数据。

Prometheus 提取公开的指标数据以进行监控。在条件指示潜在问题时,Alertmanager 会根据预定义的警报规则发出警报。

AMQ Streams 提供了示例指标和警报规则配置文件。AMQ Streams 提供的示例警报机制配置为发送通知到 Slack 频道。

7.2. Grafana

Grafana 使用 Prometheus 公开的指标数据来显示仪表板视觉化以进行监控。

需要部署 Grafana,并将 Prometheus 添加为数据源。由 AMQ Streams 提供的 AMQ Streams 作为 JSON 文件提供的示例仪表板通过 Grafana 界面导入,以呈现监控数据。

7.3. Kafka Exporter

Kafka Exporter 是一个开源项目,用于增强对 Apache Kafka 代理和客户端的监控。Kafka Exporter 使用 Kafka 集群部署,以便从 Kafka 代理中提取与偏移、消费者组、消费者滞后和主题相关的其他 Prometheus 指标数据。您可以使用提供的 Grafana 仪表板来视觉化 Prometheus 从 Kafka Exporter 收集的数据。

AMQ Streams 提供了 Kafka Exporter 的示例配置文件、警报规则和 Grafana 仪表板。

7.4. 分布式追踪

在 Kafka 部署中,支持使用 Jaeger 的分布式追踪:

  • MirrorMaker 将源集群的信息追踪到目标集群
  • kafka Connect 以跟踪由 Kafka Connect 使用并生成的信息
  • Kafka Bridge 用于跟踪 Kafka Bridge 使用和生成的消息,以及来自客户端应用程序的 HTTP 请求

模板配置属性是为 Kafka 资源设置的,用于描述追踪环境变量。

追踪 Kafka 客户端

也可以设置 Kafka 制作者和使用者等客户端应用程序,以便监控交易。客户端配置有跟踪配置文件,并且初始化追踪器供客户端应用使用。

7.5. Sything Control

光线控制是一个开源项目,用于简化对 Kafka 集群间数据的监控和平衡。Soundation Control 与 Kafka 集群一起部署,以监控其流量,提出更均衡的分区分配,并根据这些情况触发分区重新分配。

cruise Control 会收集资源利用率信息,以对 Kafka 集群的工作负载进行建模和分析。根据定义的 优化目标,Cruise 控制可以生成 优化,概述 如何有效地重新平衡集群。当一个 优化建议 被批准时,Cruise Control 应用提议中的重新平衡。

Prometheus 可以提取 Cruise Control 指标数据,包括与优化调整和重新平衡操作相关的数据。AMQ Streams 提供了用于 Cruise Control 的示例配置文件和 Grafana 仪表板。

附录 A. 使用您的订阅

AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 My Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 Zip 和 Tar 文件

要访问 zip 或 tar 文件,请使用客户门户查找要下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads
  2. 查找 INTEGRATION 类别 中的 Red Hat AMQ Streams 条目。
  3. 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
  4. 单击组件的 Download 链接。

2021-08-27 00:37:31 +1000 修订