5.5. 优化制作者以实现吞吐量和延迟

通常,系统的要求是满足特定吞吐量目标,以实现在给定延迟内对消息的比例进行比例。例如,在 2 秒内被确认为每秒 500,000条消息,其中 95% 的消息被确认。

您的生产者的消息传递语义(消息顺序和持久性)可能由应用程序的要求定义。例如,您可能没有使用 acks=0acks=1 的选项,而不破坏某些重要属性或保证由您的应用程序提供。

代理重启对高百分比的统计信息有显著影响。例如,在一个长时间内,99th percentile 的延迟是由代理重启的行为所代表的。这在设计基准测试或将性能数量与生产中性能数进行比较时需要考虑。

根据您的目标,Kafka 提供了多个配置参数,以及用于调优吞吐量和延迟的制作者性能的技术。

邮件批处理(linger.msbatch.size)
消息批量发送延迟,希望发送更多用于相同代理的消息,允许将它们批处理到单个生成请求中。批处理是实现更高吞吐量的返回延迟之间的折衷。使用 linger.ms 配置基于时间的批处理,并且基于大小的批处理则使用 batch.size 进行配置。
压缩(压缩.type)
消息压缩会增加制作者的延迟(CPU 时间用于压缩消息),但会发出请求(以及可能磁盘写入)更小,这可能会提高吞吐量。无论压缩是否值得使用,以及要使用的最佳压缩将取决于所发送的消息。压缩发生于调用 KafkaProducer.send () 的线程上,因此,如果此方法的延迟对于应用程序而言,您应该考虑使用更多线程。
pipelining (max.in.flight.requests.per.connection)
pipelining 意味着在收到对之前请求的响应前发送更多请求。通常,更多的 pipelining 意味着更好的吞吐量,达到了其他影响的阈值(如更糟糕的批处理)开始对吞吐量的影响反击。

降低延迟

当应用程序调用 KafkaProducer.send () 时,信息是:

  • 由任何拦截器处理
  • serialized
  • 分配给分区
  • 压缩
  • 添加到每个分区队列中的批处理消息

在哪一个点上,发送() 方法返回。因此,时间 send () 被阻止由以下决定:

  • 拦截器、序列化器和分区器中花费的时间
  • 使用的压缩算法
  • 等待缓冲区用于压缩所需的时间

批处理将保留在队列中,直到出现以下情况之一:

  • 批处理已满(根据 batch.size
  • linger.ms 引入的延迟已经通过
  • 发件人是将其他分区的消息批处理发送到同一代理,也可以添加此批处理
  • 制作者正在刷新或关闭

查看批处理和缓冲的配置,以减轻 send () 阻塞延迟的影响。

# ...
linger.ms=100 1
batch.size=16384 2
buffer.memory=33554432 3
# ...
1
linger 属性以毫秒为单位添加一个延迟,以便在请求中累计并发送更大的消息批处理。默认值为 0'。
2
如果使用最大 batch.size (以字节为单位),则在达到最大值时发送一个请求,或者消息排队后的时间超过 linger.ms (以更早的时间为 linger.ms)。添加延迟可让批处理将消息累计到批处理大小。
3
缓冲区大小必须至少与批处理大小相同,并能够容纳缓冲、压缩和动态请求。

增加吞吐量

通过调整消息发送前等待的最长时间来改进消息请求的吞吐量,并完成发送请求。

您还可以通过编写自定义分区来将消息定向到指定的分区,以替换默认值。

# ...
delivery.timeout.ms=120000 1
partitioner.class=my-custom-partitioner 2

# ...
1
等待完整发送请求的最长时间(毫秒)。您可以将值设为 MAX_LONG,以委派到 Kafka 的重试次数。默认值为 120000 或 2 分钟。
2
指定自定义分区器的类名称。