4.7. 管理偏移策略

使用 auto.offset.reset 属性来控制消费者在没有提交偏移时的行为方式,或者提交的偏移不再有效或删除。

假设您第一次部署消费者应用,并且从现有主题读取消息。由于第一次使用 group.id,因此 __consumer_offsets 主题不包含此应用的任何偏移信息。新应用可以开始处理日志开始时的所有现有消息,或者仅处理新消息。默认的重置值是 latest,它从分区的末尾启动,因此会遗漏了一些消息。为避免数据丢失,但要增加处理量,将 auto.offset.reset 设置为 earliest 以在分区的头部开始。

另外,还要考虑使用 earliest 选项,以避免在为代理配置的偏移保留周期(offsets.retention.minutes)时丢失消息。如果消费者组或独立使用者不活跃,且在保留期间提交没有偏移,则之前提交的偏移将从 __consumer_offsets 中删除。

# ...
heartbeat.interval.ms=3000 1
session.timeout.ms=45000 2
auto.offset.reset=earliest 3
# ...
1
根据预期的重新平衡调整心跳间隔。
2
如果在超时持续时间到期前 Kafka 代理收到心跳,则会从消费者组中删除使用者,并会启动重新平衡。如果代理配置有一个 group.min.session.timeout.msgroup.max.session.timeout.ms,则会话超时值必须在那个范围内。
3
设置为 earliest 以返回到分区的起始位置,并在未提交偏移时避免数据丢失。

如果单个获取请求中返回的数据量较大,则使用者处理请求前可能会发生超时。在这种情况下,您可以降低 max.partition.fetch.bytes 或增加 session.timeout.ms