001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038 039import javax.transaction.xa.XAResource; 040 041import org.apache.activemq.advisory.AdvisorySupport; 042import org.apache.activemq.broker.region.ConnectionStatistics; 043import org.apache.activemq.broker.region.RegionBroker; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.BrokerInfo; 046import org.apache.activemq.command.Command; 047import org.apache.activemq.command.CommandTypes; 048import org.apache.activemq.command.ConnectionControl; 049import org.apache.activemq.command.ConnectionError; 050import org.apache.activemq.command.ConnectionId; 051import org.apache.activemq.command.ConnectionInfo; 052import org.apache.activemq.command.ConsumerControl; 053import org.apache.activemq.command.ConsumerId; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.ControlCommand; 056import org.apache.activemq.command.DataArrayResponse; 057import org.apache.activemq.command.DestinationInfo; 058import org.apache.activemq.command.ExceptionResponse; 059import org.apache.activemq.command.FlushCommand; 060import org.apache.activemq.command.IntegerResponse; 061import org.apache.activemq.command.KeepAliveInfo; 062import org.apache.activemq.command.Message; 063import org.apache.activemq.command.MessageAck; 064import org.apache.activemq.command.MessageDispatch; 065import org.apache.activemq.command.MessageDispatchNotification; 066import org.apache.activemq.command.MessagePull; 067import org.apache.activemq.command.ProducerAck; 068import org.apache.activemq.command.ProducerId; 069import org.apache.activemq.command.ProducerInfo; 070import org.apache.activemq.command.RemoveInfo; 071import org.apache.activemq.command.RemoveSubscriptionInfo; 072import org.apache.activemq.command.Response; 073import org.apache.activemq.command.SessionId; 074import org.apache.activemq.command.SessionInfo; 075import org.apache.activemq.command.ShutdownInfo; 076import org.apache.activemq.command.TransactionId; 077import org.apache.activemq.command.TransactionInfo; 078import org.apache.activemq.command.WireFormatInfo; 079import org.apache.activemq.network.DemandForwardingBridge; 080import org.apache.activemq.network.MBeanNetworkListener; 081import org.apache.activemq.network.NetworkBridgeConfiguration; 082import org.apache.activemq.network.NetworkBridgeFactory; 083import org.apache.activemq.security.MessageAuthorizationPolicy; 084import org.apache.activemq.state.CommandVisitor; 085import org.apache.activemq.state.ConnectionState; 086import org.apache.activemq.state.ConsumerState; 087import org.apache.activemq.state.ProducerState; 088import org.apache.activemq.state.SessionState; 089import org.apache.activemq.state.TransactionState; 090import org.apache.activemq.thread.Task; 091import org.apache.activemq.thread.TaskRunner; 092import org.apache.activemq.thread.TaskRunnerFactory; 093import org.apache.activemq.transaction.Transaction; 094import org.apache.activemq.transport.DefaultTransportListener; 095import org.apache.activemq.transport.ResponseCorrelator; 096import org.apache.activemq.transport.TransmitCallback; 097import org.apache.activemq.transport.Transport; 098import org.apache.activemq.transport.TransportDisposedIOException; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.MarshallingSupport; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103import org.slf4j.MDC; 104 105public class TransportConnection implements Connection, Task, CommandVisitor { 106 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 107 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 108 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 109 // Keeps track of the broker and connector that created this connection. 110 protected final Broker broker; 111 protected final TransportConnector connector; 112 // Keeps track of the state of the connections. 113 // protected final ConcurrentHashMap localConnectionStates=new 114 // ConcurrentHashMap(); 115 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 116 // The broker and wireformat info that was exchanged. 117 protected BrokerInfo brokerInfo; 118 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 119 protected TaskRunner taskRunner; 120 protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>(); 121 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 122 private final Transport transport; 123 private MessageAuthorizationPolicy messageAuthorizationPolicy; 124 private WireFormatInfo wireFormatInfo; 125 // Used to do async dispatch.. this should perhaps be pushed down into the 126 // transport layer.. 127 private boolean inServiceException; 128 private final ConnectionStatistics statistics = new ConnectionStatistics(); 129 private boolean manageable; 130 private boolean slow; 131 private boolean markedCandidate; 132 private boolean blockedCandidate; 133 private boolean blocked; 134 private boolean connected; 135 private boolean active; 136 private boolean starting; 137 private boolean pendingStop; 138 private long timeStamp; 139 private final AtomicBoolean stopping = new AtomicBoolean(false); 140 private final CountDownLatch stopped = new CountDownLatch(1); 141 private final AtomicBoolean asyncException = new AtomicBoolean(false); 142 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 143 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 144 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 145 private ConnectionContext context; 146 private boolean networkConnection; 147 private boolean faultTolerantConnection; 148 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 149 private DemandForwardingBridge duplexBridge; 150 private final TaskRunnerFactory taskRunnerFactory; 151 private final TaskRunnerFactory stopTaskRunnerFactory; 152 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 153 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 154 private String duplexNetworkConnectorId; 155 156 /** 157 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 158 * else commands are sent async. 159 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 160 */ 161 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 162 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 163 this.connector = connector; 164 this.broker = broker; 165 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 166 brokerConnectionStates = rb.getConnectionStates(); 167 if (connector != null) { 168 this.statistics.setParent(connector.getStatistics()); 169 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 170 } 171 this.taskRunnerFactory = taskRunnerFactory; 172 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 173 this.transport = transport; 174 final BrokerService brokerService = this.broker.getBrokerService(); 175 if( this.transport instanceof BrokerServiceAware ) { 176 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 177 } 178 this.transport.setTransportListener(new DefaultTransportListener() { 179 @Override 180 public void onCommand(Object o) { 181 serviceLock.readLock().lock(); 182 try { 183 if (!(o instanceof Command)) { 184 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 185 } 186 Command command = (Command) o; 187 if (!brokerService.isStopping()) { 188 Response response = service(command); 189 if (response != null && !brokerService.isStopping()) { 190 dispatchSync(response); 191 } 192 } else { 193 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 194 } 195 } finally { 196 serviceLock.readLock().unlock(); 197 } 198 } 199 200 @Override 201 public void onException(IOException exception) { 202 serviceLock.readLock().lock(); 203 try { 204 serviceTransportException(exception); 205 } finally { 206 serviceLock.readLock().unlock(); 207 } 208 } 209 }); 210 connected = true; 211 } 212 213 /** 214 * Returns the number of messages to be dispatched to this connection 215 * 216 * @return size of dispatch queue 217 */ 218 @Override 219 public int getDispatchQueueSize() { 220 synchronized (dispatchQueue) { 221 return dispatchQueue.size(); 222 } 223 } 224 225 public void serviceTransportException(IOException e) { 226 BrokerService bService = connector.getBrokerService(); 227 if (bService.isShutdownOnSlaveFailure()) { 228 if (brokerInfo != null) { 229 if (brokerInfo.isSlaveBroker()) { 230 LOG.error("Slave has exception: {} shutting down master now.", e.getMessage(), e); 231 try { 232 doStop(); 233 bService.stop(); 234 } catch (Exception ex) { 235 LOG.warn("Failed to stop the master", ex); 236 } 237 } 238 } 239 } 240 if (!stopping.get() && !pendingStop) { 241 transportException.set(e); 242 if (TRANSPORTLOG.isDebugEnabled()) { 243 TRANSPORTLOG.debug(this + " failed: " + e, e); 244 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 245 TRANSPORTLOG.warn(this + " failed: " + e); 246 } 247 stopAsync(e); 248 } 249 } 250 251 private boolean expected(IOException e) { 252 return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 253 } 254 255 private boolean isStomp() { 256 URI uri = connector.getUri(); 257 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 258 } 259 260 /** 261 * Calls the serviceException method in an async thread. Since handling a 262 * service exception closes a socket, we should not tie up broker threads 263 * since client sockets may hang or cause deadlocks. 264 */ 265 @Override 266 public void serviceExceptionAsync(final IOException e) { 267 if (asyncException.compareAndSet(false, true)) { 268 new Thread("Async Exception Handler") { 269 @Override 270 public void run() { 271 serviceException(e); 272 } 273 }.start(); 274 } 275 } 276 277 /** 278 * Closes a clients connection due to a detected error. Errors are ignored 279 * if: the client is closing or broker is closing. Otherwise, the connection 280 * error transmitted to the client before stopping it's transport. 281 */ 282 @Override 283 public void serviceException(Throwable e) { 284 // are we a transport exception such as not being able to dispatch 285 // synchronously to a transport 286 if (e instanceof IOException) { 287 serviceTransportException((IOException) e); 288 } else if (e.getClass() == BrokerStoppedException.class) { 289 // Handle the case where the broker is stopped 290 // But the client is still connected. 291 if (!stopping.get()) { 292 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 293 ConnectionError ce = new ConnectionError(); 294 ce.setException(e); 295 dispatchSync(ce); 296 // Record the error that caused the transport to stop 297 transportException.set(e); 298 // Wait a little bit to try to get the output buffer to flush 299 // the exception notification to the client. 300 try { 301 Thread.sleep(500); 302 } catch (InterruptedException ie) { 303 Thread.currentThread().interrupt(); 304 } 305 // Worst case is we just kill the connection before the 306 // notification gets to him. 307 stopAsync(); 308 } 309 } else if (!stopping.get() && !inServiceException) { 310 inServiceException = true; 311 try { 312 SERVICELOG.warn("Async error occurred: ", e); 313 ConnectionError ce = new ConnectionError(); 314 ce.setException(e); 315 if (pendingStop) { 316 dispatchSync(ce); 317 } else { 318 dispatchAsync(ce); 319 } 320 } finally { 321 inServiceException = false; 322 } 323 } 324 } 325 326 @Override 327 public Response service(Command command) { 328 MDC.put("activemq.connector", connector.getUri().toString()); 329 Response response = null; 330 boolean responseRequired = command.isResponseRequired(); 331 int commandId = command.getCommandId(); 332 try { 333 if (!pendingStop) { 334 response = command.visit(this); 335 } else { 336 response = new ExceptionResponse(transportException.get()); 337 } 338 } catch (Throwable e) { 339 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 340 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 341 + " command: " + command + ", exception: " + e, e); 342 } 343 344 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 345 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 346 responseRequired = false; 347 } 348 349 if (responseRequired) { 350 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 351 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 352 transport.getRemoteAddress(), e.getMessage()); 353 } 354 response = new ExceptionResponse(e); 355 } else { 356 serviceException(e); 357 } 358 } 359 if (responseRequired) { 360 if (response == null) { 361 response = new Response(); 362 } 363 response.setCorrelationId(commandId); 364 } 365 // The context may have been flagged so that the response is not 366 // sent. 367 if (context != null) { 368 if (context.isDontSendReponse()) { 369 context.setDontSendReponse(false); 370 response = null; 371 } 372 context = null; 373 } 374 MDC.remove("activemq.connector"); 375 return response; 376 } 377 378 @Override 379 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 380 return null; 381 } 382 383 @Override 384 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 385 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 386 return null; 387 } 388 389 @Override 390 public Response processWireFormat(WireFormatInfo info) throws Exception { 391 wireFormatInfo = info; 392 protocolVersion.set(info.getVersion()); 393 return null; 394 } 395 396 @Override 397 public Response processShutdown(ShutdownInfo info) throws Exception { 398 stopAsync(); 399 return null; 400 } 401 402 @Override 403 public Response processFlush(FlushCommand command) throws Exception { 404 return null; 405 } 406 407 @Override 408 public Response processBeginTransaction(TransactionInfo info) throws Exception { 409 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 410 context = null; 411 if (cs != null) { 412 context = cs.getContext(); 413 } 414 if (cs == null) { 415 throw new NullPointerException("Context is null"); 416 } 417 // Avoid replaying dup commands 418 if (cs.getTransactionState(info.getTransactionId()) == null) { 419 cs.addTransactionState(info.getTransactionId()); 420 broker.beginTransaction(context, info.getTransactionId()); 421 } 422 return null; 423 } 424 425 @Override 426 public int getActiveTransactionCount() { 427 int rc = 0; 428 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 429 Collection<TransactionState> transactions = cs.getTransactionStates(); 430 for (TransactionState transaction : transactions) { 431 rc++; 432 } 433 } 434 return rc; 435 } 436 437 @Override 438 public Long getOldestActiveTransactionDuration() { 439 TransactionState oldestTX = null; 440 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 441 Collection<TransactionState> transactions = cs.getTransactionStates(); 442 for (TransactionState transaction : transactions) { 443 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 444 oldestTX = transaction; 445 } 446 } 447 } 448 if( oldestTX == null ) { 449 return null; 450 } 451 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 452 } 453 454 @Override 455 public Response processEndTransaction(TransactionInfo info) throws Exception { 456 // No need to do anything. This packet is just sent by the client 457 // make sure he is synced with the server as commit command could 458 // come from a different connection. 459 return null; 460 } 461 462 @Override 463 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 464 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 465 context = null; 466 if (cs != null) { 467 context = cs.getContext(); 468 } 469 if (cs == null) { 470 throw new NullPointerException("Context is null"); 471 } 472 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 473 if (transactionState == null) { 474 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 475 + info.getTransactionId()); 476 } 477 // Avoid dups. 478 if (!transactionState.isPrepared()) { 479 transactionState.setPrepared(true); 480 int result = broker.prepareTransaction(context, info.getTransactionId()); 481 transactionState.setPreparedResult(result); 482 if (result == XAResource.XA_RDONLY) { 483 // we are done, no further rollback or commit from TM 484 cs.removeTransactionState(info.getTransactionId()); 485 } 486 IntegerResponse response = new IntegerResponse(result); 487 return response; 488 } else { 489 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 490 return response; 491 } 492 } 493 494 @Override 495 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 496 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 497 context = cs.getContext(); 498 cs.removeTransactionState(info.getTransactionId()); 499 broker.commitTransaction(context, info.getTransactionId(), true); 500 return null; 501 } 502 503 @Override 504 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 505 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 506 context = cs.getContext(); 507 cs.removeTransactionState(info.getTransactionId()); 508 broker.commitTransaction(context, info.getTransactionId(), false); 509 return null; 510 } 511 512 @Override 513 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 514 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 515 context = cs.getContext(); 516 cs.removeTransactionState(info.getTransactionId()); 517 broker.rollbackTransaction(context, info.getTransactionId()); 518 return null; 519 } 520 521 @Override 522 public Response processForgetTransaction(TransactionInfo info) throws Exception { 523 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 524 context = cs.getContext(); 525 broker.forgetTransaction(context, info.getTransactionId()); 526 return null; 527 } 528 529 @Override 530 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 531 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 532 context = cs.getContext(); 533 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 534 return new DataArrayResponse(preparedTransactions); 535 } 536 537 @Override 538 public Response processMessage(Message messageSend) throws Exception { 539 ProducerId producerId = messageSend.getProducerId(); 540 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 541 if (producerExchange.canDispatch(messageSend)) { 542 broker.send(producerExchange, messageSend); 543 } 544 return null; 545 } 546 547 @Override 548 public Response processMessageAck(MessageAck ack) throws Exception { 549 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 550 if (consumerExchange != null) { 551 broker.acknowledge(consumerExchange, ack); 552 } else if (ack.isInTransaction()) { 553 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 554 } 555 return null; 556 } 557 558 @Override 559 public Response processMessagePull(MessagePull pull) throws Exception { 560 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 561 } 562 563 @Override 564 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 565 broker.processDispatchNotification(notification); 566 return null; 567 } 568 569 @Override 570 public Response processAddDestination(DestinationInfo info) throws Exception { 571 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 572 broker.addDestinationInfo(cs.getContext(), info); 573 if (info.getDestination().isTemporary()) { 574 cs.addTempDestination(info); 575 } 576 return null; 577 } 578 579 @Override 580 public Response processRemoveDestination(DestinationInfo info) throws Exception { 581 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 582 broker.removeDestinationInfo(cs.getContext(), info); 583 if (info.getDestination().isTemporary()) { 584 cs.removeTempDestination(info.getDestination()); 585 } 586 return null; 587 } 588 589 @Override 590 public Response processAddProducer(ProducerInfo info) throws Exception { 591 SessionId sessionId = info.getProducerId().getParentId(); 592 ConnectionId connectionId = sessionId.getParentId(); 593 TransportConnectionState cs = lookupConnectionState(connectionId); 594 if (cs == null) { 595 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 596 + connectionId); 597 } 598 SessionState ss = cs.getSessionState(sessionId); 599 if (ss == null) { 600 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 601 + sessionId); 602 } 603 // Avoid replaying dup commands 604 if (!ss.getProducerIds().contains(info.getProducerId())) { 605 ActiveMQDestination destination = info.getDestination(); 606 // Do not check for null here as it would cause the count of max producers to exclude 607 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 608 // call it from here with a null Destination value. 609 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 610 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 611 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 612 } 613 } 614 broker.addProducer(cs.getContext(), info); 615 try { 616 ss.addProducer(info); 617 } catch (IllegalStateException e) { 618 broker.removeProducer(cs.getContext(), info); 619 } 620 621 } 622 return null; 623 } 624 625 @Override 626 public Response processRemoveProducer(ProducerId id) throws Exception { 627 SessionId sessionId = id.getParentId(); 628 ConnectionId connectionId = sessionId.getParentId(); 629 TransportConnectionState cs = lookupConnectionState(connectionId); 630 SessionState ss = cs.getSessionState(sessionId); 631 if (ss == null) { 632 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 633 + sessionId); 634 } 635 ProducerState ps = ss.removeProducer(id); 636 if (ps == null) { 637 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 638 } 639 removeProducerBrokerExchange(id); 640 broker.removeProducer(cs.getContext(), ps.getInfo()); 641 return null; 642 } 643 644 @Override 645 public Response processAddConsumer(ConsumerInfo info) throws Exception { 646 SessionId sessionId = info.getConsumerId().getParentId(); 647 ConnectionId connectionId = sessionId.getParentId(); 648 TransportConnectionState cs = lookupConnectionState(connectionId); 649 if (cs == null) { 650 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 651 + connectionId); 652 } 653 SessionState ss = cs.getSessionState(sessionId); 654 if (ss == null) { 655 throw new IllegalStateException(broker.getBrokerName() 656 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 657 } 658 // Avoid replaying dup commands 659 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 660 ActiveMQDestination destination = info.getDestination(); 661 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 662 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 663 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 664 } 665 } 666 667 broker.addConsumer(cs.getContext(), info); 668 try { 669 ss.addConsumer(info); 670 addConsumerBrokerExchange(info.getConsumerId()); 671 } catch (IllegalStateException e) { 672 broker.removeConsumer(cs.getContext(), info); 673 } 674 675 } 676 return null; 677 } 678 679 @Override 680 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 681 SessionId sessionId = id.getParentId(); 682 ConnectionId connectionId = sessionId.getParentId(); 683 TransportConnectionState cs = lookupConnectionState(connectionId); 684 if (cs == null) { 685 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 686 + connectionId); 687 } 688 SessionState ss = cs.getSessionState(sessionId); 689 if (ss == null) { 690 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 691 + sessionId); 692 } 693 ConsumerState consumerState = ss.removeConsumer(id); 694 if (consumerState == null) { 695 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 696 } 697 ConsumerInfo info = consumerState.getInfo(); 698 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 699 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 700 removeConsumerBrokerExchange(id); 701 return null; 702 } 703 704 @Override 705 public Response processAddSession(SessionInfo info) throws Exception { 706 ConnectionId connectionId = info.getSessionId().getParentId(); 707 TransportConnectionState cs = lookupConnectionState(connectionId); 708 // Avoid replaying dup commands 709 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 710 broker.addSession(cs.getContext(), info); 711 try { 712 cs.addSession(info); 713 } catch (IllegalStateException e) { 714 e.printStackTrace(); 715 broker.removeSession(cs.getContext(), info); 716 } 717 } 718 return null; 719 } 720 721 @Override 722 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 723 ConnectionId connectionId = id.getParentId(); 724 TransportConnectionState cs = lookupConnectionState(connectionId); 725 if (cs == null) { 726 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 727 } 728 SessionState session = cs.getSessionState(id); 729 if (session == null) { 730 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 731 } 732 // Don't let new consumers or producers get added while we are closing 733 // this down. 734 session.shutdown(); 735 // Cascade the connection stop to the consumers and producers. 736 for (ConsumerId consumerId : session.getConsumerIds()) { 737 try { 738 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 739 } catch (Throwable e) { 740 LOG.warn("Failed to remove consumer: {}", consumerId, e); 741 } 742 } 743 for (ProducerId producerId : session.getProducerIds()) { 744 try { 745 processRemoveProducer(producerId); 746 } catch (Throwable e) { 747 LOG.warn("Failed to remove producer: {}", producerId, e); 748 } 749 } 750 cs.removeSession(id); 751 broker.removeSession(cs.getContext(), session.getInfo()); 752 return null; 753 } 754 755 @Override 756 public Response processAddConnection(ConnectionInfo info) throws Exception { 757 // Older clients should have been defaulting this field to true.. but 758 // they were not. 759 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 760 info.setClientMaster(true); 761 } 762 TransportConnectionState state; 763 // Make sure 2 concurrent connections by the same ID only generate 1 764 // TransportConnectionState object. 765 synchronized (brokerConnectionStates) { 766 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 767 if (state == null) { 768 state = new TransportConnectionState(info, this); 769 brokerConnectionStates.put(info.getConnectionId(), state); 770 } 771 state.incrementReference(); 772 } 773 // If there are 2 concurrent connections for the same connection id, 774 // then last one in wins, we need to sync here 775 // to figure out the winner. 776 synchronized (state.getConnectionMutex()) { 777 if (state.getConnection() != this) { 778 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 779 state.getConnection().stop(); 780 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 781 state.setConnection(this); 782 state.reset(info); 783 } 784 } 785 registerConnectionState(info.getConnectionId(), state); 786 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 787 this.faultTolerantConnection = info.isFaultTolerant(); 788 // Setup the context. 789 String clientId = info.getClientId(); 790 context = new ConnectionContext(); 791 context.setBroker(broker); 792 context.setClientId(clientId); 793 context.setClientMaster(info.isClientMaster()); 794 context.setConnection(this); 795 context.setConnectionId(info.getConnectionId()); 796 context.setConnector(connector); 797 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 798 context.setNetworkConnection(networkConnection); 799 context.setFaultTolerant(faultTolerantConnection); 800 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 801 context.setUserName(info.getUserName()); 802 context.setWireFormatInfo(wireFormatInfo); 803 context.setReconnect(info.isFailoverReconnect()); 804 this.manageable = info.isManageable(); 805 context.setConnectionState(state); 806 state.setContext(context); 807 state.setConnection(this); 808 if (info.getClientIp() == null) { 809 info.setClientIp(getRemoteAddress()); 810 } 811 812 try { 813 broker.addConnection(context, info); 814 } catch (Exception e) { 815 synchronized (brokerConnectionStates) { 816 brokerConnectionStates.remove(info.getConnectionId()); 817 } 818 unregisterConnectionState(info.getConnectionId()); 819 LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e); 820 if (e instanceof SecurityException) { 821 // close this down - in case the peer of this transport doesn't play nice 822 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 823 } 824 throw e; 825 } 826 if (info.isManageable()) { 827 // send ConnectionCommand 828 ConnectionControl command = this.connector.getConnectionControl(); 829 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 830 if (info.isFailoverReconnect()) { 831 command.setRebalanceConnection(false); 832 } 833 dispatchAsync(command); 834 } 835 return null; 836 } 837 838 @Override 839 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 840 throws InterruptedException { 841 LOG.debug("remove connection id: {}", id); 842 TransportConnectionState cs = lookupConnectionState(id); 843 if (cs != null) { 844 // Don't allow things to be added to the connection state while we 845 // are shutting down. 846 cs.shutdown(); 847 // Cascade the connection stop to the sessions. 848 for (SessionId sessionId : cs.getSessionIds()) { 849 try { 850 processRemoveSession(sessionId, lastDeliveredSequenceId); 851 } catch (Throwable e) { 852 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 853 } 854 } 855 // Cascade the connection stop to temp destinations. 856 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 857 DestinationInfo di = iter.next(); 858 try { 859 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 860 } catch (Throwable e) { 861 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 862 } 863 iter.remove(); 864 } 865 try { 866 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 867 } catch (Throwable e) { 868 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 869 } 870 TransportConnectionState state = unregisterConnectionState(id); 871 if (state != null) { 872 synchronized (brokerConnectionStates) { 873 // If we are the last reference, we should remove the state 874 // from the broker. 875 if (state.decrementReference() == 0) { 876 brokerConnectionStates.remove(id); 877 } 878 } 879 } 880 } 881 return null; 882 } 883 884 @Override 885 public Response processProducerAck(ProducerAck ack) throws Exception { 886 // A broker should not get ProducerAck messages. 887 return null; 888 } 889 890 @Override 891 public Connector getConnector() { 892 return connector; 893 } 894 895 @Override 896 public void dispatchSync(Command message) { 897 try { 898 processDispatch(message); 899 } catch (IOException e) { 900 serviceExceptionAsync(e); 901 } 902 } 903 904 @Override 905 public void dispatchAsync(Command message) { 906 if (!stopping.get()) { 907 if (taskRunner == null) { 908 dispatchSync(message); 909 } else { 910 synchronized (dispatchQueue) { 911 dispatchQueue.add(message); 912 } 913 try { 914 taskRunner.wakeup(); 915 } catch (InterruptedException e) { 916 Thread.currentThread().interrupt(); 917 } 918 } 919 } else { 920 if (message.isMessageDispatch()) { 921 MessageDispatch md = (MessageDispatch) message; 922 TransmitCallback sub = md.getTransmitCallback(); 923 broker.postProcessDispatch(md); 924 if (sub != null) { 925 sub.onFailure(); 926 } 927 } 928 } 929 } 930 931 protected void processDispatch(Command command) throws IOException { 932 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 933 try { 934 if (!stopping.get()) { 935 if (messageDispatch != null) { 936 try { 937 broker.preProcessDispatch(messageDispatch); 938 } catch (RuntimeException convertToIO) { 939 throw new IOException(convertToIO); 940 } 941 } 942 dispatch(command); 943 } 944 } catch (IOException e) { 945 if (messageDispatch != null) { 946 TransmitCallback sub = messageDispatch.getTransmitCallback(); 947 broker.postProcessDispatch(messageDispatch); 948 if (sub != null) { 949 sub.onFailure(); 950 } 951 messageDispatch = null; 952 throw e; 953 } 954 } finally { 955 if (messageDispatch != null) { 956 TransmitCallback sub = messageDispatch.getTransmitCallback(); 957 broker.postProcessDispatch(messageDispatch); 958 if (sub != null) { 959 sub.onSuccess(); 960 } 961 } 962 } 963 } 964 965 @Override 966 public boolean iterate() { 967 try { 968 if (pendingStop || stopping.get()) { 969 if (dispatchStopped.compareAndSet(false, true)) { 970 if (transportException.get() == null) { 971 try { 972 dispatch(new ShutdownInfo()); 973 } catch (Throwable ignore) { 974 } 975 } 976 dispatchStoppedLatch.countDown(); 977 } 978 return false; 979 } 980 if (!dispatchStopped.get()) { 981 Command command = null; 982 synchronized (dispatchQueue) { 983 if (dispatchQueue.isEmpty()) { 984 return false; 985 } 986 command = dispatchQueue.remove(0); 987 } 988 processDispatch(command); 989 return true; 990 } 991 return false; 992 } catch (IOException e) { 993 if (dispatchStopped.compareAndSet(false, true)) { 994 dispatchStoppedLatch.countDown(); 995 } 996 serviceExceptionAsync(e); 997 return false; 998 } 999 } 1000 1001 /** 1002 * Returns the statistics for this connection 1003 */ 1004 @Override 1005 public ConnectionStatistics getStatistics() { 1006 return statistics; 1007 } 1008 1009 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1010 return messageAuthorizationPolicy; 1011 } 1012 1013 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1014 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1015 } 1016 1017 @Override 1018 public boolean isManageable() { 1019 return manageable; 1020 } 1021 1022 @Override 1023 public void start() throws Exception { 1024 try { 1025 synchronized (this) { 1026 starting = true; 1027 if (taskRunnerFactory != null) { 1028 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1029 + getRemoteAddress()); 1030 } else { 1031 taskRunner = null; 1032 } 1033 transport.start(); 1034 active = true; 1035 BrokerInfo info = connector.getBrokerInfo().copy(); 1036 if (connector.isUpdateClusterClients()) { 1037 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1038 } else { 1039 info.setPeerBrokerInfos(null); 1040 } 1041 dispatchAsync(info); 1042 1043 connector.onStarted(this); 1044 } 1045 } catch (Exception e) { 1046 // Force clean up on an error starting up. 1047 pendingStop = true; 1048 throw e; 1049 } finally { 1050 // stop() can be called from within the above block, 1051 // but we want to be sure start() completes before 1052 // stop() runs, so queue the stop until right now: 1053 setStarting(false); 1054 if (isPendingStop()) { 1055 LOG.debug("Calling the delayed stop() after start() {}", this); 1056 stop(); 1057 } 1058 } 1059 } 1060 1061 @Override 1062 public void stop() throws Exception { 1063 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1064 // as their lifecycle is handled elsewhere 1065 1066 stopAsync(); 1067 while (!stopped.await(5, TimeUnit.SECONDS)) { 1068 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1069 } 1070 } 1071 1072 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1073 if (waitTime > 0) { 1074 synchronized (this) { 1075 pendingStop = true; 1076 transportException.set(cause); 1077 } 1078 try { 1079 stopTaskRunnerFactory.execute(new Runnable() { 1080 @Override 1081 public void run() { 1082 try { 1083 Thread.sleep(waitTime); 1084 stopAsync(); 1085 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1086 } catch (InterruptedException e) { 1087 } 1088 } 1089 }); 1090 } catch (Throwable t) { 1091 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1092 } 1093 } 1094 } 1095 1096 public void stopAsync(Throwable cause) { 1097 transportException.set(cause); 1098 stopAsync(); 1099 } 1100 1101 public void stopAsync() { 1102 // If we're in the middle of starting then go no further... for now. 1103 synchronized (this) { 1104 pendingStop = true; 1105 if (starting) { 1106 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1107 return; 1108 } 1109 } 1110 if (stopping.compareAndSet(false, true)) { 1111 // Let all the connection contexts know we are shutting down 1112 // so that in progress operations can notice and unblock. 1113 List<TransportConnectionState> connectionStates = listConnectionStates(); 1114 for (TransportConnectionState cs : connectionStates) { 1115 ConnectionContext connectionContext = cs.getContext(); 1116 if (connectionContext != null) { 1117 connectionContext.getStopping().set(true); 1118 } 1119 } 1120 try { 1121 stopTaskRunnerFactory.execute(new Runnable() { 1122 @Override 1123 public void run() { 1124 serviceLock.writeLock().lock(); 1125 try { 1126 doStop(); 1127 } catch (Throwable e) { 1128 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1129 } finally { 1130 stopped.countDown(); 1131 serviceLock.writeLock().unlock(); 1132 } 1133 } 1134 }); 1135 } catch (Throwable t) { 1136 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1137 stopped.countDown(); 1138 } 1139 } 1140 } 1141 1142 @Override 1143 public String toString() { 1144 return "Transport Connection to: " + transport.getRemoteAddress(); 1145 } 1146 1147 protected void doStop() throws Exception { 1148 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1149 connector.onStopped(this); 1150 try { 1151 synchronized (this) { 1152 if (duplexBridge != null) { 1153 duplexBridge.stop(); 1154 } 1155 } 1156 } catch (Exception ignore) { 1157 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1158 } 1159 try { 1160 transport.stop(); 1161 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1162 } catch (Exception e) { 1163 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1164 } 1165 if (taskRunner != null) { 1166 taskRunner.shutdown(1); 1167 taskRunner = null; 1168 } 1169 active = false; 1170 // Run the MessageDispatch callbacks so that message references get 1171 // cleaned up. 1172 synchronized (dispatchQueue) { 1173 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1174 Command command = iter.next(); 1175 if (command.isMessageDispatch()) { 1176 MessageDispatch md = (MessageDispatch) command; 1177 TransmitCallback sub = md.getTransmitCallback(); 1178 broker.postProcessDispatch(md); 1179 if (sub != null) { 1180 sub.onFailure(); 1181 } 1182 } 1183 } 1184 dispatchQueue.clear(); 1185 } 1186 // 1187 // Remove all logical connection associated with this connection 1188 // from the broker. 1189 if (!broker.isStopped()) { 1190 List<TransportConnectionState> connectionStates = listConnectionStates(); 1191 connectionStates = listConnectionStates(); 1192 for (TransportConnectionState cs : connectionStates) { 1193 cs.getContext().getStopping().set(true); 1194 try { 1195 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1196 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1197 } catch (Throwable ignore) { 1198 ignore.printStackTrace(); 1199 } 1200 } 1201 } 1202 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1203 } 1204 1205 /** 1206 * @return Returns the blockedCandidate. 1207 */ 1208 public boolean isBlockedCandidate() { 1209 return blockedCandidate; 1210 } 1211 1212 /** 1213 * @param blockedCandidate The blockedCandidate to set. 1214 */ 1215 public void setBlockedCandidate(boolean blockedCandidate) { 1216 this.blockedCandidate = blockedCandidate; 1217 } 1218 1219 /** 1220 * @return Returns the markedCandidate. 1221 */ 1222 public boolean isMarkedCandidate() { 1223 return markedCandidate; 1224 } 1225 1226 /** 1227 * @param markedCandidate The markedCandidate to set. 1228 */ 1229 public void setMarkedCandidate(boolean markedCandidate) { 1230 this.markedCandidate = markedCandidate; 1231 if (!markedCandidate) { 1232 timeStamp = 0; 1233 blockedCandidate = false; 1234 } 1235 } 1236 1237 /** 1238 * @param slow The slow to set. 1239 */ 1240 public void setSlow(boolean slow) { 1241 this.slow = slow; 1242 } 1243 1244 /** 1245 * @return true if the Connection is slow 1246 */ 1247 @Override 1248 public boolean isSlow() { 1249 return slow; 1250 } 1251 1252 /** 1253 * @return true if the Connection is potentially blocked 1254 */ 1255 public boolean isMarkedBlockedCandidate() { 1256 return markedCandidate; 1257 } 1258 1259 /** 1260 * Mark the Connection, so we can deem if it's collectable on the next sweep 1261 */ 1262 public void doMark() { 1263 if (timeStamp == 0) { 1264 timeStamp = System.currentTimeMillis(); 1265 } 1266 } 1267 1268 /** 1269 * @return if after being marked, the Connection is still writing 1270 */ 1271 @Override 1272 public boolean isBlocked() { 1273 return blocked; 1274 } 1275 1276 /** 1277 * @return true if the Connection is connected 1278 */ 1279 @Override 1280 public boolean isConnected() { 1281 return connected; 1282 } 1283 1284 /** 1285 * @param blocked The blocked to set. 1286 */ 1287 public void setBlocked(boolean blocked) { 1288 this.blocked = blocked; 1289 } 1290 1291 /** 1292 * @param connected The connected to set. 1293 */ 1294 public void setConnected(boolean connected) { 1295 this.connected = connected; 1296 } 1297 1298 /** 1299 * @return true if the Connection is active 1300 */ 1301 @Override 1302 public boolean isActive() { 1303 return active; 1304 } 1305 1306 /** 1307 * @param active The active to set. 1308 */ 1309 public void setActive(boolean active) { 1310 this.active = active; 1311 } 1312 1313 /** 1314 * @return true if the Connection is starting 1315 */ 1316 public synchronized boolean isStarting() { 1317 return starting; 1318 } 1319 1320 @Override 1321 public synchronized boolean isNetworkConnection() { 1322 return networkConnection; 1323 } 1324 1325 @Override 1326 public boolean isFaultTolerantConnection() { 1327 return this.faultTolerantConnection; 1328 } 1329 1330 protected synchronized void setStarting(boolean starting) { 1331 this.starting = starting; 1332 } 1333 1334 /** 1335 * @return true if the Connection needs to stop 1336 */ 1337 public synchronized boolean isPendingStop() { 1338 return pendingStop; 1339 } 1340 1341 protected synchronized void setPendingStop(boolean pendingStop) { 1342 this.pendingStop = pendingStop; 1343 } 1344 1345 @Override 1346 public Response processBrokerInfo(BrokerInfo info) { 1347 if (info.isSlaveBroker()) { 1348 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1349 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1350 // so this TransportConnection is the rear end of a network bridge 1351 // We have been requested to create a two way pipe ... 1352 try { 1353 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1354 Map<String, String> props = createMap(properties); 1355 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1356 IntrospectionSupport.setProperties(config, props, ""); 1357 config.setBrokerName(broker.getBrokerName()); 1358 1359 // check for existing duplex connection hanging about 1360 1361 // We first look if existing network connection already exists for the same broker Id and network connector name 1362 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1363 // and the duplex network connector side wanting to open a new one 1364 // In this case, the old connection must be broken 1365 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1366 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1367 synchronized (connections) { 1368 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1369 TransportConnection c = iter.next(); 1370 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1371 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1372 c.stopAsync(); 1373 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1374 c.getStopped().await(1, TimeUnit.SECONDS); 1375 } 1376 } 1377 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1378 } 1379 Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); 1380 Transport remoteBridgeTransport = transport; 1381 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1382 // the vm transport case is already wrapped 1383 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1384 } 1385 String duplexName = localTransport.toString(); 1386 if (duplexName.contains("#")) { 1387 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1388 } 1389 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); 1390 listener.setCreatedByDuplex(true); 1391 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1392 duplexBridge.setBrokerService(broker.getBrokerService()); 1393 // now turn duplex off this side 1394 info.setDuplexConnection(false); 1395 duplexBridge.setCreatedByDuplex(true); 1396 duplexBridge.duplexStart(this, brokerInfo, info); 1397 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1398 return null; 1399 } catch (TransportDisposedIOException e) { 1400 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1401 return null; 1402 } catch (Exception e) { 1403 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1404 return null; 1405 } 1406 } 1407 // We only expect to get one broker info command per connection 1408 if (this.brokerInfo != null) { 1409 LOG.warn("Unexpected extra broker info command received: {}", info); 1410 } 1411 this.brokerInfo = info; 1412 networkConnection = true; 1413 List<TransportConnectionState> connectionStates = listConnectionStates(); 1414 for (TransportConnectionState cs : connectionStates) { 1415 cs.getContext().setNetworkConnection(true); 1416 } 1417 return null; 1418 } 1419 1420 @SuppressWarnings({"unchecked", "rawtypes"}) 1421 private HashMap<String, String> createMap(Properties properties) { 1422 return new HashMap(properties); 1423 } 1424 1425 protected void dispatch(Command command) throws IOException { 1426 try { 1427 setMarkedCandidate(true); 1428 transport.oneway(command); 1429 } finally { 1430 setMarkedCandidate(false); 1431 } 1432 } 1433 1434 @Override 1435 public String getRemoteAddress() { 1436 return transport.getRemoteAddress(); 1437 } 1438 1439 public Transport getTransport() { 1440 return transport; 1441 } 1442 1443 @Override 1444 public String getConnectionId() { 1445 List<TransportConnectionState> connectionStates = listConnectionStates(); 1446 for (TransportConnectionState cs : connectionStates) { 1447 if (cs.getInfo().getClientId() != null) { 1448 return cs.getInfo().getClientId(); 1449 } 1450 return cs.getInfo().getConnectionId().toString(); 1451 } 1452 return null; 1453 } 1454 1455 @Override 1456 public void updateClient(ConnectionControl control) { 1457 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1458 && this.wireFormatInfo.getVersion() >= 6) { 1459 dispatchAsync(control); 1460 } 1461 } 1462 1463 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1464 ProducerBrokerExchange result = null; 1465 if (producerInfo != null && producerInfo.getProducerId() != null){ 1466 synchronized (producerExchanges){ 1467 result = producerExchanges.get(producerInfo.getProducerId()); 1468 } 1469 } 1470 return result; 1471 } 1472 1473 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1474 ProducerBrokerExchange result = producerExchanges.get(id); 1475 if (result == null) { 1476 synchronized (producerExchanges) { 1477 result = new ProducerBrokerExchange(); 1478 TransportConnectionState state = lookupConnectionState(id); 1479 context = state.getContext(); 1480 result.setConnectionContext(context); 1481 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1482 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); 1483 } 1484 SessionState ss = state.getSessionState(id.getParentId()); 1485 if (ss != null) { 1486 result.setProducerState(ss.getProducerState(id)); 1487 ProducerState producerState = ss.getProducerState(id); 1488 if (producerState != null && producerState.getInfo() != null) { 1489 ProducerInfo info = producerState.getInfo(); 1490 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1491 } 1492 } 1493 producerExchanges.put(id, result); 1494 } 1495 } else { 1496 context = result.getConnectionContext(); 1497 } 1498 return result; 1499 } 1500 1501 private void removeProducerBrokerExchange(ProducerId id) { 1502 synchronized (producerExchanges) { 1503 producerExchanges.remove(id); 1504 } 1505 } 1506 1507 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1508 ConsumerBrokerExchange result = consumerExchanges.get(id); 1509 return result; 1510 } 1511 1512 private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) { 1513 ConsumerBrokerExchange result = consumerExchanges.get(id); 1514 if (result == null) { 1515 synchronized (consumerExchanges) { 1516 result = new ConsumerBrokerExchange(); 1517 TransportConnectionState state = lookupConnectionState(id); 1518 context = state.getContext(); 1519 result.setConnectionContext(context); 1520 SessionState ss = state.getSessionState(id.getParentId()); 1521 if (ss != null) { 1522 ConsumerState cs = ss.getConsumerState(id); 1523 if (cs != null) { 1524 ConsumerInfo info = cs.getInfo(); 1525 if (info != null) { 1526 if (info.getDestination() != null && info.getDestination().isPattern()) { 1527 result.setWildcard(true); 1528 } 1529 } 1530 } 1531 } 1532 consumerExchanges.put(id, result); 1533 } 1534 } 1535 return result; 1536 } 1537 1538 private void removeConsumerBrokerExchange(ConsumerId id) { 1539 synchronized (consumerExchanges) { 1540 consumerExchanges.remove(id); 1541 } 1542 } 1543 1544 public int getProtocolVersion() { 1545 return protocolVersion.get(); 1546 } 1547 1548 @Override 1549 public Response processControlCommand(ControlCommand command) throws Exception { 1550 return null; 1551 } 1552 1553 @Override 1554 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1555 return null; 1556 } 1557 1558 @Override 1559 public Response processConnectionControl(ConnectionControl control) throws Exception { 1560 if (control != null) { 1561 faultTolerantConnection = control.isFaultTolerant(); 1562 } 1563 return null; 1564 } 1565 1566 @Override 1567 public Response processConnectionError(ConnectionError error) throws Exception { 1568 return null; 1569 } 1570 1571 @Override 1572 public Response processConsumerControl(ConsumerControl control) throws Exception { 1573 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1574 broker.processConsumerControl(consumerExchange, control); 1575 return null; 1576 } 1577 1578 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1579 TransportConnectionState state) { 1580 TransportConnectionState cs = null; 1581 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1582 // swap implementations 1583 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1584 newRegister.intialize(connectionStateRegister); 1585 connectionStateRegister = newRegister; 1586 } 1587 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1588 return cs; 1589 } 1590 1591 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1592 return connectionStateRegister.unregisterConnectionState(connectionId); 1593 } 1594 1595 protected synchronized List<TransportConnectionState> listConnectionStates() { 1596 return connectionStateRegister.listConnectionStates(); 1597 } 1598 1599 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1600 return connectionStateRegister.lookupConnectionState(connectionId); 1601 } 1602 1603 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1604 return connectionStateRegister.lookupConnectionState(id); 1605 } 1606 1607 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1608 return connectionStateRegister.lookupConnectionState(id); 1609 } 1610 1611 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1612 return connectionStateRegister.lookupConnectionState(id); 1613 } 1614 1615 // public only for testing 1616 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1617 return connectionStateRegister.lookupConnectionState(connectionId); 1618 } 1619 1620 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1621 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1622 } 1623 1624 protected synchronized String getDuplexNetworkConnectorId() { 1625 return this.duplexNetworkConnectorId; 1626 } 1627 1628 public boolean isStopping() { 1629 return stopping.get(); 1630 } 1631 1632 protected CountDownLatch getStopped() { 1633 return stopped; 1634 } 1635 1636 private int getProducerCount(ConnectionId connectionId) { 1637 int result = 0; 1638 TransportConnectionState cs = lookupConnectionState(connectionId); 1639 if (cs != null) { 1640 for (SessionId sessionId : cs.getSessionIds()) { 1641 SessionState sessionState = cs.getSessionState(sessionId); 1642 if (sessionState != null) { 1643 result += sessionState.getProducerIds().size(); 1644 } 1645 } 1646 } 1647 return result; 1648 } 1649 1650 private int getConsumerCount(ConnectionId connectionId) { 1651 int result = 0; 1652 TransportConnectionState cs = lookupConnectionState(connectionId); 1653 if (cs != null) { 1654 for (SessionId sessionId : cs.getSessionIds()) { 1655 SessionState sessionState = cs.getSessionState(sessionId); 1656 if (sessionState != null) { 1657 result += sessionState.getConsumerIds().size(); 1658 } 1659 } 1660 } 1661 return result; 1662 } 1663 1664 public WireFormatInfo getRemoteWireFormatInfo() { 1665 return wireFormatInfo; 1666 } 1667}