3.9. 处理大型消息大小

消息的默认批处理大小为 1MB,这是大多数用例中的最大吞吐量的最佳选择。如果有足够的磁盘容量,Kafka 可以在较低的吞吐量上容纳更大的批处理。

以四种方式处理大型消息大小:

  1. 生产端消息压缩 将压缩消息写入日志中。
  2. 基于参考的消息传递仅向存储在消息值中的其他系统中的数据发送引用。
  3. 内联消息传递将信息拆分为使用相同键的块,然后使用 Kafka Streams 等流处理器将输出合并。
  4. 为处理更大消息大小而构建的代理和生产者/使用者应用程序配置。

建议使用基于参考的消息传递和消息压缩选项,并涵盖大多数情况。对于其中任何一个选项,必须小心以避免出现性能问题。

制作者压缩

对于制作者配置,您可以指定 compression.type,如 Gzip,然后应用到制作者生成的批处理数据。使用代理配置 compression.type=producer,代理会保留使用制作者的任何压缩。每当生成者和主题压缩不匹配时,代理必须在将批处理附加到日志中之前再次压缩,这会影响代理性能。

压缩还会增加消费者上的生产者的额外处理开销,但包括批处理中的更多数据,因此在消息数据压缩时通常很有用处。

将生成者压缩与批处理大小进行微调相结合,以促进最佳吞吐量。使用指标有助于衡量所需的平均批处理大小。

基于参考的消息传递

当您不知道消息的大容量时,基于参考的消息传递对数据复制非常有用。外部数据存储必须快速、持久且高度可用,才能使此配置正常工作。数据被写入数据存储,并返回对数据的引用。制作者发送一条消息,其中包含对 Kafka 的引用。使用者从消息获取引用,并使用它来从数据存储中获取数据。

图 3.4. 基于参考的消息传递流

基于参考的消息传递流的镜像

当消息传递需要更多时,端到端延迟将会增加。这个方法的另一个显著缺陷是,在清理 Kafka 消息时,外部系统中没有自动清理数据。混合方法是仅向数据存储和进程标准消息发送大型消息。

内联消息传递

内联消息传递比较复杂,但它没有额外的开销,具体取决于基于参考的消息传递等外部系统。

生成客户端应用程序必须序列化,并在消息太大时对数据进行块。然后,制作者使用 Kafka ByteArraySerializer 或与发送每个块前再次序列化。消费者会跟踪消息和缓冲块,直到它有完整的消息。消耗的客户端应用程序接收块,这些块在反序列化前编译。完整的消息会传送到其余消耗应用程序,以根据每组块消息集合的第一个或最后一个块的偏移来。针对偏移元数据检查完整消息,以避免重新平衡期间重复。

图 3.5. 内联消息传递流

内联消息传递流的镜像

由于需要缓冲区,内联消息传递对消费者而言具有性能开销,特别是在并行处理一系列大型消息时。大消息的块可能会变得交错,因此如果缓冲区中另一个大消息的块不完整,并不总是能够提交消息的所有块。因此,缓冲通常是通过持久消息块或实施提交逻辑来支持的。

配置以处理更大消息

如果无法避免较大的消息,并避免在消息流的任何点上阻止,您可以提高消息限值。为此,请在主题级别上配置 message.max.bytes,以设置各个主题的最大记录批处理大小。如果您在代理级别设置了 message.max.bytes,则所有主题都允许较大的消息。

代理将拒绝任何大于使用 message.max.bytes 设置的限制的消息。生产者(max.request.size)和使用者(message.max.bytes)的缓冲区大小必须能够容纳较大的消息。