001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.net.URI;
020import java.util.HashMap;
021
022import javax.jms.JMSException;
023import javax.resource.NotSupportedException;
024import javax.resource.ResourceException;
025import javax.resource.spi.ActivationSpec;
026import javax.resource.spi.BootstrapContext;
027import javax.resource.spi.ResourceAdapterInternalException;
028import javax.resource.spi.endpoint.MessageEndpointFactory;
029import javax.transaction.xa.XAException;
030import javax.transaction.xa.XAResource;
031
032import javax.transaction.xa.Xid;
033import org.apache.activemq.ActiveMQConnection;
034import org.apache.activemq.ActiveMQConnectionFactory;
035import org.apache.activemq.RedeliveryPolicy;
036import org.apache.activemq.TransactionContext;
037import org.apache.activemq.broker.BrokerFactory;
038import org.apache.activemq.broker.BrokerService;
039import org.apache.activemq.util.ServiceSupport;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Knows how to connect to one ActiveMQ server. It can then activate endpoints
045 * and deliver messages to those end points using the connection configure in
046 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
047 * 
048 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
049 *                         description="The JCA Resource Adaptor for ActiveMQ"
050 * 
051 */
052public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter {
053    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class);
054    private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>();
055
056    private BootstrapContext bootstrapContext;
057    private String brokerXmlConfig;
058    private BrokerService broker;
059    private Thread brokerStartThread;
060    private ActiveMQConnectionFactory connectionFactory;
061    
062    /**
063     * 
064     */
065    public ActiveMQResourceAdapter() {
066        super();
067    }
068
069    /**
070     * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
071     */
072    public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
073        this.bootstrapContext = bootstrapContext;
074        if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) {
075            brokerStartThread = new Thread("Starting ActiveMQ Broker") {
076                @Override
077                public void run () {
078                    try {
079                        // ensure RAR resources are available to xbean (needed for weblogic)
080                        log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader());
081                        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
082                        log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader());
083                        
084                        synchronized( ActiveMQResourceAdapter.this ) {
085                            broker = BrokerFactory.createBroker(new URI(brokerXmlConfig));
086                        }
087                        broker.start();
088                        // Default the ServerUrl to the local broker if not specified in the ra.xml
089                        if (getServerUrl() == null) {
090                            setServerUrl("vm://" + broker.getBrokerName() + "?create=false");
091                        }
092                    } catch (Throwable e) {
093                        log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage());
094                        log.debug("Reason for: "+e.getMessage(), e);
095                    }
096                }
097            };
098            brokerStartThread.setDaemon(true);
099            brokerStartThread.start();
100            
101            // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it..
102            try {
103                brokerStartThread.join(1000*5);
104            } catch (InterruptedException e) {
105                Thread.currentThread().interrupt();
106            }                
107        }
108    }
109
110    public ActiveMQConnection makeConnection() throws JMSException {
111        if( connectionFactory == null ) {
112            return makeConnection(getInfo());
113        } else {
114            return makeConnection(getInfo(), connectionFactory);
115        }
116    }
117
118    /**
119     * @param activationSpec
120     */
121    public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException {
122        ActiveMQConnectionFactory cf = getConnectionFactory();
123        if (cf == null) {
124            cf = createConnectionFactory(getInfo(), activationSpec);
125        }
126        String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName());
127        String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword());
128        String clientId = activationSpec.getClientId();
129        if (clientId != null) {
130            cf.setClientID(clientId);
131        } else {
132            if (activationSpec.isDurableSubscription()) {
133                log.warn("No clientID specified for durable subscription: " + activationSpec);
134            }
135        }
136        ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password);
137
138        // have we configured a redelivery policy
139        RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
140        if (redeliveryPolicy != null) {
141            physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
142        }
143        return physicalConnection;
144    }
145
146    /**
147     * @see javax.resource.spi.ResourceAdapter#stop()
148     */
149    public void stop() {
150        while (endpointWorkers.size() > 0) {
151            ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next();
152            endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
153        }
154        
155        synchronized( this ) {
156            if (broker != null) {
157                if( brokerStartThread.isAlive() ) {
158                    brokerStartThread.interrupt();
159                }
160                ServiceSupport.dispose(broker);
161                broker = null;
162            }
163        }
164        
165        this.bootstrapContext = null;
166    }
167
168    /**
169     * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
170     */
171    public BootstrapContext getBootstrapContext() {
172        return bootstrapContext;
173    }
174
175    /**
176     * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
177     *      javax.resource.spi.ActivationSpec)
178     */
179    public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
180
181        // spec section 5.3.3
182        if (!equals(activationSpec.getResourceAdapter())) {
183            throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
184        }
185
186        if (!(activationSpec instanceof MessageActivationSpec)) {
187            throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass());
188        }
189
190        ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
191        // This is weird.. the same endpoint activated twice.. must be a
192        // container error.
193        if (endpointWorkers.containsKey(key)) {
194            throw new IllegalStateException("Endpoint previously activated");
195        }
196
197        ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
198
199        endpointWorkers.put(key, worker);
200        worker.start();
201    }
202
203    /**
204     * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
205     *      javax.resource.spi.ActivationSpec)
206     */
207    public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
208
209        if (activationSpec instanceof MessageActivationSpec) {
210            ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec);
211            ActiveMQEndpointWorker worker = endpointWorkers.remove(key);
212            if (worker == null) {
213                // This is weird.. that endpoint was not activated.. oh well..
214                // this method
215                // does not throw exceptions so just return.
216                return;
217            }
218            try {
219                worker.stop();
220            } catch (InterruptedException e) {
221                // We interrupted.. we won't throw an exception but will stop
222                // waiting for the worker
223                // to stop.. we tried our best. Keep trying to interrupt the
224                // thread.
225                Thread.currentThread().interrupt();
226            }
227
228        }
229
230    }
231
232    /**
233     * We only connect to one resource manager per ResourceAdapter instance, so
234     * any ActivationSpec will return the same XAResource.
235     * 
236     * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
237     */
238    public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
239        try {
240            return new XAResource[]{
241                    new TransactionContext() {
242
243                        @Override
244                        public boolean isSameRM(XAResource xaresource) throws XAException {
245                            ActiveMQConnection original = null;
246                            try {
247                                original = setConnection(newConnection());
248                                boolean result = super.isSameRM(xaresource);
249                                LOG.trace("{}.recover({})={}", getConnection(), xaresource, result);
250                                return result;
251
252                            } catch (JMSException e) {
253                                LOG.trace("isSameRM({}) failed", xaresource, e);
254                                XAException xaException = new XAException(e.getMessage());
255                                throw xaException;
256                            } finally {
257                                closeConnection(original);
258                            }
259                        }
260
261                        @Override
262                        protected String getResourceManagerId() throws JMSException {
263                            ActiveMQConnection original = null;
264                            try {
265                                original = setConnection(newConnection());
266                                return super.getResourceManagerId();
267                            } finally {
268                                closeConnection(original);
269                            }
270                        }
271
272                        @Override
273                        public void commit(Xid xid, boolean onePhase) throws XAException {
274                            ActiveMQConnection original = null;
275                            try {
276                                setConnection(newConnection());
277                                super.commit(xid, onePhase);
278                                LOG.trace("{}.commit({},{})", getConnection(), xid);
279
280                            } catch (JMSException e) {
281                                LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e);
282                                throwXAException(e);
283                            } finally {
284                                closeConnection(original);
285                            }
286                        }
287
288                        @Override
289                        public void rollback(Xid xid) throws XAException {
290                            ActiveMQConnection original = null;
291                            try {
292                                original = setConnection(newConnection());
293                                super.rollback(xid);
294                                LOG.trace("{}.rollback({})", getConnection(), xid);
295
296                            } catch (JMSException e) {
297                                LOG.trace("{}.rollback({}) failed", getConnection(), xid, e);
298                                throwXAException(e);
299                            } finally {
300                               closeConnection(original);
301                            }
302                        }
303
304                        @Override
305                        public Xid[] recover(int flags) throws XAException {
306                            Xid[] result = new Xid[]{};
307                            ActiveMQConnection original = null;
308                            try {
309                                original = setConnection(newConnection());
310                                result = super.recover(flags);
311                                LOG.trace("{}.recover({})={}", getConnection(), flags, result);
312
313                            } catch (JMSException e) {
314                                LOG.trace("{}.recover({}) failed", getConnection(), flags, e);
315                                throwXAException(e);
316                            } finally {
317                                closeConnection(original);
318                            }
319                            return result;
320                        }
321
322                        @Override
323                        public void forget(Xid xid) throws XAException {
324                            ActiveMQConnection original = null;
325                            try {
326                                original = setConnection(newConnection());
327                                super.forget(xid);
328                                LOG.trace("{}.forget({})", getConnection(), xid);
329
330                            } catch (JMSException e) {
331                                LOG.trace("{}.forget({}) failed", getConnection(), xid, e);
332                                throwXAException(e);
333                            } finally {
334                                closeConnection(original);
335                            }
336                        }
337
338                        private void throwXAException(JMSException e) throws XAException {
339                            XAException xaException = new XAException(e.getMessage());
340                            xaException.errorCode = XAException.XAER_RMFAIL;
341                            throw xaException;
342                        }
343
344                        private ActiveMQConnection newConnection() throws JMSException {
345                            ActiveMQConnection connection = makeConnection();
346                            connection.start();
347                            return connection;
348                        }
349
350                        private void closeConnection(ActiveMQConnection original) {
351                            ActiveMQConnection connection = getConnection();
352                            if (connection != null) {
353                                try {
354                                    connection.close();
355                                } catch (JMSException ignored) {}
356                            }
357                            setConnection(original);
358                        }
359                    }};
360
361        } catch (Exception e) {
362            throw new ResourceException(e);
363        }
364    }
365
366    // ///////////////////////////////////////////////////////////////////////
367    //
368    // Java Bean getters and setters for this ResourceAdapter class.
369    //
370    // ///////////////////////////////////////////////////////////////////////
371
372    /**
373     * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
374     */
375    public String getBrokerXmlConfig() {
376        return brokerXmlConfig;
377    }
378
379    /**
380     * Sets the <a href="http://activemq.org/Xml+Configuration">XML
381     * configuration file </a> used to configure the ActiveMQ broker via Spring
382     * if using embedded mode.
383     * 
384     * @param brokerXmlConfig is the filename which is assumed to be on the
385     *                classpath unless a URL is specified. So a value of
386     *                <code>foo/bar.xml</code> would be assumed to be on the
387     *                classpath whereas <code>file:dir/file.xml</code> would
388     *                use the file system. Any valid URL string is supported.
389     */
390    public void setBrokerXmlConfig(String brokerXmlConfig) {
391        this.brokerXmlConfig = brokerXmlConfig;
392    }
393
394    /**
395     * @see java.lang.Object#equals(java.lang.Object)
396     */
397    @Override
398    public boolean equals(Object o) {
399        if (this == o) {
400            return true;
401        }
402        if (!(o instanceof MessageResourceAdapter)) {
403            return false;
404        }
405
406        final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o;
407
408        if (!getInfo().equals(activeMQResourceAdapter.getInfo())) {
409            return false;
410        }
411        if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) {
412            return false;
413        }
414
415        return true;
416    }
417
418    /**
419     * @see java.lang.Object#hashCode()
420     */
421    @Override
422    public int hashCode() {
423        int result;
424        result = getInfo().hashCode();
425        if (brokerXmlConfig != null) {
426            result ^= brokerXmlConfig.hashCode();
427        }
428        return result;
429    }
430
431    public ActiveMQConnectionFactory getConnectionFactory() {
432        return connectionFactory;
433    }
434
435    public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) {
436        this.connectionFactory = aConnectionFactory;
437    }
438
439
440    }