001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.camel.component; 018 019import java.net.URISyntaxException; 020import java.util.*; 021import java.util.concurrent.CopyOnWriteArrayList; 022 023import org.apache.activemq.EnhancedConnection; 024import org.apache.activemq.Service; 025import org.apache.activemq.advisory.DestinationSource; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.camel.CamelContext; 028import org.apache.camel.ComponentConfiguration; 029import org.apache.camel.component.jms.JmsComponent; 030import org.apache.camel.component.jms.JmsConfiguration; 031import org.apache.camel.spi.EndpointCompleter; 032import org.apache.camel.util.IntrospectionSupport; 033import org.apache.camel.util.ObjectHelper; 034import org.apache.camel.util.URISupport; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.springframework.jms.connection.SingleConnectionFactory; 038 039import javax.jms.Connection; 040 041/** 042 * The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a> 043 */ 044public class ActiveMQComponent extends JmsComponent implements EndpointCompleter { 045 private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList = 046 new CopyOnWriteArrayList<SingleConnectionFactory>(); 047 private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList = 048 new CopyOnWriteArrayList<Service>(); 049 private static final transient Logger LOG = LoggerFactory.getLogger(ActiveMQComponent.class); 050 private boolean exposeAllQueues; 051 private CamelEndpointLoader endpointLoader; 052 053 private EnhancedConnection connection; 054 DestinationSource source; 055 boolean sourceInitialized = false; 056 057 /** 058 * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a> 059 * 060 * @return the created component 061 */ 062 public static ActiveMQComponent activeMQComponent() { 063 return new ActiveMQComponent(); 064 } 065 066 /** 067 * Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a> 068 * connecting to the given <a href="http://activemq.apache.org/configuring-transports.html">broker URL</a> 069 * 070 * @param brokerURL the URL to connect to 071 * @return the created component 072 */ 073 public static ActiveMQComponent activeMQComponent(String brokerURL) { 074 ActiveMQComponent answer = new ActiveMQComponent(); 075 if (answer.getConfiguration() instanceof ActiveMQConfiguration) { 076 ((ActiveMQConfiguration) answer.getConfiguration()) 077 .setBrokerURL(brokerURL); 078 } 079 080 return answer; 081 } 082 083 public ActiveMQComponent() { 084 } 085 086 public ActiveMQComponent(CamelContext context) { 087 super(context); 088 } 089 090 public ActiveMQComponent(ActiveMQConfiguration configuration) { 091 super(); 092 setConfiguration(configuration); 093 } 094 095 public void setBrokerURL(String brokerURL) { 096 if (getConfiguration() instanceof ActiveMQConfiguration) { 097 ((ActiveMQConfiguration)getConfiguration()).setBrokerURL(brokerURL); 098 } 099 } 100 101 public void setUserName(String userName) { 102 if (getConfiguration() instanceof ActiveMQConfiguration) { 103 ((ActiveMQConfiguration)getConfiguration()).setUserName(userName); 104 } 105 } 106 107 public void setPassword(String password) { 108 if (getConfiguration() instanceof ActiveMQConfiguration) { 109 ((ActiveMQConfiguration)getConfiguration()).setPassword(password); 110 } 111 } 112 113 public boolean isExposeAllQueues() { 114 return exposeAllQueues; 115 } 116 117 /** 118 * If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated into the CamelContext 119 * so that they can be easily browsed by any Camel tooling. This option is disabled by default. 120 * 121 * @param exposeAllQueues 122 */ 123 public void setExposeAllQueues(boolean exposeAllQueues) { 124 this.exposeAllQueues = exposeAllQueues; 125 } 126 127 public void setUsePooledConnection(boolean usePooledConnection) { 128 if (getConfiguration() instanceof ActiveMQConfiguration) { 129 ((ActiveMQConfiguration)getConfiguration()).setUsePooledConnection(usePooledConnection); 130 } 131 } 132 133 public void setUseSingleConnection(boolean useSingleConnection) { 134 if (getConfiguration() instanceof ActiveMQConfiguration) { 135 ((ActiveMQConfiguration)getConfiguration()).setUseSingleConnection(useSingleConnection); 136 } 137 } 138 139 protected void addPooledConnectionFactoryService(Service pooledConnectionFactoryService) { 140 pooledConnectionFactoryServiceList.add(pooledConnectionFactoryService); 141 } 142 143 protected void addSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) { 144 singleConnectionFactoryList.add(singleConnectionFactory); 145 } 146 147 @Override 148 @SuppressWarnings("unchecked") 149 protected String convertPathToActualDestination(String path, Map<String, Object> parameters) { 150 // support ActiveMQ destination options using the destination. prefix 151 // http://activemq.apache.org/destination-options.html 152 Map options = IntrospectionSupport.extractProperties(parameters, "destination."); 153 154 String query; 155 try { 156 query = URISupport.createQueryString(options); 157 } catch (URISyntaxException e) { 158 throw ObjectHelper.wrapRuntimeCamelException(e); 159 } 160 161 // if we have destination options then append them to the destination name 162 if (ObjectHelper.isNotEmpty(query)) { 163 return path + "?" + query; 164 } else { 165 return path; 166 } 167 } 168 169 @Override 170 protected void doStart() throws Exception { 171 super.doStart(); 172 173 if (isExposeAllQueues()) { 174 createDestinationSource(); 175 endpointLoader = new CamelEndpointLoader(getCamelContext(), source); 176 endpointLoader.afterPropertiesSet(); 177 } 178 } 179 180 protected void createDestinationSource() { 181 try { 182 if (source == null) { 183 if (connection == null) { 184 Connection value = getConfiguration().getConnectionFactory().createConnection(); 185 if (value instanceof EnhancedConnection) { 186 connection = (EnhancedConnection) value; 187 } else { 188 throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value); 189 } 190 connection.start(); 191 } 192 source = connection.getDestinationSource(); 193 } 194 } catch (Throwable t) { 195 LOG.info("Can't get destination source, endpoint completer will not work", t); 196 } 197 } 198 199 @Override 200 protected void doStop() throws Exception { 201 if (source != null) { 202 source.stop(); 203 source = null; 204 } 205 if (connection != null) { 206 connection.close(); 207 connection = null; 208 } 209 for (Service s : pooledConnectionFactoryServiceList) { 210 s.stop(); 211 } 212 pooledConnectionFactoryServiceList.clear(); 213 for (SingleConnectionFactory s : singleConnectionFactoryList) { 214 s.destroy(); 215 } 216 singleConnectionFactoryList.clear(); 217 super.doStop(); 218 } 219 220 @Override 221 public void setConfiguration(JmsConfiguration configuration) { 222 if (configuration instanceof ActiveMQConfiguration) { 223 ((ActiveMQConfiguration) configuration).setActiveMQComponent(this); 224 } 225 super.setConfiguration(configuration); 226 } 227 228 @Override 229 protected JmsConfiguration createConfiguration() { 230 ActiveMQConfiguration answer = new ActiveMQConfiguration(); 231 answer.setActiveMQComponent(this); 232 return answer; 233 } 234 235 @Override 236 public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) { 237 // try to initialize destination source only the first time 238 if (!sourceInitialized) { 239 createDestinationSource(); 240 sourceInitialized = true; 241 } 242 ArrayList<String> answer = new ArrayList<String>(); 243 if (source != null) { 244 Set candidates = source.getQueues(); 245 String destinationName = completionText; 246 if (completionText.startsWith("topic:")) { 247 candidates = source.getTopics(); 248 destinationName = completionText.substring(6); 249 } else if (completionText.startsWith("queue:")) { 250 destinationName = completionText.substring(6); 251 } 252 253 Iterator it = candidates.iterator(); 254 255 while (it.hasNext()) { 256 ActiveMQDestination destination = (ActiveMQDestination) it.next(); 257 if (destination.getPhysicalName().startsWith(destinationName)) { 258 answer.add(destination.getPhysicalName()); 259 } 260 } 261 } 262 return answer; 263 } 264}