001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.camel;
018
019import javax.jms.IllegalStateException;
020import javax.jms.JMSException;
021import javax.jms.Message;
022import javax.jms.MessageConsumer;
023import javax.jms.MessageListener;
024
025import org.apache.activemq.ActiveMQSession;
026import org.apache.activemq.util.JMSExceptionSupport;
027import org.apache.camel.Consumer;
028import org.apache.camel.Endpoint;
029import org.apache.camel.Exchange;
030import org.apache.camel.PollingConsumer;
031import org.apache.camel.Processor;
032
033/**
034 * A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from
035 * a Camel {@link Endpoint}
036 * 
037 * 
038 */
039public class CamelMessageConsumer implements MessageConsumer {
040    private final CamelDestination destination;
041    private final Endpoint endpoint;
042    private final ActiveMQSession session;
043    private final String messageSelector;
044    private final boolean noLocal;
045    private MessageListener messageListener;
046    private Consumer consumer;
047    private PollingConsumer pollingConsumer;
048    private boolean closed;
049
050    public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
051        this.destination = destination;
052        this.endpoint = endpoint;
053        this.session = session;
054        this.messageSelector = messageSelector;
055        this.noLocal = noLocal;
056    }
057
058    public void close() throws JMSException {
059        if (!closed) {
060            closed = true;
061            try {
062                if (consumer != null) {
063                    consumer.stop();
064                }
065                if (pollingConsumer != null) {
066                    pollingConsumer.stop();
067                }
068            } catch (JMSException e) {
069                throw e;
070            } catch (Exception e) {
071                throw JMSExceptionSupport.create(e);
072            }
073        }
074    }
075
076    public MessageListener getMessageListener() throws JMSException {
077        return messageListener;
078    }
079
080    public void setMessageListener(MessageListener messageListener) throws JMSException {
081        this.messageListener = messageListener;
082        if (messageListener != null && consumer == null) {
083            consumer = createConsumer();
084        }
085    }
086
087    public Message receive() throws JMSException {
088        Exchange exchange = getPollingConsumer().receive();
089        return createMessage(exchange);
090    }
091
092    public Message receive(long timeoutMillis) throws JMSException {
093        Exchange exchange = getPollingConsumer().receive(timeoutMillis);
094        return createMessage(exchange);
095    }
096
097    public Message receiveNoWait() throws JMSException {
098        Exchange exchange = getPollingConsumer().receiveNoWait();
099        return createMessage(exchange);
100    }
101
102    // Properties
103    // -----------------------------------------------------------------------
104
105    public CamelDestination getDestination() {
106        return destination;
107    }
108
109    public Endpoint getEndpoint() {
110        return endpoint;
111    }
112
113    public String getMessageSelector() {
114        return messageSelector;
115    }
116
117    public boolean isNoLocal() {
118        return noLocal;
119    }
120
121    public ActiveMQSession getSession() {
122        return session;
123    }
124
125    // Implementation methods
126    // -----------------------------------------------------------------------
127
128    protected PollingConsumer getPollingConsumer() throws JMSException {
129        try {
130            if (pollingConsumer == null) {
131                pollingConsumer = endpoint.createPollingConsumer();
132                pollingConsumer.start();
133            }
134            return pollingConsumer;
135        } catch (JMSException e) {
136            throw e;
137        } catch (Exception e) {
138            throw JMSExceptionSupport.create(e);
139        }
140    }
141
142    protected Message createMessage(Exchange exchange) throws JMSException {
143        if (exchange != null) {
144            Message message = destination.getBinding().makeJmsMessage(exchange, session);
145            return message;
146        } else {
147            return null;
148        }
149    }
150
151    protected Consumer createConsumer() throws JMSException {
152        try {
153            Consumer answer = endpoint.createConsumer(new Processor() {
154                public void process(Exchange exchange) throws Exception {
155                    Message message = createMessage(exchange);
156                    getMessageListener().onMessage(message);
157                }
158            });
159            answer.start();
160            return answer;
161        } catch (JMSException e) {
162            throw e;
163        } catch (Exception e) {
164            throw JMSExceptionSupport.create(e);
165        }
166    }
167
168    protected void checkClosed() throws javax.jms.IllegalStateException {
169        if (closed) {
170            throw new IllegalStateException("The producer is closed");
171        }
172    }
173}