001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Properties;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.CopyOnWriteArrayList;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.concurrent.atomic.AtomicInteger;
036import java.util.concurrent.atomic.AtomicReference;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import javax.transaction.xa.XAResource;
040
041import org.apache.activemq.advisory.AdvisorySupport;
042import org.apache.activemq.broker.region.ConnectionStatistics;
043import org.apache.activemq.broker.region.RegionBroker;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.BrokerInfo;
046import org.apache.activemq.command.Command;
047import org.apache.activemq.command.CommandTypes;
048import org.apache.activemq.command.ConnectionControl;
049import org.apache.activemq.command.ConnectionError;
050import org.apache.activemq.command.ConnectionId;
051import org.apache.activemq.command.ConnectionInfo;
052import org.apache.activemq.command.ConsumerControl;
053import org.apache.activemq.command.ConsumerId;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.ControlCommand;
056import org.apache.activemq.command.DataArrayResponse;
057import org.apache.activemq.command.DestinationInfo;
058import org.apache.activemq.command.ExceptionResponse;
059import org.apache.activemq.command.FlushCommand;
060import org.apache.activemq.command.IntegerResponse;
061import org.apache.activemq.command.KeepAliveInfo;
062import org.apache.activemq.command.Message;
063import org.apache.activemq.command.MessageAck;
064import org.apache.activemq.command.MessageDispatch;
065import org.apache.activemq.command.MessageDispatchNotification;
066import org.apache.activemq.command.MessagePull;
067import org.apache.activemq.command.ProducerAck;
068import org.apache.activemq.command.ProducerId;
069import org.apache.activemq.command.ProducerInfo;
070import org.apache.activemq.command.RemoveInfo;
071import org.apache.activemq.command.RemoveSubscriptionInfo;
072import org.apache.activemq.command.Response;
073import org.apache.activemq.command.SessionId;
074import org.apache.activemq.command.SessionInfo;
075import org.apache.activemq.command.ShutdownInfo;
076import org.apache.activemq.command.TransactionId;
077import org.apache.activemq.command.TransactionInfo;
078import org.apache.activemq.command.WireFormatInfo;
079import org.apache.activemq.network.DemandForwardingBridge;
080import org.apache.activemq.network.MBeanNetworkListener;
081import org.apache.activemq.network.NetworkBridgeConfiguration;
082import org.apache.activemq.network.NetworkBridgeFactory;
083import org.apache.activemq.security.MessageAuthorizationPolicy;
084import org.apache.activemq.state.CommandVisitor;
085import org.apache.activemq.state.ConnectionState;
086import org.apache.activemq.state.ConsumerState;
087import org.apache.activemq.state.ProducerState;
088import org.apache.activemq.state.SessionState;
089import org.apache.activemq.state.TransactionState;
090import org.apache.activemq.thread.Task;
091import org.apache.activemq.thread.TaskRunner;
092import org.apache.activemq.thread.TaskRunnerFactory;
093import org.apache.activemq.transaction.Transaction;
094import org.apache.activemq.transport.DefaultTransportListener;
095import org.apache.activemq.transport.ResponseCorrelator;
096import org.apache.activemq.transport.TransmitCallback;
097import org.apache.activemq.transport.Transport;
098import org.apache.activemq.transport.TransportDisposedIOException;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.MarshallingSupport;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103import org.slf4j.MDC;
104
105public class TransportConnection implements Connection, Task, CommandVisitor {
106    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
107    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
108    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
109    // Keeps track of the broker and connector that created this connection.
110    protected final Broker broker;
111    protected final TransportConnector connector;
112    // Keeps track of the state of the connections.
113    // protected final ConcurrentHashMap localConnectionStates=new
114    // ConcurrentHashMap();
115    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
116    // The broker and wireformat info that was exchanged.
117    protected BrokerInfo brokerInfo;
118    protected final List<Command> dispatchQueue = new LinkedList<Command>();
119    protected TaskRunner taskRunner;
120    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
121    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
122    private final Transport transport;
123    private MessageAuthorizationPolicy messageAuthorizationPolicy;
124    private WireFormatInfo wireFormatInfo;
125    // Used to do async dispatch.. this should perhaps be pushed down into the
126    // transport layer..
127    private boolean inServiceException;
128    private final ConnectionStatistics statistics = new ConnectionStatistics();
129    private boolean manageable;
130    private boolean slow;
131    private boolean markedCandidate;
132    private boolean blockedCandidate;
133    private boolean blocked;
134    private boolean connected;
135    private boolean active;
136    private boolean starting;
137    private boolean pendingStop;
138    private long timeStamp;
139    private final AtomicBoolean stopping = new AtomicBoolean(false);
140    private final CountDownLatch stopped = new CountDownLatch(1);
141    private final AtomicBoolean asyncException = new AtomicBoolean(false);
142    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
143    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
144    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
145    private ConnectionContext context;
146    private boolean networkConnection;
147    private boolean faultTolerantConnection;
148    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
149    private DemandForwardingBridge duplexBridge;
150    private final TaskRunnerFactory taskRunnerFactory;
151    private final TaskRunnerFactory stopTaskRunnerFactory;
152    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
153    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
154    private String duplexNetworkConnectorId;
155
156    /**
157     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
158     *                          else commands are sent async.
159     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
160     */
161    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
162                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
163        this.connector = connector;
164        this.broker = broker;
165        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
166        brokerConnectionStates = rb.getConnectionStates();
167        if (connector != null) {
168            this.statistics.setParent(connector.getStatistics());
169            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
170        }
171        this.taskRunnerFactory = taskRunnerFactory;
172        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
173        this.transport = transport;
174        final BrokerService brokerService = this.broker.getBrokerService();
175        if( this.transport instanceof BrokerServiceAware ) {
176            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
177        }
178        this.transport.setTransportListener(new DefaultTransportListener() {
179            @Override
180            public void onCommand(Object o) {
181                serviceLock.readLock().lock();
182                try {
183                    if (!(o instanceof Command)) {
184                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
185                    }
186                    Command command = (Command) o;
187                    if (!brokerService.isStopping()) {
188                        Response response = service(command);
189                        if (response != null && !brokerService.isStopping()) {
190                            dispatchSync(response);
191                        }
192                    } else {
193                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
194                    }
195                } finally {
196                    serviceLock.readLock().unlock();
197                }
198            }
199
200            @Override
201            public void onException(IOException exception) {
202                serviceLock.readLock().lock();
203                try {
204                    serviceTransportException(exception);
205                } finally {
206                    serviceLock.readLock().unlock();
207                }
208            }
209        });
210        connected = true;
211    }
212
213    /**
214     * Returns the number of messages to be dispatched to this connection
215     *
216     * @return size of dispatch queue
217     */
218    @Override
219    public int getDispatchQueueSize() {
220        synchronized (dispatchQueue) {
221            return dispatchQueue.size();
222        }
223    }
224
225    public void serviceTransportException(IOException e) {
226        BrokerService bService = connector.getBrokerService();
227        if (bService.isShutdownOnSlaveFailure()) {
228            if (brokerInfo != null) {
229                if (brokerInfo.isSlaveBroker()) {
230                    LOG.error("Slave has exception: {} shutting down master now.", e.getMessage(), e);
231                    try {
232                        doStop();
233                        bService.stop();
234                    } catch (Exception ex) {
235                        LOG.warn("Failed to stop the master", ex);
236                    }
237                }
238            }
239        }
240        if (!stopping.get() && !pendingStop) {
241            transportException.set(e);
242            if (TRANSPORTLOG.isDebugEnabled()) {
243                TRANSPORTLOG.debug(this + " failed: " + e, e);
244            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
245                TRANSPORTLOG.warn(this + " failed: " + e);
246            }
247            stopAsync(e);
248        }
249    }
250
251    private boolean expected(IOException e) {
252        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
253    }
254
255    private boolean isStomp() {
256        URI uri = connector.getUri();
257        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
258    }
259
260    /**
261     * Calls the serviceException method in an async thread. Since handling a
262     * service exception closes a socket, we should not tie up broker threads
263     * since client sockets may hang or cause deadlocks.
264     */
265    @Override
266    public void serviceExceptionAsync(final IOException e) {
267        if (asyncException.compareAndSet(false, true)) {
268            new Thread("Async Exception Handler") {
269                @Override
270                public void run() {
271                    serviceException(e);
272                }
273            }.start();
274        }
275    }
276
277    /**
278     * Closes a clients connection due to a detected error. Errors are ignored
279     * if: the client is closing or broker is closing. Otherwise, the connection
280     * error transmitted to the client before stopping it's transport.
281     */
282    @Override
283    public void serviceException(Throwable e) {
284        // are we a transport exception such as not being able to dispatch
285        // synchronously to a transport
286        if (e instanceof IOException) {
287            serviceTransportException((IOException) e);
288        } else if (e.getClass() == BrokerStoppedException.class) {
289            // Handle the case where the broker is stopped
290            // But the client is still connected.
291            if (!stopping.get()) {
292                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
293                ConnectionError ce = new ConnectionError();
294                ce.setException(e);
295                dispatchSync(ce);
296                // Record the error that caused the transport to stop
297                transportException.set(e);
298                // Wait a little bit to try to get the output buffer to flush
299                // the exception notification to the client.
300                try {
301                    Thread.sleep(500);
302                } catch (InterruptedException ie) {
303                    Thread.currentThread().interrupt();
304                }
305                // Worst case is we just kill the connection before the
306                // notification gets to him.
307                stopAsync();
308            }
309        } else if (!stopping.get() && !inServiceException) {
310            inServiceException = true;
311            try {
312                SERVICELOG.warn("Async error occurred: ", e);
313                ConnectionError ce = new ConnectionError();
314                ce.setException(e);
315                if (pendingStop) {
316                    dispatchSync(ce);
317                } else {
318                    dispatchAsync(ce);
319                }
320            } finally {
321                inServiceException = false;
322            }
323        }
324    }
325
326    @Override
327    public Response service(Command command) {
328        MDC.put("activemq.connector", connector.getUri().toString());
329        Response response = null;
330        boolean responseRequired = command.isResponseRequired();
331        int commandId = command.getCommandId();
332        try {
333            if (!pendingStop) {
334                response = command.visit(this);
335            } else {
336                response = new ExceptionResponse(transportException.get());
337            }
338        } catch (Throwable e) {
339            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
340                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
341                        + " command: " + command + ", exception: " + e, e);
342            }
343
344            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
345                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
346                responseRequired = false;
347            }
348
349            if (responseRequired) {
350                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
351                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
352                            transport.getRemoteAddress(), e.getMessage());
353                }
354                response = new ExceptionResponse(e);
355            } else {
356                serviceException(e);
357            }
358        }
359        if (responseRequired) {
360            if (response == null) {
361                response = new Response();
362            }
363            response.setCorrelationId(commandId);
364        }
365        // The context may have been flagged so that the response is not
366        // sent.
367        if (context != null) {
368            if (context.isDontSendReponse()) {
369                context.setDontSendReponse(false);
370                response = null;
371            }
372            context = null;
373        }
374        MDC.remove("activemq.connector");
375        return response;
376    }
377
378    @Override
379    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
380        return null;
381    }
382
383    @Override
384    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
385        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
386        return null;
387    }
388
389    @Override
390    public Response processWireFormat(WireFormatInfo info) throws Exception {
391        wireFormatInfo = info;
392        protocolVersion.set(info.getVersion());
393        return null;
394    }
395
396    @Override
397    public Response processShutdown(ShutdownInfo info) throws Exception {
398        stopAsync();
399        return null;
400    }
401
402    @Override
403    public Response processFlush(FlushCommand command) throws Exception {
404        return null;
405    }
406
407    @Override
408    public Response processBeginTransaction(TransactionInfo info) throws Exception {
409        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
410        context = null;
411        if (cs != null) {
412            context = cs.getContext();
413        }
414        if (cs == null) {
415            throw new NullPointerException("Context is null");
416        }
417        // Avoid replaying dup commands
418        if (cs.getTransactionState(info.getTransactionId()) == null) {
419            cs.addTransactionState(info.getTransactionId());
420            broker.beginTransaction(context, info.getTransactionId());
421        }
422        return null;
423    }
424
425    @Override
426    public int getActiveTransactionCount() {
427        int rc = 0;
428        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
429            Collection<TransactionState> transactions = cs.getTransactionStates();
430            for (TransactionState transaction : transactions) {
431                rc++;
432            }
433        }
434        return rc;
435    }
436
437    @Override
438    public Long getOldestActiveTransactionDuration() {
439        TransactionState oldestTX = null;
440        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
441            Collection<TransactionState> transactions = cs.getTransactionStates();
442            for (TransactionState transaction : transactions) {
443                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
444                    oldestTX = transaction;
445                }
446            }
447        }
448        if( oldestTX == null ) {
449            return null;
450        }
451        return System.currentTimeMillis() - oldestTX.getCreatedAt();
452    }
453
454    @Override
455    public Response processEndTransaction(TransactionInfo info) throws Exception {
456        // No need to do anything. This packet is just sent by the client
457        // make sure he is synced with the server as commit command could
458        // come from a different connection.
459        return null;
460    }
461
462    @Override
463    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
464        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
465        context = null;
466        if (cs != null) {
467            context = cs.getContext();
468        }
469        if (cs == null) {
470            throw new NullPointerException("Context is null");
471        }
472        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
473        if (transactionState == null) {
474            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
475                    + info.getTransactionId());
476        }
477        // Avoid dups.
478        if (!transactionState.isPrepared()) {
479            transactionState.setPrepared(true);
480            int result = broker.prepareTransaction(context, info.getTransactionId());
481            transactionState.setPreparedResult(result);
482            if (result == XAResource.XA_RDONLY) {
483                // we are done, no further rollback or commit from TM
484                cs.removeTransactionState(info.getTransactionId());
485            }
486            IntegerResponse response = new IntegerResponse(result);
487            return response;
488        } else {
489            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
490            return response;
491        }
492    }
493
494    @Override
495    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
496        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
497        context = cs.getContext();
498        cs.removeTransactionState(info.getTransactionId());
499        broker.commitTransaction(context, info.getTransactionId(), true);
500        return null;
501    }
502
503    @Override
504    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
505        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
506        context = cs.getContext();
507        cs.removeTransactionState(info.getTransactionId());
508        broker.commitTransaction(context, info.getTransactionId(), false);
509        return null;
510    }
511
512    @Override
513    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
514        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
515        context = cs.getContext();
516        cs.removeTransactionState(info.getTransactionId());
517        broker.rollbackTransaction(context, info.getTransactionId());
518        return null;
519    }
520
521    @Override
522    public Response processForgetTransaction(TransactionInfo info) throws Exception {
523        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
524        context = cs.getContext();
525        broker.forgetTransaction(context, info.getTransactionId());
526        return null;
527    }
528
529    @Override
530    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
531        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
532        context = cs.getContext();
533        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
534        return new DataArrayResponse(preparedTransactions);
535    }
536
537    @Override
538    public Response processMessage(Message messageSend) throws Exception {
539        ProducerId producerId = messageSend.getProducerId();
540        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
541        if (producerExchange.canDispatch(messageSend)) {
542            broker.send(producerExchange, messageSend);
543        }
544        return null;
545    }
546
547    @Override
548    public Response processMessageAck(MessageAck ack) throws Exception {
549        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
550        if (consumerExchange != null) {
551            broker.acknowledge(consumerExchange, ack);
552        } else if (ack.isInTransaction()) {
553            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
554        }
555        return null;
556    }
557
558    @Override
559    public Response processMessagePull(MessagePull pull) throws Exception {
560        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
561    }
562
563    @Override
564    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
565        broker.processDispatchNotification(notification);
566        return null;
567    }
568
569    @Override
570    public Response processAddDestination(DestinationInfo info) throws Exception {
571        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
572        broker.addDestinationInfo(cs.getContext(), info);
573        if (info.getDestination().isTemporary()) {
574            cs.addTempDestination(info);
575        }
576        return null;
577    }
578
579    @Override
580    public Response processRemoveDestination(DestinationInfo info) throws Exception {
581        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
582        broker.removeDestinationInfo(cs.getContext(), info);
583        if (info.getDestination().isTemporary()) {
584            cs.removeTempDestination(info.getDestination());
585        }
586        return null;
587    }
588
589    @Override
590    public Response processAddProducer(ProducerInfo info) throws Exception {
591        SessionId sessionId = info.getProducerId().getParentId();
592        ConnectionId connectionId = sessionId.getParentId();
593        TransportConnectionState cs = lookupConnectionState(connectionId);
594        if (cs == null) {
595            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
596                    + connectionId);
597        }
598        SessionState ss = cs.getSessionState(sessionId);
599        if (ss == null) {
600            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
601                    + sessionId);
602        }
603        // Avoid replaying dup commands
604        if (!ss.getProducerIds().contains(info.getProducerId())) {
605            ActiveMQDestination destination = info.getDestination();
606            // Do not check for null here as it would cause the count of max producers to exclude
607            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
608            // call it from here with a null Destination value.
609            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
610                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
611                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
612                }
613            }
614            broker.addProducer(cs.getContext(), info);
615            try {
616                ss.addProducer(info);
617            } catch (IllegalStateException e) {
618                broker.removeProducer(cs.getContext(), info);
619            }
620
621        }
622        return null;
623    }
624
625    @Override
626    public Response processRemoveProducer(ProducerId id) throws Exception {
627        SessionId sessionId = id.getParentId();
628        ConnectionId connectionId = sessionId.getParentId();
629        TransportConnectionState cs = lookupConnectionState(connectionId);
630        SessionState ss = cs.getSessionState(sessionId);
631        if (ss == null) {
632            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
633                    + sessionId);
634        }
635        ProducerState ps = ss.removeProducer(id);
636        if (ps == null) {
637            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
638        }
639        removeProducerBrokerExchange(id);
640        broker.removeProducer(cs.getContext(), ps.getInfo());
641        return null;
642    }
643
644    @Override
645    public Response processAddConsumer(ConsumerInfo info) throws Exception {
646        SessionId sessionId = info.getConsumerId().getParentId();
647        ConnectionId connectionId = sessionId.getParentId();
648        TransportConnectionState cs = lookupConnectionState(connectionId);
649        if (cs == null) {
650            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
651                    + connectionId);
652        }
653        SessionState ss = cs.getSessionState(sessionId);
654        if (ss == null) {
655            throw new IllegalStateException(broker.getBrokerName()
656                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
657        }
658        // Avoid replaying dup commands
659        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
660            ActiveMQDestination destination = info.getDestination();
661            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
662                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
663                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
664                }
665            }
666
667            broker.addConsumer(cs.getContext(), info);
668            try {
669                ss.addConsumer(info);
670                addConsumerBrokerExchange(info.getConsumerId());
671            } catch (IllegalStateException e) {
672                broker.removeConsumer(cs.getContext(), info);
673            }
674
675        }
676        return null;
677    }
678
679    @Override
680    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
681        SessionId sessionId = id.getParentId();
682        ConnectionId connectionId = sessionId.getParentId();
683        TransportConnectionState cs = lookupConnectionState(connectionId);
684        if (cs == null) {
685            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
686                    + connectionId);
687        }
688        SessionState ss = cs.getSessionState(sessionId);
689        if (ss == null) {
690            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
691                    + sessionId);
692        }
693        ConsumerState consumerState = ss.removeConsumer(id);
694        if (consumerState == null) {
695            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
696        }
697        ConsumerInfo info = consumerState.getInfo();
698        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
699        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
700        removeConsumerBrokerExchange(id);
701        return null;
702    }
703
704    @Override
705    public Response processAddSession(SessionInfo info) throws Exception {
706        ConnectionId connectionId = info.getSessionId().getParentId();
707        TransportConnectionState cs = lookupConnectionState(connectionId);
708        // Avoid replaying dup commands
709        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
710            broker.addSession(cs.getContext(), info);
711            try {
712                cs.addSession(info);
713            } catch (IllegalStateException e) {
714                e.printStackTrace();
715                broker.removeSession(cs.getContext(), info);
716            }
717        }
718        return null;
719    }
720
721    @Override
722    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
723        ConnectionId connectionId = id.getParentId();
724        TransportConnectionState cs = lookupConnectionState(connectionId);
725        if (cs == null) {
726            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
727        }
728        SessionState session = cs.getSessionState(id);
729        if (session == null) {
730            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
731        }
732        // Don't let new consumers or producers get added while we are closing
733        // this down.
734        session.shutdown();
735        // Cascade the connection stop to the consumers and producers.
736        for (ConsumerId consumerId : session.getConsumerIds()) {
737            try {
738                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
739            } catch (Throwable e) {
740                LOG.warn("Failed to remove consumer: {}", consumerId, e);
741            }
742        }
743        for (ProducerId producerId : session.getProducerIds()) {
744            try {
745                processRemoveProducer(producerId);
746            } catch (Throwable e) {
747                LOG.warn("Failed to remove producer: {}", producerId, e);
748            }
749        }
750        cs.removeSession(id);
751        broker.removeSession(cs.getContext(), session.getInfo());
752        return null;
753    }
754
755    @Override
756    public Response processAddConnection(ConnectionInfo info) throws Exception {
757        // Older clients should have been defaulting this field to true.. but
758        // they were not.
759        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
760            info.setClientMaster(true);
761        }
762        TransportConnectionState state;
763        // Make sure 2 concurrent connections by the same ID only generate 1
764        // TransportConnectionState object.
765        synchronized (brokerConnectionStates) {
766            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
767            if (state == null) {
768                state = new TransportConnectionState(info, this);
769                brokerConnectionStates.put(info.getConnectionId(), state);
770            }
771            state.incrementReference();
772        }
773        // If there are 2 concurrent connections for the same connection id,
774        // then last one in wins, we need to sync here
775        // to figure out the winner.
776        synchronized (state.getConnectionMutex()) {
777            if (state.getConnection() != this) {
778                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
779                state.getConnection().stop();
780                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
781                state.setConnection(this);
782                state.reset(info);
783            }
784        }
785        registerConnectionState(info.getConnectionId(), state);
786        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
787        this.faultTolerantConnection = info.isFaultTolerant();
788        // Setup the context.
789        String clientId = info.getClientId();
790        context = new ConnectionContext();
791        context.setBroker(broker);
792        context.setClientId(clientId);
793        context.setClientMaster(info.isClientMaster());
794        context.setConnection(this);
795        context.setConnectionId(info.getConnectionId());
796        context.setConnector(connector);
797        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
798        context.setNetworkConnection(networkConnection);
799        context.setFaultTolerant(faultTolerantConnection);
800        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
801        context.setUserName(info.getUserName());
802        context.setWireFormatInfo(wireFormatInfo);
803        context.setReconnect(info.isFailoverReconnect());
804        this.manageable = info.isManageable();
805        context.setConnectionState(state);
806        state.setContext(context);
807        state.setConnection(this);
808        if (info.getClientIp() == null) {
809            info.setClientIp(getRemoteAddress());
810        }
811
812        try {
813            broker.addConnection(context, info);
814        } catch (Exception e) {
815            synchronized (brokerConnectionStates) {
816                brokerConnectionStates.remove(info.getConnectionId());
817            }
818            unregisterConnectionState(info.getConnectionId());
819            LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e);
820            if (e instanceof SecurityException) {
821                // close this down - in case the peer of this transport doesn't play nice
822                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
823            }
824            throw e;
825        }
826        if (info.isManageable()) {
827            // send ConnectionCommand
828            ConnectionControl command = this.connector.getConnectionControl();
829            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
830            if (info.isFailoverReconnect()) {
831                command.setRebalanceConnection(false);
832            }
833            dispatchAsync(command);
834        }
835        return null;
836    }
837
838    @Override
839    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
840            throws InterruptedException {
841        LOG.debug("remove connection id: {}", id);
842        TransportConnectionState cs = lookupConnectionState(id);
843        if (cs != null) {
844            // Don't allow things to be added to the connection state while we
845            // are shutting down.
846            cs.shutdown();
847            // Cascade the connection stop to the sessions.
848            for (SessionId sessionId : cs.getSessionIds()) {
849                try {
850                    processRemoveSession(sessionId, lastDeliveredSequenceId);
851                } catch (Throwable e) {
852                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
853                }
854            }
855            // Cascade the connection stop to temp destinations.
856            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
857                DestinationInfo di = iter.next();
858                try {
859                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
860                } catch (Throwable e) {
861                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
862                }
863                iter.remove();
864            }
865            try {
866                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
867            } catch (Throwable e) {
868                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
869            }
870            TransportConnectionState state = unregisterConnectionState(id);
871            if (state != null) {
872                synchronized (brokerConnectionStates) {
873                    // If we are the last reference, we should remove the state
874                    // from the broker.
875                    if (state.decrementReference() == 0) {
876                        brokerConnectionStates.remove(id);
877                    }
878                }
879            }
880        }
881        return null;
882    }
883
884    @Override
885    public Response processProducerAck(ProducerAck ack) throws Exception {
886        // A broker should not get ProducerAck messages.
887        return null;
888    }
889
890    @Override
891    public Connector getConnector() {
892        return connector;
893    }
894
895    @Override
896    public void dispatchSync(Command message) {
897        try {
898            processDispatch(message);
899        } catch (IOException e) {
900            serviceExceptionAsync(e);
901        }
902    }
903
904    @Override
905    public void dispatchAsync(Command message) {
906        if (!stopping.get()) {
907            if (taskRunner == null) {
908                dispatchSync(message);
909            } else {
910                synchronized (dispatchQueue) {
911                    dispatchQueue.add(message);
912                }
913                try {
914                    taskRunner.wakeup();
915                } catch (InterruptedException e) {
916                    Thread.currentThread().interrupt();
917                }
918            }
919        } else {
920            if (message.isMessageDispatch()) {
921                MessageDispatch md = (MessageDispatch) message;
922                TransmitCallback sub = md.getTransmitCallback();
923                broker.postProcessDispatch(md);
924                if (sub != null) {
925                    sub.onFailure();
926                }
927            }
928        }
929    }
930
931    protected void processDispatch(Command command) throws IOException {
932        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
933        try {
934            if (!stopping.get()) {
935                if (messageDispatch != null) {
936                    try {
937                        broker.preProcessDispatch(messageDispatch);
938                    } catch (RuntimeException convertToIO) {
939                        throw new IOException(convertToIO);
940                    }
941                }
942                dispatch(command);
943            }
944        } catch (IOException e) {
945            if (messageDispatch != null) {
946                TransmitCallback sub = messageDispatch.getTransmitCallback();
947                broker.postProcessDispatch(messageDispatch);
948                if (sub != null) {
949                    sub.onFailure();
950                }
951                messageDispatch = null;
952                throw e;
953            }
954        } finally {
955            if (messageDispatch != null) {
956                TransmitCallback sub = messageDispatch.getTransmitCallback();
957                broker.postProcessDispatch(messageDispatch);
958                if (sub != null) {
959                    sub.onSuccess();
960                }
961            }
962        }
963    }
964
965    @Override
966    public boolean iterate() {
967        try {
968            if (pendingStop || stopping.get()) {
969                if (dispatchStopped.compareAndSet(false, true)) {
970                    if (transportException.get() == null) {
971                        try {
972                            dispatch(new ShutdownInfo());
973                        } catch (Throwable ignore) {
974                        }
975                    }
976                    dispatchStoppedLatch.countDown();
977                }
978                return false;
979            }
980            if (!dispatchStopped.get()) {
981                Command command = null;
982                synchronized (dispatchQueue) {
983                    if (dispatchQueue.isEmpty()) {
984                        return false;
985                    }
986                    command = dispatchQueue.remove(0);
987                }
988                processDispatch(command);
989                return true;
990            }
991            return false;
992        } catch (IOException e) {
993            if (dispatchStopped.compareAndSet(false, true)) {
994                dispatchStoppedLatch.countDown();
995            }
996            serviceExceptionAsync(e);
997            return false;
998        }
999    }
1000
1001    /**
1002     * Returns the statistics for this connection
1003     */
1004    @Override
1005    public ConnectionStatistics getStatistics() {
1006        return statistics;
1007    }
1008
1009    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1010        return messageAuthorizationPolicy;
1011    }
1012
1013    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1014        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1015    }
1016
1017    @Override
1018    public boolean isManageable() {
1019        return manageable;
1020    }
1021
1022    @Override
1023    public void start() throws Exception {
1024        try {
1025            synchronized (this) {
1026                starting = true;
1027                if (taskRunnerFactory != null) {
1028                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1029                            + getRemoteAddress());
1030                } else {
1031                    taskRunner = null;
1032                }
1033                transport.start();
1034                active = true;
1035                BrokerInfo info = connector.getBrokerInfo().copy();
1036                if (connector.isUpdateClusterClients()) {
1037                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1038                } else {
1039                    info.setPeerBrokerInfos(null);
1040                }
1041                dispatchAsync(info);
1042
1043                connector.onStarted(this);
1044            }
1045        } catch (Exception e) {
1046            // Force clean up on an error starting up.
1047            pendingStop = true;
1048            throw e;
1049        } finally {
1050            // stop() can be called from within the above block,
1051            // but we want to be sure start() completes before
1052            // stop() runs, so queue the stop until right now:
1053            setStarting(false);
1054            if (isPendingStop()) {
1055                LOG.debug("Calling the delayed stop() after start() {}", this);
1056                stop();
1057            }
1058        }
1059    }
1060
1061    @Override
1062    public void stop() throws Exception {
1063        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1064        // as their lifecycle is handled elsewhere
1065
1066        stopAsync();
1067        while (!stopped.await(5, TimeUnit.SECONDS)) {
1068            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1069        }
1070    }
1071
1072    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1073        if (waitTime > 0) {
1074            synchronized (this) {
1075                pendingStop = true;
1076                transportException.set(cause);
1077            }
1078            try {
1079                stopTaskRunnerFactory.execute(new Runnable() {
1080                    @Override
1081                    public void run() {
1082                        try {
1083                            Thread.sleep(waitTime);
1084                            stopAsync();
1085                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1086                        } catch (InterruptedException e) {
1087                        }
1088                    }
1089                });
1090            } catch (Throwable t) {
1091                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1092            }
1093        }
1094    }
1095
1096    public void stopAsync(Throwable cause) {
1097        transportException.set(cause);
1098        stopAsync();
1099    }
1100
1101    public void stopAsync() {
1102        // If we're in the middle of starting then go no further... for now.
1103        synchronized (this) {
1104            pendingStop = true;
1105            if (starting) {
1106                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1107                return;
1108            }
1109        }
1110        if (stopping.compareAndSet(false, true)) {
1111            // Let all the connection contexts know we are shutting down
1112            // so that in progress operations can notice and unblock.
1113            List<TransportConnectionState> connectionStates = listConnectionStates();
1114            for (TransportConnectionState cs : connectionStates) {
1115                ConnectionContext connectionContext = cs.getContext();
1116                if (connectionContext != null) {
1117                    connectionContext.getStopping().set(true);
1118                }
1119            }
1120            try {
1121                stopTaskRunnerFactory.execute(new Runnable() {
1122                    @Override
1123                    public void run() {
1124                        serviceLock.writeLock().lock();
1125                        try {
1126                            doStop();
1127                        } catch (Throwable e) {
1128                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1129                        } finally {
1130                            stopped.countDown();
1131                            serviceLock.writeLock().unlock();
1132                        }
1133                    }
1134                });
1135            } catch (Throwable t) {
1136                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1137                stopped.countDown();
1138            }
1139        }
1140    }
1141
1142    @Override
1143    public String toString() {
1144        return "Transport Connection to: " + transport.getRemoteAddress();
1145    }
1146
1147    protected void doStop() throws Exception {
1148        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1149        connector.onStopped(this);
1150        try {
1151            synchronized (this) {
1152                if (duplexBridge != null) {
1153                    duplexBridge.stop();
1154                }
1155            }
1156        } catch (Exception ignore) {
1157            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1158        }
1159        try {
1160            transport.stop();
1161            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1162        } catch (Exception e) {
1163            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1164        }
1165        if (taskRunner != null) {
1166            taskRunner.shutdown(1);
1167            taskRunner = null;
1168        }
1169        active = false;
1170        // Run the MessageDispatch callbacks so that message references get
1171        // cleaned up.
1172        synchronized (dispatchQueue) {
1173            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1174                Command command = iter.next();
1175                if (command.isMessageDispatch()) {
1176                    MessageDispatch md = (MessageDispatch) command;
1177                    TransmitCallback sub = md.getTransmitCallback();
1178                    broker.postProcessDispatch(md);
1179                    if (sub != null) {
1180                        sub.onFailure();
1181                    }
1182                }
1183            }
1184            dispatchQueue.clear();
1185        }
1186        //
1187        // Remove all logical connection associated with this connection
1188        // from the broker.
1189        if (!broker.isStopped()) {
1190            List<TransportConnectionState> connectionStates = listConnectionStates();
1191            connectionStates = listConnectionStates();
1192            for (TransportConnectionState cs : connectionStates) {
1193                cs.getContext().getStopping().set(true);
1194                try {
1195                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1196                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1197                } catch (Throwable ignore) {
1198                    ignore.printStackTrace();
1199                }
1200            }
1201        }
1202        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1203    }
1204
1205    /**
1206     * @return Returns the blockedCandidate.
1207     */
1208    public boolean isBlockedCandidate() {
1209        return blockedCandidate;
1210    }
1211
1212    /**
1213     * @param blockedCandidate The blockedCandidate to set.
1214     */
1215    public void setBlockedCandidate(boolean blockedCandidate) {
1216        this.blockedCandidate = blockedCandidate;
1217    }
1218
1219    /**
1220     * @return Returns the markedCandidate.
1221     */
1222    public boolean isMarkedCandidate() {
1223        return markedCandidate;
1224    }
1225
1226    /**
1227     * @param markedCandidate The markedCandidate to set.
1228     */
1229    public void setMarkedCandidate(boolean markedCandidate) {
1230        this.markedCandidate = markedCandidate;
1231        if (!markedCandidate) {
1232            timeStamp = 0;
1233            blockedCandidate = false;
1234        }
1235    }
1236
1237    /**
1238     * @param slow The slow to set.
1239     */
1240    public void setSlow(boolean slow) {
1241        this.slow = slow;
1242    }
1243
1244    /**
1245     * @return true if the Connection is slow
1246     */
1247    @Override
1248    public boolean isSlow() {
1249        return slow;
1250    }
1251
1252    /**
1253     * @return true if the Connection is potentially blocked
1254     */
1255    public boolean isMarkedBlockedCandidate() {
1256        return markedCandidate;
1257    }
1258
1259    /**
1260     * Mark the Connection, so we can deem if it's collectable on the next sweep
1261     */
1262    public void doMark() {
1263        if (timeStamp == 0) {
1264            timeStamp = System.currentTimeMillis();
1265        }
1266    }
1267
1268    /**
1269     * @return if after being marked, the Connection is still writing
1270     */
1271    @Override
1272    public boolean isBlocked() {
1273        return blocked;
1274    }
1275
1276    /**
1277     * @return true if the Connection is connected
1278     */
1279    @Override
1280    public boolean isConnected() {
1281        return connected;
1282    }
1283
1284    /**
1285     * @param blocked The blocked to set.
1286     */
1287    public void setBlocked(boolean blocked) {
1288        this.blocked = blocked;
1289    }
1290
1291    /**
1292     * @param connected The connected to set.
1293     */
1294    public void setConnected(boolean connected) {
1295        this.connected = connected;
1296    }
1297
1298    /**
1299     * @return true if the Connection is active
1300     */
1301    @Override
1302    public boolean isActive() {
1303        return active;
1304    }
1305
1306    /**
1307     * @param active The active to set.
1308     */
1309    public void setActive(boolean active) {
1310        this.active = active;
1311    }
1312
1313    /**
1314     * @return true if the Connection is starting
1315     */
1316    public synchronized boolean isStarting() {
1317        return starting;
1318    }
1319
1320    @Override
1321    public synchronized boolean isNetworkConnection() {
1322        return networkConnection;
1323    }
1324
1325    @Override
1326    public boolean isFaultTolerantConnection() {
1327        return this.faultTolerantConnection;
1328    }
1329
1330    protected synchronized void setStarting(boolean starting) {
1331        this.starting = starting;
1332    }
1333
1334    /**
1335     * @return true if the Connection needs to stop
1336     */
1337    public synchronized boolean isPendingStop() {
1338        return pendingStop;
1339    }
1340
1341    protected synchronized void setPendingStop(boolean pendingStop) {
1342        this.pendingStop = pendingStop;
1343    }
1344
1345    @Override
1346    public Response processBrokerInfo(BrokerInfo info) {
1347        if (info.isSlaveBroker()) {
1348            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1349        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1350            // so this TransportConnection is the rear end of a network bridge
1351            // We have been requested to create a two way pipe ...
1352            try {
1353                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1354                Map<String, String> props = createMap(properties);
1355                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1356                IntrospectionSupport.setProperties(config, props, "");
1357                config.setBrokerName(broker.getBrokerName());
1358
1359                // check for existing duplex connection hanging about
1360
1361                // We first look if existing network connection already exists for the same broker Id and network connector name
1362                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1363                // and the duplex network connector side wanting to open a new one
1364                // In this case, the old connection must be broken
1365                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1366                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1367                synchronized (connections) {
1368                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1369                        TransportConnection c = iter.next();
1370                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1371                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1372                            c.stopAsync();
1373                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1374                            c.getStopped().await(1, TimeUnit.SECONDS);
1375                        }
1376                    }
1377                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1378                }
1379                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
1380                Transport remoteBridgeTransport = transport;
1381                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1382                    // the vm transport case is already wrapped
1383                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1384                }
1385                String duplexName = localTransport.toString();
1386                if (duplexName.contains("#")) {
1387                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1388                }
1389                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1390                listener.setCreatedByDuplex(true);
1391                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1392                duplexBridge.setBrokerService(broker.getBrokerService());
1393                // now turn duplex off this side
1394                info.setDuplexConnection(false);
1395                duplexBridge.setCreatedByDuplex(true);
1396                duplexBridge.duplexStart(this, brokerInfo, info);
1397                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1398                return null;
1399            } catch (TransportDisposedIOException e) {
1400                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1401                return null;
1402            } catch (Exception e) {
1403                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1404                return null;
1405            }
1406        }
1407        // We only expect to get one broker info command per connection
1408        if (this.brokerInfo != null) {
1409            LOG.warn("Unexpected extra broker info command received: {}", info);
1410        }
1411        this.brokerInfo = info;
1412        networkConnection = true;
1413        List<TransportConnectionState> connectionStates = listConnectionStates();
1414        for (TransportConnectionState cs : connectionStates) {
1415            cs.getContext().setNetworkConnection(true);
1416        }
1417        return null;
1418    }
1419
1420    @SuppressWarnings({"unchecked", "rawtypes"})
1421    private HashMap<String, String> createMap(Properties properties) {
1422        return new HashMap(properties);
1423    }
1424
1425    protected void dispatch(Command command) throws IOException {
1426        try {
1427            setMarkedCandidate(true);
1428            transport.oneway(command);
1429        } finally {
1430            setMarkedCandidate(false);
1431        }
1432    }
1433
1434    @Override
1435    public String getRemoteAddress() {
1436        return transport.getRemoteAddress();
1437    }
1438
1439    public Transport getTransport() {
1440        return transport;
1441    }
1442
1443    @Override
1444    public String getConnectionId() {
1445        List<TransportConnectionState> connectionStates = listConnectionStates();
1446        for (TransportConnectionState cs : connectionStates) {
1447            if (cs.getInfo().getClientId() != null) {
1448                return cs.getInfo().getClientId();
1449            }
1450            return cs.getInfo().getConnectionId().toString();
1451        }
1452        return null;
1453    }
1454
1455    @Override
1456    public void updateClient(ConnectionControl control) {
1457        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1458                && this.wireFormatInfo.getVersion() >= 6) {
1459            dispatchAsync(control);
1460        }
1461    }
1462
1463    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1464        ProducerBrokerExchange result = null;
1465        if (producerInfo != null && producerInfo.getProducerId() != null){
1466            synchronized (producerExchanges){
1467                result = producerExchanges.get(producerInfo.getProducerId());
1468            }
1469        }
1470        return result;
1471    }
1472
1473    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1474        ProducerBrokerExchange result = producerExchanges.get(id);
1475        if (result == null) {
1476            synchronized (producerExchanges) {
1477                result = new ProducerBrokerExchange();
1478                TransportConnectionState state = lookupConnectionState(id);
1479                context = state.getContext();
1480                result.setConnectionContext(context);
1481                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1482                    result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1483                }
1484                SessionState ss = state.getSessionState(id.getParentId());
1485                if (ss != null) {
1486                    result.setProducerState(ss.getProducerState(id));
1487                    ProducerState producerState = ss.getProducerState(id);
1488                    if (producerState != null && producerState.getInfo() != null) {
1489                        ProducerInfo info = producerState.getInfo();
1490                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1491                    }
1492                }
1493                producerExchanges.put(id, result);
1494            }
1495        } else {
1496            context = result.getConnectionContext();
1497        }
1498        return result;
1499    }
1500
1501    private void removeProducerBrokerExchange(ProducerId id) {
1502        synchronized (producerExchanges) {
1503            producerExchanges.remove(id);
1504        }
1505    }
1506
1507    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1508        ConsumerBrokerExchange result = consumerExchanges.get(id);
1509        return result;
1510    }
1511
1512    private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1513        ConsumerBrokerExchange result = consumerExchanges.get(id);
1514        if (result == null) {
1515            synchronized (consumerExchanges) {
1516                result = new ConsumerBrokerExchange();
1517                TransportConnectionState state = lookupConnectionState(id);
1518                context = state.getContext();
1519                result.setConnectionContext(context);
1520                SessionState ss = state.getSessionState(id.getParentId());
1521                if (ss != null) {
1522                    ConsumerState cs = ss.getConsumerState(id);
1523                    if (cs != null) {
1524                        ConsumerInfo info = cs.getInfo();
1525                        if (info != null) {
1526                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1527                                result.setWildcard(true);
1528                            }
1529                        }
1530                    }
1531                }
1532                consumerExchanges.put(id, result);
1533            }
1534        }
1535        return result;
1536    }
1537
1538    private void removeConsumerBrokerExchange(ConsumerId id) {
1539        synchronized (consumerExchanges) {
1540            consumerExchanges.remove(id);
1541        }
1542    }
1543
1544    public int getProtocolVersion() {
1545        return protocolVersion.get();
1546    }
1547
1548    @Override
1549    public Response processControlCommand(ControlCommand command) throws Exception {
1550        return null;
1551    }
1552
1553    @Override
1554    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1555        return null;
1556    }
1557
1558    @Override
1559    public Response processConnectionControl(ConnectionControl control) throws Exception {
1560        if (control != null) {
1561            faultTolerantConnection = control.isFaultTolerant();
1562        }
1563        return null;
1564    }
1565
1566    @Override
1567    public Response processConnectionError(ConnectionError error) throws Exception {
1568        return null;
1569    }
1570
1571    @Override
1572    public Response processConsumerControl(ConsumerControl control) throws Exception {
1573        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1574        broker.processConsumerControl(consumerExchange, control);
1575        return null;
1576    }
1577
1578    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1579                                                                            TransportConnectionState state) {
1580        TransportConnectionState cs = null;
1581        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1582            // swap implementations
1583            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1584            newRegister.intialize(connectionStateRegister);
1585            connectionStateRegister = newRegister;
1586        }
1587        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1588        return cs;
1589    }
1590
1591    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1592        return connectionStateRegister.unregisterConnectionState(connectionId);
1593    }
1594
1595    protected synchronized List<TransportConnectionState> listConnectionStates() {
1596        return connectionStateRegister.listConnectionStates();
1597    }
1598
1599    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1600        return connectionStateRegister.lookupConnectionState(connectionId);
1601    }
1602
1603    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1604        return connectionStateRegister.lookupConnectionState(id);
1605    }
1606
1607    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1608        return connectionStateRegister.lookupConnectionState(id);
1609    }
1610
1611    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1612        return connectionStateRegister.lookupConnectionState(id);
1613    }
1614
1615    // public only for testing
1616    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1617        return connectionStateRegister.lookupConnectionState(connectionId);
1618    }
1619
1620    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1621        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1622    }
1623
1624    protected synchronized String getDuplexNetworkConnectorId() {
1625        return this.duplexNetworkConnectorId;
1626    }
1627
1628    public boolean isStopping() {
1629        return stopping.get();
1630    }
1631
1632    protected CountDownLatch getStopped() {
1633        return stopped;
1634    }
1635
1636    private int getProducerCount(ConnectionId connectionId) {
1637        int result = 0;
1638        TransportConnectionState cs = lookupConnectionState(connectionId);
1639        if (cs != null) {
1640            for (SessionId sessionId : cs.getSessionIds()) {
1641                SessionState sessionState = cs.getSessionState(sessionId);
1642                if (sessionState != null) {
1643                    result += sessionState.getProducerIds().size();
1644                }
1645            }
1646        }
1647        return result;
1648    }
1649
1650    private int getConsumerCount(ConnectionId connectionId) {
1651        int result = 0;
1652        TransportConnectionState cs = lookupConnectionState(connectionId);
1653        if (cs != null) {
1654            for (SessionId sessionId : cs.getSessionIds()) {
1655                SessionState sessionState = cs.getSessionState(sessionId);
1656                if (sessionState != null) {
1657                    result += sessionState.getConsumerIds().size();
1658                }
1659            }
1660        }
1661        return result;
1662    }
1663
1664    public WireFormatInfo getRemoteWireFormatInfo() {
1665        return wireFormatInfo;
1666    }
1667}