11.8. idempotent 소비자

11.8.1. 개요

idempotent 소비자 패턴은 중복 메시지를 필터링하는 데 사용됩니다. 예를 들어 메시징 시스템과 소비자 끝점 간의 연결이 시스템의 일부 오류로 인해 갑자기 손실되는 시나리오를 고려해 보십시오. 메시지 전송 중 메시징 시스템이 메시지 전송 중이면 소비자가 마지막 메시지를 수신했는지의 여부를 명확하지 않을 수 있습니다. 전송 안정성을 개선하기 위해 메시징 시스템은 연결이 다시 설정되는 즉시 이러한 메시지를 다시 전송하도록 결정할 수 있습니다. 안타깝게도, 이는 소비자가 중복 메시지를 수신할 수 있는 위험을 수반하며, 경우에 따라 메시지를 복제하는 영향은 바람직하지 않은 결과를 초래할 수 있습니다(예: 계정의 합계를 초월하는 등). 이 시나리오에서 idempotent 소비자는 메시지 스트림에서 원하지 않는 중복을 제거하는 데 사용할 수 있습니다.

Camel은 다음과 같은 Idempotent Consumer 구현을 제공합니다.

11.8.2. 메모리 내 캐시를 사용하는 idempotent 소비자

Apache Camel에서 idempotent 소비자 패턴은 두 개의 인수를 사용하는 idempotentConsumer() 프로세서에 의해 구현됩니다.

  • 현재 메시지에 대한 메시지 ID 문자열을 반환하는 식 입니다.An expression that returns a message ID string for the current message.
  • messageIdRepository Cryostat- Cryostat는 메시지 ID 리포지토리에 대한 참조로, 수신된 모든 메시지의 ID를 저장합니다.

각 메시지가 들어오는 대로 idempotent 소비자 프로세서는 리포지토리에서 현재 메시지 ID를 조회하여 이 메시지가 이전에 표시되는지 확인합니다. yes인 경우 메시지가 삭제됩니다. 메시지가 전달되지 않으면 메시지가 허용되고 해당 ID가 리포지토리에 추가됩니다.

예 11.1. “메모리 내 캐시를 사용하여 메시지 복제 필터링” 에 표시된 코드는 TransactionID 헤더를 사용하여 중복을 필터링합니다.

예 11.1. 메모리 내 캐시를 사용하여 메시지 복제 필터링

import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
...
RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        from("seda:a")
          .idempotentConsumer(
             header("TransactionID"),
             memoryMessageIdRepository(200)
          ).to("seda:b");
    }
};

memoryMessageIdRepository(200) 를 호출하면 최대 200개의 메시지 ID를 보유할 수 있는 메모리 내 캐시가 생성됩니다.

XML 구성을 사용하여 idempotent 소비자를 정의할 수도 있습니다. 예를 들어 다음과 같이 XML로 이전 경로를 정의할 수 있습니다.

<camelContext id="buildIdempotentConsumer" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <idempotentConsumer messageIdRepositoryRef="MsgIDRepos">
      <simple>header.TransactionID</simple>
      <to uri="seda:b"/>
    </idempotentConsumer>
  </route>
</camelContext>

<bean id="MsgIDRepos" class="org.apache.camel.processor.idempotent.MemoryMessageIdRepository">
    <!-- Specify the in-memory cache size. -->
    <constructor-arg type="int" value="200"/>
</bean>
참고

Camel 2.17에서 Idempotent Repository는 선택적 직렬화된 헤더를 지원합니다.

11.8.3. JPA 리포지토리가 있는 idempotent 소비자

메모리 내 캐시는 메모리가 부족하여 클러스터형 환경에서 작동하지 않는 단점이 있습니다. 이러한 단점을 극복하기 위해 대신 Java Persistent API(JPA) 기반 리포지토리를 사용할 수 있습니다. JPA 메시지 ID 리포지토리는 오브젝트 지향 데이터베이스를 사용하여 메시지 ID를 저장합니다. 예를 들어 다음과 같이 멱등 소비자에 JPA 리포지토리를 사용하는 경로를 정의할 수 있습니다.

import org.springframework.orm.jpa.JpaTemplate;

import org.apache.camel.spring.SpringRouteBuilder;
import static org.apache.camel.processor.idempotent.jpa.JpaMessageIdRepository.jpaMessageIdRepository;
...
RouteBuilder builder = new SpringRouteBuilder() {
    public void configure() {
        from("seda:a").idempotentConsumer(
          header("TransactionID"),
          jpaMessageIdRepository(bean(JpaTemplate.class), "myProcessorName")
        ).to("seda:b");
    }
};

JPA 메시지 ID 리포지토리는 다음 두 개의 인수로 초기화됩니다.

  • jpaTemplate instance Cryostat- Cryostat - JPA 데이터베이스의 처리 기능을 제공합니다.
  • 현재 멱등 소비자 프로세서를 나타냅니다.Represents the current idempotent consumer processor.

SpringRouteBuilder.bean() 메서드는 Spring XML 파일에 정의된 8080을 참조하는 바로 가기입니다. JpaTemplate 8080은 기본 JPA 데이터베이스에 대한 처리를 제공합니다. 이 빈을 구성하는 방법에 대한 자세한 내용은 JPA 설명서를 참조하십시오.

JPA 리포지토리 설정에 대한 자세한 내용은 JPA 구성 요소 문서, Spring JPA 문서 및 Camel JPA 단위 테스트 의 샘플 코드를 참조하십시오.

11.8.4. Spring XML 예

다음 예제에서는 myMessageId 헤더를 사용하여 중복을 필터링합니다.

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

11.8.5. JDBC 리포지토리가 있는 idempotent 소비자

idempotent 소비자 패턴에 메시지 ID를 저장하는 데 JDBC 리포지토리도 지원됩니다. JDBC 리포지토리의 구현은 SQL 구성 요소에서 제공하므로 Maven 빌드 시스템을 사용하는 경우 camel-sql 아티팩트에 대한 종속성을 추가합니다.

SQL 데이터베이스에 대한 연결을 인스턴스화하기 위해 Spring persistence API에서 SingleConnectionDataSource JDBC 래퍼 클래스를 사용할 수 있습니다. 예를 들어 HyperSQL 데이터베이스 인스턴스에 대한 JDBC 연결을 인스턴스화하려면 다음 JDBC 데이터 소스를 정의할 수 있습니다.

<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>
</bean>
참고

위의 JDBC 데이터 소스는 메모리 전용 데이터베이스 인스턴스를 생성하는 HyperSQL mem 프로토콜을 사용합니다. 이는 실제로 지속되지 않은 HyperSQL 데이터베이스의 강력한 구현입니다.

이전 데이터 소스를 사용하여 다음과 같이 JDBC 메시지 ID 리포지토리를 사용하는 idempotent 소비자 패턴을 정의할 수 있습니다.

<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
	<constructor-arg ref="dataSource" />
	<constructor-arg value="myProcessorName" />
</bean>

<camel:camelContext>
	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
	</camel:errorHandler>

	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
		<camel:from uri="direct:start" />
		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
			<camel:header>messageId</camel:header>
			<camel:to uri="mock:result" />
		</camel:idempotentConsumer>
	</camel:route>
	</camel:camelContext>

11.8.6. 경로에서 중복 메시지를 처리하는 방법

Camel 2.8 사용 가능

이제 skipDuplicate 옵션을 false 로 설정하여 idempotent 소비자가 중복 메시지를 라우팅하도록 지시할 수 있습니다. 그러나 “교환” 의 속성을 true로 설정하여 중복 메시지가 중복으로 표시되었습니다. 8.1절. “콘텐츠 기반 라우터” 또는 8.2절. “메시지 필터” 를 사용하여 이를 감지하고 중복 메시지를 처리하여 이 사실을 활용할 수 있습니다.

예를 들어 다음 예에서 8.2절. “메시지 필터” 를 사용하여 메시지를 중복 끝점으로 보낸 다음 해당 메시지의 계속 라우팅을 중지합니다.

from("direct:start")
     // instruct idempotent consumer to not skip duplicates as we will filter then our self
     .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
     .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
         // filter out duplicate messages by sending them to someplace else and then stop
         .to("mock:duplicate")
         .stop()
     .end()
     // and here we process only new messages (no duplicates)
     .to("mock:result");

XML DSL의 샘플 예제는 다음과 같습니다.

 <!-- idempotent repository, just use a memory based for testing -->
 <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

 <camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- we do not want to skip any duplicate messages -->
         <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false">
             <!-- use the messageId header as key for identifying duplicate messages -->
             <header>messageId</header>
             <!-- we will to handle duplicate messages using a filter -->
             <filter>
                 <!-- the filter will only react on duplicate messages, if this property is set on the Exchange -->
                 <property>CamelDuplicateMessage</property>
                 <!-- and send the message to this mock, due its part of an unit test -->
                 <!-- but you can of course do anything as its part of the route -->
                 <to uri="mock:duplicate"/>
                 <!-- and then stop -->
                 <stop/>
             </filter>
             <!-- here we route only new messages -->
             <to uri="mock:result"/>
         </idempotentConsumer>
     </route>
 </camelContext>

11.8.7. 데이터 그리드를 사용하여 클러스터형 환경에서 중복 메시지를 처리하는 방법

클러스터형 환경에서 Camel을 실행하는 경우 메모리의 멱등 리포지토리가 작동하지 않습니다(위 참조). 중앙 데이터베이스를 설정하거나 Hazelcast 데이터 그리드를 기반으로 멱등 소비자 구현을 사용할 수 있습니다. Hazelcast는 멀티 캐스트를 통해 노드를 찾습니다(기본값 - tcp-ip의 Hazelcast 구성)는 맵 기반 리포지토리를 자동으로 생성합니다.

HazelcastIdempotentRepository idempotentRepo = new HazelcastIdempotentRepository("myrepo");

from("direct:in").idempotentConsumer(header("messageId"), idempotentRepo).to("mock:out");

리포지토리가 각 메시지 ID를 유지해야 하는 기간을 정의해야 합니다(기본값은 절대 삭제하지 않음). 메모리 부족을 방지하려면 Hazelcast 구성 을 기반으로 제거 전략을 생성해야 합니다. 자세한 내용은 Hazelcast 를 참조하십시오.

이 링크 참조:http://camel.apache.org/hazelcast-idempotent-repository-tutorial.html[Idempotent 리포지토리

Apache Karaf를 사용하여 두 클러스터 노드에 멱등 리포지토리를 설정하는 방법에 대한 자세한 내용은 튜토리얼]을 참조하십시오.

11.8.8. 옵션

Idempotent Consumer에는 다음과 같은 옵션이 있습니다.

옵션

Default

설명

eager

true

Camel 2.0: Eager는 Camel이 교환이 처리되기 전이나 후에 메시지를 리포지토리에 추가하는지 여부를 제어합니다. 이전에 활성화된 경우 Camel은 현재 메시지가 진행 중인 경우에도 중복 메시지를 감지할 수 있습니다. Camel을 비활성화하면 메시지가 성공적으로 처리된 경우에만 중복을 감지합니다.

messageIdRepositoryRef

null

레지스트리에서 조회할 IdempotentRepository 에 대한 참조입니다. 이 옵션은 XML DSL을 사용할 때 필요합니다.

skipDuplicate

true

Camel 2.8: 중복 메시지를 건너뛸지 여부를 설정합니다. false 로 설정하면 메시지가 계속됩니다. 그러나 “교환”Exchange.DUPLICATE_MESSAG 교환 속성을 부울.TRUE 값으로 설정하여 중복으로 표시되었습니다.

completionEager

false

Camel 2.16: 교환이 완료되면 Idempotent consumer eager를 완료할지 여부를 설정합니다.

completeEager 옵션을 true로 설정하면 교환이 멱등 소비자 패턴 블록의 끝까지 도달할 때 Idempotent Consumer가 완료를 트리거합니다. 그러나 엔드 블록 후에도 교환이 계속 라우팅되는 경우 멱등 소비자의 상태에는 영향을 미치지 않습니다.

completeEager 옵션을 false로 설정하면 교환이 완료되면 Idempotent Consumer가 완료를 트리거하고 라우팅됩니다. 그러나 블록이 종료된 후에도 교환이 계속 라우팅되는 경우 멱등 소비자의 상태에도 영향을 미칩니다. 예를 들어 교환이 실패하는 경우 예외로 인해 idempotent 소비자의 상태는 롤백이 됩니다.