001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.RejectedExecutionHandler;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036
037import javax.jms.Connection;
038import javax.jms.ConnectionConsumer;
039import javax.jms.ConnectionMetaData;
040import javax.jms.Destination;
041import javax.jms.ExceptionListener;
042import javax.jms.IllegalStateException;
043import javax.jms.InvalidDestinationException;
044import javax.jms.JMSException;
045import javax.jms.Queue;
046import javax.jms.QueueConnection;
047import javax.jms.QueueSession;
048import javax.jms.ServerSessionPool;
049import javax.jms.Session;
050import javax.jms.Topic;
051import javax.jms.TopicConnection;
052import javax.jms.TopicSession;
053import javax.jms.XAConnection;
054
055import org.apache.activemq.advisory.DestinationSource;
056import org.apache.activemq.blob.BlobTransferPolicy;
057import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
058import org.apache.activemq.command.ActiveMQDestination;
059import org.apache.activemq.command.ActiveMQMessage;
060import org.apache.activemq.command.ActiveMQTempDestination;
061import org.apache.activemq.command.ActiveMQTempQueue;
062import org.apache.activemq.command.ActiveMQTempTopic;
063import org.apache.activemq.command.BrokerInfo;
064import org.apache.activemq.command.Command;
065import org.apache.activemq.command.CommandTypes;
066import org.apache.activemq.command.ConnectionControl;
067import org.apache.activemq.command.ConnectionError;
068import org.apache.activemq.command.ConnectionId;
069import org.apache.activemq.command.ConnectionInfo;
070import org.apache.activemq.command.ConsumerControl;
071import org.apache.activemq.command.ConsumerId;
072import org.apache.activemq.command.ConsumerInfo;
073import org.apache.activemq.command.ControlCommand;
074import org.apache.activemq.command.DestinationInfo;
075import org.apache.activemq.command.ExceptionResponse;
076import org.apache.activemq.command.Message;
077import org.apache.activemq.command.MessageDispatch;
078import org.apache.activemq.command.MessageId;
079import org.apache.activemq.command.ProducerAck;
080import org.apache.activemq.command.ProducerId;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.RemoveSubscriptionInfo;
083import org.apache.activemq.command.Response;
084import org.apache.activemq.command.SessionId;
085import org.apache.activemq.command.ShutdownInfo;
086import org.apache.activemq.command.WireFormatInfo;
087import org.apache.activemq.management.JMSConnectionStatsImpl;
088import org.apache.activemq.management.JMSStatsImpl;
089import org.apache.activemq.management.StatsCapable;
090import org.apache.activemq.management.StatsImpl;
091import org.apache.activemq.state.CommandVisitorAdapter;
092import org.apache.activemq.thread.Scheduler;
093import org.apache.activemq.thread.TaskRunnerFactory;
094import org.apache.activemq.transport.FutureResponse;
095import org.apache.activemq.transport.RequestTimedOutIOException;
096import org.apache.activemq.transport.ResponseCallback;
097import org.apache.activemq.transport.Transport;
098import org.apache.activemq.transport.TransportListener;
099import org.apache.activemq.transport.failover.FailoverTransport;
100import org.apache.activemq.util.IdGenerator;
101import org.apache.activemq.util.IntrospectionSupport;
102import org.apache.activemq.util.JMSExceptionSupport;
103import org.apache.activemq.util.LongSequenceGenerator;
104import org.apache.activemq.util.ServiceSupport;
105import org.apache.activemq.util.ThreadPoolUtils;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
110
111    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
112    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
113    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
114    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
115
116    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
117
118    public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
119
120    protected boolean dispatchAsync=true;
121    protected boolean alwaysSessionAsync = true;
122
123    private TaskRunnerFactory sessionTaskRunner;
124    private final ThreadPoolExecutor executor;
125
126    // Connection state variables
127    private final ConnectionInfo info;
128    private ExceptionListener exceptionListener;
129    private ClientInternalExceptionListener clientInternalExceptionListener;
130    private boolean clientIDSet;
131    private boolean isConnectionInfoSentToBroker;
132    private boolean userSpecifiedClientID;
133
134    // Configuration options variables
135    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
136    private BlobTransferPolicy blobTransferPolicy;
137    private RedeliveryPolicyMap redeliveryPolicyMap;
138    private MessageTransformer transformer;
139
140    private boolean disableTimeStampsByDefault;
141    private boolean optimizedMessageDispatch = true;
142    private boolean copyMessageOnSend = true;
143    private boolean useCompression;
144    private boolean objectMessageSerializationDefered;
145    private boolean useAsyncSend;
146    private boolean optimizeAcknowledge;
147    private long optimizeAcknowledgeTimeOut = 0;
148    private long optimizedAckScheduledAckInterval = 0;
149    private boolean nestedMapAndListEnabled = true;
150    private boolean useRetroactiveConsumer;
151    private boolean exclusiveConsumer;
152    private boolean alwaysSyncSend;
153    private int closeTimeout = 15000;
154    private boolean watchTopicAdvisories = true;
155    private long warnAboutUnstartedConnectionTimeout = 500L;
156    private int sendTimeout =0;
157    private boolean sendAcksAsync=true;
158    private boolean checkForDuplicates = true;
159    private boolean queueOnlyConnection = false;
160    private boolean consumerExpiryCheckEnabled = true;
161
162    private final Transport transport;
163    private final IdGenerator clientIdGenerator;
164    private final JMSStatsImpl factoryStats;
165    private final JMSConnectionStatsImpl stats;
166
167    private final AtomicBoolean started = new AtomicBoolean(false);
168    private final AtomicBoolean closing = new AtomicBoolean(false);
169    private final AtomicBoolean closed = new AtomicBoolean(false);
170    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
171    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
172    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
173    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
174
175    // Maps ConsumerIds to ActiveMQConsumer objects
176    private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
177    private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
178    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
179    private final SessionId connectionSessionId;
180    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
181    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
182    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
183
184    private AdvisoryConsumer advisoryConsumer;
185    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
186    private BrokerInfo brokerInfo;
187    private IOException firstFailureError;
188    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
189
190    // Assume that protocol is the latest. Change to the actual protocol
191    // version when a WireFormatInfo is received.
192    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
193    private final long timeCreated;
194    private final ConnectionAudit connectionAudit = new ConnectionAudit();
195    private DestinationSource destinationSource;
196    private final Object ensureConnectionInfoSentMutex = new Object();
197    private boolean useDedicatedTaskRunner;
198    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
199    private long consumerFailoverRedeliveryWaitPeriod;
200    private Scheduler scheduler;
201    private boolean messagePrioritySupported = false;
202    private boolean transactedIndividualAck = false;
203    private boolean nonBlockingRedelivery = false;
204    private boolean rmIdFromConnectionId = false;
205
206    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
207    private RejectedExecutionHandler rejectedTaskHandler = null;
208
209    /**
210     * Construct an <code>ActiveMQConnection</code>
211     *
212     * @param transport
213     * @param factoryStats
214     * @throws Exception
215     */
216    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
217
218        this.transport = transport;
219        this.clientIdGenerator = clientIdGenerator;
220        this.factoryStats = factoryStats;
221
222        // Configure a single threaded executor who's core thread can timeout if
223        // idle
224        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
225            @Override
226            public Thread newThread(Runnable r) {
227                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
228                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
229                //thread.setDaemon(true);
230                return thread;
231            }
232        });
233        // asyncConnectionThread.allowCoreThreadTimeOut(true);
234        String uniqueId = connectionIdGenerator.generateId();
235        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
236        this.info.setManageable(true);
237        this.info.setFaultTolerant(transport.isFaultTolerant());
238        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
239
240        this.transport.setTransportListener(this);
241
242        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
243        this.factoryStats.addConnection(this);
244        this.timeCreated = System.currentTimeMillis();
245        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
246    }
247
248    protected void setUserName(String userName) {
249        this.info.setUserName(userName);
250    }
251
252    protected void setPassword(String password) {
253        this.info.setPassword(password);
254    }
255
256    /**
257     * A static helper method to create a new connection
258     *
259     * @return an ActiveMQConnection
260     * @throws JMSException
261     */
262    public static ActiveMQConnection makeConnection() throws JMSException {
263        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
264        return (ActiveMQConnection)factory.createConnection();
265    }
266
267    /**
268     * A static helper method to create a new connection
269     *
270     * @param uri
271     * @return and ActiveMQConnection
272     * @throws JMSException
273     */
274    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
275        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
276        return (ActiveMQConnection)factory.createConnection();
277    }
278
279    /**
280     * A static helper method to create a new connection
281     *
282     * @param user
283     * @param password
284     * @param uri
285     * @return an ActiveMQConnection
286     * @throws JMSException
287     */
288    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
289        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
290        return (ActiveMQConnection)factory.createConnection();
291    }
292
293    /**
294     * @return a number unique for this connection
295     */
296    public JMSConnectionStatsImpl getConnectionStats() {
297        return stats;
298    }
299
300    /**
301     * Creates a <CODE>Session</CODE> object.
302     *
303     * @param transacted indicates whether the session is transacted
304     * @param acknowledgeMode indicates whether the consumer or the client will
305     *                acknowledge any messages it receives; ignored if the
306     *                session is transacted. Legal values are
307     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
308     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
309     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
310     * @return a newly created session
311     * @throws JMSException if the <CODE>Connection</CODE> object fails to
312     *                 create a session due to some internal error or lack of
313     *                 support for the specific transaction and acknowledgement
314     *                 mode.
315     * @see Session#AUTO_ACKNOWLEDGE
316     * @see Session#CLIENT_ACKNOWLEDGE
317     * @see Session#DUPS_OK_ACKNOWLEDGE
318     * @since 1.1
319     */
320    @Override
321    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
322        checkClosedOrFailed();
323        ensureConnectionInfoSent();
324        if(!transacted) {
325            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
326                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
327            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
328                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
329                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
330            }
331        }
332        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
333            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
334    }
335
336    /**
337     * @return sessionId
338     */
339    protected SessionId getNextSessionId() {
340        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
341    }
342
343    /**
344     * Gets the client identifier for this connection.
345     * <P>
346     * This value is specific to the JMS provider. It is either preconfigured by
347     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
348     * dynamically by the application by calling the <code>setClientID</code>
349     * method.
350     *
351     * @return the unique client identifier
352     * @throws JMSException if the JMS provider fails to return the client ID
353     *                 for this connection due to some internal error.
354     */
355    @Override
356    public String getClientID() throws JMSException {
357        checkClosedOrFailed();
358        return this.info.getClientId();
359    }
360
361    /**
362     * Sets the client identifier for this connection.
363     * <P>
364     * The preferred way to assign a JMS client's client identifier is for it to
365     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
366     * object and transparently assigned to the <CODE>Connection</CODE> object
367     * it creates.
368     * <P>
369     * Alternatively, a client can set a connection's client identifier using a
370     * provider-specific value. The facility to set a connection's client
371     * identifier explicitly is not a mechanism for overriding the identifier
372     * that has been administratively configured. It is provided for the case
373     * where no administratively specified identifier exists. If one does exist,
374     * an attempt to change it by setting it must throw an
375     * <CODE>IllegalStateException</CODE>. If a client sets the client
376     * identifier explicitly, it must do so immediately after it creates the
377     * connection and before any other action on the connection is taken. After
378     * this point, setting the client identifier is a programming error that
379     * should throw an <CODE>IllegalStateException</CODE>.
380     * <P>
381     * The purpose of the client identifier is to associate a connection and its
382     * objects with a state maintained on behalf of the client by a provider.
383     * The only such state identified by the JMS API is that required to support
384     * durable subscriptions.
385     * <P>
386     * If another connection with the same <code>clientID</code> is already
387     * running when this method is called, the JMS provider should detect the
388     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
389     *
390     * @param newClientID the unique client identifier
391     * @throws JMSException if the JMS provider fails to set the client ID for
392     *                 this connection due to some internal error.
393     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
394     *                 invalid or duplicate client ID.
395     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
396     *                 a connection's client ID at the wrong time or when it has
397     *                 been administratively configured.
398     */
399    @Override
400    public void setClientID(String newClientID) throws JMSException {
401        checkClosedOrFailed();
402
403        if (this.clientIDSet) {
404            throw new IllegalStateException("The clientID has already been set");
405        }
406
407        if (this.isConnectionInfoSentToBroker) {
408            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
409        }
410
411        this.info.setClientId(newClientID);
412        this.userSpecifiedClientID = true;
413        ensureConnectionInfoSent();
414    }
415
416    /**
417     * Sets the default client id that the connection will use if explicitly not
418     * set with the setClientId() call.
419     */
420    public void setDefaultClientID(String clientID) throws JMSException {
421        this.info.setClientId(clientID);
422        this.userSpecifiedClientID = true;
423    }
424
425    /**
426     * Gets the metadata for this connection.
427     *
428     * @return the connection metadata
429     * @throws JMSException if the JMS provider fails to get the connection
430     *                 metadata for this connection.
431     * @see javax.jms.ConnectionMetaData
432     */
433    @Override
434    public ConnectionMetaData getMetaData() throws JMSException {
435        checkClosedOrFailed();
436        return ActiveMQConnectionMetaData.INSTANCE;
437    }
438
439    /**
440     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
441     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
442     * associated with it.
443     *
444     * @return the <CODE>ExceptionListener</CODE> for this connection, or
445     *         null, if no <CODE>ExceptionListener</CODE> is associated with
446     *         this connection.
447     * @throws JMSException if the JMS provider fails to get the
448     *                 <CODE>ExceptionListener</CODE> for this connection.
449     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
450     */
451    @Override
452    public ExceptionListener getExceptionListener() throws JMSException {
453        checkClosedOrFailed();
454        return this.exceptionListener;
455    }
456
457    /**
458     * Sets an exception listener for this connection.
459     * <P>
460     * If a JMS provider detects a serious problem with a connection, it informs
461     * the connection's <CODE> ExceptionListener</CODE>, if one has been
462     * registered. It does this by calling the listener's <CODE>onException
463     * </CODE>
464     * method, passing it a <CODE>JMSException</CODE> object describing the
465     * problem.
466     * <P>
467     * An exception listener allows a client to be notified of a problem
468     * asynchronously. Some connections only consume messages, so they would
469     * have no other way to learn their connection has failed.
470     * <P>
471     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
472     * <P>
473     * A JMS provider should attempt to resolve connection problems itself
474     * before it notifies the client of them.
475     *
476     * @param listener the exception listener
477     * @throws JMSException if the JMS provider fails to set the exception
478     *                 listener for this connection.
479     */
480    @Override
481    public void setExceptionListener(ExceptionListener listener) throws JMSException {
482        checkClosedOrFailed();
483        this.exceptionListener = listener;
484    }
485
486    /**
487     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
488     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
489     * associated with it.
490     *
491     * @return the listener or <code>null</code> if no listener is registered with the connection.
492     */
493    public ClientInternalExceptionListener getClientInternalExceptionListener() {
494        return clientInternalExceptionListener;
495    }
496
497    /**
498     * Sets a client internal exception listener for this connection.
499     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
500     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
501     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
502     * describing the problem.
503     *
504     * @param listener the exception listener
505     */
506    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
507        this.clientInternalExceptionListener = listener;
508    }
509
510    /**
511     * Starts (or restarts) a connection's delivery of incoming messages. A call
512     * to <CODE>start</CODE> on a connection that has already been started is
513     * ignored.
514     *
515     * @throws JMSException if the JMS provider fails to start message delivery
516     *                 due to some internal error.
517     * @see javax.jms.Connection#stop()
518     */
519    @Override
520    public void start() throws JMSException {
521        checkClosedOrFailed();
522        ensureConnectionInfoSent();
523        if (started.compareAndSet(false, true)) {
524            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
525                ActiveMQSession session = i.next();
526                session.start();
527            }
528        }
529    }
530
531    /**
532     * Temporarily stops a connection's delivery of incoming messages. Delivery
533     * can be restarted using the connection's <CODE>start</CODE> method. When
534     * the connection is stopped, delivery to all the connection's message
535     * consumers is inhibited: synchronous receives block, and messages are not
536     * delivered to message listeners.
537     * <P>
538     * This call blocks until receives and/or message listeners in progress have
539     * completed.
540     * <P>
541     * Stopping a connection has no effect on its ability to send messages. A
542     * call to <CODE>stop</CODE> on a connection that has already been stopped
543     * is ignored.
544     * <P>
545     * A call to <CODE>stop</CODE> must not return until delivery of messages
546     * has paused. This means that a client can rely on the fact that none of
547     * its message listeners will be called and that all threads of control
548     * waiting for <CODE>receive</CODE> calls to return will not return with a
549     * message until the connection is restarted. The receive timers for a
550     * stopped connection continue to advance, so receives may time out while
551     * the connection is stopped.
552     * <P>
553     * If message listeners are running when <CODE>stop</CODE> is invoked, the
554     * <CODE>stop</CODE> call must wait until all of them have returned before
555     * it may return. While these message listeners are completing, they must
556     * have the full services of the connection available to them.
557     *
558     * @throws JMSException if the JMS provider fails to stop message delivery
559     *                 due to some internal error.
560     * @see javax.jms.Connection#start()
561     */
562    @Override
563    public void stop() throws JMSException {
564        doStop(true);
565    }
566
567    /**
568     * @see #stop()
569     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
570     *                    <tt>false</tt> to skip this check
571     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
572     */
573    void doStop(boolean checkClosed) throws JMSException {
574        if (checkClosed) {
575            checkClosedOrFailed();
576        }
577        if (started.compareAndSet(true, false)) {
578            synchronized(sessions) {
579                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
580                    ActiveMQSession s = i.next();
581                    s.stop();
582                }
583            }
584        }
585    }
586
587    /**
588     * Closes the connection.
589     * <P>
590     * Since a provider typically allocates significant resources outside the
591     * JVM on behalf of a connection, clients should close these resources when
592     * they are not needed. Relying on garbage collection to eventually reclaim
593     * these resources may not be timely enough.
594     * <P>
595     * There is no need to close the sessions, producers, and consumers of a
596     * closed connection.
597     * <P>
598     * Closing a connection causes all temporary destinations to be deleted.
599     * <P>
600     * When this method is invoked, it should not return until message
601     * processing has been shut down in an orderly fashion. This means that all
602     * message listeners that may have been running have returned, and that all
603     * pending receives have returned. A close terminates all pending message
604     * receives on the connection's sessions' consumers. The receives may return
605     * with a message or with null, depending on whether there was a message
606     * available at the time of the close. If one or more of the connection's
607     * sessions' message listeners is processing a message at the time when
608     * connection <CODE>close</CODE> is invoked, all the facilities of the
609     * connection and its sessions must remain available to those listeners
610     * until they return control to the JMS provider.
611     * <P>
612     * Closing a connection causes any of its sessions' transactions in progress
613     * to be rolled back. In the case where a session's work is coordinated by
614     * an external transaction manager, a session's <CODE>commit</CODE> and
615     * <CODE> rollback</CODE> methods are not used and the result of a closed
616     * session's work is determined later by the transaction manager. Closing a
617     * connection does NOT force an acknowledgment of client-acknowledged
618     * sessions.
619     * <P>
620     * Invoking the <CODE>acknowledge</CODE> method of a received message from
621     * a closed connection's session must throw an
622     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
623     * NOT throw an exception.
624     *
625     * @throws JMSException if the JMS provider fails to close the connection
626     *                 due to some internal error. For example, a failure to
627     *                 release resources or to close a socket connection can
628     *                 cause this exception to be thrown.
629     */
630    @Override
631    public void close() throws JMSException {
632        // Store the interrupted state and clear so that cleanup happens without
633        // leaking connection resources.  Reset in finally to preserve state.
634        boolean interrupted = Thread.interrupted();
635
636        try {
637
638            // If we were running, lets stop first.
639            if (!closed.get() && !transportFailed.get()) {
640                // do not fail if already closed as according to JMS spec we must not
641                // throw exception if already closed
642                doStop(false);
643            }
644
645            synchronized (this) {
646                if (!closed.get()) {
647                    closing.set(true);
648
649                    if (destinationSource != null) {
650                        destinationSource.stop();
651                        destinationSource = null;
652                    }
653                    if (advisoryConsumer != null) {
654                        advisoryConsumer.dispose();
655                        advisoryConsumer = null;
656                    }
657
658                    Scheduler scheduler = this.scheduler;
659                    if (scheduler != null) {
660                        try {
661                            scheduler.stop();
662                        } catch (Exception e) {
663                            JMSException ex =  JMSExceptionSupport.create(e);
664                            throw ex;
665                        }
666                    }
667
668                    long lastDeliveredSequenceId = -1;
669                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
670                        ActiveMQSession s = i.next();
671                        s.dispose();
672                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
673                    }
674                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
675                        ActiveMQConnectionConsumer c = i.next();
676                        c.dispose();
677                    }
678
679                    this.activeTempDestinations.clear();
680
681                    if (isConnectionInfoSentToBroker) {
682                        // If we announced ourselves to the broker.. Try to let the broker
683                        // know that the connection is being shutdown.
684                        RemoveInfo removeCommand = info.createRemoveCommand();
685                        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
686                        try {
687                            doSyncSendPacket(removeCommand, closeTimeout);
688                        } catch (JMSException e) {
689                            if (e.getCause() instanceof RequestTimedOutIOException) {
690                                // expected
691                            } else {
692                                throw e;
693                            }
694                        }
695                        doAsyncSendPacket(new ShutdownInfo());
696                    }
697
698                    started.set(false);
699
700                    // TODO if we move the TaskRunnerFactory to the connection
701                    // factory
702                    // then we may need to call
703                    // factory.onConnectionClose(this);
704                    if (sessionTaskRunner != null) {
705                        sessionTaskRunner.shutdown();
706                    }
707                    closed.set(true);
708                    closing.set(false);
709                }
710            }
711        } finally {
712            try {
713                if (executor != null) {
714                    ThreadPoolUtils.shutdown(executor);
715                }
716            } catch (Throwable e) {
717                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
718            }
719
720            ServiceSupport.dispose(this.transport);
721
722            factoryStats.removeConnection(this);
723            if (interrupted) {
724                Thread.currentThread().interrupt();
725            }
726        }
727    }
728
729    /**
730     * Tells the broker to terminate its VM. This can be used to cleanly
731     * terminate a broker running in a standalone java process. Server must have
732     * property enable.vm.shutdown=true defined to allow this to work.
733     */
734    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
735    // implemented.
736    /*
737     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
738     * command = new BrokerAdminCommand();
739     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
740     * asyncSendPacket(command); }
741     */
742
743    /**
744     * Create a durable connection consumer for this connection (optional
745     * operation). This is an expert facility not used by regular JMS clients.
746     *
747     * @param topic topic to access
748     * @param subscriptionName durable subscription name
749     * @param messageSelector only messages with properties matching the message
750     *                selector expression are delivered. A value of null or an
751     *                empty string indicates that there is no message selector
752     *                for the message consumer.
753     * @param sessionPool the server session pool to associate with this durable
754     *                connection consumer
755     * @param maxMessages the maximum number of messages that can be assigned to
756     *                a server session at one time
757     * @return the durable connection consumer
758     * @throws JMSException if the <CODE>Connection</CODE> object fails to
759     *                 create a connection consumer due to some internal error
760     *                 or invalid arguments for <CODE>sessionPool</CODE> and
761     *                 <CODE>messageSelector</CODE>.
762     * @throws javax.jms.InvalidDestinationException if an invalid destination
763     *                 is specified.
764     * @throws javax.jms.InvalidSelectorException if the message selector is
765     *                 invalid.
766     * @see javax.jms.ConnectionConsumer
767     * @since 1.1
768     */
769    @Override
770    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
771        throws JMSException {
772        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
773    }
774
775    /**
776     * Create a durable connection consumer for this connection (optional
777     * operation). This is an expert facility not used by regular JMS clients.
778     *
779     * @param topic topic to access
780     * @param subscriptionName durable subscription name
781     * @param messageSelector only messages with properties matching the message
782     *                selector expression are delivered. A value of null or an
783     *                empty string indicates that there is no message selector
784     *                for the message consumer.
785     * @param sessionPool the server session pool to associate with this durable
786     *                connection consumer
787     * @param maxMessages the maximum number of messages that can be assigned to
788     *                a server session at one time
789     * @param noLocal set true if you want to filter out messages published
790     *                locally
791     * @return the durable connection consumer
792     * @throws JMSException if the <CODE>Connection</CODE> object fails to
793     *                 create a connection consumer due to some internal error
794     *                 or invalid arguments for <CODE>sessionPool</CODE> and
795     *                 <CODE>messageSelector</CODE>.
796     * @throws javax.jms.InvalidDestinationException if an invalid destination
797     *                 is specified.
798     * @throws javax.jms.InvalidSelectorException if the message selector is
799     *                 invalid.
800     * @see javax.jms.ConnectionConsumer
801     * @since 1.1
802     */
803    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
804                                                              boolean noLocal) throws JMSException {
805        checkClosedOrFailed();
806
807        if (queueOnlyConnection) {
808            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
809        }
810
811        ensureConnectionInfoSent();
812        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
813        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
814        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
815        info.setSubscriptionName(subscriptionName);
816        info.setSelector(messageSelector);
817        info.setPrefetchSize(maxMessages);
818        info.setDispatchAsync(isDispatchAsync());
819
820        // Allows the options on the destination to configure the consumerInfo
821        if (info.getDestination().getOptions() != null) {
822            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
823            IntrospectionSupport.setProperties(this.info, options, "consumer.");
824        }
825
826        return new ActiveMQConnectionConsumer(this, sessionPool, info);
827    }
828
829    // Properties
830    // -------------------------------------------------------------------------
831
832    /**
833     * Returns true if this connection has been started
834     *
835     * @return true if this Connection is started
836     */
837    public boolean isStarted() {
838        return started.get();
839    }
840
841    /**
842     * Returns true if the connection is closed
843     */
844    public boolean isClosed() {
845        return closed.get();
846    }
847
848    /**
849     * Returns true if the connection is in the process of being closed
850     */
851    public boolean isClosing() {
852        return closing.get();
853    }
854
855    /**
856     * Returns true if the underlying transport has failed
857     */
858    public boolean isTransportFailed() {
859        return transportFailed.get();
860    }
861
862    /**
863     * @return Returns the prefetchPolicy.
864     */
865    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
866        return prefetchPolicy;
867    }
868
869    /**
870     * Sets the <a
871     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
872     * policy</a> for consumers created by this connection.
873     */
874    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
875        this.prefetchPolicy = prefetchPolicy;
876    }
877
878    /**
879     */
880    public Transport getTransportChannel() {
881        return transport;
882    }
883
884    /**
885     * @return Returns the clientID of the connection, forcing one to be
886     *         generated if one has not yet been configured.
887     */
888    public String getInitializedClientID() throws JMSException {
889        ensureConnectionInfoSent();
890        return info.getClientId();
891    }
892
893    /**
894     * @return Returns the timeStampsDisableByDefault.
895     */
896    public boolean isDisableTimeStampsByDefault() {
897        return disableTimeStampsByDefault;
898    }
899
900    /**
901     * Sets whether or not timestamps on messages should be disabled or not. If
902     * you disable them it adds a small performance boost.
903     */
904    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
905        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
906    }
907
908    /**
909     * @return Returns the dispatchOptimizedMessage.
910     */
911    public boolean isOptimizedMessageDispatch() {
912        return optimizedMessageDispatch;
913    }
914
915    /**
916     * If this flag is set then an larger prefetch limit is used - only
917     * applicable for durable topic subscribers.
918     */
919    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
920        this.optimizedMessageDispatch = dispatchOptimizedMessage;
921    }
922
923    /**
924     * @return Returns the closeTimeout.
925     */
926    public int getCloseTimeout() {
927        return closeTimeout;
928    }
929
930    /**
931     * Sets the timeout before a close is considered complete. Normally a
932     * close() on a connection waits for confirmation from the broker; this
933     * allows that operation to timeout to save the client hanging if there is
934     * no broker
935     */
936    public void setCloseTimeout(int closeTimeout) {
937        this.closeTimeout = closeTimeout;
938    }
939
940    /**
941     * @return ConnectionInfo
942     */
943    public ConnectionInfo getConnectionInfo() {
944        return this.info;
945    }
946
947    public boolean isUseRetroactiveConsumer() {
948        return useRetroactiveConsumer;
949    }
950
951    /**
952     * Sets whether or not retroactive consumers are enabled. Retroactive
953     * consumers allow non-durable topic subscribers to receive old messages
954     * that were published before the non-durable subscriber started.
955     */
956    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
957        this.useRetroactiveConsumer = useRetroactiveConsumer;
958    }
959
960    public boolean isNestedMapAndListEnabled() {
961        return nestedMapAndListEnabled;
962    }
963
964    /**
965     * Enables/disables whether or not Message properties and MapMessage entries
966     * support <a
967     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
968     * Structures</a> of Map and List objects
969     */
970    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
971        this.nestedMapAndListEnabled = structuredMapsEnabled;
972    }
973
974    public boolean isExclusiveConsumer() {
975        return exclusiveConsumer;
976    }
977
978    /**
979     * Enables or disables whether or not queue consumers should be exclusive or
980     * not for example to preserve ordering when not using <a
981     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
982     *
983     * @param exclusiveConsumer
984     */
985    public void setExclusiveConsumer(boolean exclusiveConsumer) {
986        this.exclusiveConsumer = exclusiveConsumer;
987    }
988
989    /**
990     * Adds a transport listener so that a client can be notified of events in
991     * the underlying transport
992     */
993    public void addTransportListener(TransportListener transportListener) {
994        transportListeners.add(transportListener);
995    }
996
997    public void removeTransportListener(TransportListener transportListener) {
998        transportListeners.remove(transportListener);
999    }
1000
1001    public boolean isUseDedicatedTaskRunner() {
1002        return useDedicatedTaskRunner;
1003    }
1004
1005    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1006        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1007    }
1008
1009    public TaskRunnerFactory getSessionTaskRunner() {
1010        synchronized (this) {
1011            if (sessionTaskRunner == null) {
1012                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1013                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1014            }
1015        }
1016        return sessionTaskRunner;
1017    }
1018
1019    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1020        this.sessionTaskRunner = sessionTaskRunner;
1021    }
1022
1023    public MessageTransformer getTransformer() {
1024        return transformer;
1025    }
1026
1027    /**
1028     * Sets the transformer used to transform messages before they are sent on
1029     * to the JMS bus or when they are received from the bus but before they are
1030     * delivered to the JMS client
1031     */
1032    public void setTransformer(MessageTransformer transformer) {
1033        this.transformer = transformer;
1034    }
1035
1036    /**
1037     * @return the statsEnabled
1038     */
1039    public boolean isStatsEnabled() {
1040        return this.stats.isEnabled();
1041    }
1042
1043    /**
1044     * @param statsEnabled the statsEnabled to set
1045     */
1046    public void setStatsEnabled(boolean statsEnabled) {
1047        this.stats.setEnabled(statsEnabled);
1048    }
1049
1050    /**
1051     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1052     * being created or destroyed or to enquire about the current destinations available on the broker
1053     *
1054     * @return a lazily created destination source
1055     * @throws JMSException
1056     */
1057    @Override
1058    public DestinationSource getDestinationSource() throws JMSException {
1059        if (destinationSource == null) {
1060            destinationSource = new DestinationSource(this);
1061            destinationSource.start();
1062        }
1063        return destinationSource;
1064    }
1065
1066    // Implementation methods
1067    // -------------------------------------------------------------------------
1068
1069    /**
1070     * Used internally for adding Sessions to the Connection
1071     *
1072     * @param session
1073     * @throws JMSException
1074     * @throws JMSException
1075     */
1076    protected void addSession(ActiveMQSession session) throws JMSException {
1077        this.sessions.add(session);
1078        if (sessions.size() > 1 || session.isTransacted()) {
1079            optimizedMessageDispatch = false;
1080        }
1081    }
1082
1083    /**
1084     * Used interanlly for removing Sessions from a Connection
1085     *
1086     * @param session
1087     */
1088    protected void removeSession(ActiveMQSession session) {
1089        this.sessions.remove(session);
1090        this.removeDispatcher(session);
1091    }
1092
1093    /**
1094     * Add a ConnectionConsumer
1095     *
1096     * @param connectionConsumer
1097     * @throws JMSException
1098     */
1099    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1100        this.connectionConsumers.add(connectionConsumer);
1101    }
1102
1103    /**
1104     * Remove a ConnectionConsumer
1105     *
1106     * @param connectionConsumer
1107     */
1108    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1109        this.connectionConsumers.remove(connectionConsumer);
1110        this.removeDispatcher(connectionConsumer);
1111    }
1112
1113    /**
1114     * Creates a <CODE>TopicSession</CODE> object.
1115     *
1116     * @param transacted indicates whether the session is transacted
1117     * @param acknowledgeMode indicates whether the consumer or the client will
1118     *                acknowledge any messages it receives; ignored if the
1119     *                session is transacted. Legal values are
1120     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1121     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1122     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1123     * @return a newly created topic session
1124     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1125     *                 to create a session due to some internal error or lack of
1126     *                 support for the specific transaction and acknowledgement
1127     *                 mode.
1128     * @see Session#AUTO_ACKNOWLEDGE
1129     * @see Session#CLIENT_ACKNOWLEDGE
1130     * @see Session#DUPS_OK_ACKNOWLEDGE
1131     */
1132    @Override
1133    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1134        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1135    }
1136
1137    /**
1138     * Creates a connection consumer for this connection (optional operation).
1139     * This is an expert facility not used by regular JMS clients.
1140     *
1141     * @param topic the topic to access
1142     * @param messageSelector only messages with properties matching the message
1143     *                selector expression are delivered. A value of null or an
1144     *                empty string indicates that there is no message selector
1145     *                for the message consumer.
1146     * @param sessionPool the server session pool to associate with this
1147     *                connection consumer
1148     * @param maxMessages the maximum number of messages that can be assigned to
1149     *                a server session at one time
1150     * @return the connection consumer
1151     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1152     *                 to create a connection consumer due to some internal
1153     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1154     *                 and <CODE>messageSelector</CODE>.
1155     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1156     *                 specified.
1157     * @throws javax.jms.InvalidSelectorException if the message selector is
1158     *                 invalid.
1159     * @see javax.jms.ConnectionConsumer
1160     */
1161    @Override
1162    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1163        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1164    }
1165
1166    /**
1167     * Creates a connection consumer for this connection (optional operation).
1168     * This is an expert facility not used by regular JMS clients.
1169     *
1170     * @param queue the queue to access
1171     * @param messageSelector only messages with properties matching the message
1172     *                selector expression are delivered. A value of null or an
1173     *                empty string indicates that there is no message selector
1174     *                for the message consumer.
1175     * @param sessionPool the server session pool to associate with this
1176     *                connection consumer
1177     * @param maxMessages the maximum number of messages that can be assigned to
1178     *                a server session at one time
1179     * @return the connection consumer
1180     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1181     *                 to create a connection consumer due to some internal
1182     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1183     *                 and <CODE>messageSelector</CODE>.
1184     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1185     *                 specified.
1186     * @throws javax.jms.InvalidSelectorException if the message selector is
1187     *                 invalid.
1188     * @see javax.jms.ConnectionConsumer
1189     */
1190    @Override
1191    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1192        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1193    }
1194
1195    /**
1196     * Creates a connection consumer for this connection (optional operation).
1197     * This is an expert facility not used by regular JMS clients.
1198     *
1199     * @param destination the destination to access
1200     * @param messageSelector only messages with properties matching the message
1201     *                selector expression are delivered. A value of null or an
1202     *                empty string indicates that there is no message selector
1203     *                for the message consumer.
1204     * @param sessionPool the server session pool to associate with this
1205     *                connection consumer
1206     * @param maxMessages the maximum number of messages that can be assigned to
1207     *                a server session at one time
1208     * @return the connection consumer
1209     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1210     *                 create a connection consumer due to some internal error
1211     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1212     *                 <CODE>messageSelector</CODE>.
1213     * @throws javax.jms.InvalidDestinationException if an invalid destination
1214     *                 is specified.
1215     * @throws javax.jms.InvalidSelectorException if the message selector is
1216     *                 invalid.
1217     * @see javax.jms.ConnectionConsumer
1218     * @since 1.1
1219     */
1220    @Override
1221    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1222        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1223    }
1224
1225    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1226        throws JMSException {
1227
1228        checkClosedOrFailed();
1229        ensureConnectionInfoSent();
1230
1231        ConsumerId consumerId = createConsumerId();
1232        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1233        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1234        consumerInfo.setSelector(messageSelector);
1235        consumerInfo.setPrefetchSize(maxMessages);
1236        consumerInfo.setNoLocal(noLocal);
1237        consumerInfo.setDispatchAsync(isDispatchAsync());
1238
1239        // Allows the options on the destination to configure the consumerInfo
1240        if (consumerInfo.getDestination().getOptions() != null) {
1241            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1242            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1243        }
1244
1245        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1246    }
1247
1248    /**
1249     * @return
1250     */
1251    private ConsumerId createConsumerId() {
1252        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1253    }
1254
1255    /**
1256     * Creates a <CODE>QueueSession</CODE> object.
1257     *
1258     * @param transacted indicates whether the session is transacted
1259     * @param acknowledgeMode indicates whether the consumer or the client will
1260     *                acknowledge any messages it receives; ignored if the
1261     *                session is transacted. Legal values are
1262     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1263     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1264     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1265     * @return a newly created queue session
1266     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1267     *                 to create a session due to some internal error or lack of
1268     *                 support for the specific transaction and acknowledgement
1269     *                 mode.
1270     * @see Session#AUTO_ACKNOWLEDGE
1271     * @see Session#CLIENT_ACKNOWLEDGE
1272     * @see Session#DUPS_OK_ACKNOWLEDGE
1273     */
1274    @Override
1275    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1276        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1277    }
1278
1279    /**
1280     * Ensures that the clientID was manually specified and not auto-generated.
1281     * If the clientID was not specified this method will throw an exception.
1282     * This method is used to ensure that the clientID + durableSubscriber name
1283     * are used correctly.
1284     *
1285     * @throws JMSException
1286     */
1287    public void checkClientIDWasManuallySpecified() throws JMSException {
1288        if (!userSpecifiedClientID) {
1289            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1290        }
1291    }
1292
1293    /**
1294     * send a Packet through the Connection - for internal use only
1295     *
1296     * @param command
1297     * @throws JMSException
1298     */
1299    public void asyncSendPacket(Command command) throws JMSException {
1300        if (isClosed()) {
1301            throw new ConnectionClosedException();
1302        } else {
1303            doAsyncSendPacket(command);
1304        }
1305    }
1306
1307    private void doAsyncSendPacket(Command command) throws JMSException {
1308        try {
1309            this.transport.oneway(command);
1310        } catch (IOException e) {
1311            throw JMSExceptionSupport.create(e);
1312        }
1313    }
1314
1315    /**
1316     * Send a packet through a Connection - for internal use only
1317     *
1318     * @param command
1319     * @return
1320     * @throws JMSException
1321     */
1322    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1323        if(onComplete==null) {
1324            syncSendPacket(command);
1325        } else {
1326            if (isClosed()) {
1327                throw new ConnectionClosedException();
1328            }
1329            try {
1330                this.transport.asyncRequest(command, new ResponseCallback() {
1331                    @Override
1332                    public void onCompletion(FutureResponse resp) {
1333                        Response response;
1334                        Throwable exception = null;
1335                        try {
1336                            response = resp.getResult();
1337                            if (response.isException()) {
1338                                ExceptionResponse er = (ExceptionResponse)response;
1339                                exception = er.getException();
1340                            }
1341                        } catch (Exception e) {
1342                            exception = e;
1343                        }
1344                        if(exception!=null) {
1345                            if ( exception instanceof JMSException) {
1346                                onComplete.onException((JMSException) exception);
1347                            } else {
1348                                if (isClosed()||closing.get()) {
1349                                    LOG.debug("Received an exception but connection is closing");
1350                                }
1351                                JMSException jmsEx = null;
1352                                try {
1353                                    jmsEx = JMSExceptionSupport.create(exception);
1354                                } catch(Throwable e) {
1355                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1356                                }
1357                                // dispose of transport for security exceptions on connection initiation
1358                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1359                                    forceCloseOnSecurityException(exception);
1360                                }
1361                                if (jmsEx !=null) {
1362                                    onComplete.onException(jmsEx);
1363                                }
1364                            }
1365                        } else {
1366                            onComplete.onSuccess();
1367                        }
1368                    }
1369                });
1370            } catch (IOException e) {
1371                throw JMSExceptionSupport.create(e);
1372            }
1373        }
1374    }
1375
1376    private void forceCloseOnSecurityException(Throwable exception) {
1377        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1378        onException(new IOException("Force close due to SecurityException on connect", exception));
1379    }
1380
1381    public Response syncSendPacket(Command command) throws JMSException {
1382        if (isClosed()) {
1383            throw new ConnectionClosedException();
1384        } else {
1385
1386            try {
1387                Response response = (Response)this.transport.request(command);
1388                if (response.isException()) {
1389                    ExceptionResponse er = (ExceptionResponse)response;
1390                    if (er.getException() instanceof JMSException) {
1391                        throw (JMSException)er.getException();
1392                    } else {
1393                        if (isClosed()||closing.get()) {
1394                            LOG.debug("Received an exception but connection is closing");
1395                        }
1396                        JMSException jmsEx = null;
1397                        try {
1398                            jmsEx = JMSExceptionSupport.create(er.getException());
1399                        } catch(Throwable e) {
1400                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1401                        }
1402                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1403                            forceCloseOnSecurityException(er.getException());
1404                        }
1405                        if (jmsEx !=null) {
1406                            throw jmsEx;
1407                        }
1408                    }
1409                }
1410                return response;
1411            } catch (IOException e) {
1412                throw JMSExceptionSupport.create(e);
1413            }
1414        }
1415    }
1416
1417    /**
1418     * Send a packet through a Connection - for internal use only
1419     *
1420     * @param command
1421     * @return
1422     * @throws JMSException
1423     */
1424    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1425        if (isClosed() || closing.get()) {
1426            throw new ConnectionClosedException();
1427        } else {
1428            return doSyncSendPacket(command, timeout);
1429        }
1430    }
1431
1432    private Response doSyncSendPacket(Command command, int timeout)
1433            throws JMSException {
1434        try {
1435            Response response = (Response) (timeout > 0
1436                    ? this.transport.request(command, timeout)
1437                    : this.transport.request(command));
1438            if (response != null && response.isException()) {
1439                ExceptionResponse er = (ExceptionResponse)response;
1440                if (er.getException() instanceof JMSException) {
1441                    throw (JMSException)er.getException();
1442                } else {
1443                    throw JMSExceptionSupport.create(er.getException());
1444                }
1445            }
1446            return response;
1447        } catch (IOException e) {
1448            throw JMSExceptionSupport.create(e);
1449        }
1450    }
1451
1452    /**
1453     * @return statistics for this Connection
1454     */
1455    @Override
1456    public StatsImpl getStats() {
1457        return stats;
1458    }
1459
1460    /**
1461     * simply throws an exception if the Connection is already closed or the
1462     * Transport has failed
1463     *
1464     * @throws JMSException
1465     */
1466    protected synchronized void checkClosedOrFailed() throws JMSException {
1467        checkClosed();
1468        if (transportFailed.get()) {
1469            throw new ConnectionFailedException(firstFailureError);
1470        }
1471    }
1472
1473    /**
1474     * simply throws an exception if the Connection is already closed
1475     *
1476     * @throws JMSException
1477     */
1478    protected synchronized void checkClosed() throws JMSException {
1479        if (closed.get()) {
1480            throw new ConnectionClosedException();
1481        }
1482    }
1483
1484    /**
1485     * Send the ConnectionInfo to the Broker
1486     *
1487     * @throws JMSException
1488     */
1489    protected void ensureConnectionInfoSent() throws JMSException {
1490        synchronized(this.ensureConnectionInfoSentMutex) {
1491            // Can we skip sending the ConnectionInfo packet??
1492            if (isConnectionInfoSentToBroker || closed.get()) {
1493                return;
1494            }
1495            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1496            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1497                info.setClientId(clientIdGenerator.generateId());
1498            }
1499            syncSendPacket(info.copy());
1500
1501            this.isConnectionInfoSentToBroker = true;
1502            // Add a temp destination advisory consumer so that
1503            // We know what the valid temporary destinations are on the
1504            // broker without having to do an RPC to the broker.
1505
1506            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1507            if (watchTopicAdvisories) {
1508                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1509            }
1510        }
1511    }
1512
1513    public synchronized boolean isWatchTopicAdvisories() {
1514        return watchTopicAdvisories;
1515    }
1516
1517    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1518        this.watchTopicAdvisories = watchTopicAdvisories;
1519    }
1520
1521    /**
1522     * @return Returns the useAsyncSend.
1523     */
1524    public boolean isUseAsyncSend() {
1525        return useAsyncSend;
1526    }
1527
1528    /**
1529     * Forces the use of <a
1530     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1531     * adds a massive performance boost; but means that the send() method will
1532     * return immediately whether the message has been sent or not which could
1533     * lead to message loss.
1534     */
1535    public void setUseAsyncSend(boolean useAsyncSend) {
1536        this.useAsyncSend = useAsyncSend;
1537    }
1538
1539    /**
1540     * @return true if always sync send messages
1541     */
1542    public boolean isAlwaysSyncSend() {
1543        return this.alwaysSyncSend;
1544    }
1545
1546    /**
1547     * Set true if always require messages to be sync sent
1548     *
1549     * @param alwaysSyncSend
1550     */
1551    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1552        this.alwaysSyncSend = alwaysSyncSend;
1553    }
1554
1555    /**
1556     * @return the messagePrioritySupported
1557     */
1558    public boolean isMessagePrioritySupported() {
1559        return this.messagePrioritySupported;
1560    }
1561
1562    /**
1563     * @param messagePrioritySupported the messagePrioritySupported to set
1564     */
1565    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1566        this.messagePrioritySupported = messagePrioritySupported;
1567    }
1568
1569    /**
1570     * Cleans up this connection so that it's state is as if the connection was
1571     * just created. This allows the Resource Adapter to clean up a connection
1572     * so that it can be reused without having to close and recreate the
1573     * connection.
1574     */
1575    public void cleanup() throws JMSException {
1576        doCleanup(false);
1577    }
1578
1579    public void doCleanup(boolean removeConnection) throws JMSException {
1580        if (advisoryConsumer != null && !isTransportFailed()) {
1581            advisoryConsumer.dispose();
1582            advisoryConsumer = null;
1583        }
1584
1585        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1586            ActiveMQSession s = i.next();
1587            s.dispose();
1588        }
1589        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1590            ActiveMQConnectionConsumer c = i.next();
1591            c.dispose();
1592        }
1593
1594        if (removeConnection) {
1595            if (isConnectionInfoSentToBroker) {
1596                if (!transportFailed.get() && !closing.get()) {
1597                    syncSendPacket(info.createRemoveCommand());
1598                }
1599                isConnectionInfoSentToBroker = false;
1600            }
1601            if (userSpecifiedClientID) {
1602                info.setClientId(null);
1603                userSpecifiedClientID = false;
1604            }
1605            clientIDSet = false;
1606        }
1607
1608        started.set(false);
1609    }
1610
1611    /**
1612     * Changes the associated username/password that is associated with this
1613     * connection. If the connection has been used, you must called cleanup()
1614     * before calling this method.
1615     *
1616     * @throws IllegalStateException if the connection is in used.
1617     */
1618    public void changeUserInfo(String userName, String password) throws JMSException {
1619        if (isConnectionInfoSentToBroker) {
1620            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1621        }
1622        this.info.setUserName(userName);
1623        this.info.setPassword(password);
1624    }
1625
1626    /**
1627     * @return Returns the resourceManagerId.
1628     * @throws JMSException
1629     */
1630    public String getResourceManagerId() throws JMSException {
1631        if (isRmIdFromConnectionId()) {
1632            return info.getConnectionId().getValue();
1633        }
1634        waitForBrokerInfo();
1635        if (brokerInfo == null) {
1636            throw new JMSException("Connection failed before Broker info was received.");
1637        }
1638        return brokerInfo.getBrokerId().getValue();
1639    }
1640
1641    /**
1642     * Returns the broker name if one is available or null if one is not
1643     * available yet.
1644     */
1645    public String getBrokerName() {
1646        try {
1647            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1648            if (brokerInfo == null) {
1649                return null;
1650            }
1651            return brokerInfo.getBrokerName();
1652        } catch (InterruptedException e) {
1653            Thread.currentThread().interrupt();
1654            return null;
1655        }
1656    }
1657
1658    /**
1659     * Returns the broker information if it is available or null if it is not
1660     * available yet.
1661     */
1662    public BrokerInfo getBrokerInfo() {
1663        return brokerInfo;
1664    }
1665
1666    /**
1667     * @return Returns the RedeliveryPolicy.
1668     * @throws JMSException
1669     */
1670    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1671        return redeliveryPolicyMap.getDefaultEntry();
1672    }
1673
1674    /**
1675     * Sets the redelivery policy to be used when messages are rolled back
1676     */
1677    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1678        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1679    }
1680
1681    public BlobTransferPolicy getBlobTransferPolicy() {
1682        if (blobTransferPolicy == null) {
1683            blobTransferPolicy = createBlobTransferPolicy();
1684        }
1685        return blobTransferPolicy;
1686    }
1687
1688    /**
1689     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1690     * OBjects) are transferred from producers to brokers to consumers
1691     */
1692    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1693        this.blobTransferPolicy = blobTransferPolicy;
1694    }
1695
1696    /**
1697     * @return Returns the alwaysSessionAsync.
1698     */
1699    public boolean isAlwaysSessionAsync() {
1700        return alwaysSessionAsync;
1701    }
1702
1703    /**
1704     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1705     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1706     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1707     * happens asynchronously.
1708     */
1709    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1710        this.alwaysSessionAsync = alwaysSessionAsync;
1711    }
1712
1713    /**
1714     * @return Returns the optimizeAcknowledge.
1715     */
1716    public boolean isOptimizeAcknowledge() {
1717        return optimizeAcknowledge;
1718    }
1719
1720    /**
1721     * Enables an optimised acknowledgement mode where messages are acknowledged
1722     * in batches rather than individually
1723     *
1724     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1725     */
1726    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1727        this.optimizeAcknowledge = optimizeAcknowledge;
1728    }
1729
1730    /**
1731     * The max time in milliseconds between optimized ack batches
1732     * @param optimizeAcknowledgeTimeOut
1733     */
1734    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1735        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1736    }
1737
1738    public long getOptimizeAcknowledgeTimeOut() {
1739        return optimizeAcknowledgeTimeOut;
1740    }
1741
1742    public long getWarnAboutUnstartedConnectionTimeout() {
1743        return warnAboutUnstartedConnectionTimeout;
1744    }
1745
1746    /**
1747     * Enables the timeout from a connection creation to when a warning is
1748     * generated if the connection is not properly started via {@link #start()}
1749     * and a message is received by a consumer. It is a very common gotcha to
1750     * forget to <a
1751     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1752     * the connection</a> so this option makes the default case to create a
1753     * warning if the user forgets. To disable the warning just set the value to <
1754     * 0 (say -1).
1755     */
1756    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1757        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1758    }
1759
1760    /**
1761     * @return the sendTimeout (in milliseconds)
1762     */
1763    public int getSendTimeout() {
1764        return sendTimeout;
1765    }
1766
1767    /**
1768     * @param sendTimeout the sendTimeout to set (in milliseconds)
1769     */
1770    public void setSendTimeout(int sendTimeout) {
1771        this.sendTimeout = sendTimeout;
1772    }
1773
1774    /**
1775     * @return the sendAcksAsync
1776     */
1777    public boolean isSendAcksAsync() {
1778        return sendAcksAsync;
1779    }
1780
1781    /**
1782     * @param sendAcksAsync the sendAcksAsync to set
1783     */
1784    public void setSendAcksAsync(boolean sendAcksAsync) {
1785        this.sendAcksAsync = sendAcksAsync;
1786    }
1787
1788    /**
1789     * Returns the time this connection was created
1790     */
1791    public long getTimeCreated() {
1792        return timeCreated;
1793    }
1794
1795    private void waitForBrokerInfo() throws JMSException {
1796        try {
1797            brokerInfoReceived.await();
1798        } catch (InterruptedException e) {
1799            Thread.currentThread().interrupt();
1800            throw JMSExceptionSupport.create(e);
1801        }
1802    }
1803
1804    // Package protected so that it can be used in unit tests
1805    public Transport getTransport() {
1806        return transport;
1807    }
1808
1809    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1810        producers.put(producerId, producer);
1811    }
1812
1813    public void removeProducer(ProducerId producerId) {
1814        producers.remove(producerId);
1815    }
1816
1817    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1818        dispatchers.put(consumerId, dispatcher);
1819    }
1820
1821    public void removeDispatcher(ConsumerId consumerId) {
1822        dispatchers.remove(consumerId);
1823    }
1824
1825    public boolean hasDispatcher(ConsumerId consumerId) {
1826        return dispatchers.containsKey(consumerId);
1827    }
1828
1829    /**
1830     * @param o - the command to consume
1831     */
1832    @Override
1833    public void onCommand(final Object o) {
1834        final Command command = (Command)o;
1835        if (!closed.get() && command != null) {
1836            try {
1837                command.visit(new CommandVisitorAdapter() {
1838                    @Override
1839                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1840                        waitForTransportInterruptionProcessingToComplete();
1841                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1842                        if (dispatcher != null) {
1843                            // Copy in case a embedded broker is dispatching via
1844                            // vm://
1845                            // md.getMessage() == null to signal end of queue
1846                            // browse.
1847                            Message msg = md.getMessage();
1848                            if (msg != null) {
1849                                msg = msg.copy();
1850                                msg.setReadOnlyBody(true);
1851                                msg.setReadOnlyProperties(true);
1852                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1853                                msg.setConnection(ActiveMQConnection.this);
1854                                msg.setMemoryUsage(null);
1855                                md.setMessage(msg);
1856                            }
1857                            dispatcher.dispatch(md);
1858                        } else {
1859                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1860                        }
1861                        return null;
1862                    }
1863
1864                    @Override
1865                    public Response processProducerAck(ProducerAck pa) throws Exception {
1866                        if (pa != null && pa.getProducerId() != null) {
1867                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1868                            if (producer != null) {
1869                                producer.onProducerAck(pa);
1870                            }
1871                        }
1872                        return null;
1873                    }
1874
1875                    @Override
1876                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1877                        brokerInfo = info;
1878                        brokerInfoReceived.countDown();
1879                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1880                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1881                        return null;
1882                    }
1883
1884                    @Override
1885                    public Response processConnectionError(final ConnectionError error) throws Exception {
1886                        executor.execute(new Runnable() {
1887                            @Override
1888                            public void run() {
1889                                onAsyncException(error.getException());
1890                            }
1891                        });
1892                        return null;
1893                    }
1894
1895                    @Override
1896                    public Response processControlCommand(ControlCommand command) throws Exception {
1897                        onControlCommand(command);
1898                        return null;
1899                    }
1900
1901                    @Override
1902                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1903                        onConnectionControl((ConnectionControl)command);
1904                        return null;
1905                    }
1906
1907                    @Override
1908                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1909                        onConsumerControl((ConsumerControl)command);
1910                        return null;
1911                    }
1912
1913                    @Override
1914                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1915                        onWireFormatInfo((WireFormatInfo)command);
1916                        return null;
1917                    }
1918                });
1919            } catch (Exception e) {
1920                onClientInternalException(e);
1921            }
1922        }
1923
1924        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1925            TransportListener listener = iter.next();
1926            listener.onCommand(command);
1927        }
1928    }
1929
1930    protected void onWireFormatInfo(WireFormatInfo info) {
1931        protocolVersion.set(info.getVersion());
1932    }
1933
1934    /**
1935     * Handles async client internal exceptions.
1936     * A client internal exception is usually one that has been thrown
1937     * by a container runtime component during asynchronous processing of a
1938     * message that does not affect the connection itself.
1939     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1940     * its <code>onException</code> method, if one has been registered with this connection.
1941     *
1942     * @param error the exception that the problem
1943     */
1944    public void onClientInternalException(final Throwable error) {
1945        if ( !closed.get() && !closing.get() ) {
1946            if ( this.clientInternalExceptionListener != null ) {
1947                executor.execute(new Runnable() {
1948                    @Override
1949                    public void run() {
1950                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1951                    }
1952                });
1953            } else {
1954                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1955                        + error, error);
1956            }
1957        }
1958    }
1959
1960    /**
1961     * Used for handling async exceptions
1962     *
1963     * @param error
1964     */
1965    public void onAsyncException(Throwable error) {
1966        if (!closed.get() && !closing.get()) {
1967            if (this.exceptionListener != null) {
1968
1969                if (!(error instanceof JMSException)) {
1970                    error = JMSExceptionSupport.create(error);
1971                }
1972                final JMSException e = (JMSException)error;
1973
1974                executor.execute(new Runnable() {
1975                    @Override
1976                    public void run() {
1977                        ActiveMQConnection.this.exceptionListener.onException(e);
1978                    }
1979                });
1980
1981            } else {
1982                LOG.debug("Async exception with no exception listener: " + error, error);
1983            }
1984        }
1985    }
1986
1987    @Override
1988    public void onException(final IOException error) {
1989        onAsyncException(error);
1990        if (!closing.get() && !closed.get()) {
1991            executor.execute(new Runnable() {
1992                @Override
1993                public void run() {
1994                    transportFailed(error);
1995                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1996                    brokerInfoReceived.countDown();
1997                    try {
1998                        doCleanup(true);
1999                    } catch (JMSException e) {
2000                        LOG.warn("Exception during connection cleanup, " + e, e);
2001                    }
2002                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2003                        TransportListener listener = iter.next();
2004                        listener.onException(error);
2005                    }
2006                }
2007            });
2008        }
2009    }
2010
2011    @Override
2012    public void transportInterupted() {
2013        transportInterruptionProcessingComplete.set(1);
2014        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2015            ActiveMQSession s = i.next();
2016            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
2017        }
2018
2019        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
2020            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
2021        }
2022
2023        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
2024            if (LOG.isDebugEnabled()) {
2025                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2026            }
2027            signalInterruptionProcessingNeeded();
2028        }
2029
2030        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2031            TransportListener listener = iter.next();
2032            listener.transportInterupted();
2033        }
2034    }
2035
2036    @Override
2037    public void transportResumed() {
2038        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2039            TransportListener listener = iter.next();
2040            listener.transportResumed();
2041        }
2042    }
2043
2044    /**
2045     * Create the DestinationInfo object for the temporary destination.
2046     *
2047     * @param topic - if its true topic, else queue.
2048     * @return DestinationInfo
2049     * @throws JMSException
2050     */
2051    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2052
2053        // Check if Destination info is of temporary type.
2054        ActiveMQTempDestination dest;
2055        if (topic) {
2056            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2057        } else {
2058            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2059        }
2060
2061        DestinationInfo info = new DestinationInfo();
2062        info.setConnectionId(this.info.getConnectionId());
2063        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2064        info.setDestination(dest);
2065        syncSendPacket(info);
2066
2067        dest.setConnection(this);
2068        activeTempDestinations.put(dest, dest);
2069        return dest;
2070    }
2071
2072    /**
2073     * @param destination
2074     * @throws JMSException
2075     */
2076    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2077
2078        checkClosedOrFailed();
2079
2080        for (ActiveMQSession session : this.sessions) {
2081            if (session.isInUse(destination)) {
2082                throw new JMSException("A consumer is consuming from the temporary destination");
2083            }
2084        }
2085
2086        activeTempDestinations.remove(destination);
2087
2088        DestinationInfo destInfo = new DestinationInfo();
2089        destInfo.setConnectionId(this.info.getConnectionId());
2090        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2091        destInfo.setDestination(destination);
2092        destInfo.setTimeout(0);
2093        syncSendPacket(destInfo);
2094    }
2095
2096    public boolean isDeleted(ActiveMQDestination dest) {
2097
2098        // If we are not watching the advisories.. then
2099        // we will assume that the temp destination does exist.
2100        if (advisoryConsumer == null) {
2101            return false;
2102        }
2103
2104        return !activeTempDestinations.containsValue(dest);
2105    }
2106
2107    public boolean isCopyMessageOnSend() {
2108        return copyMessageOnSend;
2109    }
2110
2111    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2112        return localTransactionIdGenerator;
2113    }
2114
2115    public boolean isUseCompression() {
2116        return useCompression;
2117    }
2118
2119    /**
2120     * Enables the use of compression of the message bodies
2121     */
2122    public void setUseCompression(boolean useCompression) {
2123        this.useCompression = useCompression;
2124    }
2125
2126    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2127
2128        checkClosedOrFailed();
2129        ensureConnectionInfoSent();
2130
2131        DestinationInfo info = new DestinationInfo();
2132        info.setConnectionId(this.info.getConnectionId());
2133        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2134        info.setDestination(destination);
2135        info.setTimeout(0);
2136        syncSendPacket(info);
2137    }
2138
2139    public boolean isDispatchAsync() {
2140        return dispatchAsync;
2141    }
2142
2143    /**
2144     * Enables or disables the default setting of whether or not consumers have
2145     * their messages <a
2146     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2147     * synchronously or asynchronously by the broker</a>. For non-durable
2148     * topics for example we typically dispatch synchronously by default to
2149     * minimize context switches which boost performance. However sometimes its
2150     * better to go slower to ensure that a single blocked consumer socket does
2151     * not block delivery to other consumers.
2152     *
2153     * @param asyncDispatch If true then consumers created on this connection
2154     *                will default to having their messages dispatched
2155     *                asynchronously. The default value is true.
2156     */
2157    public void setDispatchAsync(boolean asyncDispatch) {
2158        this.dispatchAsync = asyncDispatch;
2159    }
2160
2161    public boolean isObjectMessageSerializationDefered() {
2162        return objectMessageSerializationDefered;
2163    }
2164
2165    /**
2166     * When an object is set on an ObjectMessage, the JMS spec requires the
2167     * object to be serialized by that set method. Enabling this flag causes the
2168     * object to not get serialized. The object may subsequently get serialized
2169     * if the message needs to be sent over a socket or stored to disk.
2170     */
2171    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2172        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2173    }
2174
2175    /**
2176     * Unsubscribes a durable subscription that has been created by a client.
2177     * <P>
2178     * This method deletes the state being maintained on behalf of the
2179     * subscriber by its provider.
2180     * <P>
2181     * It is erroneous for a client to delete a durable subscription while there
2182     * is an active <CODE>MessageConsumer </CODE> or
2183     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2184     * message is part of a pending transaction or has not been acknowledged in
2185     * the session.
2186     *
2187     * @param name the name used to identify this subscription
2188     * @throws JMSException if the session fails to unsubscribe to the durable
2189     *                 subscription due to some internal error.
2190     * @throws InvalidDestinationException if an invalid subscription name is
2191     *                 specified.
2192     * @since 1.1
2193     */
2194    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2195        checkClosedOrFailed();
2196        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2197        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2198        rsi.setSubscriptionName(name);
2199        rsi.setClientId(getConnectionInfo().getClientId());
2200        syncSendPacket(rsi);
2201    }
2202
2203    /**
2204     * Internal send method optimized: - It does not copy the message - It can
2205     * only handle ActiveMQ messages. - You can specify if the send is async or
2206     * sync - Does not allow you to send /w a transaction.
2207     */
2208    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2209        checkClosedOrFailed();
2210
2211        if (destination.isTemporary() && isDeleted(destination)) {
2212            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2213        }
2214
2215        msg.setJMSDestination(destination);
2216        msg.setJMSDeliveryMode(deliveryMode);
2217        long expiration = 0L;
2218
2219        if (!isDisableTimeStampsByDefault()) {
2220            long timeStamp = System.currentTimeMillis();
2221            msg.setJMSTimestamp(timeStamp);
2222            if (timeToLive > 0) {
2223                expiration = timeToLive + timeStamp;
2224            }
2225        }
2226
2227        msg.setJMSExpiration(expiration);
2228        msg.setJMSPriority(priority);
2229        msg.setJMSRedelivered(false);
2230        msg.setMessageId(messageId);
2231        msg.onSend();
2232        msg.setProducerId(msg.getMessageId().getProducerId());
2233
2234        if (LOG.isDebugEnabled()) {
2235            LOG.debug("Sending message: " + msg);
2236        }
2237
2238        if (async) {
2239            asyncSendPacket(msg);
2240        } else {
2241            syncSendPacket(msg);
2242        }
2243    }
2244
2245    protected void onControlCommand(ControlCommand command) {
2246        String text = command.getCommand();
2247        if (text != null) {
2248            if ("shutdown".equals(text)) {
2249                LOG.info("JVM told to shutdown");
2250                System.exit(0);
2251            }
2252
2253            // TODO Should we handle the "close" case?
2254            // if (false && "close".equals(text)){
2255            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2256            //     try {
2257            //         close();
2258            //     } catch (JMSException e) {
2259            //     }
2260            // }
2261        }
2262    }
2263
2264    protected void onConnectionControl(ConnectionControl command) {
2265        if (command.isFaultTolerant()) {
2266            this.optimizeAcknowledge = false;
2267            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2268                ActiveMQSession s = i.next();
2269                s.setOptimizeAcknowledge(false);
2270            }
2271        }
2272    }
2273
2274    protected void onConsumerControl(ConsumerControl command) {
2275        if (command.isClose()) {
2276            for (ActiveMQSession session : this.sessions) {
2277                session.close(command.getConsumerId());
2278            }
2279        } else {
2280            for (ActiveMQSession session : this.sessions) {
2281                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2282            }
2283            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2284                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2285                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2286                    consumerInfo.setPrefetchSize(command.getPrefetch());
2287                }
2288            }
2289        }
2290    }
2291
2292    protected void transportFailed(IOException error) {
2293        transportFailed.set(true);
2294        if (firstFailureError == null) {
2295            firstFailureError = error;
2296        }
2297    }
2298
2299    /**
2300     * Should a JMS message be copied to a new JMS Message object as part of the
2301     * send() method in JMS. This is enabled by default to be compliant with the
2302     * JMS specification. You can disable it if you do not mutate JMS messages
2303     * after they are sent for a performance boost
2304     */
2305    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2306        this.copyMessageOnSend = copyMessageOnSend;
2307    }
2308
2309    @Override
2310    public String toString() {
2311        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2312    }
2313
2314    protected BlobTransferPolicy createBlobTransferPolicy() {
2315        return new BlobTransferPolicy();
2316    }
2317
2318    public int getProtocolVersion() {
2319        return protocolVersion.get();
2320    }
2321
2322    public int getProducerWindowSize() {
2323        return producerWindowSize;
2324    }
2325
2326    public void setProducerWindowSize(int producerWindowSize) {
2327        this.producerWindowSize = producerWindowSize;
2328    }
2329
2330    public void setAuditDepth(int auditDepth) {
2331        connectionAudit.setAuditDepth(auditDepth);
2332    }
2333
2334    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2335        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2336    }
2337
2338    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2339        connectionAudit.removeDispatcher(dispatcher);
2340    }
2341
2342    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2343        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2344    }
2345
2346    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2347        connectionAudit.rollbackDuplicate(dispatcher, message);
2348    }
2349
2350    public IOException getFirstFailureError() {
2351        return firstFailureError;
2352    }
2353
2354    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2355        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2356            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2357            signalInterruptionProcessingComplete();
2358        }
2359    }
2360
2361    protected void transportInterruptionProcessingComplete() {
2362        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2363            signalInterruptionProcessingComplete();
2364        }
2365    }
2366
2367    private void signalInterruptionProcessingComplete() {
2368            if (LOG.isDebugEnabled()) {
2369                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2370                        + " for:" + this.getConnectionInfo().getConnectionId());
2371            }
2372
2373            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2374            if (failoverTransport != null) {
2375                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2376                if (LOG.isDebugEnabled()) {
2377                    LOG.debug("notified failover transport (" + failoverTransport
2378                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2379                }
2380            }
2381            transportInterruptionProcessingComplete.set(0);
2382    }
2383
2384    private void signalInterruptionProcessingNeeded() {
2385        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2386        if (failoverTransport != null) {
2387            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2388            if (LOG.isDebugEnabled()) {
2389                LOG.debug("notified failover transport (" + failoverTransport
2390                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2391            }
2392        }
2393    }
2394
2395    /*
2396     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2397     * will wait to receive re dispatched messages.
2398     * default value is 0 so there is no wait by default.
2399     */
2400    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2401        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2402    }
2403
2404    public long getConsumerFailoverRedeliveryWaitPeriod() {
2405        return consumerFailoverRedeliveryWaitPeriod;
2406    }
2407
2408    protected Scheduler getScheduler() throws JMSException {
2409        Scheduler result = scheduler;
2410        if (result == null) {
2411            if (isClosing() || isClosed()) {
2412                // without lock contention report the closing state
2413                throw new ConnectionClosedException();
2414            }
2415            synchronized (this) {
2416                result = scheduler;
2417                if (result == null) {
2418                    checkClosed();
2419                    try {
2420                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2421                        result.start();
2422                        scheduler = result;
2423                    } catch(Exception e) {
2424                        throw JMSExceptionSupport.create(e);
2425                    }
2426                }
2427            }
2428        }
2429        return result;
2430    }
2431
2432    protected ThreadPoolExecutor getExecutor() {
2433        return this.executor;
2434    }
2435
2436    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2437        return sessions;
2438    }
2439
2440    /**
2441     * @return the checkForDuplicates
2442     */
2443    public boolean isCheckForDuplicates() {
2444        return this.checkForDuplicates;
2445    }
2446
2447    /**
2448     * @param checkForDuplicates the checkForDuplicates to set
2449     */
2450    public void setCheckForDuplicates(boolean checkForDuplicates) {
2451        this.checkForDuplicates = checkForDuplicates;
2452    }
2453
2454    public boolean isTransactedIndividualAck() {
2455        return transactedIndividualAck;
2456    }
2457
2458    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2459        this.transactedIndividualAck = transactedIndividualAck;
2460    }
2461
2462    public boolean isNonBlockingRedelivery() {
2463        return nonBlockingRedelivery;
2464    }
2465
2466    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2467        this.nonBlockingRedelivery = nonBlockingRedelivery;
2468    }
2469
2470    public boolean isRmIdFromConnectionId() {
2471        return rmIdFromConnectionId;
2472    }
2473
2474    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2475        this.rmIdFromConnectionId = rmIdFromConnectionId;
2476    }
2477
2478    /**
2479     * Removes any TempDestinations that this connection has cached, ignoring
2480     * any exceptions generated because the destination is in use as they should
2481     * not be removed.
2482     * Used from a pooled connection, b/c it will not be explicitly closed.
2483     */
2484    public void cleanUpTempDestinations() {
2485
2486        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2487            return;
2488        }
2489
2490        Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2491            = this.activeTempDestinations.entrySet().iterator();
2492        while(entries.hasNext()) {
2493            ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2494            try {
2495                // Only delete this temp destination if it was created from this connection. The connection used
2496                // for the advisory consumer may also have a reference to this temp destination.
2497                ActiveMQTempDestination dest = entry.getValue();
2498                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2499                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2500                    this.deleteTempDestination(entry.getValue());
2501                }
2502            } catch (Exception ex) {
2503                // the temp dest is in use so it can not be deleted.
2504                // it is ok to leave it to connection tear down phase
2505            }
2506        }
2507    }
2508
2509    /**
2510     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2511     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2512     */
2513    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2514        this.redeliveryPolicyMap = redeliveryPolicyMap;
2515    }
2516
2517    /**
2518     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2519     * Consumers when dealing with transaction messages that have been rolled back.
2520     *
2521     * @return the redeliveryPolicyMap
2522     */
2523    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2524        return redeliveryPolicyMap;
2525    }
2526
2527    public int getMaxThreadPoolSize() {
2528        return maxThreadPoolSize;
2529    }
2530
2531    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2532        this.maxThreadPoolSize = maxThreadPoolSize;
2533    }
2534
2535    /**
2536     * Enable enforcement of QueueConnection semantics.
2537     *
2538     * @return this object, useful for chaining
2539     */
2540    ActiveMQConnection enforceQueueOnlyConnection() {
2541        this.queueOnlyConnection = true;
2542        return this;
2543    }
2544
2545    public RejectedExecutionHandler getRejectedTaskHandler() {
2546        return rejectedTaskHandler;
2547    }
2548
2549    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2550        this.rejectedTaskHandler = rejectedTaskHandler;
2551    }
2552
2553    /**
2554     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2555     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2556     * will not do any background Message acknowledgment.
2557     *
2558     * @return the scheduledOptimizedAckInterval
2559     */
2560    public long getOptimizedAckScheduledAckInterval() {
2561        return optimizedAckScheduledAckInterval;
2562    }
2563
2564    /**
2565     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2566     * have been configured with optimizeAcknowledge enabled.
2567     *
2568     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2569     */
2570    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2571        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2572    }
2573
2574    /**
2575     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2576     */
2577    public boolean isConsumerExpiryCheckEnabled() {
2578        return consumerExpiryCheckEnabled;
2579    }
2580
2581    /**
2582     * Controls whether message expiration checking is done in each MessageConsumer
2583     * prior to dispatching a message.  Disabling this check can lead to consumption
2584     * of expired messages.
2585     *
2586     * @param consumerExpiryCheckEnabled
2587     *        controls whether expiration checking is done prior to dispatch.
2588     */
2589    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2590        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2591    }
2592}