第 7 章 Kafka Streams 配置属性
application.idtype: string
Importance: high
流处理应用的标识符。在 Kafka 集群中必须是唯一的。它被用作 1 的默认 client-id 前缀,2)用于成员资格管理的 group-id,3) changelog topic 前缀。
bootstrap.serverstype: list
Importance: high
用于建立到 Kafka 集群的初始连接的主机/端口对列表。客户端将使用此处为 bootstrap 指定了哪些服务器的所有服务器 - 此列表仅影响用于发现全部服务器集的初始主机。此列表应当采用
host1:port1,host2:port2,…的形式。由于这些服务器仅用于初始连接,以发现完整的群集成员身份(可能会动态更改),因此该列表不包含完整的服务器集合(尽管服务器停机时,您可能需要多个服务器)。num.standby.replicastype: int
Default: 0
Importance: high
每个任务的待机副本数。
state.dirtype: string
Default: /tmp/kafka-streams
Importance: high
状态存储的目录位置。对于共享同一基础文件系统的每个流实例,此路径必须是唯一的。
acceptable.recovery.lagtype: long
Default: 10000
Valid Values: [0,…]
Importance: medium
可以接受的最大可接受(捕获的偏移数)以便使客户端能够被发现,以接收活跃任务分配。分配后,它仍然会在处理前恢复更改日志的其余部分。为了避免在重新平衡期间暂停,这个配置应该与给定工作负载在一分钟后有非常好的恢复时间对应。必须至少为 0。
cache.max.bytes.bufferingtype: long
Default: 10485760
Valid Values: [0,…]
Importance: medium
用于在所有线程之间缓冲的最大内存字节数。
client.idtype: string
Default: ""
Importance: medium
用于内部消费者、制作者和 restore-consumer 的客户端 ID 的 ID 前缀字符串,其模式为 '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'。
default.deserialization.exception.handlertype: class
Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler
Importance: medium
实施
org.apache.kafka.streams.errors.DeserializationExceptionHandler接口的异常处理类。default.key.serdetype: class
Default: null
Importance: medium
默认的 serializer / deserializer 类用于实现
org.apache.kafka.common.serialization.Serde接口的密钥。在使用窗口的 serde 类时,需要设置用来实施org.apache.kafka.common.serialization.Serde.inner' 或 'default.windowed.inner' 或 'default.windowed.value.inner' 的内级类。default.list.key.serde.innertype: class
Default: null
Importance: medium
默认内部类是实施
org.apache.kafka.common.serialization.Serde接口的关键。如果default.key.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerde,则此配置会被读取。default.list.key.serde.typetype: class
Default: null
Importance: medium
用于实施
java.util.List接口的密钥的 default 类。如果default.key.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerdes.ListSerdes.ListSerde 类,此时此时需要设置 inner serde 类来实现org.apache.kafka.common.serialization.Serde接口。default.list.value.serde.innertype: class
Default: null
Importance: medium
默认内部类是实施
org.apache.kafka.common.serialization.Serde接口的值。如果default.value.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerdes.ListSerde,则此配置会被读取。default.list.value.serde.typetype: class
Default: null
Importance: medium
用于实现
java.util.List接口的值的默认类。如果default.value.serde配置被设置为org.apache.kafka.common.serialization.Serdes.ListSerdes.ListSerdes.ListSerde 类,则当使用列表 serdener serdener serdener serdener serdener serdener serdener serdener serdener serdener serdener。default.production.exception.handlertype: class
Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
Importance: medium
实施
org.apache.kafka.streams.errors.ProductionExceptionHandler接口的异常处理类。default.timestamp.extractortype: class
Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp
Importance: medium
实施
org.apache.kafka.streams.processor.TimestampExtractor接口的默认时间戳提取器类。default.value.serdetype: class
Default: null
Importance: medium
默认 serializer / deserializer 类用于实现
org.apache.kafka.common.serialization.Serde接口的值。在使用窗口的 serde 类时,需要设置用来实施org.apache.kafka.common.serialization.Serde.inner' 或 'default.windowed.inner' 或 'default.windowed.value.inner' 的内级类。max.task.idle.mstype: long
Default: 0
Importance: medium
此配置控制加入和合并是否可能产生任何顺序结果。配置值是流任务完全停留在某些(但并非所有)输入分区,等待生产者发送额外记录并避免在多个输入流中潜在的转出记录时,会保持闲置时间(以毫秒为单位)。默认(零)不会等待生产者发送更多记录,但它会等待获取代理中已存在的数据。这个默认值意味着,对于代理中已存在的记录,Streams 会以时间戳顺序处理它们。设置为 -1 可完全禁用闲置并处理任何本地可用数据,即使这样做可产生超出顺序处理。
max.warmup.replicastype: int
Default: 2
Valid Values: [1,…]
Importance: medium
最多的备份副本(除配置的 num.standbys 之外),可以一次分配这些副本,以便让任务在一个实例上可用,同时将其重新分配到另一个实例上。用于限制额外的代理流量和集群状态可用于高可用性。必须至少为 1。
num.stream.threadstype: int
Default: 1
Importance: medium
执行流处理的线程数量。
processing.guaranteetype: string
Default: at_least_once
Valid Values: [at_least_once, exactly_once, exactly_once_beta, exactly_once_v2]
Importance: medium
处理保证应使用。可能的值有
at_least_once(默认)和exactly_once_v2(需要代理版本 2.5 或更高版本)。弃用的选项是exactly_once(需要 0.11.0 或更高版本)和exactly_once_beta(需要代理版本 2.5 或更高版本)。请注意,完全处理需要至少三个代理的集群,这是生产的建议设置;通过调整 broker 设置事务.state.log.factor 和transaction.state.log.factor 和可以更改此设置。transaction.state.log.min.isrrack.aware.assignment.tagstype: list
Default: ""
Valid Values: List contain maximum of 5 elements
Importance: medium
用于在 Kafka Streams 实例间分配待机副本的客户端标签密钥列表。配置后,Kafka Streams 将在每客户端标签维度上发布备用任务。
replication.factortype: int
Default: -1
Importance: medium
更改日志主题的复制因素和由流处理应用程序创建的重新分区主题。默认值
-1(meaning:使用代理默认复制因素)需要代理版本 2.4 或更新版本。security.protocoltype: string
Default: PLAINTEXT
Importance: medium
用于与代理通信的协议。有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL.
task.timeout.mstype: long
Default: 300000 (5 分钟)
Valid Values: [0,…]
Importance: medium
任务因为内部错误并重试的最长时间(以毫秒为单位),直到引发错误为止。对于超时为 0ms,任务将为第一个内部错误引发错误。对于大于 0ms 的任何超时,任务将在出错之前至少重试一次。
topology.optimizationtype: string
Default: none
Valid Values: [none, all]
Importance: medium
一个配置,如果应该优化拓扑,则告知 Kafka Streams。
application.servertype: string
Default: ""
Importance: low
host:port 对指向用户定义的端点,该端点可用于在此 KafkaStreams 实例中进行状态存储发现和交互式查询。
buffered.records.per.partitiontype: int
Default: 1000
Importance: low
每个分区最多缓冲的记录数。
built.in.metrics.versiontype: string
Default: latest
Valid Values: [latest]
Importance: low
要使用的内置指标版本。
commit.interval.mstype: long
Default: 30000 (30 秒)
Valid Values: [0,…]
Importance: low
保存处理器位置的频率(以毫秒为单位)。(请注意,如果
处理.guarantee被设置为,则默认值为exactly_once_v2100,否则默认值为30000。connections.max.idle.mstype: long
Default: 540000 (9 分钟)
Importance: low
在此配置指定的毫秒数后关闭闲置连接。
default.dsl.storetype: string
Default: rocksDB
Valid Values: [rocksDB, in_memory]
Importance: low
DSL 操作器使用的默认状态存储类型。
metadata.max.age.mstype: long
Default: 300000 (5 分钟)
Valid Values: [0,…]
Importance: low
毫秒内的时间(以毫秒为单位),即使在我们未看到任何分区领导更改的情况下,也会强制更新元数据,以便主动发现任何新的代理或分区。
metric.reporterstype: list
Default: ""
Importance: low
用作指标报告器的类列表。实施
org.apache.kafka.common.metrics.MetricsReporter接口允许在新指标创建通知的类中插入。JmxReporter 始终包含在内以注册 JMX 统计。metrics.num.samplestype: int
Default: 2
Valid Values: [1,…]
Importance: low
为计算指标维护的示例数量。
metrics.recording.leveltype: string
Default: INFO
Valid Values: [INFO, DEBUG, SSP]
Importance: low
指标的最高记录级别。
metrics.sample.window.mstype: long
Default: 30000 (30 秒)
Valid Values: [0,…]
Importance: low
计算指标示例的时间窗口。
poll.mstype: long
Default: 100
Importance: low
阻塞等待输入的时间(毫秒)。
probing.rebalance.interval.mstype: long
Default: 600000 (10 分钟)
Valid Values: [60000,…]
Importance: low
在触发重新平衡以探测完成备份并准备好成为活跃前,需要等待的时间(毫秒)到探测。在分配平衡前,将继续触发重新平衡。必须至少为 1 分钟。
receive.buffer.bytestype: int
Default: 32768(32 kibibytes)
Valid Values: [-1,…]
Importance: low
在读取数据时要使用的 TCP 接收缓冲区(SO_RCVBUF)的大小。如果值为 -1,则使用 OS 默认。
reconnect.backoff.max.mstype: long
Default: 1000(1秒)
Valid Values: [0,…]
Importance: low
重新连接到重复连接失败的代理时要等待的最大时间(毫秒)。如果提供,每个主机的 backoff 将为每个连续的连接失败指数增加,最高最高。在计算了 backoff 增长后,会添加 20% 的随机 jitter 来避免连接停滞。
reconnect.backoff.mstype: long
Default: 50
Valid Values: [0,…]
Importance: low
在尝试重新连接给定主机前要等待的基本时间。这可避免在紧张循环中重复连接到主机。这个 backoff 适用于客户端到代理的所有连接尝试。
repartition.purge.interval.mstype: long
Default: 30000 (30 秒)
Valid Values: [0,…]
Importance: low
从重新分区主题中删除完全消耗的记录的频率(以毫秒为单位)。从上一次清除以来至少会在此值后进行清除,但稍后可能会延迟到延迟。(注意,与
commit.interval.ms不同,当processing.guarantee设置为exactly_once_v2时,这个值的默认值保持不变。request.timeout.mstype: int
Default: 40000 (40 秒)
Valid Values: [0,…]
Importance: low
配置控制客户端等待请求响应的最大时间。如果在超时前没有收到响应,客户端会在需要时重新发送请求,或者在重试耗尽时失败请求。
retriestype: int
Default: 0
Valid Values: [0,…,2147483647]
Importance: low
设置大于零的值将导致客户端重新发送任何失败的请求,并出现可能存在瞬时错误的请求。建议将值设置为零或
MAX_VALUE,并使用对应的超时参数来控制客户端应重试请求的时长。retry.backoff.mstype: long
Default: 100
Valid Values: [0,…]
Importance: low
尝试重试失败的请求到给定主题分区前等待的时间。这可避免在某些故障情况下在紧密循环中重复发送请求。
rocksdb.config.settertype: class
Default: null
Importance: low
实施
org.apache.kafka.streams.state.RocksDBConfigSetter接口的 Rocks DB config setter 类或类名称。send.buffer.bytestype: int
Default: 131072(128 kibibytes)
Valid Values: [-1,…]
Importance: low
在发送数据时要使用的 TCP 发送缓冲区(SO_SNDBUF)的大小。如果值为 -1,则使用 OS 默认。
state.cleanup.delay.mstype: long
Default: 600000 (10 分钟)
Importance: low
在删除分区时状态前需要等待的时间(毫秒)。只有未修改至少为
state.cleanup.delay.ms的状态目录才会被删除。upgrade.fromtype: string
Default: null
Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3]
Importance: low
允许以向后兼容的方式进行升级。从 [0.10.0.0, 1.1] 升级到 2.0+,或者在从 [2.0, 2.3] 升级到 2.4+ 时需要。当从 2.4 升级到更新的版本时,不需要指定此配置。默认为
null。可接受的值为 "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "2.0", "2.0", "2.1", "2.2", "2.3"(用于从对应的旧版本升级)。window.size.mstype: long
Default: null
Importance: low
为 deserializer 设置窗口大小,以便计算窗口结束时间。
windowed.inner.class.serdetype: string
Default: null
Importance: low
对于窗口化记录的 inner 类,默认序列化器/反序列化器。必须实施 "
"'org.apache.kafka.common.serialization.Serde' 接口。请注意,在 KafkaStreams 应用程序中设置此配置会导致错误,因为它仅用于 Plain consumer 客户端。windowstore.changelog.additional.retention.mstype: long
Default: 86400000(1 天)
Importance: low
添加到窗口维护M,以确保数据不会被预先从日志中删除。允许时钟偏移。默认为 1 天。