189.6. 使用 Kafka 幂等存储库

从 Camel 2.19 可用

camel-kafka 库提供了一个基于 Kafka 的幂等存储库。此存储库将广播在 Kafka 主题中存储对幂等状态(add/remove)的所有更改,并通过事件采购填充每个存储库的进程实例的本地内存缓存。

所使用的主题必须为每个幂等存储库实例唯一。机制对主题分区数量没有任何要求;因为仓库同时消耗所有分区。它还对主题复制因素没有任何要求。

每个使用主题的仓库实例(例如,通常在并行运行的不同机器上)控制自己的消费者组,因此在使用相同主题的 10 个 Camel 进程集群中,每个标题都会控制自己的偏移。

在启动时,实例会订阅该主题,并向开始便重新显示偏移量,将缓存重新重建到最新状态。在较长的 pollDurationM 后,缓存不会被视为温量,它会返回 0 记录。在缓存启动前,启动才会完成,或由 30 秒启动或 30 秒;如果后者在使用者到达主题结束之前,幂等存储库可能会处于不一致的状态。

KafkaIdempotentRepository 具有以下属性:

属性描述

topic

用于广播更改的 Kafka 主题名称(必需)

bootstrapServers

内部 Kafka producer 和使用者上的 bootstrap.servers 属性。如果没有设置 consumerConfigproducerConfig,则使用它作为简写。如果使用,此组件将为制作者和消费者应用可靠的默认配置。

producerConfig

设置用于广播更改的 Kafka 制作者使用的属性。覆盖 bootstrapServers,因此必须定义 Kafka bootstrap.servers 属性本身

consumerConfig

设置 Kafka consumer 使用的属性,该属性从主题填充缓存。覆盖 bootstrapServers,因此必须定义 Kafka bootstrap.servers 属性本身

maxCacheSize

最近使用的密钥应存储在内存中(默认值 1000)。

pollDurationMs

Kafka consumer 的 poll 持续时间。本地缓存会立即更新。这个值将影响到其他同伴的后面,它们从主题更新其缓存的缓存与发送缓存操作消息的幂等使用者实例相对应。默认值为 100 ms。
如果明确设置这个值,请注意远程缓存存活度和此存储库使用者和 Kafka 代理之间网络流量的卷之间存在利弊。Cache warmup 进程还取决于获取任何内容的一个轮询 - 这表明流已被消耗到当前点上。如果轮询持续时间过长,在主题上发送消息的速率过长,则可能存在缓存无法破解,并且其同级同级状态会处于不一致的状态,直到它捕获为止。

可以通过定义 主题bootstrapServersproducer Config 属性集来实例化存储库,以显式定义启用 SSL/SASL 等功能。

要使用,此仓库必须放在 Camel 注册表中,也可以手动注册为 Spring/Blueprint 中的 bean,因为它是 CamelContext aware。

示例用法如下:

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);

// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

在 XML 中:

<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>