41.2. 소비자 인터페이스 구현

41.2.1. 소비자를 구현하는 대체 방법

다음 방법 중 하나로 소비자를 구현할 수 있습니다.

41.2.2. 이벤트 중심 소비자 구현

이벤트 중심 소비자에서 처리는 외부 이벤트에 의해 명시적으로 구동됩니다. 이벤트는 event-listener 인터페이스를 통해 수신되며, 여기서 리스너 인터페이스는 특정 이벤트 소스에 따라 다릅니다.

예 41.4. “CryostatConsumer 구현” Apache Camel Cryo stat 구성 요소 구현에서 가져온 CryostatConsumer 클래스의 구현을 보여줍니다. CryostatConsumer 클래스는 org.apache.camel.impl.DefaultConsumer 클래스에서 상속하여 구현된 이벤트 중심 소비자의 예입니다. Cryostat Consumer 예제 의 경우 이벤트는 Cryostat 이벤트를 수신하는 표준 방법인 NotificationListener.handleNotification() 메서드를 호출하여 표시됩니다. 이러한 Cryostat 이벤트를 수신하려면 예 41.4. “CryostatConsumer 구현” 에 표시된 대로 NotificationListener 인터페이스를 구현하고 handleNotification() 메서드를 재정의해야 합니다.

예 41.4. CryostatConsumer 구현

package org.apache.camel.component.jmx;

import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 1

    JMXEndpoint jmxEndpoint;

    public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.jmxEndpoint = endpoint;
    }

    public void handleNotification(Notification notification, Object handback) { 3
        try {
            getProcessor().process(jmxEndpoint.createExchange(notification)); 4
        } catch (Throwable e) {
            handleException(e); 5
        }
    }
}
1
Default Consumer 패턴은 DefaultConsumer 클래스를 확장하여 이벤트 중심 소비자의 일반적인 패턴을 따릅니다. 또한 이 소비자는 Cryostat 알림에서 이벤트를 수신하도록 설계되었으므로 NotificationListener 인터페이스를 구현해야 합니다.
2
상위 끝점, 엔드포인트 및 체인의 다음 프로세서에 대한 참조를 사용하는 하나 이상의 생성자를 인수로 구현해야 합니다.
3
NotificationListener에 정의된 handleNotification() 메서드는 Cryostat 알림이 도달할 때마다 Cryostat에서 자동으로 호출됩니다. 이 메서드의 본문에는 소비자의 이벤트 처리를 수행하는 코드가 포함되어야 합니다. handleNotification() 호출은 Cryostat 계층에서 시작되므로 소비자의 스레드링 모델은 Cryostat 계층이 아닌 Cryostat 계층에서 암시적으로 제어됩니다.
4
이 코드 줄은 두 단계를 결합합니다. 먼저 Cryostat 알림 오브젝트는 Apache Camel에서 이벤트의 일반 표현인 교환 오브젝트로 변환됩니다. 그런 다음 새로 생성된 교환 오브젝트가 경로의 다음 프로세서로 전달됩니다(동시적으로 계산됨).
5
handleException() 메서드는 DefaultConsumer 기본 클래스에서 구현됩니다. 기본적으로 org.apache.camel.impl.LoggingExceptionHandler 클래스를 사용하여 예외를 처리합니다.
참고

handleNotification() 메서드는 Cryostat 예제에 따라 다릅니다. 자체 이벤트 중심 소비자를 구현할 때는 사용자 정의 소비자에 구현할 유사한 이벤트 리스너 메서드를 식별해야 합니다.

41.2.3. 예약된 폴링 소비자 구현

예약된 폴링 소비자에서 폴링 이벤트는 타이머 클래스인 java.util.concurrent.ScheduledExecutorService 에 의해 자동으로 생성됩니다. 생성된 폴링 이벤트를 수신하려면 ScheduledPollConsumer.poll() 메서드를 구현해야 합니다( 38.1.3절. “소비자 패턴 및 스레드”참조).

예 41.5. “ScheduledPollConsumer Implementation” ScheduledPollConsumer 클래스를 확장하여 구현된 예약된 폴링 패턴을 따르는 소비자를 구현하는 방법을 보여줍니다.

예 41.5. ScheduledPollConsumer Implementation

import java.util.concurrent.ScheduledExecutorService;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;

import org.apache.camel.impl.ScheduledPollConsumer;

public class pass:quotes[CustomConsumer] extends ScheduledPollConsumer { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint, Processor processor) { 2
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    protected void poll() throws Exception { 3
        Exchange exchange = /* Receive exchange object ... */;

        // Example of a synchronous processor.
        getProcessor().process(exchange); 4
    }

    @Override
    protected void doStart() throws Exception { 5
        // Pre-Start:
        // Place code here to execute just before start of processing.
        super.doStart();
        // Post-Start:
        // Place code here to execute just after start of processing.
    }

    @Override
    protected void doStop() throws Exception { 6
        // Pre-Stop:
        // Place code here to execute just before processing stops.
        super.doStop();
        // Post-Stop:
        // Place code here to execute just after processing stops.
    }
}
1
org.apache.camel.impl.ScheduledPollConsumer 클래스를 확장하여 예약된 폴링 소비자 클래스인 CustomConsumer 를 구현합니다.
2
상위 끝점, 엔드포인트 및 체인의 다음 프로세서에 대한 참조를 사용하는 하나 이상의 생성자를 인수로 구현해야 합니다.
3
poll() 메서드를 재정의하여 예약된 폴링 이벤트를 수신합니다. 여기에서 들어오는 이벤트를 검색하고 처리하는 코드를 배치해야 합니다(배체 개체에 의해 표시됨).
4
이 예제에서는 이벤트가 동기적으로 처리됩니다. 이벤트를 비동기적으로 처리하려면 getAsyncProcessor() 를 호출하여 비동기 프로세서에 대한 참조를 사용해야 합니다. 이벤트를 비동기적으로 처리하는 방법에 대한 자세한 내용은 38.1.4절. “비동기 처리” 을 참조하십시오.
5
(선택 사항) 소비자가 시작될 때 일부 코드 행을 실행하려면 표시된 대로 doStart() 메서드를 재정의합니다.
6
(선택 사항) 소비자가 중지됨에 따라 일부 코드 행을 실행하려면 표시된 대로 doStop() 메서드를 재정의합니다.

41.2.4. 폴링 소비자 구현

예 41.6. “PollingConsumerSupport 구현” PollingConsumerSupport 클래스를 확장하여 구현되는 폴링 패턴을 따르는 소비자를 구현하는 방법을 간략하게 설명합니다.

예 41.6. PollingConsumerSupport 구현

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.PollingConsumerSupport;

public class pass:quotes[CustomConsumer] extends PollingConsumerSupport { 1
    private final pass:quotes[CustomEndpoint] endpoint;

    public pass:quotes[CustomConsumer](pass:quotes[CustomEndpoint] endpoint) { 2
        super(endpoint);
        this.endpoint = endpoint;
    }

    public Exchange receiveNoWait() { 3
        Exchange exchange = /* Obtain an exchange object. */;
        // Further processing ...
        return exchange;
    }

    public Exchange receive() { 4
        // Blocking poll ...
    }

    public Exchange receive(long timeout) { 5
        // Poll with timeout ...
    }

    protected void doStart() throws Exception { 6
        // Code to execute whilst starting up.
    }

    protected void doStop() throws Exception {
        // Code to execute whilst shutting down.
    }
}
1
org.apache.camel.impl.PollingConsumerSupport 클래스를 확장하여 폴링 소비자 클래스인 CustomConsumer 를 구현합니다.
2
부모 끝점, 엔드포인트에 대한 참조를 인수로 사용하는 하나 이상의 생성자를 구현해야 합니다. 폴링 소비자는 프로세서 인스턴스에 대한 참조가 필요하지 않습니다.
3
receiveNoWait() 메서드는 이벤트를 검색하기 위한 비차단 알고리즘을 구현해야 합니다(exchange 오브젝트). 이벤트를 사용할 수 없는 경우 null 을 반환해야 합니다.
4
receive() 메서드는 이벤트를 검색하기 위한 차단 알고리즘을 구현해야 합니다. 이 방법은 이벤트를 사용할 수 없는 경우 무기한 차단될 수 있습니다.
5
receive(long timeout) 메서드는 지정된 시간 초과(일반적으로 밀리초 단위로 지정됨)로 차단할 수 있는 알고리즘을 구현합니다.
6
소비자가 시작 또는 종료되는 동안 실행되는 코드를 삽입하려면 doStart() 메서드와 doStop() 메서드를 각각 구현합니다.

41.2.5. 사용자 정의 스레드 구현

표준 소비자 패턴이 소비자 구현에 적합하지 않은 경우 소비자 인터페이스를 직접 구현하고 스레드 코드를 직접 작성할 수 있습니다. 그러나 스레드 코드를 작성할 때는 2.8절. “스레드 모델” 에 설명된 대로 표준 Apache Camel 스레딩 모델을 준수하는 것이 중요합니다.

예를 들어 camel-core 의 SEDA 구성 요소는 Apache Camel 스레딩 모델과 일치하는 자체 소비자 스레딩을 구현합니다. 예 41.7. “사용자 정의 스레드 구현” SedaConsumer 클래스에서 스레드를 구현하는 방법에 대한 개요를 보여줍니다.

예 41.7. 사용자 정의 스레드 구현

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
...
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Consumer for the SEDA component.
 *
 * @version $Revision: 922485 $
 */
public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware { 1
    private static final transient Log LOG = LogFactory.getLog(SedaConsumer.class);

    private SedaEndpoint endpoint;
    private Processor processor;
    private ExecutorService executor;
    ...
    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
        this.endpoint = endpoint;
        this.processor = processor;
    }
    ...

    public void run() { 2
        BlockingQueue<Exchange> queue = endpoint.getQueue();
        // Poll the queue and process exchanges
        ...
    }

    ...
    protected void doStart() throws Exception { 3
        int poolSize = endpoint.getConcurrentConsumers();
        executor = endpoint.getCamelContext().getExecutorServiceStrategy()
            .newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 4
        for (int i = 0; i < poolSize; i++) { 5
            executor.execute(this);
        }
        endpoint.onStarted(this);
    }

    protected void doStop() throws Exception { 6
        endpoint.onStopped(this);
        // must shutdown executor on stop to avoid overhead of having them running
        endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor); 7

        if (multicast != null) {
            ServiceHelper.stopServices(multicast);
        }
    }
    ...
    //----------
    // Implementation of ShutdownAware interface

    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        // deny stopping on shutdown as we want seda consumers to run in case some other queues
        // depend on this consumer to run, so it can complete its exchanges
        return true;
    }

    public int getPendingExchangesSize() {
        // number of pending messages on the queue
        return endpoint.getQueue().size();
    }

}
1
SedaConsumer 클래스는 org.apache.camel.impl.ServiceSupport 클래스를 확장하고 Consumer,Runnable 및 Cryostat Aware 인터페이스를 구현하여 구현됩니다.
2
Runnable.run() 메서드를 구현하여 스레드에서 실행되는 동안 소비자가 수행하는 작업을 정의합니다. 이 경우 소비자는 루프에서 실행되며, 새로운 교환을 위해 큐를 폴링한 다음 대기열의 마지막 부분에서 교환을 처리합니다.
3
doStart() 메서드는 ServiceSupport 에서 상속됩니다. 소비자가 시작할 때 수행하는 작업을 정의하기 위해 이 메서드를 재정의합니다.
4
스레드를 직접 생성하는 대신 CamelContext 에 등록된 ExecutorServiceStrategy 오브젝트를 사용하여 스레드 풀을 생성해야 합니다. 이는 Apache Camel에서 스레드의 중앙 집중식 관리 및 정상 종료와 같은 지원을 구현할 수 있기 때문에 중요합니다. 자세한 내용은 2.8절. “스레드 모델”의 내용을 참조하십시오.
5
ExecutorService.execute() 메서드를 poolSize 번 호출하여 스레드를 시작합니다.
6
doStop() 메서드는 ServiceSupport 에서 상속됩니다. 소비자가 종료할 때 수행하는 작업을 정의하기 위해 이 방법을 재정의합니다.
7
executor 인스턴스에 의해 표시되는 스레드 풀을 종료합니다.