Kafka 配置调整

Red Hat AMQ Streams 2.5

使用 Kafka 配置属性来优化数据流

摘要

使用 Kafka 配置属性微调 Kafka 代理、制作者和消费者的操作。

使开源包含更多

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息

第 1 章 Kafka 调整概述

使用配置属性优化 Kafka 代理、生产者和消费者的性能。您可以在 OCP 和 RHEL 上为 AMQ Streams 指定配置属性。

需要最小配置属性,但您可以添加或调整属性,以更改制作者和消费者如何与 Kafka 代理交互。例如,您可以调整消息的延迟和吞吐量,以便客户端可以实时响应数据。

您可以首先分析指标以量化进行初始配置的位置,然后进行增量更改并进一步比较指标,直到您有需要配置为止。

有关 Apache Kafka 配置属性的更多信息,请参阅 Apache Kafka 文档

1.1. 映射属性和值

如何指定配置属性取决于部署类型。如果在 OCP 上部署 AMQ Streams,您可以使用 Kafka 资源通过 config 属性添加 Kafka 代理的配置。在 RHEL 上使用 AMQ Streams,您可以将配置作为环境变量添加到属性文件中。

当您向自定义资源添加 config 属性时,您可以使用冒号(':')来映射属性和值。

自定义资源中的配置示例

num.partitions:1

当您将属性作为环境变量添加时,您可以使用等号('=')来映射属性和值。

环境变量配置示例

num.partitions=1

1.2. 帮助调优的工具

以下工具有助于 Kafka 调整:

  • Cruise Control 生成优化建议,可用于评估和实施集群重新平衡
  • Kafka 静态配额插件对代理设置限制
  • 机架配置将代理分区分散到机架中,并允许用户从最接近的副本获取数据

有关这些工具的更多信息,请参阅以下指南:

第 2 章 受管代理配置

在 OpenShift 上部署 AMQ Streams 时,您可以通过 Kafka 自定义资源的 config 属性指定代理配置。但是,某些代理配置选项由 AMQ Streams 直接管理。

因此,如果您在 OpenShift 中使用 AMQ Streams,则无法配置以下选项:

  • broker.id 指定 Kafka 代理的 ID
  • log.dirs 目录用于日志数据
  • zookeeper.connect 配置,将 Kafka 与 ZooKeeper 连接
  • 监听程序 将 Kafka 集群公开给客户端
  • 允许或拒绝用户执行操作的 授权机制
  • 证明需要访问 Kafka 的用户身份 的身份验证机制

代理 ID 从 0 (零)开始,对应于代理副本数。日志目录根据 Kafka 自定义资源中的 spec.kafka.storage 配置挂载到 /var/lib/kafka/data/kafka-logIDXIDX 是 Kafka 代理 pod 索引。

有关排除列表,请参阅 KafkaClusterSpec 模式参考

在 RHEL 上使用 AMQ Streams 时,这些排除并不适用。在这种情况下,您需要在基本代理配置中添加这些属性来识别代理并提供安全访问。

RHEL 上 AMQ Streams 的代理配置示例

# ...
broker.id = 1
log.dirs = /var/lib/kafka
zookeeper.connect = zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
listeners = internal-1://:9092
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
ssl.truststore.location = /path/to/truststore.jks
ssl.truststore.password = 123456
ssl.client.auth = required
# ...

第 3 章 Kafka 代理配置调整

使用配置属性优化 Kafka 代理的性能。您可以使用标准 Kafka 代理配置选项,但直接由 AMQ Streams 管理的属性除外。

3.1. 基本代理配置

典型的代理配置包括与主题、线程和日志相关的属性设置。

基本代理配置属性

# ...
num.partitions=1
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=3
num.io.threads=8
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
group.initial.rebalance.delay.ms=0
zookeeper.connection.timeout.ms=6000
# ...

3.2. 复制用于高可用性的主题

基本主题属性为主题设置默认分区和复制因素数量,该主题适用于在没有显式设置这些属性的情况下创建的主题,包括何时自动创建主题。

# ...
num.partitions=1
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
replica.fetch.max.bytes=1048576
# ...

对于高可用性环境,建议将复制因素增加到至少 3 个主题,并将所需的最少同步副本数设置为复制因素减 1。

auto.create.topics.enable 属性默认为启用,以便在生成者和消费者需要时自动创建不存在的主题。如果使用自动主题创建,您可以使用 num.partitions 为主题设置默认分区数量。通常,此属性被禁用,以便在创建显式主题时通过主题提供更多控制。

为了实现数据持久性,需要在 topic 配置中设置 min.insync.replicas,并在 producer 配置中使用 acks=all 来发送提交确认。

使用 replica.fetch.max.bytes 设置复制领导分区的每个后续信息的最大大小(以字节为单位)。根据平均消息大小和吞吐量更改这个值。当考虑读/写缓冲区所需的内存分配总量时,可用内存还必须能够满足所有后续者乘以的最大复制消息大小。

delete.topic.enable 属性默认启用,以允许删除主题。在生产环境中,您应该禁用此属性以避免意外删除主题,从而导致数据丢失。但是,您可以临时启用它并删除主题,然后再次禁用它。

注意

在 OpenShift 上运行 AMQ Streams 时,主题 Operator 可以提供 operator 风格的主题管理。您可以使用 KafkaTopic 资源来创建主题。对于使用 KafkaTopic 资源创建的主题,使用 spec.replicas 设置复制因素。如果启用了 delete.topic.enable,您还可以使用 KafkaTopic 资源删除主题。

# ...
auto.create.topics.enable=false
delete.topic.enable=true
# ...

3.3. 事务和提交的内部主题设置

如果您使用事务 启用来自制作者的分区的原子写入,事务的状态将存储在内部 __transaction_state 主题中。默认情况下,代理配置有复制因素 3,这个主题最少需要 2 个同步副本,这意味着您的 Kafka 集群中至少需要三个代理。

# ...
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ...

同样,内部 __consumer_offsets 主题(存储消费者状态)具有分区和复制因素的默认设置。

# ...
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
# ...

不要在生产环境中减少这些设置。生产环境中,您可以提高设置。作为例外,您可能想要减少单一代理 测试环境中的设置

3.4. 通过增加 I/O 线程来提高请求处理吞吐量

网络绑定处理到 Kafka 集群的请求,如从客户端应用程序生成和获取请求。生成请求放置在请求队列中。响应放置在响应队列中。

每个监听器的网络线程数量应该反映复制因素以及客户端制作者和与 Kafka 集群交互的用户的活动级别。如果您要有大量请求,您可以使用时间线程量来决定何时添加更多线程。

要减少拥塞并规范请求流量,您可以限制请求队列中允许的请求数。当请求队列已满时,所有传入的流量都会被阻断。

I/O 线程从请求队列中获取请求来处理它们。添加更多线程可以提高吞吐量,但 CPU 内核和磁盘带宽的数量会产生实际上限。至少,I/O 线程数量应该等于存储卷的数量。

# ...
num.network.threads=3 1
queued.max.requests=500 2
num.io.threads=8 3
num.recovery.threads.per.data.dir=4 4
# ...
1
Kafka 集群的网络线程数量。
2
请求队列中允许的请求数。
3
Kafka 代理的 I/O 线程数量。
4
在启动时用于日志加载的线程数量,并在关闭时清除。尝试将值设为至少内核数。

对所有代理的线程池的配置更新可能会在集群级别动态发生。这些更新仅限于当前大小的一半和当前大小的两倍。

提示

以下 Kafka 代理指标可帮助处理所需的线程数量:

  • kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent 提供平均时间网络线程的指标作为百分比。
  • kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent 提供平均 I/O 线程闲置为百分比的指标。

如果有 0% 的空闲时间,所有资源都被使用,这意味着添加更多线程可能很有用。当闲置时间低于 30% 时,性能可能会开始下降。

如果因为磁盘数量导致线程缓慢或有限,您可以尝试增加缓冲区的大小来提高吞吐量:

# ...
replica.socket.receive.buffer.bytes=65536
# ...

另外,增加 Kafka 可以接收的最大字节数:

# ...
socket.request.max.bytes=104857600
# ...

3.5. 增加高延迟连接的带宽

Kafka 批处理数据,以便在从 Kafka 到客户端(如数据中心之间的连接)上实现高延迟连接的合理的吞吐量。但是,如果高延迟问题,您可以增加缓冲区的大小来发送和接收信息。

# ...
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# ...

您可以使用 带宽延迟产品 计算来估算缓冲区的最佳大小,它乘以链接的最大带宽(以字节为单位/s),带有往返延迟(以秒为单位)来估算缓冲区如何保持最大吞吐量。

3.6. 使用数据保留策略管理日志

Kafka 使用日志来存储消息数据。日志是与各种索引关联的一系列片段。新消息被写入 active 片段,之后永远不会修改。在服务从消费者获取请求时读取片段。有时候,活跃段会滚动到变为只读,一个新的活跃段会被创建来替代它。一次只有一个片段活跃。旧的片段会被保留,直到它们有资格删除。

代理级别的配置会以字节为单位设置日志片段的最大大小,以及滚动活跃片段前的时间(以毫秒为单位):

# ...
log.segment.bytes=1073741824
log.roll.ms=604800000
# ...

您可以使用 segment.bytessegment.ms 在主题级别上覆盖这些设置。无论您需要降低或引发这些值,都取决于删除片段的策略。较大的大小意味着活跃片段包含更多信息,经常滚动。段也变得更频繁地删除。

您可以设置基于时间或基于大小的日志保留和清理策略,以便日志保持可以管理。根据您的要求,您可以使用日志保留配置来删除旧的片段。如果使用日志保留策略,则在达到保留限制时会删除非主动日志片段。删除旧的片段会绑定日志所需的存储空间,因此您不会超过磁盘容量。

对于基于时间的日志保留,您可以根据小时、分钟和毫秒设置一个保留周期。保留周期基于时间信息附加到片段中。

毫秒的配置的优先级超过分钟,其优先级超过小时。分钟和毫秒配置默认为 null,但三个选项提供了对您要保留的数据进行大量控制。首选应分配给毫秒配置,因为它是唯一可动态更新的三个属性之一。

# ...
log.retention.ms=1680000
# ...

如果 log.retention.ms 设为 -1,则不会将时间限制应用到日志保留,因此会保留所有日志。应始终监控磁盘用量,但通常不建议 -1 设置,因为它可能会导致完整磁盘出现问题,这可能会很难重新处理。

对于基于大小的日志保留,您可以以字节为单位设置最大日志大小(日志中的所有片段):

# ...
log.retention.bytes=1073741824
# ...

换句话说,日志通常会在达到稳定状态后有大约 log.retention.bytes/log.segment.bytes 段。当达到最大日志大小时,旧的片段会被删除。

使用最大日志大小的潜在问题是,它不会考虑时间信息被附加到片段中。您可以使用基于时间和基于大小的日志保留来进行清理策略,以获取您需要的平衡。每当首先达到清理阈值时都会触发清理。

如果要在从系统中删除段文件前添加时间延迟,您可以为主题配置中的特定主题使用 log.segment. delete.delay.ms 添加延迟。

# ...
log.segment.delete.delay.ms=60000
# ...

3.7. 使用清理策略删除日志数据

删除旧日志数据的方法由日志 清理 器配置决定。

默认情况下,代理启用了 log cleaner:

# ...
log.cleaner.enable=true
# ...

如果您使用日志压缩清理策略,则需要启用日志清理。您可以在主题或代理级别设置清理策略。对于没有设置策略的主题,代理级配置是默认设置。

您可以设置策略以删除日志、紧凑日志或同时执行两者:

# ...
log.cleanup.policy=compact,delete
# ...

delete 策略与使用数据保留策略管理日志对应。当不需要永久保留数据时,它适合。紧凑 策略保证为每个消息键保留最新的消息。日志压缩适合消息值可改变,您希望保留最新的更新。

如果将清理策略设置为删除日志,则根据日志保留限制删除旧的片段。否则,如果没有启用日志清理程序,且没有日志保留限制,日志将继续增长。

如果为日志压缩设置了清理策略,日志 的头 将作为标准 Kafka 日志运行,并按顺序添加新信息。在紧凑日志的末尾,日志清理程序运行,如果日志稍后发生具有相同键的另一个记录,则会删除记录。带有 null 值的消息也会被删除。如果您不使用密钥,则无法使用压缩,因为需要密钥来识别相关消息。虽然 Kafka 保证每个键的最新信息将被保留,但它不能保证整个压缩日志不包含重复的日志。

图 3.1. 在压缩前,显示键值写入的偏移位置的日志

显示键值写入的压缩镜像

使用键来识别消息,Kafka 压缩会保留特定消息键的最新消息(具有最高偏移),最终丢弃具有相同键的之前信息。换句话说,处于 latest 状态的消息始终可用,在日志清理程序运行时最终会删除特定消息的最新记录。您可以将消息还原回以前的状态。

即使删除记录,记录也会保留其原始偏移量。因此,尾部可以有非连续偏移。当消耗尾部不再可用的偏移量时,会找到具有下一个较高偏移的记录。

图 3.2. 压缩后的日志

日志清理后压缩的镜像

如果您只选择一个紧凑策略,则您的日志仍然可以变得任意大。在这种情况下,您可以将策略设置为紧凑删除日志。如果您选择紧凑和删除,首先压缩日志数据,在日志头使用键删除记录。之后,在日志保留阈值前的数据会被删除。

图 3.3. 日志保留点和压缩点

使用保留点压缩的镜像

您可以设置日志检查的频率,以毫秒为单位进行清理:

# ...
log.retention.check.interval.ms=300000
# ...

调整日志保留检查间隔,以便与日志保留设置相关。较小的保留大小可能需要更频繁地检查。

清理的频率应该足以管理磁盘空间,但通常不会影响主题的性能。

您还可以设置一个时间(毫秒),以便在没有日志清理时将清理器置于待机状态:

# ...
log.cleaner.backoff.ms=15000
# ...

如果您选择删除旧的日志数据,可以设置一个毫秒的句点来保留已删除的数据,然后再清除:

# ...
log.cleaner.delete.retention.ms=86400000
# ...

删除的数据保留周期提供了在不必要的删除数据前了解数据的时间。

要删除与特定键相关的所有消息,生成者可以发送 tombstone 消息。tombstone 有一个 null 值,并充当一个标记来告知消费者会删除该值。压缩后,只会保留 tombstone,这必须很长时间才能让消费者知道该消息已被删除。删除旧的消息时,没有值,tombstone 键也会从分区中删除。

3.8. 管理磁盘使用率

与日志清理相关的其他配置设置有很多,但特别是内存分配是内存分配。

deduplication 属性指定在所有日志清理线程间清理的总内存。您可以在通过缓冲区负载因素使用的内存百分比上设置上限。

# ...
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.io.buffer.load.factor=0.9
# ...

每个日志条目都使用 24 字节,因此您可以处理缓冲区在单个运行中可以处理的日志条目数量,并相应地调整设置。

如果可能,如果要减少日志清理时间,请考虑增加日志清理线程的数量:

# ...
log.cleaner.threads=8
# ...

如果您遇到 100% 磁盘带宽使用情况的问题,您可以节流日志清理 I/O,以便读取/写操作的总和低于执行该操作的磁盘的功能:

# ...
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
# ...

3.9. 处理大型消息大小

消息的默认批处理大小为 1MB,在大多数用例中最适合最大吞吐量。假设有足够的磁盘容量,Kafka 可以以较低的吞吐量容纳更大的批处理。

大型消息大小以三种方式处理:

  1. 生成者消息压缩 将压缩消息写入日志。
  2. 基于参考的消息仅发送对存储在消息值中的一些其他系统中的数据的引用。
  3. 内联消息传递将信息分成使用相同键的块,然后使用像 Kafka Streams 这样的流处理器组合在输出中。
  4. 为处理更大的消息大小而构建代理和制作者/消费者客户端应用程序配置。

建议使用基于参考的消息和消息压缩选项,并涵盖大多数情况。使用任何这些选项时,必须小心以避免出现性能问题。

制作者侧压缩

对于制作者配置,您可以指定一个 compression.type,如 Gzip,然后应用于制作者生成的数据的批处理。使用代理配置 compression.type=producer,代理会保留制作者使用的任何压缩。每当生成者和主题压缩不匹配时,代理必须在将批处理附加到日志之前再次压缩,这会影响代理性能。

压缩还增加生成者上的额外的处理开销,但在批处理中包含更多数据,因此在消息数据压缩时通常很有用。

将生成者压缩与批处理大小的微调合并,以促进最佳吞吐量。使用指标有助于量化所需的平均批处理大小。

基于参考的消息

当您不知道消息将如何多大时,基于参考的消息对于数据复制非常有用。外部数据存储必须为 fast、durable,且高可用性才能使此配置正常工作。数据被写入数据存储,并返回对数据的引用。producer 发送一个包含对 Kafka 引用的消息。消费者从消息获取引用,并使用它来从数据存储中获取数据。

图 3.4. 基于参考的消息流

基于参考的消息流的镜像

由于消息传递需要更多往返,因此端到端延迟会增加。这个方法的另一个显著缺陷是在清理 Kafka 消息时,外部系统中没有自动清理数据。混合方法是仅向数据存储和进程标准消息发送大型消息。

内联消息传递

内联消息传递比较复杂,但它没有依赖于外部系统的开销,如基于参考的消息。

生成的客户端应用程序必须序列化,然后在消息太大时阻止数据。然后,生成者使用 Kafka ByteArraySerializer 或与发送每个块前再次序列化每个块类似。消费者跟踪消息和缓冲区块,直到它有完整的消息。消耗客户端应用程序接收块,这些块在反序列化前被编译。根据每个一组块消息的第一个或最后一个块的偏移,将完整的消息传送到消耗的应用程序的其余部分。检查通过偏移元数据进行的成功发送完整消息,以避免在重新平衡过程中重复。

图 3.5. 内联消息传递流

内联消息传递流的镜像

因为缓冲区需要,内联消息传递在消费者端性能开销,特别是在并行处理一系列大型消息时。大型消息的块可能会变为交集,因此如果缓冲区中另一个大消息的块不完整,则无法提交消息的所有块。因此,通常通过保留消息区块或实施提交逻辑来支持缓冲区。

处理更大消息的配置

如果无法避免更大的消息,并避免在消息流的任意点上的块,您可以提高消息限值。为此,请在主题级别配置 message.max.bytes,为单个主题设置最大记录批处理大小。如果您在代理级别上设置了 message.max.bytes,则所有主题都允许更大的消息。

代理将拒绝任何大于 message. max.bytes 设置的消息。生产者(max.request.size)和消费者(message.max.bytes)的缓冲区大小必须能够容纳更大的消息。

3.10. 控制消息数据的日志清除

通常,建议不会设置显式清除阈值,并让操作系统使用默认设置执行后台清除。分区复制提供比写入任何单个磁盘更多的数据持久性,因为失败的代理可以从其同步的副本中恢复。

Log flush 属性控制缓存消息数据的定期写入磁盘。调度程序以毫秒为单位指定日志缓存中检查的频率:

# ...
log.flush.scheduler.interval.ms=2000
# ...

您可以根据保存在内存的最大时间和日志中的最大消息数来控制清除的频率:

# ...
log.flush.interval.ms=50000
log.flush.interval.messages=100000
# ...

刷新之间的等待包括时间进行检查,以及在执行清除前指定的时间间隔。增加清除的频率可能会影响吞吐量。

如果您使用应用程序清除管理,如果您使用更快的磁盘,设置较低的 flush 阈值可能适合。

3.11. 对可用性进行分区重新平衡

分区可以在代理之间复制,以进行容错。对于给定分区,选择一个代理被选择,用于处理所有生成的请求(写入日志)。在领导失败时,在其他代理中,分区遵循器复制分区领导分区数据以获得数据可靠性。

遵循者通常不为客户端提供服务,但 机架 配置允许在 Kafka 集群跨越多个数据中心时消耗来自最接近的副本的消息。followers 仅操作从分区领导复制消息,并允许恢复机制失败。恢复需要一个同步后续程序。followers 通过向领导发送获取请求来保持同步,该请求会向后续者返回消息。如果后续者在领导上最近提交的消息已耗尽,则它被视为处于同步状态。领导通过查看后续请求的最后偏移量来检查这一点。除非允许未清理领导选举机制,否则通常不同步后续者作为领导机 线。

您可以在后续者被视为不同步前调整滞后时间:

# ...
replica.lag.time.max.ms=30000
# ...

滞后时间限制将消息复制到所有同步副本的时间限制,以及制作者必须等待确认的时长。如果后续者无法发出获取请求并使用指定滞后内的最新消息捕获,它将从同步的副本中删除。您可以减少更快检测失败副本的时间,但这样做可能会增加不需要同步的跟随者数量。右后时间值取决于网络延迟和代理磁盘带宽。

当领导分区不再可用时,会选择其中一个同步副本作为新领导。分区列表中的第一个代理称为 首选 领导。默认情况下,会根据定期检查领导分发,为自动分区领导重新平衡启用 Kafka。也就是说,Kafka 会检查首选领导 是当前的 领导。重新平衡可确保领导在代理和代理间平均分布不会过载。

您可以使用 Cruise Control for AMQ Streams 来找出副本分配到集群中平均平衡负载的代理。计算考虑领导和后续人员遇到的差异负载。失败的领导会影响 Kafka 集群的平衡,因为剩余的代理会获得前导额外分区的额外工作。

对于 Cruise Control 发现的分配,实际上平衡,分区需要由首选领导领导提供。Kafka 可以自动确保使用首选领导机制(可能),根据需要更改当前的领导。这样可确保集群保持在 Cruise Control 找到的 balanced 状态。

在触发重新平衡前,您可以控制重新平衡检查的频率,以及代理允许的最大不可变百分比。

#...
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
#...

代理的领导机机是当前分区数量之间的比例,代理是当前领导的领导数,以及它是首选领导分区数量。您可以将百分比设置为零,以确保始终选择首选领导,假设它们同步。

如果对重新平衡的检查需要更多控制,您可以禁用自动重新平衡。然后,您可以选择何时使用 kafka-leader-election.sh 命令行工具触发重新平衡。

注意

由 AMQ Streams 提供的 Grafana 仪表板显示没有活跃领导的分区和分区的指标。

3.12. 未清理领导选举机制

同步副本的领导选举被视为干净的,因为它无法保证数据丢失。这是默认发生什么。但是,如果没有同步的副本来参与领导?ISR (同步副本)可能仅在领导磁盘结束时才会包含领导。如果没有设置最小同步的副本,且在硬盘失败时没有与分区领导同步的遵循者,则数据已经丢失。不仅如此,新的领导产品也无法被选举,因为没有同步的跟随者。

您可以配置 Kafka 如何处理领导失败:

# ...
unclean.leader.election.enable=false
# ...

默认禁用未清理领导选举机制,这意味着没有同步的副本无法成为领导。使用清理领导选举机制时,如果没有其他代理在旧的领导机丢失时位于 ISR 中,Kafka 会在可以写入或读取信息前等待该领导重新上线。Unclean leader 选举意味着没有同步的副本可能会成为领导状态,但您会面临丢失消息的风险。您做出的选择取决于您的要求是否优先可用性。

您可以在主题级别上覆盖特定主题的默认配置。如果您无法牺牲数据丢失的风险,则保留默认配置。

3.13. 避免不必要的消费者组重新平衡

对于加入新消费者组的用户,您可以添加延迟,以避免对代理进行不必要的重新平衡:

# ...
group.initial.rebalance.delay.ms=3000
# ...

延迟是协调者等待成员加入的时间。延迟时间越长,所有成员都将及时加入,并避免重新平衡。但是,延迟也会阻止组消耗,直到周期结束为止。

第 4 章 Kafka 使用者配置调整

使用基本消费者配置,以及专为特定用例量身定制的可选属性。

在调整您的消费者时,您的主要关注将确保它们与最接近的数据量保持一致。与生成者调优一样,准备好进行增量更改,直到消费者按预期运行为止。

4.1. 基本消费者配置

每个消费者都需要 connection 和 deserializer 属性。通常,为跟踪添加客户端 ID 是不错的做法。

在消费者配置中,无论后续配置是什么:

  • 消费者从给定的偏移获取并按顺序消耗消息,除非将偏移更改为跳过或重新读取消息。
  • 代理不知道消费者是否处理响应,即使向 Kafka 提交偏移,因为偏移可能会发送到集群中的不同代理。

基本消费者配置属性

# ...
bootstrap.servers=localhost:9092 1
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  2
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer  3
client.id=my-client 4
group.id=my-group-id 5
# ...

1
(必需)使用 Kafka 代理的 host:port bootstrap 服务器地址来连接 Kafka 集群的消费者。消费者使用地址来发现并连接到集群中的所有代理。在服务器停机时,使用逗号分隔的列表指定两个或多个地址,但不需要提供集群中的所有代理列表。如果您使用 loadbalancer 服务公开 Kafka 集群,则只需要该服务的地址,因为可用性是由 loadbalancer 处理的。
2
(必需)将从 Kafka 代理获取的字节转换为消息键的反序列化器。
3
(必需)将从 Kafka 代理获取的字节转换为消息值的反序列化器。
4
(可选)客户端的逻辑名称,用于日志和指标来标识请求的来源。id 也可以根据处理时间配额来节流消费者。
5
(条件) 需要 组 id 才能加入消费者组。

4.2. 使用消费者组扩展数据消耗

消费者组共享由来自给定主题的一个或多个制作者生成的大型数据流。用户使用 group.id 属性分组,允许消息分散到成员中。组中的一个消费者是选择的领导者,决定如何将分区分配给组中的消费者。每个分区只能分配给单个消费者。

如果您还没有尽可能多的消费者分区,您可以通过添加更多具有相同 group.id 的消费者实例来扩展数据消耗。向组添加更多使用者无法帮助吞吐量,但这意味着有一个处于待机状态的用户应该有一个停止正常运行。如果可以通过较少的消费者来满足吞吐量目标,您可以节省资源。

同一消费者组中的消费者将偏移提交和心跳发送到同一代理。因此,组中的消费者数量越大,代理的请求负载越大。

# ...
group.id=my-group-id 1
# ...
1
使用组 ID 将消费者添加到消费者组中。

4.3. 消息排序保证

Kafka 代理接收从请求代理从主题、分区和偏移位置列表发送消息的用户获取请求。

消费者会按照提交到代理的顺序观察单个分区的信息,这意味着 Kafka 只为 单个分区中的消息提供排序保证。相反,如果消费者消耗来自多个分区的消息,则消费者观察到的不同分区中的消息顺序不一定反映它们发送的顺序。

如果您希望从一个主题严格排序消息,请为每个消费者使用一个分区。

4.4. 优化消费者吞吐量和延迟

控制客户端应用程序调用 KafkaConsumer.poll () 时返回的消息数量。

使用 fetch.max.wait.msfetch.min.bytes 属性来增加由 Kafka 代理使用者获取的最小数据量。使用 fetch.max.wait.ms 配置基于时间的批处理,使用 fetch.min.bytes 配置基于大小的批处理。

如果消费者或代理中的 CPU 使用率很高,这可能是因为消费者有太多的请求。您可以调整 fetch.max.wait.msfetch.min.bytes 属性,以便以较大的批处理方式发送较少的请求和消息。通过调整更高的吞吐量,吞吐量会降低一些成本延迟。如果生成的数据量较低,您也可以调整更高的数据。

例如,如果您将 fetch.max.wait.ms 设置为 500ms,将 fetch.min.bytes 设置为 16384 字节,当 Kafka 从消费者收到获取请求时,它将在达到任何一个阈值时响应。

相反,您可以调整 fetch.max.wait.msfetch.min.bytes 属性,以提高端到端延迟。

# ...
fetch.max.wait.ms=500 1
fetch.min.bytes=16384 2
# ...
1
在完成获取请求前代理将等待的时间(毫秒)。默认值为 500 毫秒。
2
如果使用最小批处理大小(以字节为单位),则在达到最小值时发送请求,或者消息已排队的时间超过 fetch.max.wait.ms (以更早的时间为准)。添加延迟允许批处理消息累计到批处理大小。

通过增加获取请求大小来降低延迟

使用 fetch.max.bytesmax.partition.fetch.bytes 属性来增加 Kafka 代理使用者获取的最大数据量。

fetch.max.bytes 属性一次性从代理获取的数据量设置最大限制,以字节为单位。

max.partition.fetch.bytes 会以字节为单位设置每个分区返回的最大限制,每个分区必须始终大于代理或主题配置中设置的 max.message.bytes 字节数。

客户端可消耗的最大内存量大约计算为:

NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes

如果内存用量可以容纳它,您可以增加这两个属性的值。通过在每个请求中允许更多数据,延迟会随着获取请求较少而得到改进。

# ...
fetch.max.bytes=52428800 1
max.partition.fetch.bytes=1048576 2
# ...
1
为获取请求返回的最大数据量(以字节为单位)。
2
为每个分区返回的最大数据量(以字节为单位)。

4.5. 在提交偏移时避免数据丢失或重复

Kafka auto-commit 机制 允许使用者自动提交消息的偏移。如果启用,使用者将以 5000ms 间隔提交从轮询代理接收的偏移量。

auto-commit 机制非常方便,但它引入了数据丢失和重复的风险。如果消费者获取并转换了多个信息,但系统会在执行自动提交时,在消费者缓冲区中处理消息崩溃,则该数据将会丢失。如果系统在处理消息后崩溃,但在执行自动提交前,数据会在重新平衡后在另一个消费者实例上重复。

只有在下次轮询代理或消费者关闭前处理所有消息时,才会避免数据丢失。

要最小化数据丢失或重复的可能性,您可以将 enable.auto.commit 设置为 false,并开发客户端应用程序以更好地控制提交偏移。或者,您可以使用 auto.commit.interval.ms 减少提交之间的间隔。

# ...
enable.auto.commit=false 1
# ...
1
自动提交设置为 false,以提供对提交偏移的更多控制。

通过将 enable.auto.commit 设置为 false,您可以在 执行所有 处理并消耗消息后提交偏移。例如,您可以设置应用程序来调用 Kafka commitSynccommitAsync 提交 API。

commitSync API 在从轮询返回的消息批处理中提交偏移量。在处理批处理中的所有消息时,您将调用 API。如果使用 commitSync API,则应用程序不会轮询新消息,直到批处理中的最后一个偏移提交为止。如果这种负面影响吞吐量,您可以更频繁地提交,或者您可以使用 commitAsync API。commitAsync API 不会等待代理响应提交请求,而是在重新平衡时造成更多重复的风险。一个常见的方法是,将应用程序中的两个提交 API 与在关闭消费者或重新平衡之前所使用的 commitSync API 相结合,以确保最终提交成功。

4.5.1. 控制事务消息

考虑在制作者端使用事务ID 和启用 idempotence=true,以确保完全提供。然后,您可以使用 isolation.level 属性来控制消费者读取事务消息的方式。

isolation.level 属性有两个有效值:

  • read_committed
  • read_uncommitted (default)

使用 read_committed 来确保只有提交的事务信息才会被消费者读取。但是,这会导致端到端延迟增加,因为消费者将无法返回消息,直到代理编写记录事务结果的事务标记(提交中止)。

# ...
enable.auto.commit=false
isolation.level=read_committed 1
# ...
1
设置 read_committed,以便只有提交的消息才会被消费者读取。

4.6. 恢复失败以避免数据丢失

使用 session.timeout.msheartbeat.interval.ms 属性配置检查和恢复消费者组中的消费者故障所需的时间。

session.timeout.ms 属性指定消费者组中消费者在被视为不活跃前与代理联系的最长时间(以毫秒为单位),并在组中活跃的消费者之间触发 重新平衡。当组重新平衡时,分区会被重新分配给组的成员。

heartbeat.interval.ms 属性指定 心跳 检查消费者组协调器之间的间隔(毫秒),以指示消费者处于活动状态并连接。心跳间隔必须较低(通常以第三个方式表示),而不是会话超时间隔。

如果您设置 session.timeout.ms 属性,则之前检测到失败的消费者,并且重新平衡可以更快地进行。但是,要小心不要设置超时,因此代理无法及时收到心跳,并触发不必要的重新平衡。

减少心跳间隔可减少意外重新平衡的机会,但更频繁的心跳会增加代理资源的开销。

4.7. 管理偏移策略

使用 auto.offset.reset 属性来控制消费者在未提交偏移时的行为方式,或者提交的偏移不再有效或删除。

假设您首次部署消费者应用,并且它从现有主题读取消息。由于第一次使用 group.id,因此 __consumer_offsets 主题不包含此应用的任何偏移信息。新应用可以开始处理来自日志开头的所有现有消息,或仅处理新消息。默认重置值 是最新的,它从分区的末尾开始,因此意味着丢失了一些消息。为避免数据丢失,但要增加处理量,将 auto.offset.reset 设置为 earliest 以在分区的头部开始。

另外,请考虑使用 earliest 选项以避免在为代理配置的偏移保留周期(offsets.retention.minutes)时丢失消息。如果消费者组或独立消费者不活跃,且在保留期间没有偏移量,则之前提交的偏移将从 __consumer_offsets 中删除。

# ...
heartbeat.interval.ms=3000 1
session.timeout.ms=45000 2
auto.offset.reset=earliest 3
# ...
1
根据预期的重新平衡调整心跳间隔。
2
如果在超时时间过期前 Kafka 代理没有收到心跳,则消费者会从消费者组中删除并启动重新平衡。如果代理配置有一个 group.min.session.timeout.msgroup.max.session.timeout.ms,则会话超时值必须在该范围内。
3
设置为 earliest 以返回到分区的开始,并在未提交偏移时避免数据丢失。

如果单个获取请求返回的数据量较大,则可能会在消费者处理前发生超时。在这种情况下,您可以降低 max.partition.fetch.bytes 或增加 session.timeout.ms

4.8. 最大程度降低重新平衡的影响

在一个组中的活跃消费者之间重新平衡分区是需要的时间:

  • 消费者提交其偏移量
  • 要形成的新消费者组
  • 为组成员分配分区的组领导
  • 组中的消费者接收其分配并开始获取

明确而言,该过程会增加服务的停机时间,特别是在消费者组集群滚动重启过程中重复发生。

在这种情况下,您可以使用 静态成员资格 的概念来减少重新平衡的数量。在消费者组成员间平均重新平衡分配主题分区。静态成员资格使用持久性,以便在会话超时重启后识别消费者实例。

消费者组协调器可以使用 group.instance.id 属性指定的唯一 id 来识别新的消费者实例。在重启过程中,为消费者分配一个新的成员 ID,但作为静态成员,它继续使用同一实例 ID,并且使用相同的主题分区分配。

如果消费者应用程序没有调用至少轮询每个 max.poll.interval.ms 毫秒,则消费者被视为失败,从而导致重新平衡。如果应用程序无法处理轮询时间返回的所有记录,您可以使用 max.poll.interval.ms 属性指定从消费者轮询新消息之间的间隔(毫秒)。或者,您可以使用 max.poll.records 属性对从消费者缓冲区返回的记录数量设置最大限制,允许您的应用程序在 max.poll.interval.ms 限值内处理较少的记录。

# ...
group.instance.id=UNIQUE-ID 1
max.poll.interval.ms=300000 2
max.poll.records=500 3
# ...
1
唯一的实例 id 确保新消费者实例收到相同的主题分区分配。
2
设置检查消费者继续处理消息的间隔。
3
设置从消费者返回的已处理的记录数量。

第 5 章 Kafka producer 配置调整

使用基本制作者配置,以及专为特定用例量身定制的可选属性。

调整您的配置以最大化吞吐量可能会增加延迟,反之亦然。您需要试验和调优您的制作者配置,以获得您需要的平衡。

5.1. 基本制作者配置

每个制作者都需要 connection 和 serializer 属性。通常,为跟踪添加客户端 ID 的好做法是,并在生成者上使用压缩来减少请求中的批处理大小。

在基本制作者配置中:

  • 不保证分区中的消息顺序。
  • 确认到达代理的消息不能保证持久性。

基本制作者配置属性

# ...
bootstrap.servers=localhost:9092 1
key.serializer=org.apache.kafka.common.serialization.StringSerializer 2
value.serializer=org.apache.kafka.common.serialization.StringSerializer 3
client.id=my-client 4
compression.type=gzip 5
# ...

1
(必需)使用 Kafka 代理的 host:port bootstrap 服务器地址来连接 Kafka 集群的制作者。制作者使用地址来发现并连接到集群中的所有代理。在服务器停机时,使用逗号分隔的列表指定两个或多个地址,但不需要提供集群中的所有代理列表。
2
(必需)在发送到代理前,将每个消息的密钥转换为字节的序列化器。
3
(必需)在发送到代理前将每个消息的值转换为字节的序列化器。
4
(可选)客户端的逻辑名称,用于日志和指标来标识请求的来源。
5
(可选)压缩消息的 codec,它们以压缩格式发送并存储,然后在到达消费者时解压缩。压缩可用于提高吞吐量并减少存储负载,但可能不适用于低延迟应用程序,因为压缩成本可能会被压缩或解压缩。

5.2. 数据持久性

消息发送确认可最小化消息丢失的可能性。默认情况下,使用 acks=all 设置的 acks 属性启用确认。

确认消息发送

# ...
acks=all 1
# ...

1
acks=all 强制领导副本在成功收到消息请求前,将信息复制到特定数量的后续者。

acks=all 设置提供最强的交付保证,但它会增加生成者发送消息和接收确认之间的延迟。如果您不要求这种强大的保证,则设置 acks=0acks=1 不提供交付保证,或者仅确认领导副本已将记录写入日志中。

使用 acks=all 时,领导会等待所有同步的副本确认消息发送。主题的 min.insync.replicas 配置设置最低同步副本确认数量。确认的数量包括领导和后续者。

典型的起点是使用以下配置:

  • 生成者配置:

    • acks=all (default)
  • 用于主题复制的代理配置:

    • default.replication.factor=3 (default = 1)
    • min.insync.replicas=2 (default = 1)

创建主题时,您可以覆盖默认复制因素。您还可以在主题配置的主题级别上覆盖 min.insync.replicas

AMQ Streams 在示例配置文件中使用此配置进行 Kafka 的多节点部署。

下表描述了此配置根据复制领导副本的后续者的可用性。

表 5.1. 遵循程序可用性

可用的接收者数和同步致谢生产者可以发送消息?

2

领导等待 2 个后续确认

1

领导等待 1 个跟随者的确认

0

领导者引发异常

3 主题复制因素会创建一个领导副本和两个后续者。在此配置中,如果单个后续程序不可用时,生产者可以继续。有些延迟可能会从同步的副本中删除失败的代理或创建新领导。如果第二个后续者也不可用,消息发送将无法成功。领导向生产者发送一个错误(不足够副本),而不是确认成功消息发送。生产者引发等同的异常。通过 重试 配置,生成者可以重新发送失败的消息请求。

注意

如果系统失败,则缓冲区中不必要的数据可能会丢失。

5.3. 排序交付

幂等生产者避免重复,因为消息会完全传递一次。将 ID 和序列号分配给消息以确保交付顺序,即使在出现故障时也是如此。如果您使用 acks=all 进行数据一致性,使用 idempotency 对排序交付有意义。默认情况下,为制作者启用 idempotency。启用 idempotency 后,您可以将并发动态请求数设置为最多 5 个消息排序。

使用 idempotency 排序交付

# ...
enable.idempotence=true 1
max.in.flight.requests.per.connection=5 2
acks=all 3
retries=2147483647 4
# ...

1
设置为 true 以启用幂等制作者。
2
通过幂等交付动态请求数时,可能会大于 1,同时仍然提供消息排序保证。默认值为 5 个动态请求。
3
acks 设置为 all
4
设置重新发送失败消息请求的尝试次数。

如果您选择不使用 acks=all 并禁用 idempotency,因为性能成本,请将 in-flight (未确认)请求数设置为 1 以保留排序。否则,在 Message-B 写入代理后,Message-A 才会成功。

在没有 idempotency 的情况下排序交付

# ...
enable.idempotence=false 1
max.in.flight.requests.per.connection=1 2
retries=2147483647
# ...

1
设置为 false 以禁用幂等制作者。
2
将动态请求数设置为正好 1

5.4. 可靠性保证

当写入单个分区后,idempotence 非常有用。与 idempotence 一起使用时,事务允许在多个分区间写入一次。

使用相同事务 ID 发送的事务消息只生成一次,因此只会所有都成功写入到相应的日志,或所有都没有写入。

# ...
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647
transactional.id=UNIQUE-ID 1
transaction.timeout.ms=900000 2
# ...
1
指定唯一的事务 ID。
2
在返回超时错误前,以毫秒为单位设置事务允许的最大时间。默认值为 900000 或 15 分钟。

选择 transactional.id 非常重要,以便保持事务保证。每个事务 ID 都应该用于一组唯一的主题分区。例如,这可以通过主题分区名称的外部映射到事务 ID 来实现,也可以使用避免冲突的功能从主题分区名称计算事务 ID。

5.5. 优化吞吐量和延迟的制作者

通常,系统要求是满足给定延迟内消息的具体吞吐量目标。例如,以每秒 500,000 个消息为目标,在 2 秒内确认了 95% 的消息。

生成者的消息语义(消息排序和持久性)可能由应用程序的要求定义。例如,您可能没有选择在不破坏一些重要属性或保证应用程序提供的 acks=0acks=1 的情况下使用 acks=0 或 acks=1。

代理重启会对高百分比统计产生重大影响。例如,在一个长时间内,99 百分比的延迟由代理重启的行为所降低。在设计基准测试或将性能号与生产环境中看到的性能号进行比较时,这值得考虑。

根据您的目的,Kafka 提供了很多配置参数和技术,用于调整吞吐量和延迟的性能。

消息批处理(linger.msbatch.size)
在希望用于同一代理的消息中会发送消息批处理延迟,从而允许将它们批处理到一个生成请求中。批量(批处理)是更高的延迟,以返回更高的吞吐量。基于时间的批处理使用 linger.ms 配置,基于大小的批处理是使用 batch.size 来配置的。
compression (compression.type)
消息压缩会增加生成者(CPU 时间压缩消息),但使请求(和可能的磁盘写入)更小,这可以提高吞吐量。压缩是否值得注意,以及使用的最佳压缩将取决于正在发送的消息。压缩会在调用 KafkaProducer.send () 的线程上进行压缩,因此如果此方法的延迟对于您的应用程序来说很重要,您应该考虑使用更多线程。
Pipelining (max.in.flight.requests.per.connection)
pipelining 意味着在收到响应前发送更多请求。通常,更多 pipelining 意味着更好的吞吐量,最多会达到其他影响的阈值,如更差的批处理,从而开始对吞吐量的影响。

降低延迟

当应用程序调用 KafkaProducer.send () 时,信息为:

  • 由任何拦截器处理
  • serialized
  • 分配给分区
  • 压缩
  • 添加到每个分区队列中的批处理消息

在哪个时候,send () 方法返回。因此,时间 send () 会被阻断:

  • 拦截器、序列化器和分区器花费的时间
  • 使用的压缩算法
  • 等待缓冲区压缩的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(到 batch.size
  • linger.ms 引入的延迟已经通过
  • 发送方即将将其他分区的消息批处理发送到同一代理,并可添加此批处理
  • 生成者正在刷新或关闭

查看批处理和缓冲的配置,以减少 send () 阻止延迟的影响。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 属性添加一个毫秒的延迟,以便在请求中累积并发送更大的消息。默认值为 0'。
2
如果使用最大 batch.size,则在达到最大值时发送请求,或者消息已排队的时间超过 linger.ms (以更早的时间为准)。添加延迟允许批处理消息累计到批处理大小。
3
缓冲区大小必须至少与批处理大小相同,并能够容纳缓冲区、压缩和动态请求。

增加吞吐量

通过调整发送消息前等待的最长时间并完成发送请求,提高消息请求的吞吐量。

您还可以通过编写自定义分区程序来替换默认值,将消息定向到指定的分区。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
等待完整发送请求的最长时间(毫秒)。您可以将值设为 MAX_LONG,以委派给 Kafka 的一个无限期重试次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。

附录 A. 使用您的订阅

AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 My Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 Zip 和 Tar 文件

要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads
  2. INTEGRATION AND AUTOMATION 目录中找到 AMQ Streams for Apache Kafka 项。
  3. 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
  4. 单击组件的 Download 链接。

使用 DNF 安装软件包

要安装软件包以及所有软件包的依赖软件包,请使用:

dnf install <package_name>

要从本地目录中安装之前下载的软件包,请使用:

dnf install <path_to_download_package>

更新于 2023-11-22

法律通告

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.