001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.net.URI; 020import java.util.HashMap; 021 022import javax.jms.JMSException; 023import javax.resource.NotSupportedException; 024import javax.resource.ResourceException; 025import javax.resource.spi.ActivationSpec; 026import javax.resource.spi.BootstrapContext; 027import javax.resource.spi.ResourceAdapterInternalException; 028import javax.resource.spi.endpoint.MessageEndpointFactory; 029import javax.transaction.xa.XAException; 030import javax.transaction.xa.XAResource; 031 032import javax.transaction.xa.Xid; 033import org.apache.activemq.ActiveMQConnection; 034import org.apache.activemq.ActiveMQConnectionFactory; 035import org.apache.activemq.RedeliveryPolicy; 036import org.apache.activemq.TransactionContext; 037import org.apache.activemq.broker.BrokerFactory; 038import org.apache.activemq.broker.BrokerService; 039import org.apache.activemq.util.ServiceSupport; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Knows how to connect to one ActiveMQ server. It can then activate endpoints 045 * and deliver messages to those end points using the connection configure in 046 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4) 047 * 048 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" 049 * description="The JCA Resource Adaptor for ActiveMQ" 050 * 051 */ 052public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter { 053 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class); 054 private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>(); 055 056 private BootstrapContext bootstrapContext; 057 private String brokerXmlConfig; 058 private BrokerService broker; 059 private Thread brokerStartThread; 060 private ActiveMQConnectionFactory connectionFactory; 061 062 /** 063 * 064 */ 065 public ActiveMQResourceAdapter() { 066 super(); 067 } 068 069 /** 070 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) 071 */ 072 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { 073 this.bootstrapContext = bootstrapContext; 074 if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { 075 brokerStartThread = new Thread("Starting ActiveMQ Broker") { 076 @Override 077 public void run () { 078 try { 079 // ensure RAR resources are available to xbean (needed for weblogic) 080 log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 081 Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); 082 log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 083 084 synchronized( ActiveMQResourceAdapter.this ) { 085 broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); 086 } 087 broker.start(); 088 // Default the ServerUrl to the local broker if not specified in the ra.xml 089 if (getServerUrl() == null) { 090 setServerUrl("vm://" + broker.getBrokerName() + "?create=false"); 091 } 092 } catch (Throwable e) { 093 log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); 094 log.debug("Reason for: "+e.getMessage(), e); 095 } 096 } 097 }; 098 brokerStartThread.setDaemon(true); 099 brokerStartThread.start(); 100 101 // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it.. 102 try { 103 brokerStartThread.join(1000*5); 104 } catch (InterruptedException e) { 105 Thread.currentThread().interrupt(); 106 } 107 } 108 } 109 110 public ActiveMQConnection makeConnection() throws JMSException { 111 if( connectionFactory == null ) { 112 return makeConnection(getInfo()); 113 } else { 114 return makeConnection(getInfo(), connectionFactory); 115 } 116 } 117 118 /** 119 * @param activationSpec 120 */ 121 public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { 122 ActiveMQConnectionFactory cf = getConnectionFactory(); 123 if (cf == null) { 124 cf = createConnectionFactory(getInfo(), activationSpec); 125 } 126 String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); 127 String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); 128 String clientId = activationSpec.getClientId(); 129 if (clientId != null) { 130 cf.setClientID(clientId); 131 } else { 132 if (activationSpec.isDurableSubscription()) { 133 log.warn("No clientID specified for durable subscription: " + activationSpec); 134 } 135 } 136 ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password); 137 138 // have we configured a redelivery policy 139 RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); 140 if (redeliveryPolicy != null) { 141 physicalConnection.setRedeliveryPolicy(redeliveryPolicy); 142 } 143 return physicalConnection; 144 } 145 146 /** 147 * @see javax.resource.spi.ResourceAdapter#stop() 148 */ 149 public void stop() { 150 while (endpointWorkers.size() > 0) { 151 ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); 152 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); 153 } 154 155 synchronized( this ) { 156 if (broker != null) { 157 if( brokerStartThread.isAlive() ) { 158 brokerStartThread.interrupt(); 159 } 160 ServiceSupport.dispose(broker); 161 broker = null; 162 } 163 } 164 165 this.bootstrapContext = null; 166 } 167 168 /** 169 * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext() 170 */ 171 public BootstrapContext getBootstrapContext() { 172 return bootstrapContext; 173 } 174 175 /** 176 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, 177 * javax.resource.spi.ActivationSpec) 178 */ 179 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException { 180 181 // spec section 5.3.3 182 if (!equals(activationSpec.getResourceAdapter())) { 183 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); 184 } 185 186 if (!(activationSpec instanceof MessageActivationSpec)) { 187 throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass()); 188 } 189 190 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 191 // This is weird.. the same endpoint activated twice.. must be a 192 // container error. 193 if (endpointWorkers.containsKey(key)) { 194 throw new IllegalStateException("Endpoint previously activated"); 195 } 196 197 ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); 198 199 endpointWorkers.put(key, worker); 200 worker.start(); 201 } 202 203 /** 204 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, 205 * javax.resource.spi.ActivationSpec) 206 */ 207 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { 208 209 if (activationSpec instanceof MessageActivationSpec) { 210 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 211 ActiveMQEndpointWorker worker = endpointWorkers.remove(key); 212 if (worker == null) { 213 // This is weird.. that endpoint was not activated.. oh well.. 214 // this method 215 // does not throw exceptions so just return. 216 return; 217 } 218 try { 219 worker.stop(); 220 } catch (InterruptedException e) { 221 // We interrupted.. we won't throw an exception but will stop 222 // waiting for the worker 223 // to stop.. we tried our best. Keep trying to interrupt the 224 // thread. 225 Thread.currentThread().interrupt(); 226 } 227 228 } 229 230 } 231 232 /** 233 * We only connect to one resource manager per ResourceAdapter instance, so 234 * any ActivationSpec will return the same XAResource. 235 * 236 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) 237 */ 238 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { 239 try { 240 return new XAResource[]{ 241 new TransactionContext() { 242 243 @Override 244 public boolean isSameRM(XAResource xaresource) throws XAException { 245 ActiveMQConnection original = null; 246 try { 247 original = setConnection(newConnection()); 248 boolean result = super.isSameRM(xaresource); 249 LOG.trace("{}.recover({})={}", getConnection(), xaresource, result); 250 return result; 251 252 } catch (JMSException e) { 253 LOG.trace("isSameRM({}) failed", xaresource, e); 254 XAException xaException = new XAException(e.getMessage()); 255 throw xaException; 256 } finally { 257 closeConnection(original); 258 } 259 } 260 261 @Override 262 protected String getResourceManagerId() throws JMSException { 263 ActiveMQConnection original = null; 264 try { 265 original = setConnection(newConnection()); 266 return super.getResourceManagerId(); 267 } finally { 268 closeConnection(original); 269 } 270 } 271 272 @Override 273 public void commit(Xid xid, boolean onePhase) throws XAException { 274 ActiveMQConnection original = null; 275 try { 276 setConnection(newConnection()); 277 super.commit(xid, onePhase); 278 LOG.trace("{}.commit({},{})", getConnection(), xid); 279 280 } catch (JMSException e) { 281 LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e); 282 throwXAException(e); 283 } finally { 284 closeConnection(original); 285 } 286 } 287 288 @Override 289 public void rollback(Xid xid) throws XAException { 290 ActiveMQConnection original = null; 291 try { 292 original = setConnection(newConnection()); 293 super.rollback(xid); 294 LOG.trace("{}.rollback({})", getConnection(), xid); 295 296 } catch (JMSException e) { 297 LOG.trace("{}.rollback({}) failed", getConnection(), xid, e); 298 throwXAException(e); 299 } finally { 300 closeConnection(original); 301 } 302 } 303 304 @Override 305 public Xid[] recover(int flags) throws XAException { 306 Xid[] result = new Xid[]{}; 307 ActiveMQConnection original = null; 308 try { 309 original = setConnection(newConnection()); 310 result = super.recover(flags); 311 LOG.trace("{}.recover({})={}", getConnection(), flags, result); 312 313 } catch (JMSException e) { 314 LOG.trace("{}.recover({}) failed", getConnection(), flags, e); 315 throwXAException(e); 316 } finally { 317 closeConnection(original); 318 } 319 return result; 320 } 321 322 @Override 323 public void forget(Xid xid) throws XAException { 324 ActiveMQConnection original = null; 325 try { 326 original = setConnection(newConnection()); 327 super.forget(xid); 328 LOG.trace("{}.forget({})", getConnection(), xid); 329 330 } catch (JMSException e) { 331 LOG.trace("{}.forget({}) failed", getConnection(), xid, e); 332 throwXAException(e); 333 } finally { 334 closeConnection(original); 335 } 336 } 337 338 private void throwXAException(JMSException e) throws XAException { 339 XAException xaException = new XAException(e.getMessage()); 340 xaException.errorCode = XAException.XAER_RMFAIL; 341 throw xaException; 342 } 343 344 private ActiveMQConnection newConnection() throws JMSException { 345 ActiveMQConnection connection = makeConnection(); 346 connection.start(); 347 return connection; 348 } 349 350 private void closeConnection(ActiveMQConnection original) { 351 ActiveMQConnection connection = getConnection(); 352 if (connection != null) { 353 try { 354 connection.close(); 355 } catch (JMSException ignored) {} 356 } 357 setConnection(original); 358 } 359 }}; 360 361 } catch (Exception e) { 362 throw new ResourceException(e); 363 } 364 } 365 366 // /////////////////////////////////////////////////////////////////////// 367 // 368 // Java Bean getters and setters for this ResourceAdapter class. 369 // 370 // /////////////////////////////////////////////////////////////////////// 371 372 /** 373 * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig() 374 */ 375 public String getBrokerXmlConfig() { 376 return brokerXmlConfig; 377 } 378 379 /** 380 * Sets the <a href="http://activemq.org/Xml+Configuration">XML 381 * configuration file </a> used to configure the ActiveMQ broker via Spring 382 * if using embedded mode. 383 * 384 * @param brokerXmlConfig is the filename which is assumed to be on the 385 * classpath unless a URL is specified. So a value of 386 * <code>foo/bar.xml</code> would be assumed to be on the 387 * classpath whereas <code>file:dir/file.xml</code> would 388 * use the file system. Any valid URL string is supported. 389 */ 390 public void setBrokerXmlConfig(String brokerXmlConfig) { 391 this.brokerXmlConfig = brokerXmlConfig; 392 } 393 394 /** 395 * @see java.lang.Object#equals(java.lang.Object) 396 */ 397 @Override 398 public boolean equals(Object o) { 399 if (this == o) { 400 return true; 401 } 402 if (!(o instanceof MessageResourceAdapter)) { 403 return false; 404 } 405 406 final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o; 407 408 if (!getInfo().equals(activeMQResourceAdapter.getInfo())) { 409 return false; 410 } 411 if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) { 412 return false; 413 } 414 415 return true; 416 } 417 418 /** 419 * @see java.lang.Object#hashCode() 420 */ 421 @Override 422 public int hashCode() { 423 int result; 424 result = getInfo().hashCode(); 425 if (brokerXmlConfig != null) { 426 result ^= brokerXmlConfig.hashCode(); 427 } 428 return result; 429 } 430 431 public ActiveMQConnectionFactory getConnectionFactory() { 432 return connectionFactory; 433 } 434 435 public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) { 436 this.connectionFactory = aConnectionFactory; 437 } 438 439 440 }