001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.camel.component;
018
019import java.net.URISyntaxException;
020import java.util.*;
021import java.util.concurrent.CopyOnWriteArrayList;
022
023import org.apache.activemq.EnhancedConnection;
024import org.apache.activemq.Service;
025import org.apache.activemq.advisory.DestinationSource;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.camel.CamelContext;
028import org.apache.camel.ComponentConfiguration;
029import org.apache.camel.component.jms.JmsComponent;
030import org.apache.camel.component.jms.JmsConfiguration;
031import org.apache.camel.spi.EndpointCompleter;
032import org.apache.camel.util.IntrospectionSupport;
033import org.apache.camel.util.ObjectHelper;
034import org.apache.camel.util.URISupport;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037import org.springframework.jms.connection.SingleConnectionFactory;
038
039import javax.jms.Connection;
040
041/**
042 * The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
043 */
044public class ActiveMQComponent extends JmsComponent implements EndpointCompleter {
045    private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList =
046        new CopyOnWriteArrayList<SingleConnectionFactory>();
047    private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList =
048        new CopyOnWriteArrayList<Service>();
049    private static final transient Logger LOG = LoggerFactory.getLogger(ActiveMQComponent.class);
050    private boolean exposeAllQueues;
051    private CamelEndpointLoader endpointLoader;
052
053    private EnhancedConnection connection;
054    DestinationSource source;
055    boolean sourceInitialized = false;
056
057    /**
058     * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
059     *
060     * @return the created component
061     */
062    public static ActiveMQComponent activeMQComponent() {
063        return new ActiveMQComponent();
064    }
065
066    /**
067     * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
068     * connecting to the given <a href="http://activemq.apache.org/configuring-transports.html">broker URL</a>
069     *
070     * @param brokerURL the URL to connect to
071     * @return the created component
072     */
073    public static ActiveMQComponent activeMQComponent(String brokerURL) {
074        ActiveMQComponent answer = new ActiveMQComponent();
075        if (answer.getConfiguration() instanceof ActiveMQConfiguration) {
076            ((ActiveMQConfiguration) answer.getConfiguration())
077                    .setBrokerURL(brokerURL);
078        }
079
080        return answer;
081    }
082
083    public ActiveMQComponent() {
084    }
085
086    public ActiveMQComponent(CamelContext context) {
087        super(context);
088    }
089
090    public ActiveMQComponent(ActiveMQConfiguration configuration) {
091        super();
092        setConfiguration(configuration);
093    }
094
095    public void setBrokerURL(String brokerURL) {
096        if (getConfiguration() instanceof ActiveMQConfiguration) {
097            ((ActiveMQConfiguration)getConfiguration()).setBrokerURL(brokerURL);
098        }
099    }
100
101    public void setUserName(String userName) {
102        if (getConfiguration() instanceof ActiveMQConfiguration) {
103            ((ActiveMQConfiguration)getConfiguration()).setUserName(userName);
104        }
105    }
106
107    public void setPassword(String password) {
108        if (getConfiguration() instanceof ActiveMQConfiguration) {
109            ((ActiveMQConfiguration)getConfiguration()).setPassword(password);
110        }
111    }
112
113    public boolean isExposeAllQueues() {
114        return exposeAllQueues;
115    }
116
117    /**
118     * If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated into the CamelContext
119     * so that they can be easily browsed by any Camel tooling. This option is disabled by default.
120     *
121     * @param exposeAllQueues
122     */
123    public void setExposeAllQueues(boolean exposeAllQueues) {
124        this.exposeAllQueues = exposeAllQueues;
125    }
126
127    public void setUsePooledConnection(boolean usePooledConnection) {
128        if (getConfiguration() instanceof ActiveMQConfiguration) {
129            ((ActiveMQConfiguration)getConfiguration()).setUsePooledConnection(usePooledConnection);
130        }
131    }
132
133    public void setUseSingleConnection(boolean useSingleConnection) {
134        if (getConfiguration() instanceof ActiveMQConfiguration) {
135            ((ActiveMQConfiguration)getConfiguration()).setUseSingleConnection(useSingleConnection);
136        }
137    }
138
139    protected void addPooledConnectionFactoryService(Service pooledConnectionFactoryService) {
140        pooledConnectionFactoryServiceList.add(pooledConnectionFactoryService);
141    }
142
143    protected void addSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) {
144        singleConnectionFactoryList.add(singleConnectionFactory);
145    }
146
147    @Override
148    @SuppressWarnings("unchecked")
149    protected String convertPathToActualDestination(String path, Map<String, Object> parameters) {
150        // support ActiveMQ destination options using the destination. prefix
151        // http://activemq.apache.org/destination-options.html
152        Map options = IntrospectionSupport.extractProperties(parameters, "destination.");
153
154        String query;
155        try {
156            query = URISupport.createQueryString(options);
157        } catch (URISyntaxException e) {
158            throw ObjectHelper.wrapRuntimeCamelException(e);
159        }
160
161        // if we have destination options then append them to the destination name
162        if (ObjectHelper.isNotEmpty(query)) {
163            return path + "?" + query;
164        } else {
165            return path;
166        }
167    }
168
169    @Override
170    protected void doStart() throws Exception {
171        super.doStart();
172
173        if (isExposeAllQueues()) {
174            createDestinationSource();
175            endpointLoader = new CamelEndpointLoader(getCamelContext(), source);
176            endpointLoader.afterPropertiesSet();
177        }
178    }
179
180    protected void createDestinationSource() {
181        try {
182            if (source == null) {
183                if (connection == null) {
184                    Connection value = getConfiguration().getConnectionFactory().createConnection();
185                    if (value instanceof EnhancedConnection) {
186                        connection = (EnhancedConnection) value;
187                    } else {
188                        throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value);
189                    }
190                    connection.start();
191                }
192                source = connection.getDestinationSource();
193            }
194        } catch (Throwable t) {
195            LOG.info("Can't get destination source, endpoint completer will not work", t);
196        }
197    }
198
199    @Override
200    protected void doStop() throws Exception {
201        if (source != null) {
202            source.stop();
203            source = null;
204        }
205        if (connection != null) {
206            connection.close();
207            connection = null;
208        }
209        for (Service s : pooledConnectionFactoryServiceList) {
210            s.stop();
211        }
212        pooledConnectionFactoryServiceList.clear();
213        for (SingleConnectionFactory s : singleConnectionFactoryList) {
214            s.destroy();
215        }
216        singleConnectionFactoryList.clear();
217        super.doStop();
218    }
219
220    @Override
221    public void setConfiguration(JmsConfiguration configuration) {
222        if (configuration instanceof ActiveMQConfiguration) {
223            ((ActiveMQConfiguration) configuration).setActiveMQComponent(this);
224        }
225        super.setConfiguration(configuration);
226    }
227
228    @Override
229    protected JmsConfiguration createConfiguration() {
230        ActiveMQConfiguration answer = new ActiveMQConfiguration();
231        answer.setActiveMQComponent(this);
232        return answer;
233    }
234
235    @Override
236    public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) {
237        // try to initialize destination source only the first time
238        if (!sourceInitialized) {
239            createDestinationSource();
240            sourceInitialized = true;
241        }
242        ArrayList<String> answer = new ArrayList<String>();
243        if (source != null) {
244            Set candidates = source.getQueues();
245            String destinationName = completionText;
246            if (completionText.startsWith("topic:")) {
247                candidates = source.getTopics();
248                destinationName = completionText.substring(6);
249            } else if (completionText.startsWith("queue:")) {
250                destinationName = completionText.substring(6);
251            }
252
253            Iterator it = candidates.iterator();
254
255            while (it.hasNext()) {
256                ActiveMQDestination destination = (ActiveMQDestination) it.next();
257                if (destination.getPhysicalName().startsWith(destinationName)) {
258                    answer.add(destination.getPhysicalName());
259                }
260            }
261        }
262        return answer;
263    }
264}