001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.camel;
018
019import javax.jms.Destination;
020import javax.jms.IllegalStateException;
021import javax.jms.JMSException;
022import javax.jms.Message;
023
024import org.apache.activemq.ActiveMQMessageProducerSupport;
025import org.apache.activemq.ActiveMQSession;
026import org.apache.activemq.util.JMSExceptionSupport;
027import org.apache.camel.Endpoint;
028import org.apache.camel.Exchange;
029import org.apache.camel.ExchangePattern;
030import org.apache.camel.Producer;
031import org.apache.camel.component.jms.JmsMessage;
032import org.apache.camel.util.ObjectHelper;
033
034/**
035 * A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
036 * Camel {@link Endpoint}
037 * 
038 * 
039 */
040public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
041    
042    protected Producer producer;
043
044    private final CamelDestination destination;
045    private final Endpoint endpoint;
046    private boolean closed;
047
048    public CamelMessageProducer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
049        super(session);
050        this.destination = destination;
051        this.endpoint = endpoint;
052        try {
053            this.producer = endpoint.createProducer();
054        } catch (JMSException e) {
055            throw e;
056        } catch (Exception e) {
057            throw JMSExceptionSupport.create(e);
058        }
059    }
060
061    public CamelDestination getDestination() throws JMSException {
062        return destination;
063    }
064
065    public Endpoint getEndpoint() {
066        return endpoint;
067    }
068
069    public void close() throws JMSException {
070        if (!closed) {
071            closed = true;
072            try {
073                producer.stop();
074            } catch (JMSException e) {
075                throw e;
076            } catch (Exception e) {
077                throw JMSExceptionSupport.create(e);
078            }
079        }
080    }
081
082    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
083        CamelDestination camelDestination = null;
084        if (ObjectHelper.equal(destination, this.destination)) {
085            camelDestination = this.destination;
086        } else {
087            // TODO support any CamelDestination?
088            throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
089        }
090        try {
091                        Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
092                        exchange.setIn(new JmsMessage(message, camelDestination.getBinding()));
093            producer.process(exchange);
094        } catch (JMSException e) {
095            throw e;
096        } catch (Exception e) {
097            throw JMSExceptionSupport.create(e);
098        }
099    }
100
101    protected void checkClosed() throws IllegalStateException {
102        if (closed) {
103            throw new IllegalStateException("The producer is closed");
104        }
105    }
106}