001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Properties; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicLong; 038 039import javax.management.ObjectName; 040 041import org.apache.activemq.DestinationDoesNotExistException; 042import org.apache.activemq.Service; 043import org.apache.activemq.advisory.AdvisoryBroker; 044import org.apache.activemq.advisory.AdvisorySupport; 045import org.apache.activemq.broker.BrokerService; 046import org.apache.activemq.broker.BrokerServiceAware; 047import org.apache.activemq.broker.ConnectionContext; 048import org.apache.activemq.broker.TransportConnection; 049import org.apache.activemq.broker.region.AbstractRegion; 050import org.apache.activemq.broker.region.DurableTopicSubscription; 051import org.apache.activemq.broker.region.Region; 052import org.apache.activemq.broker.region.RegionBroker; 053import org.apache.activemq.broker.region.Subscription; 054import org.apache.activemq.broker.region.policy.PolicyEntry; 055import org.apache.activemq.command.ActiveMQDestination; 056import org.apache.activemq.command.ActiveMQMessage; 057import org.apache.activemq.command.ActiveMQTempDestination; 058import org.apache.activemq.command.ActiveMQTopic; 059import org.apache.activemq.command.BrokerId; 060import org.apache.activemq.command.BrokerInfo; 061import org.apache.activemq.command.Command; 062import org.apache.activemq.command.ConnectionError; 063import org.apache.activemq.command.ConnectionId; 064import org.apache.activemq.command.ConnectionInfo; 065import org.apache.activemq.command.ConsumerId; 066import org.apache.activemq.command.ConsumerInfo; 067import org.apache.activemq.command.DataStructure; 068import org.apache.activemq.command.DestinationInfo; 069import org.apache.activemq.command.ExceptionResponse; 070import org.apache.activemq.command.KeepAliveInfo; 071import org.apache.activemq.command.Message; 072import org.apache.activemq.command.MessageAck; 073import org.apache.activemq.command.MessageDispatch; 074import org.apache.activemq.command.MessageId; 075import org.apache.activemq.command.NetworkBridgeFilter; 076import org.apache.activemq.command.ProducerInfo; 077import org.apache.activemq.command.RemoveInfo; 078import org.apache.activemq.command.RemoveSubscriptionInfo; 079import org.apache.activemq.command.Response; 080import org.apache.activemq.command.SessionInfo; 081import org.apache.activemq.command.ShutdownInfo; 082import org.apache.activemq.command.SubscriptionInfo; 083import org.apache.activemq.command.WireFormatInfo; 084import org.apache.activemq.filter.DestinationFilter; 085import org.apache.activemq.filter.MessageEvaluationContext; 086import org.apache.activemq.security.SecurityContext; 087import org.apache.activemq.transport.DefaultTransportListener; 088import org.apache.activemq.transport.FutureResponse; 089import org.apache.activemq.transport.ResponseCallback; 090import org.apache.activemq.transport.Transport; 091import org.apache.activemq.transport.TransportDisposedIOException; 092import org.apache.activemq.transport.TransportFilter; 093import org.apache.activemq.transport.tcp.SslTransport; 094import org.apache.activemq.util.IdGenerator; 095import org.apache.activemq.util.IntrospectionSupport; 096import org.apache.activemq.util.LongSequenceGenerator; 097import org.apache.activemq.util.MarshallingSupport; 098import org.apache.activemq.util.ServiceStopper; 099import org.apache.activemq.util.ServiceSupport; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103/** 104 * A useful base class for implementing demand forwarding bridges. 105 */ 106public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 107 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 108 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 109 protected final Transport localBroker; 110 protected final Transport remoteBroker; 111 protected IdGenerator idGenerator = new IdGenerator(); 112 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 113 protected ConnectionInfo localConnectionInfo; 114 protected ConnectionInfo remoteConnectionInfo; 115 protected SessionInfo localSessionInfo; 116 protected ProducerInfo producerInfo; 117 protected String remoteBrokerName = "Unknown"; 118 protected String localClientId; 119 protected ConsumerInfo demandConsumerInfo; 120 protected int demandConsumerDispatched; 121 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 122 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 123 protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); 124 protected final AtomicBoolean disposed = new AtomicBoolean(); 125 protected BrokerId localBrokerId; 126 protected ActiveMQDestination[] excludedDestinations; 127 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 128 protected ActiveMQDestination[] staticallyIncludedDestinations; 129 protected ActiveMQDestination[] durableDestinations; 130 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 131 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>(); 132 protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; 133 protected final CountDownLatch startedLatch = new CountDownLatch(2); 134 protected final CountDownLatch localStartedLatch = new CountDownLatch(1); 135 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 136 protected NetworkBridgeConfiguration configuration; 137 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 138 139 protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; 140 protected BrokerId remoteBrokerId; 141 142 final AtomicLong enqueueCounter = new AtomicLong(); 143 final AtomicLong dequeueCounter = new AtomicLong(); 144 145 private NetworkBridgeListener networkBridgeListener; 146 private boolean createdByDuplex; 147 private BrokerInfo localBrokerInfo; 148 private BrokerInfo remoteBrokerInfo; 149 150 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); 151 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); 152 153 private final AtomicBoolean started = new AtomicBoolean(); 154 private TransportConnection duplexInitiatingConnection; 155 private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); 156 protected BrokerService brokerService = null; 157 private ObjectName mbeanObjectName; 158 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); 159 private Transport duplexInboundLocalBroker = null; 160 private ProducerInfo duplexInboundLocalProducerInfo; 161 162 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 163 this.configuration = configuration; 164 this.localBroker = localBroker; 165 this.remoteBroker = remoteBroker; 166 } 167 168 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 169 this.localBrokerInfo = localBrokerInfo; 170 this.remoteBrokerInfo = remoteBrokerInfo; 171 this.duplexInitiatingConnection = connection; 172 start(); 173 serviceRemoteCommand(remoteBrokerInfo); 174 } 175 176 @Override 177 public void start() throws Exception { 178 if (started.compareAndSet(false, true)) { 179 180 if (brokerService == null) { 181 throw new IllegalArgumentException("BrokerService is null on " + this); 182 } 183 184 if (isDuplex()) { 185 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker()); 186 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { 187 188 @Override 189 public void onCommand(Object o) { 190 Command command = (Command) o; 191 serviceLocalCommand(command); 192 } 193 194 @Override 195 public void onException(IOException error) { 196 serviceLocalException(error); 197 } 198 }); 199 duplexInboundLocalBroker.start(); 200 } 201 202 localBroker.setTransportListener(new DefaultTransportListener() { 203 204 @Override 205 public void onCommand(Object o) { 206 Command command = (Command) o; 207 serviceLocalCommand(command); 208 } 209 210 @Override 211 public void onException(IOException error) { 212 if (!futureLocalBrokerInfo.isDone()) { 213 futureLocalBrokerInfo.cancel(true); 214 return; 215 } 216 serviceLocalException(error); 217 } 218 }); 219 220 remoteBroker.setTransportListener(new DefaultTransportListener() { 221 222 @Override 223 public void onCommand(Object o) { 224 Command command = (Command) o; 225 serviceRemoteCommand(command); 226 } 227 228 @Override 229 public void onException(IOException error) { 230 if (!futureRemoteBrokerInfo.isDone()) { 231 futureRemoteBrokerInfo.cancel(true); 232 return; 233 } 234 serviceRemoteException(error); 235 } 236 }); 237 238 remoteBroker.start(); 239 localBroker.start(); 240 241 if (!disposed.get()) { 242 try { 243 triggerStartAsyncNetworkBridgeCreation(); 244 } catch (IOException e) { 245 LOG.warn("Caught exception from remote start", e); 246 } 247 } else { 248 LOG.warn("Bridge was disposed before the start() method was fully executed."); 249 throw new TransportDisposedIOException(); 250 } 251 } 252 } 253 254 @Override 255 public void stop() throws Exception { 256 if (started.compareAndSet(true, false)) { 257 if (disposed.compareAndSet(false, true)) { 258 LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName); 259 260 futureRemoteBrokerInfo.cancel(true); 261 futureLocalBrokerInfo.cancel(true); 262 263 NetworkBridgeListener l = this.networkBridgeListener; 264 if (l != null) { 265 l.onStop(this); 266 } 267 try { 268 // local start complete 269 if (startedLatch.getCount() < 2) { 270 LOG.trace("{} unregister bridge ({}) to {}", new Object[]{ 271 configuration.getBrokerName(), this, remoteBrokerName 272 }); 273 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 274 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 275 } 276 277 remoteBridgeStarted.set(false); 278 final CountDownLatch sendShutdown = new CountDownLatch(1); 279 280 brokerService.getTaskRunnerFactory().execute(new Runnable() { 281 @Override 282 public void run() { 283 try { 284 serialExecutor.shutdown(); 285 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 286 List<Runnable> pendingTasks = serialExecutor.shutdownNow(); 287 LOG.info("pending tasks on stop {}", pendingTasks); 288 } 289 localBroker.oneway(new ShutdownInfo()); 290 remoteBroker.oneway(new ShutdownInfo()); 291 } catch (Throwable e) { 292 LOG.debug("Caught exception sending shutdown", e); 293 } finally { 294 sendShutdown.countDown(); 295 } 296 297 } 298 }, "ActiveMQ ForwardingBridge StopTask"); 299 300 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 301 LOG.info("Network Could not shutdown in a timely manner"); 302 } 303 } finally { 304 ServiceStopper ss = new ServiceStopper(); 305 ss.stop(remoteBroker); 306 ss.stop(localBroker); 307 ss.stop(duplexInboundLocalBroker); 308 // Release the started Latch since another thread could be 309 // stuck waiting for it to start up. 310 startedLatch.countDown(); 311 startedLatch.countDown(); 312 localStartedLatch.countDown(); 313 314 ss.throwFirstException(); 315 } 316 } 317 318 LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName); 319 } 320 } 321 322 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { 323 brokerService.getTaskRunnerFactory().execute(new Runnable() { 324 @Override 325 public void run() { 326 final String originalName = Thread.currentThread().getName(); 327 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + 328 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); 329 330 try { 331 // First we collect the info data from both the local and remote ends 332 collectBrokerInfos(); 333 334 // Once we have all required broker info we can attempt to start 335 // the local and then remote sides of the bridge. 336 doStartLocalAndRemoteBridges(); 337 } finally { 338 Thread.currentThread().setName(originalName); 339 } 340 } 341 }); 342 } 343 344 private void collectBrokerInfos() { 345 346 // First wait for the remote to feed us its BrokerInfo, then we can check on 347 // the LocalBrokerInfo and decide is this is a loop. 348 try { 349 remoteBrokerInfo = futureRemoteBrokerInfo.get(); 350 if (remoteBrokerInfo == null) { 351 fireBridgeFailed(new Throwable("remoteBrokerInfo is null")); 352 return; 353 } 354 } catch (Exception e) { 355 serviceRemoteException(e); 356 return; 357 } 358 359 try { 360 localBrokerInfo = futureLocalBrokerInfo.get(); 361 if (localBrokerInfo == null) { 362 fireBridgeFailed(new Throwable("localBrokerInfo is null")); 363 return; 364 } 365 366 // Before we try and build the bridge lets check if we are in a loop 367 // and if so just stop now before registering anything. 368 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 369 if (localBrokerId.equals(remoteBrokerId)) { 370 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ 371 configuration.getBrokerName(), remoteBrokerName, remoteBrokerId 372 }); 373 ServiceSupport.dispose(localBroker); 374 ServiceSupport.dispose(remoteBroker); 375 // the bridge is left in a bit of limbo, but it won't get retried 376 // in this state. 377 return; 378 } 379 380 // Fill in the remote broker's information now. 381 remoteBrokerPath[0] = remoteBrokerId; 382 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 383 if (configuration.isUseBrokerNamesAsIdSeed()) { 384 idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName); 385 } 386 } catch (Throwable e) { 387 serviceLocalException(e); 388 } 389 } 390 391 private void doStartLocalAndRemoteBridges() { 392 393 if (disposed.get()) { 394 return; 395 } 396 397 if (isCreatedByDuplex()) { 398 // apply remote (propagated) configuration to local duplex bridge before start 399 Properties props = null; 400 try { 401 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 402 IntrospectionSupport.getProperties(configuration, props, null); 403 if (configuration.getExcludedDestinations() != null) { 404 excludedDestinations = configuration.getExcludedDestinations().toArray( 405 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 406 } 407 if (configuration.getStaticallyIncludedDestinations() != null) { 408 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 409 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 410 } 411 if (configuration.getDynamicallyIncludedDestinations() != null) { 412 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( 413 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); 414 } 415 } catch (Throwable t) { 416 LOG.error("Error mapping remote configuration: {}", props, t); 417 } 418 } 419 420 try { 421 startLocalBridge(); 422 } catch (Throwable e) { 423 serviceLocalException(e); 424 return; 425 } 426 427 try { 428 startRemoteBridge(); 429 } catch (Throwable e) { 430 serviceRemoteException(e); 431 return; 432 } 433 434 try { 435 if (safeWaitUntilStarted()) { 436 setupStaticDestinations(); 437 } 438 } catch (Throwable e) { 439 serviceLocalException(e); 440 } 441 } 442 443 private void startLocalBridge() throws Throwable { 444 if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) { 445 synchronized (this) { 446 LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker); 447 if (!disposed.get()) { 448 449 if (idGenerator == null) { 450 throw new IllegalStateException("Id Generator cannot be null"); 451 } 452 453 localConnectionInfo = new ConnectionInfo(); 454 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 455 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 456 localConnectionInfo.setClientId(localClientId); 457 localConnectionInfo.setUserName(configuration.getUserName()); 458 localConnectionInfo.setPassword(configuration.getPassword()); 459 Transport originalTransport = remoteBroker; 460 while (originalTransport instanceof TransportFilter) { 461 originalTransport = ((TransportFilter) originalTransport).getNext(); 462 } 463 if (originalTransport instanceof SslTransport) { 464 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 465 localConnectionInfo.setTransportContext(peerCerts); 466 } 467 // sync requests that may fail 468 Object resp = localBroker.request(localConnectionInfo); 469 if (resp instanceof ExceptionResponse) { 470 throw ((ExceptionResponse) resp).getException(); 471 } 472 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 473 localBroker.oneway(localSessionInfo); 474 475 if (configuration.isDuplex()) { 476 // separate in-bound channel for forwards so we don't 477 // contend with out-bound dispatch on same connection 478 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); 479 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 480 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" 481 + configuration.getBrokerName()); 482 duplexLocalConnectionInfo.setUserName(configuration.getUserName()); 483 duplexLocalConnectionInfo.setPassword(configuration.getPassword()); 484 485 if (originalTransport instanceof SslTransport) { 486 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 487 duplexLocalConnectionInfo.setTransportContext(peerCerts); 488 } 489 // sync requests that may fail 490 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); 491 if (resp instanceof ExceptionResponse) { 492 throw ((ExceptionResponse) resp).getException(); 493 } 494 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); 495 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); 496 duplexInboundLocalBroker.oneway(duplexInboundSession); 497 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo); 498 } 499 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 500 NetworkBridgeListener l = this.networkBridgeListener; 501 if (l != null) { 502 l.onStart(this); 503 } 504 505 // Let the local broker know the remote broker's ID. 506 localBroker.oneway(remoteBrokerInfo); 507 // new peer broker (a consumer can work with remote broker also) 508 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 509 510 LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{ 511 localBroker, remoteBroker, remoteBrokerName 512 }); 513 LOG.trace("{} register bridge ({}) to {}", new Object[]{ 514 configuration.getBrokerName(), this, remoteBrokerName 515 }); 516 } else { 517 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); 518 } 519 startedLatch.countDown(); 520 localStartedLatch.countDown(); 521 } 522 } 523 } 524 525 protected void startRemoteBridge() throws Exception { 526 if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) { 527 LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker); 528 synchronized (this) { 529 if (!isCreatedByDuplex()) { 530 BrokerInfo brokerInfo = new BrokerInfo(); 531 brokerInfo.setBrokerName(configuration.getBrokerName()); 532 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 533 brokerInfo.setNetworkConnection(true); 534 brokerInfo.setDuplexConnection(configuration.isDuplex()); 535 // set our properties 536 Properties props = new Properties(); 537 IntrospectionSupport.getProperties(configuration, props, null); 538 props.remove("networkTTL"); 539 String str = MarshallingSupport.propertiesToString(props); 540 brokerInfo.setNetworkProperties(str); 541 brokerInfo.setBrokerId(this.localBrokerId); 542 remoteBroker.oneway(brokerInfo); 543 } 544 if (remoteConnectionInfo != null) { 545 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 546 } 547 remoteConnectionInfo = new ConnectionInfo(); 548 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 549 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 550 remoteConnectionInfo.setUserName(configuration.getUserName()); 551 remoteConnectionInfo.setPassword(configuration.getPassword()); 552 remoteBroker.oneway(remoteConnectionInfo); 553 554 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 555 remoteBroker.oneway(remoteSessionInfo); 556 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 557 producerInfo.setResponseRequired(false); 558 remoteBroker.oneway(producerInfo); 559 // Listen to consumer advisory messages on the remote broker to determine demand. 560 if (!configuration.isStaticBridge()) { 561 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 562 // always dispatch advisory message asynchronously so that 563 // we never block the producer broker if we are slow 564 demandConsumerInfo.setDispatchAsync(true); 565 String advisoryTopic = configuration.getDestinationFilter(); 566 if (configuration.isBridgeTempDestinations()) { 567 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 568 } 569 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 570 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 571 remoteBroker.oneway(demandConsumerInfo); 572 } 573 startedLatch.countDown(); 574 } 575 } 576 } 577 578 @Override 579 public void serviceRemoteException(Throwable error) { 580 if (!disposed.get()) { 581 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 582 LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 583 localBroker, remoteBroker, error 584 }); 585 } else { 586 LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 587 localBroker, remoteBroker, error 588 }); 589 } 590 LOG.debug("The remote Exception was: {}", error, error); 591 brokerService.getTaskRunnerFactory().execute(new Runnable() { 592 @Override 593 public void run() { 594 ServiceSupport.dispose(getControllingService()); 595 } 596 }); 597 fireBridgeFailed(error); 598 } 599 } 600 601 protected void serviceRemoteCommand(Command command) { 602 if (!disposed.get()) { 603 try { 604 if (command.isMessageDispatch()) { 605 safeWaitUntilStarted(); 606 MessageDispatch md = (MessageDispatch) command; 607 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 608 ackAdvisory(md.getMessage()); 609 } else if (command.isBrokerInfo()) { 610 futureRemoteBrokerInfo.set((BrokerInfo) command); 611 } else if (command.getClass() == ConnectionError.class) { 612 ConnectionError ce = (ConnectionError) command; 613 serviceRemoteException(ce.getException()); 614 } else { 615 if (isDuplex()) { 616 LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); 617 if (command.isMessage()) { 618 final ActiveMQMessage message = (ActiveMQMessage) command; 619 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) 620 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { 621 serviceRemoteConsumerAdvisory(message.getDataStructure()); 622 ackAdvisory(message); 623 } else { 624 if (!isPermissableDestination(message.getDestination(), true)) { 625 return; 626 } 627 // message being forwarded - we need to 628 // propagate the response to our local send 629 if (canDuplexDispatch(message)) { 630 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); 631 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 632 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { 633 final int correlationId = message.getCommandId(); 634 635 @Override 636 public void onCompletion(FutureResponse resp) { 637 try { 638 Response reply = resp.getResult(); 639 reply.setCorrelationId(correlationId); 640 remoteBroker.oneway(reply); 641 } catch (IOException error) { 642 LOG.error("Exception: {} on duplex forward of: {}", error, message); 643 serviceRemoteException(error); 644 } 645 } 646 }); 647 } else { 648 duplexInboundLocalBroker.oneway(message); 649 } 650 serviceInboundMessage(message); 651 } else { 652 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 653 Response reply = new Response(); 654 reply.setCorrelationId(message.getCommandId()); 655 remoteBroker.oneway(reply); 656 } 657 } 658 } 659 } else { 660 switch (command.getDataStructureType()) { 661 case ConnectionInfo.DATA_STRUCTURE_TYPE: 662 if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { 663 // end of initiating connection setup - propogate to initial connection to get mbean by clientid 664 duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); 665 } else { 666 localBroker.oneway(command); 667 } 668 break; 669 case SessionInfo.DATA_STRUCTURE_TYPE: 670 localBroker.oneway(command); 671 break; 672 case ProducerInfo.DATA_STRUCTURE_TYPE: 673 // using duplexInboundLocalProducerInfo 674 break; 675 case MessageAck.DATA_STRUCTURE_TYPE: 676 MessageAck ack = (MessageAck) command; 677 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 678 if (localSub != null) { 679 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 680 localBroker.oneway(ack); 681 } else { 682 LOG.warn("Matching local subscription not found for ack: {}", ack); 683 } 684 break; 685 case ConsumerInfo.DATA_STRUCTURE_TYPE: 686 localStartedLatch.await(); 687 if (started.get()) { 688 addConsumerInfo((ConsumerInfo) command); 689 } else { 690 // received a subscription whilst stopping 691 LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); 692 } 693 break; 694 case ShutdownInfo.DATA_STRUCTURE_TYPE: 695 // initiator is shutting down, controlled case 696 // abortive close dealt with by inactivity monitor 697 LOG.info("Stopping network bridge on shutdown of remote broker"); 698 serviceRemoteException(new IOException(command.toString())); 699 break; 700 default: 701 LOG.debug("Ignoring remote command: {}", command); 702 } 703 } 704 } else { 705 switch (command.getDataStructureType()) { 706 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 707 case WireFormatInfo.DATA_STRUCTURE_TYPE: 708 case ShutdownInfo.DATA_STRUCTURE_TYPE: 709 break; 710 default: 711 LOG.warn("Unexpected remote command: {}", command); 712 } 713 } 714 } 715 } catch (Throwable e) { 716 LOG.debug("Exception processing remote command: {}", command, e); 717 serviceRemoteException(e); 718 } 719 } 720 } 721 722 private void ackAdvisory(Message message) throws IOException { 723 demandConsumerDispatched++; 724 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) { 725 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 726 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 727 remoteBroker.oneway(ack); 728 demandConsumerDispatched = 0; 729 } 730 } 731 732 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 733 final int networkTTL = configuration.getConsumerTTL(); 734 if (data.getClass() == ConsumerInfo.class) { 735 // Create a new local subscription 736 ConsumerInfo info = (ConsumerInfo) data; 737 BrokerId[] path = info.getBrokerPath(); 738 739 if (info.isBrowser()) { 740 LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName); 741 return; 742 } 743 744 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 745 LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{ 746 configuration.getBrokerName(), remoteBrokerName, networkTTL, info 747 }); 748 return; 749 } 750 751 if (contains(path, localBrokerPath[0])) { 752 // Ignore this consumer as it's a consumer we locally sent to the broker. 753 LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{ 754 configuration.getBrokerName(), remoteBrokerName, info 755 }); 756 return; 757 } 758 759 if (!isPermissableDestination(info.getDestination())) { 760 // ignore if not in the permitted or in the excluded list 761 LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{ 762 configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info 763 }); 764 return; 765 } 766 767 // in a cyclic network there can be multiple bridges per broker that can propagate 768 // a network subscription so there is a need to synchronize on a shared entity 769 synchronized (brokerService.getVmConnectorURI()) { 770 addConsumerInfo(info); 771 } 772 } else if (data.getClass() == DestinationInfo.class) { 773 // It's a destination info - we want to pass up information about temporary destinations 774 final DestinationInfo destInfo = (DestinationInfo) data; 775 BrokerId[] path = destInfo.getBrokerPath(); 776 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 777 LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{ 778 configuration.getBrokerName(), destInfo, networkTTL 779 }); 780 return; 781 } 782 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 783 LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo); 784 return; 785 } 786 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 787 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 788 // re-set connection id so comes from here 789 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 790 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 791 } 792 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 793 LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{ 794 configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo 795 }); 796 if (destInfo.isRemoveOperation()) { 797 // Serialize with removeSub operations such that all removeSub advisories 798 // are generated 799 serialExecutor.execute(new Runnable() { 800 @Override 801 public void run() { 802 try { 803 localBroker.oneway(destInfo); 804 } catch (IOException e) { 805 LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); 806 } 807 } 808 }); 809 } else { 810 localBroker.oneway(destInfo); 811 } 812 } else if (data.getClass() == RemoveInfo.class) { 813 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 814 removeDemandSubscription(id); 815 } else if (data.getClass() == RemoveSubscriptionInfo.class) { 816 RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); 817 SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); 818 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 819 DemandSubscription ds = i.next(); 820 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); 821 if (removed) { 822 if (ds.getDurableRemoteSubs().isEmpty()) { 823 824 // deactivate subscriber 825 RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); 826 localBroker.oneway(removeInfo); 827 828 // remove subscriber 829 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 830 sending.setClientId(localClientId); 831 sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); 832 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 833 localBroker.oneway(sending); 834 } 835 } 836 } 837 } 838 } 839 840 @Override 841 public void serviceLocalException(Throwable error) { 842 serviceLocalException(null, error); 843 } 844 845 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { 846 LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); 847 if (!disposed.get()) { 848 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { 849 // not a reason to terminate the bridge - temps can disappear with 850 // pending sends as the demand sub may outlive the remote dest 851 if (messageDispatch != null) { 852 LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); 853 try { 854 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); 855 poisonAck.setPoisonCause(error); 856 localBroker.oneway(poisonAck); 857 } catch (IOException ioe) { 858 LOG.error("Failed to posion ack message following forward failure: ", ioe); 859 } 860 fireFailedForwardAdvisory(messageDispatch, error); 861 } else { 862 LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); 863 } 864 return; 865 } 866 867 LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); 868 LOG.debug("The local Exception was: {}", error, error); 869 870 brokerService.getTaskRunnerFactory().execute(new Runnable() { 871 @Override 872 public void run() { 873 ServiceSupport.dispose(getControllingService()); 874 } 875 }); 876 fireBridgeFailed(error); 877 } 878 } 879 880 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) { 881 if (configuration.isAdvisoryForFailedForward()) { 882 AdvisoryBroker advisoryBroker = null; 883 try { 884 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); 885 886 if (advisoryBroker != null) { 887 ConnectionContext context = new ConnectionContext(); 888 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 889 context.setBroker(brokerService.getBroker()); 890 891 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 892 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); 893 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, 894 advisoryMessage); 895 896 } 897 } catch (Exception e) { 898 LOG.warn("failed to fire forward failure advisory, cause: {}", e); 899 LOG.debug("detail", e); 900 } 901 } 902 } 903 904 protected Service getControllingService() { 905 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 906 } 907 908 protected void addSubscription(DemandSubscription sub) throws IOException { 909 if (sub != null) { 910 if (isDuplex()) { 911 // async vm transport, need to wait for completion 912 localBroker.request(sub.getLocalInfo()); 913 } else { 914 localBroker.oneway(sub.getLocalInfo()); 915 } 916 } 917 } 918 919 protected void removeSubscription(final DemandSubscription sub) throws IOException { 920 if (sub != null) { 921 LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); 922 923 // ensure not available for conduit subs pending removal 924 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 925 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 926 927 // continue removal in separate thread to free up this thread for outstanding responses 928 // Serialize with removeDestination operations so that removeSubs are serialized with 929 // removeDestinations such that all removeSub advisories are generated 930 serialExecutor.execute(new Runnable() { 931 @Override 932 public void run() { 933 sub.waitForCompletion(); 934 try { 935 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 936 } catch (IOException e) { 937 LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); 938 } 939 } 940 }); 941 } 942 } 943 944 protected Message configureMessage(MessageDispatch md) throws IOException { 945 Message message = md.getMessage().copy(); 946 // Update the packet to show where it came from. 947 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 948 message.setProducerId(producerInfo.getProducerId()); 949 message.setDestination(md.getDestination()); 950 message.setMemoryUsage(null); 951 if (message.getOriginalTransactionId() == null) { 952 message.setOriginalTransactionId(message.getTransactionId()); 953 } 954 message.setTransactionId(null); 955 if (configuration.isUseCompression()) { 956 message.compress(); 957 } 958 return message; 959 } 960 961 protected void serviceLocalCommand(Command command) { 962 if (!disposed.get()) { 963 try { 964 if (command.isMessageDispatch()) { 965 safeWaitUntilStarted(); 966 enqueueCounter.incrementAndGet(); 967 final MessageDispatch md = (MessageDispatch) command; 968 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 969 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 970 971 if (suppressMessageDispatch(md, sub)) { 972 LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ 973 configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() 974 }); 975 // still ack as it may be durable 976 try { 977 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 978 } finally { 979 sub.decrementOutstandingResponses(); 980 } 981 return; 982 } 983 984 Message message = configureMessage(md); 985 LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ 986 configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message 987 }); 988 989 if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { 990 try { 991 // never request b/c they are eventually acked async 992 remoteBroker.oneway(message); 993 } finally { 994 sub.decrementOutstandingResponses(); 995 } 996 return; 997 } 998 999 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { 1000 1001 // The message was not sent using async send, so we should only 1002 // ack the local broker when we get confirmation that the remote 1003 // broker has received the message. 1004 remoteBroker.asyncRequest(message, new ResponseCallback() { 1005 @Override 1006 public void onCompletion(FutureResponse future) { 1007 try { 1008 Response response = future.getResult(); 1009 if (response.isException()) { 1010 ExceptionResponse er = (ExceptionResponse) response; 1011 serviceLocalException(md, er.getException()); 1012 } else { 1013 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1014 dequeueCounter.incrementAndGet(); 1015 } 1016 } catch (IOException e) { 1017 serviceLocalException(md, e); 1018 } finally { 1019 sub.decrementOutstandingResponses(); 1020 } 1021 } 1022 }); 1023 1024 } else { 1025 // If the message was originally sent using async send, we will 1026 // preserve that QOS by bridging it using an async send (small chance 1027 // of message loss). 1028 try { 1029 remoteBroker.oneway(message); 1030 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1031 dequeueCounter.incrementAndGet(); 1032 } finally { 1033 sub.decrementOutstandingResponses(); 1034 } 1035 } 1036 serviceOutbound(message); 1037 } else { 1038 LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); 1039 } 1040 } else if (command.isBrokerInfo()) { 1041 futureLocalBrokerInfo.set((BrokerInfo) command); 1042 } else if (command.isShutdownInfo()) { 1043 LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); 1044 stop(); 1045 } else if (command.getClass() == ConnectionError.class) { 1046 ConnectionError ce = (ConnectionError) command; 1047 serviceLocalException(ce.getException()); 1048 } else { 1049 switch (command.getDataStructureType()) { 1050 case WireFormatInfo.DATA_STRUCTURE_TYPE: 1051 break; 1052 default: 1053 LOG.warn("Unexpected local command: {}", command); 1054 } 1055 } 1056 } catch (Throwable e) { 1057 LOG.warn("Caught an exception processing local command", e); 1058 serviceLocalException(e); 1059 } 1060 } 1061 } 1062 1063 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 1064 boolean suppress = false; 1065 // for durable subs, suppression via filter leaves dangling acks so we 1066 // need to check here and allow the ack irrespective 1067 if (sub.getLocalInfo().isDurable()) { 1068 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 1069 messageEvalContext.setMessageReference(md.getMessage()); 1070 messageEvalContext.setDestination(md.getDestination()); 1071 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 1072 } 1073 return suppress; 1074 } 1075 1076 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 1077 if (brokerPath != null) { 1078 for (BrokerId id : brokerPath) { 1079 if (brokerId.equals(id)) { 1080 return true; 1081 } 1082 } 1083 } 1084 return false; 1085 } 1086 1087 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 1088 if (brokerPath == null || brokerPath.length == 0) { 1089 return pathsToAppend; 1090 } 1091 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 1092 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1093 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 1094 return rc; 1095 } 1096 1097 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 1098 if (brokerPath == null || brokerPath.length == 0) { 1099 return new BrokerId[]{idToAppend}; 1100 } 1101 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 1102 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1103 rc[brokerPath.length] = idToAppend; 1104 return rc; 1105 } 1106 1107 protected boolean isPermissableDestination(ActiveMQDestination destination) { 1108 return isPermissableDestination(destination, false); 1109 } 1110 1111 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 1112 // Are we not bridging temporary destinations? 1113 if (destination.isTemporary()) { 1114 if (allowTemporary) { 1115 return true; 1116 } else { 1117 return configuration.isBridgeTempDestinations(); 1118 } 1119 } 1120 1121 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1122 if (dests != null && dests.length > 0) { 1123 for (ActiveMQDestination dest : dests) { 1124 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1125 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1126 return true; 1127 } 1128 } 1129 } 1130 1131 dests = excludedDestinations; 1132 if (dests != null && dests.length > 0) { 1133 for (ActiveMQDestination dest : dests) { 1134 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); 1135 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1136 return false; 1137 } 1138 } 1139 } 1140 1141 dests = dynamicallyIncludedDestinations; 1142 if (dests != null && dests.length > 0) { 1143 for (ActiveMQDestination dest : dests) { 1144 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1145 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1146 return true; 1147 } 1148 } 1149 1150 return false; 1151 } 1152 return true; 1153 } 1154 1155 /** 1156 * Subscriptions for these destinations are always created 1157 */ 1158 protected void setupStaticDestinations() { 1159 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1160 if (dests != null) { 1161 for (ActiveMQDestination dest : dests) { 1162 DemandSubscription sub = createDemandSubscription(dest); 1163 sub.setStaticallyIncluded(true); 1164 try { 1165 addSubscription(sub); 1166 } catch (IOException e) { 1167 LOG.error("Failed to add static destination {}", dest, e); 1168 } 1169 LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); 1170 } 1171 } 1172 } 1173 1174 protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1175 ConsumerInfo info = consumerInfo.copy(); 1176 addRemoteBrokerToBrokerPath(info); 1177 DemandSubscription sub = createDemandSubscription(info); 1178 if (sub != null) { 1179 if (duplicateSuppressionIsRequired(sub)) { 1180 undoMapRegistration(sub); 1181 } else { 1182 if (consumerInfo.isDurable()) { 1183 sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); 1184 } 1185 addSubscription(sub); 1186 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); 1187 } 1188 } 1189 } 1190 1191 private void undoMapRegistration(DemandSubscription sub) { 1192 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1193 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1194 } 1195 1196 /* 1197 * check our existing subs networkConsumerIds against the list of network 1198 * ids in this subscription A match means a duplicate which we suppress for 1199 * topics and maybe for queues 1200 */ 1201 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1202 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1203 boolean suppress = false; 1204 1205 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() 1206 && !configuration.isSuppressDuplicateTopicSubscriptions()) { 1207 return suppress; 1208 } 1209 1210 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1211 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); 1212 for (Subscription sub : currentSubs) { 1213 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1214 if (!networkConsumers.isEmpty()) { 1215 if (matchFound(candidateConsumers, networkConsumers)) { 1216 if (isInActiveDurableSub(sub)) { 1217 suppress = false; 1218 } else { 1219 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1220 } 1221 break; 1222 } 1223 } 1224 } 1225 return suppress; 1226 } 1227 1228 private boolean isInActiveDurableSub(Subscription sub) { 1229 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); 1230 } 1231 1232 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1233 boolean suppress = false; 1234 1235 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1236 LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{ 1237 configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds() 1238 }); 1239 suppress = true; 1240 } else { 1241 // remove the existing lower priority duplicate and allow this candidate 1242 try { 1243 removeDuplicateSubscription(existingSub); 1244 1245 LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{ 1246 configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds() 1247 }); 1248 } catch (IOException e) { 1249 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e); 1250 } 1251 } 1252 return suppress; 1253 } 1254 1255 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1256 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1257 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1258 break; 1259 } 1260 } 1261 } 1262 1263 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1264 boolean found = false; 1265 for (ConsumerId aliasConsumer : networkConsumers) { 1266 if (candidateConsumers.contains(aliasConsumer)) { 1267 found = true; 1268 break; 1269 } 1270 } 1271 return found; 1272 } 1273 1274 protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1275 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1276 Region region; 1277 Collection<Subscription> subs; 1278 1279 region = null; 1280 switch (dest.getDestinationType()) { 1281 case ActiveMQDestination.QUEUE_TYPE: 1282 region = region_broker.getQueueRegion(); 1283 break; 1284 case ActiveMQDestination.TOPIC_TYPE: 1285 region = region_broker.getTopicRegion(); 1286 break; 1287 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1288 region = region_broker.getTempQueueRegion(); 1289 break; 1290 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1291 region = region_broker.getTempTopicRegion(); 1292 break; 1293 } 1294 1295 if (region instanceof AbstractRegion) { 1296 subs = ((AbstractRegion) region).getSubscriptions().values(); 1297 } else { 1298 subs = null; 1299 } 1300 1301 return subs; 1302 } 1303 1304 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1305 // add our original id to ourselves 1306 info.addNetworkConsumerId(info.getConsumerId()); 1307 return doCreateDemandSubscription(info); 1308 } 1309 1310 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1311 DemandSubscription result = new DemandSubscription(info); 1312 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1313 if (info.getDestination().isTemporary()) { 1314 // reset the local connection Id 1315 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1316 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1317 } 1318 1319 if (configuration.isDecreaseNetworkConsumerPriority()) { 1320 byte priority = (byte) configuration.getConsumerPriorityBase(); 1321 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1322 // The longer the path to the consumer, the less it's consumer priority. 1323 priority -= info.getBrokerPath().length + 1; 1324 } 1325 result.getLocalInfo().setPriority(priority); 1326 LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); 1327 } 1328 configureDemandSubscription(info, result); 1329 return result; 1330 } 1331 1332 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) { 1333 ConsumerInfo info = new ConsumerInfo(); 1334 info.setNetworkSubscription(true); 1335 info.setDestination(destination); 1336 1337 // Indicate that this subscription is being made on behalf of the remote broker. 1338 info.setBrokerPath(new BrokerId[]{remoteBrokerId}); 1339 1340 // the remote info held by the DemandSubscription holds the original 1341 // consumerId, the local info get's overwritten 1342 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1343 DemandSubscription result = null; 1344 try { 1345 result = createDemandSubscription(info); 1346 } catch (IOException e) { 1347 LOG.error("Failed to create DemandSubscription ", e); 1348 } 1349 return result; 1350 } 1351 1352 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1353 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 1354 sub.getLocalInfo().setDispatchAsync(true); 1355 } else { 1356 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1357 } 1358 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize()); 1359 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1360 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1361 1362 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1363 if (!info.isDurable()) { 1364 // This works for now since we use a VM connection to the local broker. 1365 // may need to change if we ever subscribe to a remote broker. 1366 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1367 } else { 1368 sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 1369 } 1370 } 1371 1372 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1373 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1374 LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{ 1375 configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub 1376 }); 1377 if (sub != null) { 1378 removeSubscription(sub); 1379 LOG.debug("{} removed sub on {} from {}: {}", new Object[]{ 1380 configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo() 1381 }); 1382 } 1383 } 1384 1385 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1386 boolean removeDone = false; 1387 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1388 if (sub != null) { 1389 try { 1390 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1391 removeDone = true; 1392 } catch (IOException e) { 1393 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e); 1394 } 1395 } 1396 return removeDone; 1397 } 1398 1399 /** 1400 * Performs a timed wait on the started latch and then checks for disposed 1401 * before performing another wait each time the the started wait times out. 1402 */ 1403 protected boolean safeWaitUntilStarted() throws InterruptedException { 1404 while (!disposed.get()) { 1405 if (startedLatch.await(1, TimeUnit.SECONDS)) { 1406 break; 1407 } 1408 } 1409 return !disposed.get(); 1410 } 1411 1412 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1413 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1414 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1415 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1416 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1417 filterFactory = entry.getNetworkBridgeFilterFactory(); 1418 } 1419 } 1420 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL()); 1421 } 1422 1423 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1424 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1425 } 1426 1427 protected BrokerId[] getRemoteBrokerPath() { 1428 return remoteBrokerPath; 1429 } 1430 1431 @Override 1432 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1433 this.networkBridgeListener = listener; 1434 } 1435 1436 private void fireBridgeFailed(Throwable reason) { 1437 LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason); 1438 NetworkBridgeListener l = this.networkBridgeListener; 1439 if (l != null && this.bridgeFailed.compareAndSet(false, true)) { 1440 l.bridgeFailed(); 1441 } 1442 } 1443 1444 /** 1445 * @return Returns the dynamicallyIncludedDestinations. 1446 */ 1447 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 1448 return dynamicallyIncludedDestinations; 1449 } 1450 1451 /** 1452 * @param dynamicallyIncludedDestinations 1453 * The dynamicallyIncludedDestinations to set. 1454 */ 1455 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 1456 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 1457 } 1458 1459 /** 1460 * @return Returns the excludedDestinations. 1461 */ 1462 public ActiveMQDestination[] getExcludedDestinations() { 1463 return excludedDestinations; 1464 } 1465 1466 /** 1467 * @param excludedDestinations The excludedDestinations to set. 1468 */ 1469 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 1470 this.excludedDestinations = excludedDestinations; 1471 } 1472 1473 /** 1474 * @return Returns the staticallyIncludedDestinations. 1475 */ 1476 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 1477 return staticallyIncludedDestinations; 1478 } 1479 1480 /** 1481 * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. 1482 */ 1483 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 1484 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 1485 } 1486 1487 /** 1488 * @return Returns the durableDestinations. 1489 */ 1490 public ActiveMQDestination[] getDurableDestinations() { 1491 return durableDestinations; 1492 } 1493 1494 /** 1495 * @param durableDestinations The durableDestinations to set. 1496 */ 1497 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 1498 this.durableDestinations = durableDestinations; 1499 } 1500 1501 /** 1502 * @return Returns the localBroker. 1503 */ 1504 public Transport getLocalBroker() { 1505 return localBroker; 1506 } 1507 1508 /** 1509 * @return Returns the remoteBroker. 1510 */ 1511 public Transport getRemoteBroker() { 1512 return remoteBroker; 1513 } 1514 1515 /** 1516 * @return the createdByDuplex 1517 */ 1518 public boolean isCreatedByDuplex() { 1519 return this.createdByDuplex; 1520 } 1521 1522 /** 1523 * @param createdByDuplex the createdByDuplex to set 1524 */ 1525 public void setCreatedByDuplex(boolean createdByDuplex) { 1526 this.createdByDuplex = createdByDuplex; 1527 } 1528 1529 @Override 1530 public String getRemoteAddress() { 1531 return remoteBroker.getRemoteAddress(); 1532 } 1533 1534 @Override 1535 public String getLocalAddress() { 1536 return localBroker.getRemoteAddress(); 1537 } 1538 1539 @Override 1540 public String getRemoteBrokerName() { 1541 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1542 } 1543 1544 @Override 1545 public String getRemoteBrokerId() { 1546 return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); 1547 } 1548 1549 @Override 1550 public String getLocalBrokerName() { 1551 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1552 } 1553 1554 @Override 1555 public long getDequeueCounter() { 1556 return dequeueCounter.get(); 1557 } 1558 1559 @Override 1560 public long getEnqueueCounter() { 1561 return enqueueCounter.get(); 1562 } 1563 1564 protected boolean isDuplex() { 1565 return configuration.isDuplex() || createdByDuplex; 1566 } 1567 1568 public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1569 return subscriptionMapByRemoteId; 1570 } 1571 1572 @Override 1573 public void setBrokerService(BrokerService brokerService) { 1574 this.brokerService = brokerService; 1575 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1576 localBrokerPath[0] = localBrokerId; 1577 } 1578 1579 @Override 1580 public void setMbeanObjectName(ObjectName objectName) { 1581 this.mbeanObjectName = objectName; 1582 } 1583 1584 @Override 1585 public ObjectName getMbeanObjectName() { 1586 return mbeanObjectName; 1587 } 1588 1589 @Override 1590 public void resetStats() { 1591 enqueueCounter.set(0); 1592 dequeueCounter.set(0); 1593 } 1594 1595 /* 1596 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and 1597 * remote sides of the network bridge. 1598 */ 1599 private static class FutureBrokerInfo implements Future<BrokerInfo> { 1600 1601 private final CountDownLatch slot = new CountDownLatch(1); 1602 private final AtomicBoolean disposed; 1603 private volatile BrokerInfo info = null; 1604 1605 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { 1606 this.info = info; 1607 this.disposed = disposed; 1608 } 1609 1610 @Override 1611 public boolean cancel(boolean mayInterruptIfRunning) { 1612 slot.countDown(); 1613 return true; 1614 } 1615 1616 @Override 1617 public boolean isCancelled() { 1618 return slot.getCount() == 0 && info == null; 1619 } 1620 1621 @Override 1622 public boolean isDone() { 1623 return info != null; 1624 } 1625 1626 @Override 1627 public BrokerInfo get() throws InterruptedException, ExecutionException { 1628 try { 1629 if (info == null) { 1630 while (!disposed.get()) { 1631 if (slot.await(1, TimeUnit.SECONDS)) { 1632 break; 1633 } 1634 } 1635 } 1636 return info; 1637 } catch (InterruptedException e) { 1638 Thread.currentThread().interrupt(); 1639 LOG.debug("Operation interrupted: {}", e, e); 1640 throw new InterruptedException("Interrupted."); 1641 } 1642 } 1643 1644 @Override 1645 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 1646 try { 1647 if (info == null) { 1648 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 1649 1650 while (!disposed.get() || System.currentTimeMillis() < deadline) { 1651 if (slot.await(1, TimeUnit.MILLISECONDS)) { 1652 break; 1653 } 1654 } 1655 if (info == null) { 1656 throw new TimeoutException(); 1657 } 1658 } 1659 return info; 1660 } catch (InterruptedException e) { 1661 throw new InterruptedException("Interrupted."); 1662 } 1663 } 1664 1665 public void set(BrokerInfo info) { 1666 this.info = info; 1667 this.slot.countDown(); 1668 } 1669 } 1670 1671 protected void serviceOutbound(Message message) { 1672 NetworkBridgeListener l = this.networkBridgeListener; 1673 if (l != null) { 1674 l.onOutboundMessage(this, message); 1675 } 1676 } 1677 1678 protected void serviceInboundMessage(Message message) { 1679 NetworkBridgeListener l = this.networkBridgeListener; 1680 if (l != null) { 1681 l.onInboundMessage(this, message); 1682 } 1683 } 1684 1685 protected boolean canDuplexDispatch(Message message) { 1686 boolean result = true; 1687 if (configuration.isCheckDuplicateMessagesOnDuplex()){ 1688 final long producerSequenceId = message.getMessageId().getProducerSequenceId(); 1689 // messages are multiplexed on this producer so we need to query the persistenceAdapter 1690 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); 1691 if (producerSequenceId <= lastStoredForMessageProducer) { 1692 result = false; 1693 LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 1694 (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer 1695 }); 1696 } 1697 } 1698 return result; 1699 } 1700 1701 protected long getStoredSequenceIdForMessage(MessageId messageId) { 1702 try { 1703 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 1704 } catch (IOException ignored) { 1705 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 1706 } 1707 return -1; 1708 } 1709 1710}