001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038
039import javax.management.ObjectName;
040
041import org.apache.activemq.DestinationDoesNotExistException;
042import org.apache.activemq.Service;
043import org.apache.activemq.advisory.AdvisoryBroker;
044import org.apache.activemq.advisory.AdvisorySupport;
045import org.apache.activemq.broker.BrokerService;
046import org.apache.activemq.broker.BrokerServiceAware;
047import org.apache.activemq.broker.ConnectionContext;
048import org.apache.activemq.broker.TransportConnection;
049import org.apache.activemq.broker.region.AbstractRegion;
050import org.apache.activemq.broker.region.DurableTopicSubscription;
051import org.apache.activemq.broker.region.Region;
052import org.apache.activemq.broker.region.RegionBroker;
053import org.apache.activemq.broker.region.Subscription;
054import org.apache.activemq.broker.region.policy.PolicyEntry;
055import org.apache.activemq.command.ActiveMQDestination;
056import org.apache.activemq.command.ActiveMQMessage;
057import org.apache.activemq.command.ActiveMQTempDestination;
058import org.apache.activemq.command.ActiveMQTopic;
059import org.apache.activemq.command.BrokerId;
060import org.apache.activemq.command.BrokerInfo;
061import org.apache.activemq.command.Command;
062import org.apache.activemq.command.ConnectionError;
063import org.apache.activemq.command.ConnectionId;
064import org.apache.activemq.command.ConnectionInfo;
065import org.apache.activemq.command.ConsumerId;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.DataStructure;
068import org.apache.activemq.command.DestinationInfo;
069import org.apache.activemq.command.ExceptionResponse;
070import org.apache.activemq.command.KeepAliveInfo;
071import org.apache.activemq.command.Message;
072import org.apache.activemq.command.MessageAck;
073import org.apache.activemq.command.MessageDispatch;
074import org.apache.activemq.command.MessageId;
075import org.apache.activemq.command.NetworkBridgeFilter;
076import org.apache.activemq.command.ProducerInfo;
077import org.apache.activemq.command.RemoveInfo;
078import org.apache.activemq.command.RemoveSubscriptionInfo;
079import org.apache.activemq.command.Response;
080import org.apache.activemq.command.SessionInfo;
081import org.apache.activemq.command.ShutdownInfo;
082import org.apache.activemq.command.SubscriptionInfo;
083import org.apache.activemq.command.WireFormatInfo;
084import org.apache.activemq.filter.DestinationFilter;
085import org.apache.activemq.filter.MessageEvaluationContext;
086import org.apache.activemq.security.SecurityContext;
087import org.apache.activemq.transport.DefaultTransportListener;
088import org.apache.activemq.transport.FutureResponse;
089import org.apache.activemq.transport.ResponseCallback;
090import org.apache.activemq.transport.Transport;
091import org.apache.activemq.transport.TransportDisposedIOException;
092import org.apache.activemq.transport.TransportFilter;
093import org.apache.activemq.transport.tcp.SslTransport;
094import org.apache.activemq.util.IdGenerator;
095import org.apache.activemq.util.IntrospectionSupport;
096import org.apache.activemq.util.LongSequenceGenerator;
097import org.apache.activemq.util.MarshallingSupport;
098import org.apache.activemq.util.ServiceStopper;
099import org.apache.activemq.util.ServiceSupport;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103/**
104 * A useful base class for implementing demand forwarding bridges.
105 */
106public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
107    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
108    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
109    protected final Transport localBroker;
110    protected final Transport remoteBroker;
111    protected IdGenerator idGenerator = new IdGenerator();
112    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
113    protected ConnectionInfo localConnectionInfo;
114    protected ConnectionInfo remoteConnectionInfo;
115    protected SessionInfo localSessionInfo;
116    protected ProducerInfo producerInfo;
117    protected String remoteBrokerName = "Unknown";
118    protected String localClientId;
119    protected ConsumerInfo demandConsumerInfo;
120    protected int demandConsumerDispatched;
121    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
122    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
123    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
124    protected final AtomicBoolean disposed = new AtomicBoolean();
125    protected BrokerId localBrokerId;
126    protected ActiveMQDestination[] excludedDestinations;
127    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
128    protected ActiveMQDestination[] staticallyIncludedDestinations;
129    protected ActiveMQDestination[] durableDestinations;
130    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
131    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
132    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
133    protected final CountDownLatch startedLatch = new CountDownLatch(2);
134    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
135    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
136    protected NetworkBridgeConfiguration configuration;
137    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
138
139    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
140    protected BrokerId remoteBrokerId;
141
142    final AtomicLong enqueueCounter = new AtomicLong();
143    final AtomicLong dequeueCounter = new AtomicLong();
144
145    private NetworkBridgeListener networkBridgeListener;
146    private boolean createdByDuplex;
147    private BrokerInfo localBrokerInfo;
148    private BrokerInfo remoteBrokerInfo;
149
150    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
151    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
152
153    private final AtomicBoolean started = new AtomicBoolean();
154    private TransportConnection duplexInitiatingConnection;
155    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
156    protected BrokerService brokerService = null;
157    private ObjectName mbeanObjectName;
158    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
159    private Transport duplexInboundLocalBroker = null;
160    private ProducerInfo duplexInboundLocalProducerInfo;
161
162    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
163        this.configuration = configuration;
164        this.localBroker = localBroker;
165        this.remoteBroker = remoteBroker;
166    }
167
168    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
169        this.localBrokerInfo = localBrokerInfo;
170        this.remoteBrokerInfo = remoteBrokerInfo;
171        this.duplexInitiatingConnection = connection;
172        start();
173        serviceRemoteCommand(remoteBrokerInfo);
174    }
175
176    @Override
177    public void start() throws Exception {
178        if (started.compareAndSet(false, true)) {
179
180            if (brokerService == null) {
181                throw new IllegalArgumentException("BrokerService is null on " + this);
182            }
183
184            if (isDuplex()) {
185                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
186                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
187
188                    @Override
189                    public void onCommand(Object o) {
190                        Command command = (Command) o;
191                        serviceLocalCommand(command);
192                    }
193
194                    @Override
195                    public void onException(IOException error) {
196                        serviceLocalException(error);
197                    }
198                });
199                duplexInboundLocalBroker.start();
200            }
201
202            localBroker.setTransportListener(new DefaultTransportListener() {
203
204                @Override
205                public void onCommand(Object o) {
206                    Command command = (Command) o;
207                    serviceLocalCommand(command);
208                }
209
210                @Override
211                public void onException(IOException error) {
212                    if (!futureLocalBrokerInfo.isDone()) {
213                        futureLocalBrokerInfo.cancel(true);
214                        return;
215                    }
216                    serviceLocalException(error);
217                }
218            });
219
220            remoteBroker.setTransportListener(new DefaultTransportListener() {
221
222                @Override
223                public void onCommand(Object o) {
224                    Command command = (Command) o;
225                    serviceRemoteCommand(command);
226                }
227
228                @Override
229                public void onException(IOException error) {
230                    if (!futureRemoteBrokerInfo.isDone()) {
231                        futureRemoteBrokerInfo.cancel(true);
232                        return;
233                    }
234                    serviceRemoteException(error);
235                }
236            });
237
238            remoteBroker.start();
239            localBroker.start();
240
241            if (!disposed.get()) {
242                try {
243                    triggerStartAsyncNetworkBridgeCreation();
244                } catch (IOException e) {
245                    LOG.warn("Caught exception from remote start", e);
246                }
247            } else {
248                LOG.warn("Bridge was disposed before the start() method was fully executed.");
249                throw new TransportDisposedIOException();
250            }
251        }
252    }
253
254    @Override
255    public void stop() throws Exception {
256        if (started.compareAndSet(true, false)) {
257            if (disposed.compareAndSet(false, true)) {
258                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
259
260                futureRemoteBrokerInfo.cancel(true);
261                futureLocalBrokerInfo.cancel(true);
262
263                NetworkBridgeListener l = this.networkBridgeListener;
264                if (l != null) {
265                    l.onStop(this);
266                }
267                try {
268                    // local start complete
269                    if (startedLatch.getCount() < 2) {
270                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
271                                configuration.getBrokerName(), this, remoteBrokerName
272                        });
273                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
274                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
275                    }
276
277                    remoteBridgeStarted.set(false);
278                    final CountDownLatch sendShutdown = new CountDownLatch(1);
279
280                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
281                        @Override
282                        public void run() {
283                            try {
284                                serialExecutor.shutdown();
285                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
286                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
287                                    LOG.info("pending tasks on stop {}", pendingTasks);
288                                }
289                                localBroker.oneway(new ShutdownInfo());
290                                remoteBroker.oneway(new ShutdownInfo());
291                            } catch (Throwable e) {
292                                LOG.debug("Caught exception sending shutdown", e);
293                            } finally {
294                                sendShutdown.countDown();
295                            }
296
297                        }
298                    }, "ActiveMQ ForwardingBridge StopTask");
299
300                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
301                        LOG.info("Network Could not shutdown in a timely manner");
302                    }
303                } finally {
304                    ServiceStopper ss = new ServiceStopper();
305                    ss.stop(remoteBroker);
306                    ss.stop(localBroker);
307                    ss.stop(duplexInboundLocalBroker);
308                    // Release the started Latch since another thread could be
309                    // stuck waiting for it to start up.
310                    startedLatch.countDown();
311                    startedLatch.countDown();
312                    localStartedLatch.countDown();
313
314                    ss.throwFirstException();
315                }
316            }
317
318            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
319        }
320    }
321
322    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
323        brokerService.getTaskRunnerFactory().execute(new Runnable() {
324            @Override
325            public void run() {
326                final String originalName = Thread.currentThread().getName();
327                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
328                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
329
330                try {
331                    // First we collect the info data from both the local and remote ends
332                    collectBrokerInfos();
333
334                    // Once we have all required broker info we can attempt to start
335                    // the local and then remote sides of the bridge.
336                    doStartLocalAndRemoteBridges();
337                } finally {
338                    Thread.currentThread().setName(originalName);
339                }
340            }
341        });
342    }
343
344    private void collectBrokerInfos() {
345
346        // First wait for the remote to feed us its BrokerInfo, then we can check on
347        // the LocalBrokerInfo and decide is this is a loop.
348        try {
349            remoteBrokerInfo = futureRemoteBrokerInfo.get();
350            if (remoteBrokerInfo == null) {
351                fireBridgeFailed(new Throwable("remoteBrokerInfo is null"));
352                return;
353            }
354        } catch (Exception e) {
355            serviceRemoteException(e);
356            return;
357        }
358
359        try {
360            localBrokerInfo = futureLocalBrokerInfo.get();
361            if (localBrokerInfo == null) {
362                fireBridgeFailed(new Throwable("localBrokerInfo is null"));
363                return;
364            }
365
366            // Before we try and build the bridge lets check if we are in a loop
367            // and if so just stop now before registering anything.
368            remoteBrokerId = remoteBrokerInfo.getBrokerId();
369            if (localBrokerId.equals(remoteBrokerId)) {
370                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
371                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
372                });
373                ServiceSupport.dispose(localBroker);
374                ServiceSupport.dispose(remoteBroker);
375                // the bridge is left in a bit of limbo, but it won't get retried
376                // in this state.
377                return;
378            }
379
380            // Fill in the remote broker's information now.
381            remoteBrokerPath[0] = remoteBrokerId;
382            remoteBrokerName = remoteBrokerInfo.getBrokerName();
383            if (configuration.isUseBrokerNamesAsIdSeed()) {
384                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
385            }
386        } catch (Throwable e) {
387            serviceLocalException(e);
388        }
389    }
390
391    private void doStartLocalAndRemoteBridges() {
392
393        if (disposed.get()) {
394            return;
395        }
396
397        if (isCreatedByDuplex()) {
398            // apply remote (propagated) configuration to local duplex bridge before start
399            Properties props = null;
400            try {
401                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
402                IntrospectionSupport.getProperties(configuration, props, null);
403                if (configuration.getExcludedDestinations() != null) {
404                    excludedDestinations = configuration.getExcludedDestinations().toArray(
405                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
406                }
407                if (configuration.getStaticallyIncludedDestinations() != null) {
408                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
409                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
410                }
411                if (configuration.getDynamicallyIncludedDestinations() != null) {
412                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
413                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
414                }
415            } catch (Throwable t) {
416                LOG.error("Error mapping remote configuration: {}", props, t);
417            }
418        }
419
420        try {
421            startLocalBridge();
422        } catch (Throwable e) {
423            serviceLocalException(e);
424            return;
425        }
426
427        try {
428            startRemoteBridge();
429        } catch (Throwable e) {
430            serviceRemoteException(e);
431            return;
432        }
433
434        try {
435            if (safeWaitUntilStarted()) {
436                setupStaticDestinations();
437            }
438        } catch (Throwable e) {
439            serviceLocalException(e);
440        }
441    }
442
443    private void startLocalBridge() throws Throwable {
444        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
445            synchronized (this) {
446                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
447                if (!disposed.get()) {
448
449                    if (idGenerator == null) {
450                        throw new IllegalStateException("Id Generator cannot be null");
451                    }
452
453                    localConnectionInfo = new ConnectionInfo();
454                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
455                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
456                    localConnectionInfo.setClientId(localClientId);
457                    localConnectionInfo.setUserName(configuration.getUserName());
458                    localConnectionInfo.setPassword(configuration.getPassword());
459                    Transport originalTransport = remoteBroker;
460                    while (originalTransport instanceof TransportFilter) {
461                        originalTransport = ((TransportFilter) originalTransport).getNext();
462                    }
463                    if (originalTransport instanceof SslTransport) {
464                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
465                        localConnectionInfo.setTransportContext(peerCerts);
466                    }
467                    // sync requests that may fail
468                    Object resp = localBroker.request(localConnectionInfo);
469                    if (resp instanceof ExceptionResponse) {
470                        throw ((ExceptionResponse) resp).getException();
471                    }
472                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
473                    localBroker.oneway(localSessionInfo);
474
475                    if (configuration.isDuplex()) {
476                        // separate in-bound channel for forwards so we don't
477                        // contend with out-bound dispatch on same connection
478                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
479                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
480                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
481                                + configuration.getBrokerName());
482                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
483                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
484
485                        if (originalTransport instanceof SslTransport) {
486                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
487                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
488                        }
489                        // sync requests that may fail
490                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
491                        if (resp instanceof ExceptionResponse) {
492                            throw ((ExceptionResponse) resp).getException();
493                        }
494                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
495                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
496                        duplexInboundLocalBroker.oneway(duplexInboundSession);
497                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
498                    }
499                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
500                    NetworkBridgeListener l = this.networkBridgeListener;
501                    if (l != null) {
502                        l.onStart(this);
503                    }
504
505                    // Let the local broker know the remote broker's ID.
506                    localBroker.oneway(remoteBrokerInfo);
507                    // new peer broker (a consumer can work with remote broker also)
508                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
509
510                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
511                            localBroker, remoteBroker, remoteBrokerName
512                    });
513                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
514                            configuration.getBrokerName(), this, remoteBrokerName
515                    });
516                } else {
517                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
518                }
519                startedLatch.countDown();
520                localStartedLatch.countDown();
521            }
522        }
523    }
524
525    protected void startRemoteBridge() throws Exception {
526        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
527            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
528            synchronized (this) {
529                if (!isCreatedByDuplex()) {
530                    BrokerInfo brokerInfo = new BrokerInfo();
531                    brokerInfo.setBrokerName(configuration.getBrokerName());
532                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
533                    brokerInfo.setNetworkConnection(true);
534                    brokerInfo.setDuplexConnection(configuration.isDuplex());
535                    // set our properties
536                    Properties props = new Properties();
537                    IntrospectionSupport.getProperties(configuration, props, null);
538                    props.remove("networkTTL");
539                    String str = MarshallingSupport.propertiesToString(props);
540                    brokerInfo.setNetworkProperties(str);
541                    brokerInfo.setBrokerId(this.localBrokerId);
542                    remoteBroker.oneway(brokerInfo);
543                }
544                if (remoteConnectionInfo != null) {
545                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
546                }
547                remoteConnectionInfo = new ConnectionInfo();
548                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
549                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
550                remoteConnectionInfo.setUserName(configuration.getUserName());
551                remoteConnectionInfo.setPassword(configuration.getPassword());
552                remoteBroker.oneway(remoteConnectionInfo);
553
554                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
555                remoteBroker.oneway(remoteSessionInfo);
556                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
557                producerInfo.setResponseRequired(false);
558                remoteBroker.oneway(producerInfo);
559                // Listen to consumer advisory messages on the remote broker to determine demand.
560                if (!configuration.isStaticBridge()) {
561                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
562                    // always dispatch advisory message asynchronously so that
563                    // we never block the producer broker if we are slow
564                    demandConsumerInfo.setDispatchAsync(true);
565                    String advisoryTopic = configuration.getDestinationFilter();
566                    if (configuration.isBridgeTempDestinations()) {
567                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
568                    }
569                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
570                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
571                    remoteBroker.oneway(demandConsumerInfo);
572                }
573                startedLatch.countDown();
574            }
575        }
576    }
577
578    @Override
579    public void serviceRemoteException(Throwable error) {
580        if (!disposed.get()) {
581            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
582                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
583                        localBroker, remoteBroker, error
584                });
585            } else {
586                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
587                        localBroker, remoteBroker, error
588                });
589            }
590            LOG.debug("The remote Exception was: {}", error, error);
591            brokerService.getTaskRunnerFactory().execute(new Runnable() {
592                @Override
593                public void run() {
594                    ServiceSupport.dispose(getControllingService());
595                }
596            });
597            fireBridgeFailed(error);
598        }
599    }
600
601    protected void serviceRemoteCommand(Command command) {
602        if (!disposed.get()) {
603            try {
604                if (command.isMessageDispatch()) {
605                    safeWaitUntilStarted();
606                    MessageDispatch md = (MessageDispatch) command;
607                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
608                    ackAdvisory(md.getMessage());
609                } else if (command.isBrokerInfo()) {
610                    futureRemoteBrokerInfo.set((BrokerInfo) command);
611                } else if (command.getClass() == ConnectionError.class) {
612                    ConnectionError ce = (ConnectionError) command;
613                    serviceRemoteException(ce.getException());
614                } else {
615                    if (isDuplex()) {
616                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
617                        if (command.isMessage()) {
618                            final ActiveMQMessage message = (ActiveMQMessage) command;
619                            if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
620                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
621                                serviceRemoteConsumerAdvisory(message.getDataStructure());
622                                ackAdvisory(message);
623                            } else {
624                                if (!isPermissableDestination(message.getDestination(), true)) {
625                                    return;
626                                }
627                                // message being forwarded - we need to
628                                // propagate the response to our local send
629                                if (canDuplexDispatch(message)) {
630                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
631                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
632                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
633                                            final int correlationId = message.getCommandId();
634
635                                            @Override
636                                            public void onCompletion(FutureResponse resp) {
637                                                try {
638                                                    Response reply = resp.getResult();
639                                                    reply.setCorrelationId(correlationId);
640                                                    remoteBroker.oneway(reply);
641                                                } catch (IOException error) {
642                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
643                                                    serviceRemoteException(error);
644                                                }
645                                            }
646                                        });
647                                    } else {
648                                        duplexInboundLocalBroker.oneway(message);
649                                    }
650                                    serviceInboundMessage(message);
651                                } else {
652                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
653                                        Response reply = new Response();
654                                        reply.setCorrelationId(message.getCommandId());
655                                        remoteBroker.oneway(reply);
656                                    }
657                                }
658                            }
659                        } else {
660                            switch (command.getDataStructureType()) {
661                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
662                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
663                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
664                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
665                                    } else {
666                                        localBroker.oneway(command);
667                                    }
668                                    break;
669                                case SessionInfo.DATA_STRUCTURE_TYPE:
670                                    localBroker.oneway(command);
671                                    break;
672                                case ProducerInfo.DATA_STRUCTURE_TYPE:
673                                    // using duplexInboundLocalProducerInfo
674                                    break;
675                                case MessageAck.DATA_STRUCTURE_TYPE:
676                                    MessageAck ack = (MessageAck) command;
677                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
678                                    if (localSub != null) {
679                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
680                                        localBroker.oneway(ack);
681                                    } else {
682                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
683                                    }
684                                    break;
685                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
686                                    localStartedLatch.await();
687                                    if (started.get()) {
688                                        addConsumerInfo((ConsumerInfo) command);
689                                    } else {
690                                        // received a subscription whilst stopping
691                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
692                                    }
693                                    break;
694                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
695                                    // initiator is shutting down, controlled case
696                                    // abortive close dealt with by inactivity monitor
697                                    LOG.info("Stopping network bridge on shutdown of remote broker");
698                                    serviceRemoteException(new IOException(command.toString()));
699                                    break;
700                                default:
701                                    LOG.debug("Ignoring remote command: {}", command);
702                            }
703                        }
704                    } else {
705                        switch (command.getDataStructureType()) {
706                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
707                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
708                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
709                                break;
710                            default:
711                                LOG.warn("Unexpected remote command: {}", command);
712                        }
713                    }
714                }
715            } catch (Throwable e) {
716                LOG.debug("Exception processing remote command: {}", command, e);
717                serviceRemoteException(e);
718            }
719        }
720    }
721
722    private void ackAdvisory(Message message) throws IOException {
723        demandConsumerDispatched++;
724        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
725            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
726            ack.setConsumerId(demandConsumerInfo.getConsumerId());
727            remoteBroker.oneway(ack);
728            demandConsumerDispatched = 0;
729        }
730    }
731
732    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
733        final int networkTTL = configuration.getConsumerTTL();
734        if (data.getClass() == ConsumerInfo.class) {
735            // Create a new local subscription
736            ConsumerInfo info = (ConsumerInfo) data;
737            BrokerId[] path = info.getBrokerPath();
738
739            if (info.isBrowser()) {
740                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
741                return;
742            }
743
744            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
745                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
746                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
747                });
748                return;
749            }
750
751            if (contains(path, localBrokerPath[0])) {
752                // Ignore this consumer as it's a consumer we locally sent to the broker.
753                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
754                        configuration.getBrokerName(), remoteBrokerName, info
755                });
756                return;
757            }
758
759            if (!isPermissableDestination(info.getDestination())) {
760                // ignore if not in the permitted or in the excluded list
761                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
762                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
763                });
764                return;
765            }
766
767            // in a cyclic network there can be multiple bridges per broker that can propagate
768            // a network subscription so there is a need to synchronize on a shared entity
769            synchronized (brokerService.getVmConnectorURI()) {
770                addConsumerInfo(info);
771            }
772        } else if (data.getClass() == DestinationInfo.class) {
773            // It's a destination info - we want to pass up information about temporary destinations
774            final DestinationInfo destInfo = (DestinationInfo) data;
775            BrokerId[] path = destInfo.getBrokerPath();
776            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
777                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
778                        configuration.getBrokerName(), destInfo, networkTTL
779                });
780                return;
781            }
782            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
783                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
784                return;
785            }
786            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
787            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
788                // re-set connection id so comes from here
789                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
790                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
791            }
792            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
793            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
794                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
795            });
796            if (destInfo.isRemoveOperation()) {
797                // Serialize with removeSub operations such that all removeSub advisories
798                // are generated
799                serialExecutor.execute(new Runnable() {
800                    @Override
801                    public void run() {
802                        try {
803                            localBroker.oneway(destInfo);
804                        } catch (IOException e) {
805                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
806                        }
807                    }
808                });
809            } else {
810                localBroker.oneway(destInfo);
811            }
812        } else if (data.getClass() == RemoveInfo.class) {
813            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
814            removeDemandSubscription(id);
815        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
816            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
817            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
818            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
819                DemandSubscription ds = i.next();
820                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
821                if (removed) {
822                    if (ds.getDurableRemoteSubs().isEmpty()) {
823
824                        // deactivate subscriber
825                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
826                        localBroker.oneway(removeInfo);
827
828                        // remove subscriber
829                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
830                        sending.setClientId(localClientId);
831                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
832                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
833                        localBroker.oneway(sending);
834                    }
835                }
836            }
837        }
838    }
839
840    @Override
841    public void serviceLocalException(Throwable error) {
842        serviceLocalException(null, error);
843    }
844
845    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
846        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
847        if (!disposed.get()) {
848            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
849                // not a reason to terminate the bridge - temps can disappear with
850                // pending sends as the demand sub may outlive the remote dest
851                if (messageDispatch != null) {
852                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
853                    try {
854                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
855                        poisonAck.setPoisonCause(error);
856                        localBroker.oneway(poisonAck);
857                    } catch (IOException ioe) {
858                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
859                    }
860                    fireFailedForwardAdvisory(messageDispatch, error);
861                } else {
862                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
863                }
864                return;
865            }
866
867            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
868            LOG.debug("The local Exception was: {}", error, error);
869
870            brokerService.getTaskRunnerFactory().execute(new Runnable() {
871                @Override
872                public void run() {
873                    ServiceSupport.dispose(getControllingService());
874                }
875            });
876            fireBridgeFailed(error);
877        }
878    }
879
880    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
881        if (configuration.isAdvisoryForFailedForward()) {
882            AdvisoryBroker advisoryBroker = null;
883            try {
884                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
885
886                if (advisoryBroker != null) {
887                    ConnectionContext context = new ConnectionContext();
888                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
889                    context.setBroker(brokerService.getBroker());
890
891                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
892                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
893                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
894                            advisoryMessage);
895
896                }
897            } catch (Exception e) {
898                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
899                LOG.debug("detail", e);
900            }
901        }
902    }
903
904    protected Service getControllingService() {
905        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
906    }
907
908    protected void addSubscription(DemandSubscription sub) throws IOException {
909        if (sub != null) {
910            if (isDuplex()) {
911                // async vm transport, need to wait for completion
912                localBroker.request(sub.getLocalInfo());
913            } else {
914                localBroker.oneway(sub.getLocalInfo());
915            }
916        }
917    }
918
919    protected void removeSubscription(final DemandSubscription sub) throws IOException {
920        if (sub != null) {
921            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
922
923            // ensure not available for conduit subs pending removal
924            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
925            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
926
927            // continue removal in separate thread to free up this thread for outstanding responses
928            // Serialize with removeDestination operations so that removeSubs are serialized with
929            // removeDestinations such that all removeSub advisories are generated
930            serialExecutor.execute(new Runnable() {
931                @Override
932                public void run() {
933                    sub.waitForCompletion();
934                    try {
935                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
936                    } catch (IOException e) {
937                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
938                    }
939                }
940            });
941        }
942    }
943
944    protected Message configureMessage(MessageDispatch md) throws IOException {
945        Message message = md.getMessage().copy();
946        // Update the packet to show where it came from.
947        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
948        message.setProducerId(producerInfo.getProducerId());
949        message.setDestination(md.getDestination());
950        message.setMemoryUsage(null);
951        if (message.getOriginalTransactionId() == null) {
952            message.setOriginalTransactionId(message.getTransactionId());
953        }
954        message.setTransactionId(null);
955        if (configuration.isUseCompression()) {
956            message.compress();
957        }
958        return message;
959    }
960
961    protected void serviceLocalCommand(Command command) {
962        if (!disposed.get()) {
963            try {
964                if (command.isMessageDispatch()) {
965                    safeWaitUntilStarted();
966                    enqueueCounter.incrementAndGet();
967                    final MessageDispatch md = (MessageDispatch) command;
968                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
969                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
970
971                        if (suppressMessageDispatch(md, sub)) {
972                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
973                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
974                            });
975                            // still ack as it may be durable
976                            try {
977                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
978                            } finally {
979                                sub.decrementOutstandingResponses();
980                            }
981                            return;
982                        }
983
984                        Message message = configureMessage(md);
985                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
986                                configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message
987                        });
988
989                        if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
990                            try {
991                                // never request b/c they are eventually acked async
992                                remoteBroker.oneway(message);
993                            } finally {
994                                sub.decrementOutstandingResponses();
995                            }
996                            return;
997                        }
998
999                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1000
1001                            // The message was not sent using async send, so we should only
1002                            // ack the local broker when we get confirmation that the remote
1003                            // broker has received the message.
1004                            remoteBroker.asyncRequest(message, new ResponseCallback() {
1005                                @Override
1006                                public void onCompletion(FutureResponse future) {
1007                                    try {
1008                                        Response response = future.getResult();
1009                                        if (response.isException()) {
1010                                            ExceptionResponse er = (ExceptionResponse) response;
1011                                            serviceLocalException(md, er.getException());
1012                                        } else {
1013                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1014                                            dequeueCounter.incrementAndGet();
1015                                        }
1016                                    } catch (IOException e) {
1017                                        serviceLocalException(md, e);
1018                                    } finally {
1019                                        sub.decrementOutstandingResponses();
1020                                    }
1021                                }
1022                            });
1023
1024                        } else {
1025                            // If the message was originally sent using async send, we will
1026                            // preserve that QOS by bridging it using an async send (small chance
1027                            // of message loss).
1028                            try {
1029                                remoteBroker.oneway(message);
1030                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1031                                dequeueCounter.incrementAndGet();
1032                            } finally {
1033                                sub.decrementOutstandingResponses();
1034                            }
1035                        }
1036                        serviceOutbound(message);
1037                    } else {
1038                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1039                    }
1040                } else if (command.isBrokerInfo()) {
1041                    futureLocalBrokerInfo.set((BrokerInfo) command);
1042                } else if (command.isShutdownInfo()) {
1043                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1044                    stop();
1045                } else if (command.getClass() == ConnectionError.class) {
1046                    ConnectionError ce = (ConnectionError) command;
1047                    serviceLocalException(ce.getException());
1048                } else {
1049                    switch (command.getDataStructureType()) {
1050                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1051                            break;
1052                        default:
1053                            LOG.warn("Unexpected local command: {}", command);
1054                    }
1055                }
1056            } catch (Throwable e) {
1057                LOG.warn("Caught an exception processing local command", e);
1058                serviceLocalException(e);
1059            }
1060        }
1061    }
1062
1063    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1064        boolean suppress = false;
1065        // for durable subs, suppression via filter leaves dangling acks so we
1066        // need to check here and allow the ack irrespective
1067        if (sub.getLocalInfo().isDurable()) {
1068            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1069            messageEvalContext.setMessageReference(md.getMessage());
1070            messageEvalContext.setDestination(md.getDestination());
1071            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1072        }
1073        return suppress;
1074    }
1075
1076    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1077        if (brokerPath != null) {
1078            for (BrokerId id : brokerPath) {
1079                if (brokerId.equals(id)) {
1080                    return true;
1081                }
1082            }
1083        }
1084        return false;
1085    }
1086
1087    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1088        if (brokerPath == null || brokerPath.length == 0) {
1089            return pathsToAppend;
1090        }
1091        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1092        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1093        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1094        return rc;
1095    }
1096
1097    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1098        if (brokerPath == null || brokerPath.length == 0) {
1099            return new BrokerId[]{idToAppend};
1100        }
1101        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1102        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1103        rc[brokerPath.length] = idToAppend;
1104        return rc;
1105    }
1106
1107    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1108        return isPermissableDestination(destination, false);
1109    }
1110
1111    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1112        // Are we not bridging temporary destinations?
1113        if (destination.isTemporary()) {
1114            if (allowTemporary) {
1115                return true;
1116            } else {
1117                return configuration.isBridgeTempDestinations();
1118            }
1119        }
1120
1121        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1122        if (dests != null && dests.length > 0) {
1123            for (ActiveMQDestination dest : dests) {
1124                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1125                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1126                    return true;
1127                }
1128            }
1129        }
1130
1131        dests = excludedDestinations;
1132        if (dests != null && dests.length > 0) {
1133            for (ActiveMQDestination dest : dests) {
1134                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1135                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1136                    return false;
1137                }
1138            }
1139        }
1140
1141        dests = dynamicallyIncludedDestinations;
1142        if (dests != null && dests.length > 0) {
1143            for (ActiveMQDestination dest : dests) {
1144                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1145                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1146                    return true;
1147                }
1148            }
1149
1150            return false;
1151        }
1152        return true;
1153    }
1154
1155    /**
1156     * Subscriptions for these destinations are always created
1157     */
1158    protected void setupStaticDestinations() {
1159        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1160        if (dests != null) {
1161            for (ActiveMQDestination dest : dests) {
1162                DemandSubscription sub = createDemandSubscription(dest);
1163                sub.setStaticallyIncluded(true);
1164                try {
1165                    addSubscription(sub);
1166                } catch (IOException e) {
1167                    LOG.error("Failed to add static destination {}", dest, e);
1168                }
1169                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1170            }
1171        }
1172    }
1173
1174    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1175        ConsumerInfo info = consumerInfo.copy();
1176        addRemoteBrokerToBrokerPath(info);
1177        DemandSubscription sub = createDemandSubscription(info);
1178        if (sub != null) {
1179            if (duplicateSuppressionIsRequired(sub)) {
1180                undoMapRegistration(sub);
1181            } else {
1182                if (consumerInfo.isDurable()) {
1183                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1184                }
1185                addSubscription(sub);
1186                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1187            }
1188        }
1189    }
1190
1191    private void undoMapRegistration(DemandSubscription sub) {
1192        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1193        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1194    }
1195
1196    /*
1197     * check our existing subs networkConsumerIds against the list of network
1198     * ids in this subscription A match means a duplicate which we suppress for
1199     * topics and maybe for queues
1200     */
1201    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1202        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1203        boolean suppress = false;
1204
1205        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1206                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1207            return suppress;
1208        }
1209
1210        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1211        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1212        for (Subscription sub : currentSubs) {
1213            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1214            if (!networkConsumers.isEmpty()) {
1215                if (matchFound(candidateConsumers, networkConsumers)) {
1216                    if (isInActiveDurableSub(sub)) {
1217                        suppress = false;
1218                    } else {
1219                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1220                    }
1221                    break;
1222                }
1223            }
1224        }
1225        return suppress;
1226    }
1227
1228    private boolean isInActiveDurableSub(Subscription sub) {
1229        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1230    }
1231
1232    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1233        boolean suppress = false;
1234
1235        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1236            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1237                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1238            });
1239            suppress = true;
1240        } else {
1241            // remove the existing lower priority duplicate and allow this candidate
1242            try {
1243                removeDuplicateSubscription(existingSub);
1244
1245                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1246                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1247                });
1248            } catch (IOException e) {
1249                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1250            }
1251        }
1252        return suppress;
1253    }
1254
1255    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1256        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1257            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1258                break;
1259            }
1260        }
1261    }
1262
1263    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1264        boolean found = false;
1265        for (ConsumerId aliasConsumer : networkConsumers) {
1266            if (candidateConsumers.contains(aliasConsumer)) {
1267                found = true;
1268                break;
1269            }
1270        }
1271        return found;
1272    }
1273
1274    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1275        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1276        Region region;
1277        Collection<Subscription> subs;
1278
1279        region = null;
1280        switch (dest.getDestinationType()) {
1281            case ActiveMQDestination.QUEUE_TYPE:
1282                region = region_broker.getQueueRegion();
1283                break;
1284            case ActiveMQDestination.TOPIC_TYPE:
1285                region = region_broker.getTopicRegion();
1286                break;
1287            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1288                region = region_broker.getTempQueueRegion();
1289                break;
1290            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1291                region = region_broker.getTempTopicRegion();
1292                break;
1293        }
1294
1295        if (region instanceof AbstractRegion) {
1296            subs = ((AbstractRegion) region).getSubscriptions().values();
1297        } else {
1298            subs = null;
1299        }
1300
1301        return subs;
1302    }
1303
1304    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1305        // add our original id to ourselves
1306        info.addNetworkConsumerId(info.getConsumerId());
1307        return doCreateDemandSubscription(info);
1308    }
1309
1310    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1311        DemandSubscription result = new DemandSubscription(info);
1312        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1313        if (info.getDestination().isTemporary()) {
1314            // reset the local connection Id
1315            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1316            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1317        }
1318
1319        if (configuration.isDecreaseNetworkConsumerPriority()) {
1320            byte priority = (byte) configuration.getConsumerPriorityBase();
1321            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1322                // The longer the path to the consumer, the less it's consumer priority.
1323                priority -= info.getBrokerPath().length + 1;
1324            }
1325            result.getLocalInfo().setPriority(priority);
1326            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1327        }
1328        configureDemandSubscription(info, result);
1329        return result;
1330    }
1331
1332    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1333        ConsumerInfo info = new ConsumerInfo();
1334        info.setNetworkSubscription(true);
1335        info.setDestination(destination);
1336
1337        // Indicate that this subscription is being made on behalf of the remote broker.
1338        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1339
1340        // the remote info held by the DemandSubscription holds the original
1341        // consumerId, the local info get's overwritten
1342        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1343        DemandSubscription result = null;
1344        try {
1345            result = createDemandSubscription(info);
1346        } catch (IOException e) {
1347            LOG.error("Failed to create DemandSubscription ", e);
1348        }
1349        return result;
1350    }
1351
1352    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1353        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
1354            sub.getLocalInfo().setDispatchAsync(true);
1355        } else {
1356            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1357        }
1358        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1359        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1360        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1361
1362        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1363        if (!info.isDurable()) {
1364            // This works for now since we use a VM connection to the local broker.
1365            // may need to change if we ever subscribe to a remote broker.
1366            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1367        } else {
1368            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1369        }
1370    }
1371
1372    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1373        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1374        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1375                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1376        });
1377        if (sub != null) {
1378            removeSubscription(sub);
1379            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1380                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1381            });
1382        }
1383    }
1384
1385    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1386        boolean removeDone = false;
1387        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1388        if (sub != null) {
1389            try {
1390                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1391                removeDone = true;
1392            } catch (IOException e) {
1393                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1394            }
1395        }
1396        return removeDone;
1397    }
1398
1399    /**
1400     * Performs a timed wait on the started latch and then checks for disposed
1401     * before performing another wait each time the the started wait times out.
1402     */
1403    protected boolean safeWaitUntilStarted() throws InterruptedException {
1404        while (!disposed.get()) {
1405            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1406                break;
1407            }
1408        }
1409        return !disposed.get();
1410    }
1411
1412    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1413        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1414        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1415            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1416            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1417                filterFactory = entry.getNetworkBridgeFilterFactory();
1418            }
1419        }
1420        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1421    }
1422
1423    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1424        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1425    }
1426
1427    protected BrokerId[] getRemoteBrokerPath() {
1428        return remoteBrokerPath;
1429    }
1430
1431    @Override
1432    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1433        this.networkBridgeListener = listener;
1434    }
1435
1436    private void fireBridgeFailed(Throwable reason) {
1437        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1438        NetworkBridgeListener l = this.networkBridgeListener;
1439        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1440            l.bridgeFailed();
1441        }
1442    }
1443
1444    /**
1445     * @return Returns the dynamicallyIncludedDestinations.
1446     */
1447    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1448        return dynamicallyIncludedDestinations;
1449    }
1450
1451    /**
1452     * @param dynamicallyIncludedDestinations
1453     *         The dynamicallyIncludedDestinations to set.
1454     */
1455    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1456        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1457    }
1458
1459    /**
1460     * @return Returns the excludedDestinations.
1461     */
1462    public ActiveMQDestination[] getExcludedDestinations() {
1463        return excludedDestinations;
1464    }
1465
1466    /**
1467     * @param excludedDestinations The excludedDestinations to set.
1468     */
1469    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1470        this.excludedDestinations = excludedDestinations;
1471    }
1472
1473    /**
1474     * @return Returns the staticallyIncludedDestinations.
1475     */
1476    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1477        return staticallyIncludedDestinations;
1478    }
1479
1480    /**
1481     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1482     */
1483    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1484        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1485    }
1486
1487    /**
1488     * @return Returns the durableDestinations.
1489     */
1490    public ActiveMQDestination[] getDurableDestinations() {
1491        return durableDestinations;
1492    }
1493
1494    /**
1495     * @param durableDestinations The durableDestinations to set.
1496     */
1497    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1498        this.durableDestinations = durableDestinations;
1499    }
1500
1501    /**
1502     * @return Returns the localBroker.
1503     */
1504    public Transport getLocalBroker() {
1505        return localBroker;
1506    }
1507
1508    /**
1509     * @return Returns the remoteBroker.
1510     */
1511    public Transport getRemoteBroker() {
1512        return remoteBroker;
1513    }
1514
1515    /**
1516     * @return the createdByDuplex
1517     */
1518    public boolean isCreatedByDuplex() {
1519        return this.createdByDuplex;
1520    }
1521
1522    /**
1523     * @param createdByDuplex the createdByDuplex to set
1524     */
1525    public void setCreatedByDuplex(boolean createdByDuplex) {
1526        this.createdByDuplex = createdByDuplex;
1527    }
1528
1529    @Override
1530    public String getRemoteAddress() {
1531        return remoteBroker.getRemoteAddress();
1532    }
1533
1534    @Override
1535    public String getLocalAddress() {
1536        return localBroker.getRemoteAddress();
1537    }
1538
1539    @Override
1540    public String getRemoteBrokerName() {
1541        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1542    }
1543
1544    @Override
1545    public String getRemoteBrokerId() {
1546        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1547    }
1548
1549    @Override
1550    public String getLocalBrokerName() {
1551        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1552    }
1553
1554    @Override
1555    public long getDequeueCounter() {
1556        return dequeueCounter.get();
1557    }
1558
1559    @Override
1560    public long getEnqueueCounter() {
1561        return enqueueCounter.get();
1562    }
1563
1564    protected boolean isDuplex() {
1565        return configuration.isDuplex() || createdByDuplex;
1566    }
1567
1568    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1569        return subscriptionMapByRemoteId;
1570    }
1571
1572    @Override
1573    public void setBrokerService(BrokerService brokerService) {
1574        this.brokerService = brokerService;
1575        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1576        localBrokerPath[0] = localBrokerId;
1577    }
1578
1579    @Override
1580    public void setMbeanObjectName(ObjectName objectName) {
1581        this.mbeanObjectName = objectName;
1582    }
1583
1584    @Override
1585    public ObjectName getMbeanObjectName() {
1586        return mbeanObjectName;
1587    }
1588
1589    @Override
1590    public void resetStats() {
1591        enqueueCounter.set(0);
1592        dequeueCounter.set(0);
1593    }
1594
1595    /*
1596     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1597     * remote sides of the network bridge.
1598     */
1599    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1600
1601        private final CountDownLatch slot = new CountDownLatch(1);
1602        private final AtomicBoolean disposed;
1603        private volatile BrokerInfo info = null;
1604
1605        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1606            this.info = info;
1607            this.disposed = disposed;
1608        }
1609
1610        @Override
1611        public boolean cancel(boolean mayInterruptIfRunning) {
1612            slot.countDown();
1613            return true;
1614        }
1615
1616        @Override
1617        public boolean isCancelled() {
1618            return slot.getCount() == 0 && info == null;
1619        }
1620
1621        @Override
1622        public boolean isDone() {
1623            return info != null;
1624        }
1625
1626        @Override
1627        public BrokerInfo get() throws InterruptedException, ExecutionException {
1628            try {
1629                if (info == null) {
1630                    while (!disposed.get()) {
1631                        if (slot.await(1, TimeUnit.SECONDS)) {
1632                            break;
1633                        }
1634                    }
1635                }
1636                return info;
1637            } catch (InterruptedException e) {
1638                Thread.currentThread().interrupt();
1639                LOG.debug("Operation interrupted: {}", e, e);
1640                throw new InterruptedException("Interrupted.");
1641            }
1642        }
1643
1644        @Override
1645        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1646            try {
1647                if (info == null) {
1648                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1649
1650                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1651                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1652                            break;
1653                        }
1654                    }
1655                    if (info == null) {
1656                        throw new TimeoutException();
1657                    }
1658                }
1659                return info;
1660            } catch (InterruptedException e) {
1661                throw new InterruptedException("Interrupted.");
1662            }
1663        }
1664
1665        public void set(BrokerInfo info) {
1666            this.info = info;
1667            this.slot.countDown();
1668        }
1669    }
1670
1671    protected void serviceOutbound(Message message) {
1672        NetworkBridgeListener l = this.networkBridgeListener;
1673        if (l != null) {
1674            l.onOutboundMessage(this, message);
1675        }
1676    }
1677
1678    protected void serviceInboundMessage(Message message) {
1679        NetworkBridgeListener l = this.networkBridgeListener;
1680        if (l != null) {
1681            l.onInboundMessage(this, message);
1682        }
1683    }
1684
1685    protected boolean canDuplexDispatch(Message message) {
1686        boolean result = true;
1687        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1688            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1689            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1690            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1691            if (producerSequenceId <= lastStoredForMessageProducer) {
1692                result = false;
1693                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1694                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1695                });
1696            }
1697        }
1698        return result;
1699    }
1700
1701    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1702        try {
1703            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1704        } catch (IOException ignored) {
1705            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1706        }
1707        return -1;
1708    }
1709
1710}