189.6. 使用 Kafka 幂等存储库
从 Camel 2.19 可用
camel-kafka
库提供了一个基于 Kafka 的幂等存储库。此存储库将广播在 Kafka 主题中存储对幂等状态(add/remove)的所有更改,并通过事件采购填充每个存储库的进程实例的本地内存缓存。
所使用的主题必须为每个幂等存储库实例唯一。机制对主题分区数量没有任何要求;因为仓库同时消耗所有分区。它还对主题复制因素没有任何要求。
每个使用主题的仓库实例(例如,通常在并行运行的不同机器上)控制自己的消费者组,因此在使用相同主题的 10 个 Camel 进程集群中,每个标题都会控制自己的偏移。
在启动时,实例会订阅该主题,并向开始便重新显示偏移量,将缓存重新重建到最新状态。在较长的 pollDurationM 后,缓存不会被视为温量,它会返回
0 记录。在缓存启动前,启动才会完成,或由 30 秒启动或 30 秒;如果后者在使用者到达主题结束之前,幂等存储库可能会处于不一致的状态。
KafkaIdempotentRepository
具有以下属性:
属性 | 描述 |
---|---|
| 用于广播更改的 Kafka 主题名称(必需) |
|
内部 Kafka producer 和使用者上的 |
|
设置用于广播更改的 Kafka 制作者使用的属性。覆盖 |
|
设置 Kafka consumer 使用的属性,该属性从主题填充缓存。覆盖 |
| 最近使用的密钥应存储在内存中(默认值 1000)。 |
|
Kafka consumer 的 poll 持续时间。本地缓存会立即更新。这个值将影响到其他同伴的后面,它们从主题更新其缓存的缓存与发送缓存操作消息的幂等使用者实例相对应。默认值为 100 ms。 |
可以通过定义 主题
和 bootstrapServers
或 producer
属性集来实例化存储库,以显式定义启用 SSL/SASL 等功能。
Config
要使用,此仓库必须放在 Camel 注册表中,也可以手动注册为 Spring/Blueprint 中的 bean,因为它是 CamelContext
aware。
示例用法如下:
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); SimpleRegistry registry = new SimpleRegistry(); registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext CamelContext context = new CamelContext(registry); // later in RouteBuilder... from("direct:performInsert") .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo") // once-only insert into database .end()
在 XML 中:
<!-- simple --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="bootstrapServers" value="localhost:9091"/> </bean> <!-- complex --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="maxCacheSize" value="10000"/> <property name="consumerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> <property name="producerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> </bean>