10장. 메시지 Cryostat

초록

메시지 변환 패턴은 다양한 용도로 메시지 내용을 수정하는 방법을 설명합니다.

10.1. 콘텐츠 Enricher

10.1.1. 개요

콘텐츠 강화 패턴은 메시지 대상에 원본 메시지에 있는 것보다 더 많은 데이터가 필요한 시나리오를 설명합니다. 이 경우 메시지 번역자, 라우팅 논리의 임의의 프로세서 또는 컨텐츠 강화 방법을 사용하여 외부 리소스에서 추가 데이터를 가져올 수 있습니다.

그림 10.1. Content Enricher Pattern

콘텐츠 강화 패턴

10.1.2. 콘텐츠 강화를 위한 대안

Apache Camel은 콘텐츠를 보강하는 여러 가지 방법을 지원합니다.

  • 라우팅 논리에서 임의의 프로세서가 있는 메시지 번역
  • enrich() 메서드는 현재 교환의 사본을 생산자 엔드 포인트로 보낸 다음 결과 응답의 데이터를 사용하여 리소스에서 추가 데이터를 가져옵니다. 부자에 의해 생성된 교환은 항상 InOut 교환입니다.
  • pollEnrich() 메서드는 데이터를 위해 소비자 엔드포인트를 폴링하여 추가 데이터를 가져옵니다. 효과적으로, 메인 경로의 소비자 끝점과 pollEnrich() 작업의 소비자 끝점이 결합됩니다. 즉, 경로의 초기 소비자에 대한 수신 메시지는 소비자의 pollEnrich() 메서드를 폴링합니다.
참고

enrich()pollEnrich() 메서드는 동적 엔드포인트 URI를 지원합니다. 현재 교환에서 값을 가져올 수 있는 식을 지정하여 URI를 계산할 수 있습니다. 예를 들어 데이터 교환에서 계산된 이름으로 파일을 폴링할 수 있습니다. 이 동작은 Camel 2.16에서 도입되었습니다. 이 변경으로 인해 XML DSL이 중단되고 쉽게 마이그레이션할 수 있습니다. Java DSL은 이전 버전과 호환됩니다.

10.1.3. 메시지 번역가 및 프로세서를 사용하여 컨텐츠 강화

Camel은 스마트 완료를 제공하고 리팩토링 보안을 유지하는 유형 안전 IDE 친화적인 방법을 사용하여 라우팅 및 중재 규칙을 생성할 수 있는 유창한 빌더 를 제공합니다. 분산 시스템을 테스트할 때 특정 시스템을 사용하거나 작성할 때까지 시스템의 다른 부분을 테스트할 수 있도록 특정 외부 시스템을 스텁해야 합니다. 이를 수행하는 한 가지 방법은 대부분 정적인 본문이 있는 동적 메시지를 생성하여 요청에 대한 응답을 생성하는 일종의 템플릿 시스템을 사용하는 것입니다. 템플릿을 사용하는 또 다른 방법은 한 대상의 메시지를 사용하여 Velocity 또는 XQuery 와 같은 것으로 변환한 다음 다른 대상으로 전송하는 것입니다. 다음 예제에서는 InOnly (한 가지 방법) 메시지에 대해 이를 보여줍니다.

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

InOut (request-reply) 메시징을 사용하여 ActiveMQ의 My.Queue 큐에서 요청을 처리한다고 가정합니다. JMSReplyTo 대상으로 이동하는 템플릿 생성 응답이 필요합니다. 다음 예제에서는 이 작업을 수행하는 방법을 보여줍니다.

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

다음 간단한 예제에서는 DSL을 사용하여 메시지 본문을 변환하는 방법을 보여줍니다.

from("direct:start").setBody(body().append(" World!")).to("mock:result");

다음 예제에서는 명시적 Java 코드를 사용하여 프로세서를 추가합니다.

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

다음 예제에서는 useful integration을 사용하여 빈을 사용하여 transformer 역할을 할 수 있습니다.

from("activemq:My.Queue").
  beanRef("myBeanName", "myMethodName").
  to("activemq:Another.Queue");

다음 예제에서는 Spring XML 구현을 보여줍니다.

<route>
  <from uri="activemq:Input"/>
  <bean ref="myBeanName" method="doTransform"/>
  <to uri="activemq:Output"/>
</route>/>

10.1.4. enrich() 메서드를 사용하여 콘텐츠를 강화

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")
...

컨텐츠 강화(신분)는 들어오는 메시지(복원 교환포함)를 강화하기 위해 리소스 끝점 에서 추가 데이터를 검색합니다. 집계 전략은 원래 교환과 리소스 교환을 결합합니다. 집계Strategy.aggregate(Exchange, Exchange) 메서드의 첫 번째 매개 변수는 원래 교환에 해당하고 두 번째 매개 변수는 리소스 교환에 해당합니다. 리소스 끝점의 결과는 리소스 교환의 Out 메시지에 저장됩니다. 다음은 자체 집계 전략 클래스를 구현하기 위한 샘플 템플릿입니다.

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

이 템플릿을 사용하면 원래 교환에 모든 교환 패턴이 있을 수 있습니다. 번영자에 의해 생성되는 리소스 교환은 항상 InOut 교환입니다.

10.1.5. Spring XML 강화 예

위 예제는 Spring XML에서도 구현할 수 있습니다.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

10.1.6. 콘텐츠를 보강할 때 기본 집계 전략

집계 전략은 선택 사항입니다. 이를 제공하지 않으면 Apache Camel에서 기본적으로 리소스에서 얻은 본문을 사용합니다. 예를 들면 다음과 같습니다.

from("direct:start")
  .enrich("direct:resource")
  .to("direct:result");

이전 경로에서 direct:result 엔드포인트로 전송된 메시지에는 direct:resource 의 출력이 포함되어 있습니다. 이 예제에서는 사용자 지정 집계를 사용하지 않기 때문입니다.

XML DSL에서 다음과 같이 strategyRef 특성을 생략합니다.

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

10.1.7. enrich() 메서드에서 지원하는 옵션

강력한 DSL 명령은 다음 옵션을 지원합니다.

이름

기본값

설명

expression

없음

Camel 2.16부터 이 옵션이 필요합니다. 에서 보강할 외부 서비스의 URI를 구성하기 위한 표현식을 지정합니다. 단순 표현식 언어, Constant 표현식 언어 또는 현재 교환의 값에서 동적으로 URI를 계산할 수 있는 기타 언어를 사용할 수 있습니다.

uri

 

이러한 옵션이 제거되었습니다. 대신 expression 옵션을 지정합니다. Camel 2.15 및 이전 버전에서는 uri 옵션 또는 ref 옵션 사양이 필요했습니다. 각 옵션은 외부 서비스에서 보강할 끝점 URI를 지정했습니다.

ref

 

에서 보강할 외부 서비스의 끝점을 나타냅니다. uri 또는 ref 중 하나를 사용해야 합니다.

strategyRef

 

외부 서비스의 응답을 단일 발신 메시지로 병합하는 데 사용되는 집계Strategy 를 나타냅니다. 기본적으로 Camel은 외부 서비스의 응답을 발신 메시지로 사용합니다. Cryostat를 AggregationStrategy 로 사용할 수 있습니다. 자세한 내용은 집계 패턴 설명서를 참조하십시오.

strategyMethodName

 

Cryostat를 AggregationStrategy 로 사용하는 경우 이 옵션을 지정하여 집계 메서드의 이름을 명시적으로 선언합니다. 자세한 내용은 집계 패턴을 참조하십시오.

strategyMethodAllowNull

false

기본 동작은 집계 메서드가 보강할 데이터가 없는 경우 사용되지 않는다는 것입니다. 이 옵션이 true이면 보강할 데이터가 없고 AggregationStrategy 로 Cryostat를 사용하는 경우 null 값이 oldExchange 로 사용됩니다. 자세한 내용은 집계 패턴을 참조하십시오.

aggregateOnException

false

리소스에서 보강하기 위해 데이터를 검색하는 동안 예외가 발생한 경우 기본 동작은 집계 메서드가 사용되지 않는다는 것입니다. 이 옵션을 true 로 설정하면 최종 사용자가 집계 메서드에 예외가 있는 경우 수행할 작업을 제어할 수 있습니다. 예를 들어 예외를 억제하거나 사용자 정의 메시지 본문을 설정할 수 있습니다.

shareUntOfWork

false

Camel 2.16부터는 기본 동작은 보강 작업이 부모 교환과 리소스 교환 간에 작업 단위를 공유하지 않는다는 것입니다. 즉, 리소스 교환에는 자체 개별 작업 단위가 있습니다. 자세한 내용은 Splitter 패턴 설명서를 참조하십시오.

cacheSize

1000

Camel 2.16부터는 ProducerCache 의 캐시 크기를 구성하려면 이 옵션을 지정하여 보강 작업에서 재사용할 수 있도록 생산자를 캐시합니다. 이 캐시를 끄려면 cacheSize 옵션을 -1 로 설정합니다.

ignoreInvalidEndpoint

false

Camel 2.16부터 이 옵션은 해결할 수 없는 엔드포인트 URI를 무시할지 여부를 나타냅니다. 기본 동작은 Camel이 잘못된 엔드포인트 URI를 식별하는 예외를 throw한다는 것입니다.

10.1.8. enrich() 메서드를 사용할 때 집계 전략 지정

enrich() 메서드는 리소스 끝점에서 추가 데이터를 검색하여 원래 교환에 포함된 들어오는 메시지를 보강합니다. 집계 전략을 사용하여 원래 교환과 리소스 교환을 결합할 수 있습니다. 집계Strategy.aggregate(Exchange, 교환) 방법의 첫 번째 매개변수는 원래 교환에 해당합니다. 두 번째 매개 변수는 리소스 교환에 해당합니다. 리소스 끝점의 결과는 리소스 교환의 Out 메시지에 저장됩니다. 예를 들면 다음과 같습니다.

AggregationStrategy aggregationStrategy = ...

   from("direct:start")
   .enrich("direct:resource", aggregationStrategy)
   .to("direct:result");

   from("direct:resource")
...

다음 코드는 집계 전략을 구현하기 위한 템플릿입니다. 이 템플릿을 사용하는 구현에서는 원래 교환이 모든 메시지 교환 패턴일 수 있습니다. 번영자에 의해 생성되는 리소스 교환은 항상 InOut 메시지 교환 패턴입니다.

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object originalBody = original.getIn().getBody();
        Object resourceResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

다음 예제에서는 Spring XML DSL을 사용하여 집계 전략을 구현하는 방법을 보여줍니다.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

10.1.9. enrich()를 사용하여 동적 URI 사용

Camel 2.16부터 enrich()pollEnrich() 메서드는 현재 교환의 정보를 기반으로 계산되는 동적 URI 사용을 지원합니다. 예를 들어 orderId 키가 있는 헤더가 HTTP URL의 콘텐츠 경로의 일부로 사용되는 HTTP 끝점에서 강화하려면 다음과 같은 작업을 수행할 수 있습니다.

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

다음은 XML DSL의 예는 다음과 같습니다.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

10.1.10. pollEnrich() 메서드를 사용하여 콘텐츠를 강화

pollEnrich 명령은 리소스 끝점을 소비자 로 처리합니다. 리소스 엔드포인트로 교환을 보내는 대신 엔드포인트를 폴링 합니다. 리소스 끝점에서 사용할 수 있는 교환이 없는 경우 기본적으로 폴링은 즉시 반환됩니다. 예를 들어 다음 경로는 들어오는 JMS 메시지의 헤더에서 이름이 추출된 파일을 읽습니다.

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

파일이 준비될 때까지 대기할 시간을 제한할 수 있습니다. 다음 예제에서는 최대 20초 대기를 보여줍니다.

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

pollEnrich() 에 대한 집계 전략을 지정할 수도 있습니다. 예를 들면 다음과 같습니다.

   .pollEnrich("file://order/data/additional?fileName=orderId", 20000, aggregationStrategy)

pollEnrich() 메서드는 consumer.bridgeErrorHandler=true 로 구성된 소비자를 지원합니다. 이를 통해 경로 오류 처리기에 전파되는 폴링의 예외가 있어 폴링을 다시 시도할 수 있습니다.

참고

consumer.bridgeErrorHandler=true 에 대한 지원은 Camel 2.18의 새로운 기능입니다. 이 동작은 Camel 2.17에서 지원되지 않습니다.

집계 전략의 aggregate() 메서드에 전달된 리소스 교환은 교환이 수신되기 전에 폴링이 시간 초과되면 null 일 수 있습니다.

10.1.11. pollEnrich()에서 사용하는 폴링 방법

pollEnrich() 메서드는 다음 폴링 방법 중 하나를 호출하여 소비자 끝점을 폴링합니다.

  • receiveNoWait()(기본값입니다.)
  • receive()
  • receive(긴 시간)

pollEnrich() 명령의 시간 초과 인수(밀리초 단위로 지정)는 다음과 같이 호출할 메서드를 결정합니다.

  • 시간 초과가 0 이거나 지정되지 않은 경우 pollEnrich() 호출은 11 Wait을 수신합니다.
  • 시간 제한이 음수이면 pollEnrich() 호출이 수신됩니다.
  • 그렇지 않으면 pollEnrich() 호출이 수신(timeout) 됩니다.

데이터가 없는 경우 집계 전략의 newExchange 는 null입니다.

10.1.12. pollEnrich() 메서드를 사용하는 예

다음 예제에서는 inbox/data.txt 파일에서 콘텐츠를 로드하여 메시지를 보강하는 방법을 보여줍니다.

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

다음은 XML DSL의 예는 다음과 같습니다.

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

지정된 파일이 없으면 메시지가 비어 있습니다. 파일이 존재하거나 특정 시간까지 대기할 시간 초과를 지정할 수 있습니다. 다음 예에서 명령은 5초 이상 대기하지 않습니다.

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="5000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

10.1.13. pollEnrich()와 함께 동적 URI 사용

Camel 2.16부터 enrich()pollEnrich() 메서드는 현재 교환의 정보를 기반으로 계산되는 동적 URI 사용을 지원합니다. 예를 들어, 헤더를 사용하여 SEDA 큐 이름을 나타내는 끝점에서 강력한 폴링을 수행하려면 다음과 같은 작업을 수행할 수 있습니다.

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

다음은 XML DSL의 예는 다음과 같습니다.

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

10.1.14. pollEnrich() 메서드에서 지원하는 옵션

pollEnrich DSL 명령은 다음 옵션을 지원합니다.

이름

기본값

설명

expression

없음

Camel 2.16부터 이 옵션이 필요합니다. 에서 보강할 외부 서비스의 URI를 구성하기 위한 표현식을 지정합니다. 단순 표현식 언어, Constant 표현식 언어 또는 현재 교환의 값에서 동적으로 URI를 계산할 수 있는 기타 언어를 사용할 수 있습니다.

uri

 

이러한 옵션이 제거되었습니다. 대신 expression 옵션을 지정합니다. Camel 2.15 및 이전 버전에서는 uri 옵션 또는 ref 옵션 사양이 필요했습니다. 각 옵션은 외부 서비스에서 보강할 끝점 URI를 지정했습니다.

ref

 

에서 보강할 외부 서비스의 끝점을 나타냅니다. uri 또는 ref 중 하나를 사용해야 합니다.

strategyRef

 

외부 서비스의 응답을 단일 발신 메시지로 병합하는 데 사용되는 집계Strategy 를 나타냅니다. 기본적으로 Camel은 외부 서비스의 응답을 발신 메시지로 사용합니다. Cryostat를 AggregationStrategy 로 사용할 수 있습니다. 자세한 내용은 집계 패턴 설명서를 참조하십시오.

strategyMethodName

 

Cryostat를 AggregationStrategy 로 사용하는 경우 이 옵션을 지정하여 집계 메서드의 이름을 명시적으로 선언합니다. 자세한 내용은 집계 패턴을 참조하십시오.

strategyMethodAllowNull

false

기본 동작은 집계 메서드가 보강할 데이터가 없는 경우 사용되지 않는다는 것입니다. 이 옵션이 true이면 보강할 데이터가 없고 AggregationStrategy 로 Cryostat를 사용하는 경우 null 값이 oldExchange 로 사용됩니다. 자세한 내용은 집계 패턴을 참조하십시오.

timeout

-1

외부 서비스에서 폴링할 때 응답을 대기하는 최대 시간(밀리초)입니다. 기본 동작은 pollEnrich() 메서드가 receive() 메서드를 호출한다는 것입니다. receive() 는 메시지를 사용할 수 있을 때까지 차단할 수 있으므로 권장 사항은 항상 시간 초과를 지정하는 것입니다.

aggregateOnException

false

리소스에서 보강하기 위해 데이터를 검색하는 동안 예외가 발생한 경우 기본 동작은 집계 메서드가 사용되지 않는다는 것입니다. 이 옵션을 true 로 설정하면 최종 사용자가 집계 메서드에 예외가 있는 경우 수행할 작업을 제어할 수 있습니다. 예를 들어 예외를 억제하거나 사용자 정의 메시지 본문을 설정할 수 있습니다.

cacheSize

1000

pollEnrich() 작업에서 재사용할 소비자를 캐시하는 ConsumerCache 의 캐시 크기를 구성하려면 이 옵션을 지정합니다. 이 캐시를 끄려면 cacheSize 옵션을 -1 로 설정합니다.

ignoreInvalidEndpoint

false

확인할 수 없는 엔드포인트 URI를 무시할지 여부를 나타냅니다. 기본 동작은 Camel이 잘못된 엔드포인트 URI를 식별하는 예외를 throw한다는 것입니다.