191.3. Spring Boot Auto-Configuration

组件支持 99 个选项,它们如下所列。

名称描述默认类型

camel.component.kafka.allow-manual-commit

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange message 标头上,这允许最终用户访问这个 API,并通过 Kafka consumer 执行手动偏移提交。

false

布尔值

camel.component.kafka.break-on-first-error

该选项控制在消费者处理交换时所发生的情况,它会失败。如果 选项为 false,则使用者将继续进行下一个消息并进行处理。如果 选项为 true,则使用者将中断,并且会重新查找导致失败的消息偏移,然后重新尝试处理此消息。但是,如果其绑定每次都失败,则可能导致完全无法处理同一消息,例如投毒消息。因此,建议通过使用 Camel 的错误处理程序来处理该示例。

false

布尔值

camel.component.kafka.brokers

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

camel.component.kafka.configuration.allow-manual-commit

是否允许通过 KafkaManualCommit 进行手动提交。如果启用了这个选项,则 KafkaManualCommit 实例存储在 Exchange message 标头上,这允许最终用户访问这个 API,并通过 Kafka consumer 执行手动偏移提交。

false

布尔值

camel.component.kafka.configuration.auto-commit-enable

如果为 true,请定期提交 ZooKeeper 被消费者获取的消息偏移。当进程失败时,将使用提交的偏移量,作为新消费者开始的位置。

true

布尔值

camel.component.kafka.configuration.auto-commit-interval-ms

消费者偏移量的频率被提交到 zookeeper。

5000

整数

camel.component.kafka.configuration.auto-commit-on-stop

当消费者停止时,是否执行显式自动提交,以确保代理有来自上一次消耗的消息的提交。这需要打开 autoCommitEnable 选项。可能的值有:sync、sync 或 none。sync 是默认值。

sync

字符串

camel.component.kafka.configuration.auto-offset-reset

ZooKeeper 中没有初始偏移时做什么:如果偏移偏移范围不足: 最早的 : 自动将偏移重置为最新的偏移:自动将偏移重置为最新偏移失败:将异常设置为使用者

latest

字符串

camel.component.kafka.configuration.break-on-first-error

该选项控制在消费者处理交换时所发生的情况,它会失败。如果 选项为 false,则使用者将继续进行下一个消息并进行处理。如果 选项为 true,则使用者将中断,并且会重新查找导致失败的消息偏移,然后重新尝试处理此消息。但是,如果其绑定每次都失败,则可能导致完全无法处理同一消息,例如投毒消息。因此,建议通过使用 Camel 的错误处理程序来处理该示例。

false

布尔值

camel.component.kafka.configuration.bridge-endpoint

如果选项为 true,则 KafkaProducer 将忽略入站消息的 KafkaConstants.TOPIC 标头设置。

false

布尔值

camel.component.kafka.configuration.brokers

要使用的 Kafka 代理的 URL。格式为 host1:port1,host2:port2,列表可以是代理子集或指向代理子集的 VIP。这个选项在 Kafka 文档中称为 bootstrap.servers。

 

字符串

camel.component.kafka.configuration.buffer-memory-size

制作者可用于缓冲记录的内存总量,等待发送到服务器。如果记录的发送速度快于服务器,则生产者不会根据 block.on.buffer.full 指定的首选项进行阻止或抛出异常。此设置应该大致对应于制作者将使用的总内存,而不是硬绑定,因为生产者不使用所有内存。一些额外的内存用于压缩(如果启用了压缩)以及维护航班请求。

33554432

整数

camel.component.kafka.configuration.check-crcs

自动检查消耗的记录的 CRC32。这可确保没有发生消息的在线或磁盘损坏。这个检查会增加一些开销,因此在寻求极端性能的情况下可能会禁用它。

true

布尔值

camel.component.kafka.configuration.circular-topic-detection

如果 选项为 true,则 KafkaProducer 将检测到消息是否试图将消息发送到它所来自的同一主题(如果从 kafka consumer 原始)。如果 KafkaConstants.TOPIC 标头与原始 kafka consumer 主题相同,则忽略标头设置,并使用制作者端点的主题。换句话说,这可避免将相同的消息发送回来自哪里。如果选项 bridgeEndpoint 设为 true,则使用这个选项。

true

布尔值

camel.component.kafka.configuration.client-id

客户端 ID 是在每次请求中发送的用户指定的字符串,以帮助跟踪调用。它应该以逻辑方式识别发出请求的应用程序。

 

字符串

camel.component.kafka.configuration.compression-codec

此参数允许您指定此制作者生成的所有数据的压缩代码。有效值为 none、gzip 和 snappy。

none

字符串

camel.component.kafka.configuration.connection-max-idle-ms

在此配置指定的毫秒数后关闭闲置连接。

540000

整数

camel.component.kafka.configuration.consumer-request-timeout-ms

配置控制客户端等待请求响应的最长时间。如果在超时前未收到响应,如果请求用尽,则客户端将重新发送请求,或者请求失败。

40000

整数

camel.component.kafka.configuration.consumer-streams

消费者上的并发用户数量

10

整数

camel.component.kafka.configuration.consumers-count

连接到 kafka 服务器的消费者数量

1

整数

camel.component.kafka.configuration.enable-idempotence

如果设置为 'true',则制作者将确保流中写入每条消息的一个副本。如果 'false',则生产者重试可能会在流中写入重试消息的副本。如果设置为 true,这个选项需要 max.in.flight.requests.per.connection 设置为 1,且重试无法为零,另外的攻击必须设置为 'all'。

false

布尔值

camel.component.kafka.configuration.fetch-max-bytes

如果 get 的第一个非空分区中的第一条大于这个值大于这个值,则服务器应返回最多的数据量应返回这个值,以确保使用者可以进行进展。代理接受的最大消息大小通过 message.max.bytes (broker config)或 max.message.bytes (主题配置)来定义。请注意,使用者执行多个并行获取。

52428800

整数

camel.component.kafka.configuration.fetch-min-bytes

服务器应返回获取请求的最小数据量。如果没有足够的数据可用,请求会等待该数据在回答请求前累积累积。

1

整数

camel.component.kafka.configuration.fetch-wait-max-ms

如果没有足够的数据立即满足 fetch.min.bytes,则服务器将在应答获取请求前阻止的最大时间。

500

整数

camel.component.kafka.configuration.group-id

唯一标识此消费者所属消费者的消费者进程组的字符串。通过设置同一个组 id 多个进程,表示它们都是同一消费者组的所有部分。消费者需要此选项。

 

字符串

camel.component.kafka.configuration.header-filter-strategy

使用自定义 HeaderFilterStrategy 过滤来自 Camel 消息的标头。

 

HeaderFilterStrategy

camel.component.kafka.configuration.heartbeat-interval-ms

在使用 Kafka 的组管理设施时,心跳到消费者协调器之间预期的时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置小于 session.timeout.ms,但通常不应设置超过该值的 1/3。甚至可以调整它,以控制正常重新平衡的预期时间。

3000

整数

camel.component.kafka.configuration.interceptor-classes

为制作者或消费者设置拦截器。制作者拦截器必须是实施 org.apache.kafka.clients.producer.ProducerInterceptor 拦截器的类,必须实施 org.apache.kafka.clients.consumer.ConsumerInterceptors.consumer.ConsumerInterceptoror,它将在运行时引发一个类广播异常。

 

字符串

camel.component.kafka.configuration.kafka-header-deserializer

为 deserialization kafka 标头值设置自定义 KafkaHeaderDeserializer 以 camel headers 值。

 

KafkaHeaderDeserializer

camel.component.kafka.configuration.kafka-header-serializer

为序列化 camel 标头值设置自定义 KafkaHeaderDeserializer 到 kafka 标头值。

 

KafkaHeaderSerializer

camel.component.kafka.configuration.kerberos-before-relogin-min-time

刷新尝试之间的登录线程睡眠时间。

60000

整数

camel.component.kafka.configuration.kerberos-init-cmd

Kerberos kinit 命令路径.默认为 /usr/bin/kinit

/usr/bin/kinit

字符串

camel.component.kafka.configuration.kerberos-principal-to-local-rules

用于从主体名称映射到短名称的规则列表(通常是操作系统用户名)。规则按顺序进行评估,第一个与主体名称匹配的规则用于将它映射到短名称。之后的任何规则都将被忽略。默认情况下,格式为 username/hostnameREALM 的主体名称映射到 username。有关格式的详情,请查看安全授权和 acl。可以使用逗号分隔多个值

DEFAULT

字符串

camel.component.kafka.configuration.kerberos-renew-jitter

添加至续订时间的随机危害百分比。

 

camel.component.kafka.configuration.kerberos-renew-window-factor

登录线程会休眠,直到指定的窗口因时间从上一次刷新到 ticket 的到期时间才会处于睡眠状态,此时它将尝试更新票据。

 

camel.component.kafka.configuration.key

记录密钥(如果没有指定密钥,则为 null)。如果配置了这个选项,则优先于标头 KafkaConstants#KEY

 

字符串

camel.component.kafka.configuration.key-deserializer

用于实现 Deserializer 接口的密钥的 Deserializer 类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

camel.component.kafka.configuration.key-serializer-class

密钥的 serializer 类(如果未指定任何信息,默认为与消息相同)。

org.apache.kafka.common.serialization.StringSerializer

字符串

camel.component.kafka.configuration.linger-ms

制作者将请求传输到单一批处理请求之间的任何记录分组在一起。通常,仅当记录到达的速度比发送的速度快时,这才会发生。但是在某些情形中,客户端可能也会减少请求的数量,即使在中等负载下也是如此。此设置通过添加少量延迟来达到此目的,而不是立即发送记录,以便生产者最多等待给定延迟发送其他记录,从而使发送其他记录可以合并在一起。这可能被视为与在 TCP 中 Nagle 的算法类似。此设置提供批量延迟的上限:一旦我们获得批次,每个分区的相应记录会立即发送,而无论此设置如何,我们将立即发送超过此分区的字节数,那么我们将会"linger' 以便等待更多记录显示。此设置默认为 0 (即没有延迟)。例如,设置 linger.ms=5 将对发送的请求数量产生影响,但会最多为加载加载时发送的记录添加 5ms 的延迟。

0

整数

camel.component.kafka.configuration.max-block-ms

配置控制 kafka 发送到 kafka 的时长。由于多个原因,这些方法可以被阻断。例如: buffer full,metadata 不可用。这种配置会对获取元数据的总时间、键和值序列化、在执行 send ()时对缓冲区内存进行分区和分配进行最大的限制。对于 partitionsFor (),此配置会在等待元数据时实施最长时间阈值

60000

整数

camel.component.kafka.configuration.max-in-flight-request

在阻断前,客户端将在单一连接上发送的最大未确认请求数。请注意,如果此设置设定为大于 1 且存在失败发送,则因为重试导致消息重新排序的风险(例如,如果启用了重试)。

5

整数

camel.component.kafka.configuration.max-partition-fetch-bytes

服务器将返回每个分区的最大数据量。用于请求的最大内存总量为 #partitions max.partition.fetch.bytes。这种大小必须至少与服务器允许的最大消息大小相同,或者生产者能够发送大于消费者可获取的消息。如果出现这种情况,消费者会卡住在特定分区上获取大量信息。

1048576

整数

camel.component.kafka.configuration.max-poll-interval-ms

在使用消费者组管理时,poll ()的调用之间的最大延迟。这会在获取更多记录前处于闲置时间的上限。如果在这个超时过期前不调用 poll (),则消费者被视为失败,并且组将重新平衡,从而将分区重新分配给其他成员。

 

Long

camel.component.kafka.configuration.max-poll-records

单个调用中返回的最大记录数,用于 poll ()

500

整数

camel.component.kafka.configuration.max-request-size

请求的最大大小。这也是最高记录大小的上限。请注意,该服务器具有自己的记录大小,这可能与此大小不同。此设置将限制制作者将在单一请求中发送的记录数量,以避免发送大量请求。

1048576

整数

camel.component.kafka.configuration.metadata-max-age-ms

在毫秒内,我们强制刷新元数据,即使我们没有看到任何分区领导更改来主动发现任何新的代理或分区。

300000

整数

camel.component.kafka.configuration.metric-reporters

用作指标报告者的类列表。实施MetricReporter 接口允许在创建新指标的类中插入内容。JmxReporter 始终包含在内来注册 JMX 统计数据。

 

字符串

camel.component.kafka.configuration.metrics-sample-window-ms

为计算指标保留的样本数量。

30000

整数

camel.component.kafka.configuration.no-of-metrics-sample

为计算指标保留的样本数量。

2

整数

camel.component.kafka.configuration.offset-repository

要使用的偏移存储库,在本地存储每个主题分区的误差。定义一个将禁用 autocommit。

 

StateRepository

camel.component.kafka.configuration.partition-assignor

使用组管理时,客户端将使用的分区分配策略在原始消费者实例之间分发分区所有权

org.apache.kafka.clients.consumer.RangeAssignor

字符串

camel.component.kafka.configuration.partition-key

记录将被发送的分区(如果没有指定分区,则为 null)。如果配置了这个选项,则优先于标头 KafkaConstants#PARTITION_KEY

 

整数

camel.component.kafka.configuration.partitioner

partitioner 类,用于对st 子主题之间的消息进行分区。默认分区器基于密钥的哈希。

org.apache.kafka.clients.producer.internals.DefaultPartitioner

字符串

camel.component.kafka.configuration.poll-timeout-ms

轮询 KafkaConsumer 时使用的超时。

5000

Long

camel.component.kafka.configuration.producer-batch-size

每当多个记录被发送到同一分区时,生产者将尝试将记录集中到更少的请求。这有助于在客户端和服务器中性能。此配置以字节为单位控制默认批处理大小。将不尝试批量记录,而不是发送到代理的批处理。请求将包含多个批处理,每个分区都有多个批处理,每个分区都可用于发送数据。一个小批处理大小将降低批处理,并可以降低吞吐量(零批处理大小完全将完全禁用批处理)。非常大的批量大小可能会更加严重地使用内存,因为我们将始终在指定批处理大小中分配指定批处理大小的缓冲区。

16384

整数

camel.component.kafka.configuration.queue-buffering-max-messages

在使用 async 模式时可排队制作者的最大未处理消息数量,然后才能阻止制作者或必须丢弃数据。

10000

整数

camel.component.kafka.configuration.receive-buffer-bytes

TCP 接收缓冲区的大小(SO_RCVBUF),用于读取数据。

65536

整数

camel.component.kafka.configuration.reconnect-backoff-max-ms

重新连接到重复连接失败的代理时要等待的最大时间,以毫秒为单位。如果提供,每个主机的 backoff 将根据连续连接失败增加每个连续连接失败,最多增加这个最大值。在计算后,添加 20% 的随机 jitter 以避免连接停滞。

1000

整数

camel.component.kafka.configuration.reconnect-backoff-ms

尝试重新连接给定主机前等待的时间。这可避免在紧张循环中重复连接到主机。这个 backoff 适用于由消费者发送到代理的所有请求。

50

整数

camel.component.kafka.configuration.record-metadata

制作者应该存储 RecordMetadata 结果是否应该发送到 Kafka。结果存储在包含 RecordMetadata 元数据的列表中。列表存储在带有键 KafkaConstants#KAFKA_RECORDMETA 的标头中

true

布尔值

camel.component.kafka.configuration.request-required-acks

生产者数量要求领导人在考虑请求完成前收到。这会控制发送的记录的持久性。以下设置是 common: acks=0 如果设为零,则制作者不会等待全部来自服务器的任何致谢。记录将立即添加到套接字缓冲区并被视为发送。在这种情况下,无法保证服务器收到记录,并且重试配置不会生效(因为客户端通常不会知道任何故障)。每个记录给出的误差将始终设置为 -1. acks=1 意味着领导机会将记录写入其本地日志,但是不会对所有后续者等待完全确认。在这种情况下,领导机应该立即在记录后立即失败,但后续人已复制记录将会丢失。acks=all 意味着领导机将等待完全同步副本组确认记录。这样可保证,只要至少有一个 in-sync 副本仍保持上线,则记录将不会丢失。这是可用最强的保证。

1

字符串

camel.component.kafka.configuration.request-timeout-ms

在向客户端发送错误前,代理会等待满足 request.required.acks 要求的时间。

305000

整数

camel.component.kafka.configuration.retries

设置大于零的值将导致客户端重新发送发送失败并显示可能瞬态错误的任何记录。请注意,这个重试与客户端在收到错误时重新输入记录不同。允许重试可能会更改记录顺序,因为如果将两个记录发送到单个分区,则第一个记录失败并重试,但第二个记录可能会首先出现。

0

整数

camel.component.kafka.configuration.retry-backoff-ms

在重试前,生产者都会刷新相关主题的元数据,以查看是否已选定新的领导人。由于领导选举需要一定的时间,因此此属性指定制作者在刷新元数据前等待的时间。

100

整数

camel.component.kafka.configuration.sasl-jaas-config

公开 kafka sasl.jaas.config 参数示例: org.apache.kafka.common.security.plain.PlainLoginModule required username=USERNAME password=PASSWORD;

 

字符串

camel.component.kafka.configuration.sasl-kerberos-service-name

Kafka 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。

 

字符串

camel.component.kafka.configuration.sasl-mechanism

使用简单验证和安全层(SASL)机制。如需有效值,请参阅 http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

GSSAPI

字符串

camel.component.kafka.configuration.security-protocol

用于与代理通信的协议。目前只支持 PLAINTEXT 和 SSL。

PLAINTEXT

字符串

camel.component.kafka.configuration.seek-to

设定如果 KafkaConsumer 将在启动时读取或结束:从开头读取:从结尾读取,这将替换掉前面的属性 seekToBeginning

 

字符串

camel.component.kafka.configuration.send-buffer-bytes

套接字写入缓冲区大小

131072

整数

camel.component.kafka.configuration.serializer-class

消息的 serializer 类。

org.apache.kafka.common.serialization.StringSerializer

字符串

camel.component.kafka.configuration.session-timeout-ms

使用 Kafka 的组管理功能时检测故障的超时时间。

10000

整数

camel.component.kafka.configuration.ssl-cipher-suites

密码套件列表。这是身份验证、加密、MAC 和密钥交换算法的命名组合,用于使用 TLS 或 SSL 网络协议来协商网络连接的安全设置。By 默认为所有可用的加密套件。

 

字符串

camel.component.kafka.configuration.ssl-context-parameters

使用 Camel SSLContextParameters 对象的 SSL 配置。如果在其他 SSL 端点参数之前应用了它。

 

SSLContextParameters

camel.component.kafka.configuration.ssl-enabled-protocols

为 SSL 连接启用的协议列表。TLSv1.2、TLSv1.1 和 TLSv1 默认启用。

TLSv1.2,TLSv1.1,TLSv1

字符串

camel.component.kafka.configuration.ssl-endpoint-algorithm

使用服务器证书验证服务器主机名的端点标识算法。

 

字符串

camel.component.kafka.configuration.ssl-key-password

密钥存储文件中私钥的密码。对于客户端,这是可选的。

 

字符串

camel.component.kafka.configuration.ssl-keymanager-algorithm

密钥管理器工厂用于 SSL 连接的算法。默认值是 Java 虚拟机配置的主要管理器工厂算法。

SunX509

字符串

camel.component.kafka.configuration.ssl-keystore-location

密钥存储文件的位置。这对于客户端来说是可选的,可用于客户端的双向身份验证。

 

字符串

camel.component.kafka.configuration.ssl-keystore-password

密钥存储文件的存储密码。对于客户端是可选的,只有配置了 ssl.keystore.location 时才需要。

 

字符串

camel.component.kafka.configuration.ssl-keystore-type

密钥存储文件的文件格式。对于客户端,这是可选的。默认值为 JKS

JKS

字符串

camel.component.kafka.configuration.ssl-protocol

用于生成 SSLContext 的 SSL 协议。默认设置为 TLS,这适用于大多数情况。最近的 JVM 中允许的值是 TLS、TLSv1.1 和 TLSv1.2。较旧 JVM 中可能会支持 SSLv2 和 SSLv3,但由于已知安全漏洞,不建议使用它们。

TLS

字符串

camel.component.kafka.configuration.ssl-provider

用于 SSL 连接的安全提供商的名称。默认值是 JVM 的默认安全提供程序。

 

字符串

camel.component.kafka.configuration.ssl-trustmanager-algorithm

信任管理器工厂用于 SSL 连接的算法。默认值是 Java 虚拟机配置的信任管理器工厂算法。

PKIX

字符串

camel.component.kafka.configuration.ssl-truststore-location

信任存储文件的位置。

 

字符串

camel.component.kafka.configuration.ssl-truststore-password

信任存储文件的密码。

 

字符串

camel.component.kafka.configuration.ssl-truststore-type

信任存储文件的文件格式。默认值为 JKS。

JKS

字符串

camel.component.kafka.configuration.topic

要使用的主题名称。在使用者上,您可以使用逗号分隔多个主题。制作者只能发送消息到单个主题。

 

字符串

camel.component.kafka.configuration.topic-is-pattern

主题是模式(正则表达式)。这可用于订阅与模式匹配的动态主题。

false

布尔值

camel.component.kafka.configuration.value-deserializer

对实现 Deserializer 接口的值进行反序列化器类。

org.apache.kafka.common.serialization.StringDeserializer

字符串

camel.component.kafka.configuration.worker-pool

使用自定义 worker 池在 kafka 服务器后继续路由 Exchange,已确认通过异步非阻塞处理从 KafkaProducer 发送到它的消息。

 

ExecutorService

camel.component.kafka.configuration.worker-pool-core-size

kafka 服务器后用于继续路由 Exchange 的 worker 池的核心线程数量,确认已使用异步非阻塞处理从 KafkaProducer 发送到它的消息。

10

整数

camel.component.kafka.configuration.worker-pool-max-size

kafka 服务器后用于继续路由 Exchange 的 worker 池的最大线程数量已被确认使用异步非阻塞处理从 KafkaProducer 发送到它的消息。

20

整数

camel.component.kafka.enabled

启用 kafka 组件

true

布尔值

camel.component.kafka.kafka-manual-commit-factory

创建 KafkaManualCommit 实例使用的工厂。这样,在进行手动提交时,可以插入一个自定义 KafkaManualCommit 实例来创建自定义 KafkaManualCommit 实例。选项是一个 org.apache.camel.component.kafka.KafkaManualCommitFactory 类型。

 

字符串

camel.component.kafka.resolve-property-placeholders

启动时,组件是否应自行解析属性占位符。只有 String 类型的属性才能使用属性占位符。

true

布尔值

camel.component.kafka.shutdown-timeout

以毫秒为单位的超时,以正常等待使用者或制作者关闭和终止其 worker 线程。

30000

整数

camel.component.kafka.use-global-ssl-context-parameters

启用使用全局 SSL 上下文参数。

false

布尔值

camel.component.kafka.worker-pool

要使用共享自定义 worker 池在 kafka 服务器后继续路由 Exchange,请确认使用异步非阻塞处理从 KafkaProducer 发送到它的消息。如果使用这个选项,则必须处理线程池的生命周期,以便在需要时关闭池。选项为 java.util.concurrent.ExecutorService 类型。

 

字符串

有关 Producer/Consumer 配置的更多信息:

http://kafka.apache.org/documentation.html#newconsumerconfigs http://kafka.apache.org/documentation.html#producerconfigs