190.7. Kafka 멱등 리포지토리 사용
Camel 2.19에서 사용 가능
camel-kafka 라이브러리는 Kafka 주제 기반 멱등 리포지토리를 제공합니다. 이 리포지토리는 Kafka 주제의 멱등 상태(추가/제거)에 대한 모든 변경 사항을 저장하고 이벤트 소싱을 통해 각 리포지토리의 프로세스 인스턴스에 대한 로컬 메모리 내 캐시를 채웁니다.
사용된 항목은 멱등 리포지토리 인스턴스별로 고유해야 합니다. 이 메커니즘에는 주제 파티션 수에 대한 요구 사항이 없습니다. 리포지토리가 모든 파티션에서 동시에 사용되므로. 또한 주제의 복제 요소에 대한 요구 사항이 없습니다.
주제를 사용하는 각 리포지토리 인스턴스(예: 병렬로 실행되는 다른 시스템에서)는 자체 소비자 그룹을 제어하므로 동일한 주제를 사용하는 10 Camel 프로세스의 클러스터에서 각각 자체 오프셋을 제어합니다.
시작 시 인스턴스에서 주제를 구독하고 처음 오프셋을 다시 시도하여 캐시를 최신 상태로 다시 빌드합니다. 이 캐시는 pollDurationM 의 하나의 폴링이 0 레코드를 반환할 때까지 워밍업으로 간주되지 않습니다. 시작은 캐시가 웜화되거나 30초가 될 때까지 완료되지 않습니다. 후자의 경우 소비자가 주제가 끝날 때까지 멱등 리포지토리가 일관성 없는 상태가 될 수 있습니다.
KafkaIdempotentRepository 에는 다음 속성이 있습니다.
| 속성 | 설명 |
|---|---|
|
| 변경 사항을 브로드캐스트하는 데 사용할 Kafka 항목의 이름입니다. (필수) |
|
|
내부 Kafka 생산자 및 소비자의 |
|
|
변경 사항을 브로드캐스트하는 Kafka 생산자에서 사용할 속성을 설정합니다. |
|
|
주제에서 캐시를 채우는 Kafka 소비자가 사용할 속성을 설정합니다. |
|
| 메모리에 저장해야 하는 가장 최근에 사용된 키 수(기본값 1000)입니다. |
|
|
Kafka 소비자의 폴링 기간입니다. 로컬 캐시는 즉시 업데이트됩니다. 이 값은 주제에서 캐시를 업데이트하는 다른 피어 뒤에는 캐시 작업 메시지를 보낸 멱등 소비자 인스턴스와 관련된 정도에 영향을 미칩니다. 기본값은 100ms입니다. |
주제 및 bootstrapServers 를 정의하여 리포지토리를 인스턴스화할 수 있습니다. 또는 producerConfig 및 consumerConfig 속성 세트를 명시적으로 정의하여 SSL/SASL과 같은 기능을 활성화할 수 있습니다.
이 리포지토리를 사용하려면 수동으로 또는 Spring/Blueprint의 빈으로 등록하여 Camel 레지스트리에 배치해야 합니다.
샘플 사용법은 다음과 같습니다.
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);
// later in RouteBuilder...
from("direct:performInsert")
.idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
// once-only insert into database
.end()XML에서 다음을 수행합니다.
<!-- simple -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo"
class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>