001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038 039import javax.transaction.xa.XAResource; 040 041import org.apache.activemq.advisory.AdvisorySupport; 042import org.apache.activemq.broker.region.ConnectionStatistics; 043import org.apache.activemq.broker.region.RegionBroker; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.BrokerInfo; 046import org.apache.activemq.command.Command; 047import org.apache.activemq.command.CommandTypes; 048import org.apache.activemq.command.ConnectionControl; 049import org.apache.activemq.command.ConnectionError; 050import org.apache.activemq.command.ConnectionId; 051import org.apache.activemq.command.ConnectionInfo; 052import org.apache.activemq.command.ConsumerControl; 053import org.apache.activemq.command.ConsumerId; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.ControlCommand; 056import org.apache.activemq.command.DataArrayResponse; 057import org.apache.activemq.command.DestinationInfo; 058import org.apache.activemq.command.ExceptionResponse; 059import org.apache.activemq.command.FlushCommand; 060import org.apache.activemq.command.IntegerResponse; 061import org.apache.activemq.command.KeepAliveInfo; 062import org.apache.activemq.command.Message; 063import org.apache.activemq.command.MessageAck; 064import org.apache.activemq.command.MessageDispatch; 065import org.apache.activemq.command.MessageDispatchNotification; 066import org.apache.activemq.command.MessagePull; 067import org.apache.activemq.command.ProducerAck; 068import org.apache.activemq.command.ProducerId; 069import org.apache.activemq.command.ProducerInfo; 070import org.apache.activemq.command.RemoveInfo; 071import org.apache.activemq.command.RemoveSubscriptionInfo; 072import org.apache.activemq.command.Response; 073import org.apache.activemq.command.SessionId; 074import org.apache.activemq.command.SessionInfo; 075import org.apache.activemq.command.ShutdownInfo; 076import org.apache.activemq.command.TransactionId; 077import org.apache.activemq.command.TransactionInfo; 078import org.apache.activemq.command.WireFormatInfo; 079import org.apache.activemq.network.DemandForwardingBridge; 080import org.apache.activemq.network.MBeanNetworkListener; 081import org.apache.activemq.network.NetworkBridgeConfiguration; 082import org.apache.activemq.network.NetworkBridgeFactory; 083import org.apache.activemq.security.MessageAuthorizationPolicy; 084import org.apache.activemq.state.CommandVisitor; 085import org.apache.activemq.state.ConnectionState; 086import org.apache.activemq.state.ConsumerState; 087import org.apache.activemq.state.ProducerState; 088import org.apache.activemq.state.SessionState; 089import org.apache.activemq.state.TransactionState; 090import org.apache.activemq.thread.Task; 091import org.apache.activemq.thread.TaskRunner; 092import org.apache.activemq.thread.TaskRunnerFactory; 093import org.apache.activemq.transaction.Transaction; 094import org.apache.activemq.transport.DefaultTransportListener; 095import org.apache.activemq.transport.ResponseCorrelator; 096import org.apache.activemq.transport.TransmitCallback; 097import org.apache.activemq.transport.Transport; 098import org.apache.activemq.transport.TransportDisposedIOException; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.MarshallingSupport; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103import org.slf4j.MDC; 104 105public class TransportConnection implements Connection, Task, CommandVisitor { 106 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 107 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 108 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 109 // Keeps track of the broker and connector that created this connection. 110 protected final Broker broker; 111 protected final BrokerService brokerService; 112 protected final TransportConnector connector; 113 // Keeps track of the state of the connections. 114 // protected final ConcurrentHashMap localConnectionStates=new 115 // ConcurrentHashMap(); 116 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 117 // The broker and wireformat info that was exchanged. 118 protected BrokerInfo brokerInfo; 119 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 120 protected TaskRunner taskRunner; 121 protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>(); 122 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 123 private final Transport transport; 124 private MessageAuthorizationPolicy messageAuthorizationPolicy; 125 private WireFormatInfo wireFormatInfo; 126 // Used to do async dispatch.. this should perhaps be pushed down into the 127 // transport layer.. 128 private boolean inServiceException; 129 private final ConnectionStatistics statistics = new ConnectionStatistics(); 130 private boolean manageable; 131 private boolean slow; 132 private boolean markedCandidate; 133 private boolean blockedCandidate; 134 private boolean blocked; 135 private boolean connected; 136 private boolean active; 137 private boolean starting; 138 private boolean pendingStop; 139 private long timeStamp; 140 private final AtomicBoolean stopping = new AtomicBoolean(false); 141 private final CountDownLatch stopped = new CountDownLatch(1); 142 private final AtomicBoolean asyncException = new AtomicBoolean(false); 143 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 144 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 145 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 146 private ConnectionContext context; 147 private boolean networkConnection; 148 private boolean faultTolerantConnection; 149 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 150 private DemandForwardingBridge duplexBridge; 151 private final TaskRunnerFactory taskRunnerFactory; 152 private final TaskRunnerFactory stopTaskRunnerFactory; 153 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 154 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 155 private String duplexNetworkConnectorId; 156 157 /** 158 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 159 * else commands are sent async. 160 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 161 */ 162 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 163 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 164 this.connector = connector; 165 this.broker = broker; 166 this.brokerService = broker.getBrokerService(); 167 168 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 169 brokerConnectionStates = rb.getConnectionStates(); 170 if (connector != null) { 171 this.statistics.setParent(connector.getStatistics()); 172 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 173 } 174 this.taskRunnerFactory = taskRunnerFactory; 175 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 176 this.transport = transport; 177 if( this.transport instanceof BrokerServiceAware ) { 178 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 179 } 180 this.transport.setTransportListener(new DefaultTransportListener() { 181 @Override 182 public void onCommand(Object o) { 183 serviceLock.readLock().lock(); 184 try { 185 if (!(o instanceof Command)) { 186 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 187 } 188 Command command = (Command) o; 189 if (!brokerService.isStopping()) { 190 Response response = service(command); 191 if (response != null && !brokerService.isStopping()) { 192 dispatchSync(response); 193 } 194 } else { 195 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 196 } 197 } finally { 198 serviceLock.readLock().unlock(); 199 } 200 } 201 202 @Override 203 public void onException(IOException exception) { 204 serviceLock.readLock().lock(); 205 try { 206 serviceTransportException(exception); 207 } finally { 208 serviceLock.readLock().unlock(); 209 } 210 } 211 }); 212 connected = true; 213 } 214 215 /** 216 * Returns the number of messages to be dispatched to this connection 217 * 218 * @return size of dispatch queue 219 */ 220 @Override 221 public int getDispatchQueueSize() { 222 synchronized (dispatchQueue) { 223 return dispatchQueue.size(); 224 } 225 } 226 227 public void serviceTransportException(IOException e) { 228 if (!stopping.get() && !pendingStop) { 229 transportException.set(e); 230 if (TRANSPORTLOG.isDebugEnabled()) { 231 TRANSPORTLOG.debug(this + " failed: " + e, e); 232 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 233 TRANSPORTLOG.warn(this + " failed: " + e); 234 } 235 stopAsync(e); 236 } 237 } 238 239 private boolean expected(IOException e) { 240 return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 241 } 242 243 private boolean isStomp() { 244 URI uri = connector.getUri(); 245 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 246 } 247 248 /** 249 * Calls the serviceException method in an async thread. Since handling a 250 * service exception closes a socket, we should not tie up broker threads 251 * since client sockets may hang or cause deadlocks. 252 */ 253 @Override 254 public void serviceExceptionAsync(final IOException e) { 255 if (asyncException.compareAndSet(false, true)) { 256 new Thread("Async Exception Handler") { 257 @Override 258 public void run() { 259 serviceException(e); 260 } 261 }.start(); 262 } 263 } 264 265 /** 266 * Closes a clients connection due to a detected error. Errors are ignored 267 * if: the client is closing or broker is closing. Otherwise, the connection 268 * error transmitted to the client before stopping it's transport. 269 */ 270 @Override 271 public void serviceException(Throwable e) { 272 // are we a transport exception such as not being able to dispatch 273 // synchronously to a transport 274 if (e instanceof IOException) { 275 serviceTransportException((IOException) e); 276 } else if (e.getClass() == BrokerStoppedException.class) { 277 // Handle the case where the broker is stopped 278 // But the client is still connected. 279 if (!stopping.get()) { 280 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 281 ConnectionError ce = new ConnectionError(); 282 ce.setException(e); 283 dispatchSync(ce); 284 // Record the error that caused the transport to stop 285 transportException.set(e); 286 // Wait a little bit to try to get the output buffer to flush 287 // the exception notification to the client. 288 try { 289 Thread.sleep(500); 290 } catch (InterruptedException ie) { 291 Thread.currentThread().interrupt(); 292 } 293 // Worst case is we just kill the connection before the 294 // notification gets to him. 295 stopAsync(); 296 } 297 } else if (!stopping.get() && !inServiceException) { 298 inServiceException = true; 299 try { 300 if (SERVICELOG.isDebugEnabled()) { 301 SERVICELOG.debug("Async error occurred: " + e, e); 302 } else { 303 SERVICELOG.warn("Async error occurred: " + e); 304 } 305 ConnectionError ce = new ConnectionError(); 306 ce.setException(e); 307 if (pendingStop) { 308 dispatchSync(ce); 309 } else { 310 dispatchAsync(ce); 311 } 312 } finally { 313 inServiceException = false; 314 } 315 } 316 } 317 318 @Override 319 public Response service(Command command) { 320 MDC.put("activemq.connector", connector.getUri().toString()); 321 Response response = null; 322 boolean responseRequired = command.isResponseRequired(); 323 int commandId = command.getCommandId(); 324 try { 325 if (!pendingStop) { 326 response = command.visit(this); 327 } else { 328 response = new ExceptionResponse(transportException.get()); 329 } 330 } catch (Throwable e) { 331 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 332 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 333 + " command: " + command + ", exception: " + e, e); 334 } 335 336 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 337 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 338 responseRequired = false; 339 } 340 341 if (responseRequired) { 342 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 343 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 344 transport.getRemoteAddress(), e.getMessage()); 345 } 346 response = new ExceptionResponse(e); 347 } else { 348 forceRollbackOnlyOnFailedAsyncTransactionOp(e, command); 349 serviceException(e); 350 } 351 } 352 if (responseRequired) { 353 if (response == null) { 354 response = new Response(); 355 } 356 response.setCorrelationId(commandId); 357 } 358 // The context may have been flagged so that the response is not 359 // sent. 360 if (context != null) { 361 if (context.isDontSendReponse()) { 362 context.setDontSendReponse(false); 363 response = null; 364 } 365 context = null; 366 } 367 MDC.remove("activemq.connector"); 368 return response; 369 } 370 371 private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) { 372 if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) { 373 Transaction transaction = getActiveTransaction(command); 374 if (transaction != null && !transaction.isRollbackOnly()) { 375 LOG.debug("on async exception, force rollback of transaction for: " + command, e); 376 transaction.setRollbackOnly(e); 377 } 378 } 379 } 380 381 private Transaction getActiveTransaction(Command command) { 382 Transaction transaction = null; 383 try { 384 if (command instanceof Message) { 385 Message messageSend = (Message) command; 386 ProducerId producerId = messageSend.getProducerId(); 387 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 388 transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId()); 389 } else if (command instanceof MessageAck) { 390 MessageAck messageAck = (MessageAck) command; 391 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId()); 392 if (consumerExchange != null) { 393 transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId()); 394 } 395 } 396 } catch(Exception ignored){ 397 LOG.trace("failed to find active transaction for command: " + command, ignored); 398 } 399 return transaction; 400 } 401 402 private boolean isInTransaction(Command command) { 403 return command instanceof Message && ((Message)command).isInTransaction() 404 || command instanceof MessageAck && ((MessageAck)command).isInTransaction(); 405 } 406 407 @Override 408 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 409 return null; 410 } 411 412 @Override 413 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 414 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 415 return null; 416 } 417 418 @Override 419 public Response processWireFormat(WireFormatInfo info) throws Exception { 420 wireFormatInfo = info; 421 protocolVersion.set(info.getVersion()); 422 return null; 423 } 424 425 @Override 426 public Response processShutdown(ShutdownInfo info) throws Exception { 427 stopAsync(); 428 return null; 429 } 430 431 @Override 432 public Response processFlush(FlushCommand command) throws Exception { 433 return null; 434 } 435 436 @Override 437 public Response processBeginTransaction(TransactionInfo info) throws Exception { 438 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 439 context = null; 440 if (cs != null) { 441 context = cs.getContext(); 442 } 443 if (cs == null) { 444 throw new NullPointerException("Context is null"); 445 } 446 // Avoid replaying dup commands 447 if (cs.getTransactionState(info.getTransactionId()) == null) { 448 cs.addTransactionState(info.getTransactionId()); 449 broker.beginTransaction(context, info.getTransactionId()); 450 } 451 return null; 452 } 453 454 @Override 455 public int getActiveTransactionCount() { 456 int rc = 0; 457 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 458 Collection<TransactionState> transactions = cs.getTransactionStates(); 459 for (TransactionState transaction : transactions) { 460 rc++; 461 } 462 } 463 return rc; 464 } 465 466 @Override 467 public Long getOldestActiveTransactionDuration() { 468 TransactionState oldestTX = null; 469 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 470 Collection<TransactionState> transactions = cs.getTransactionStates(); 471 for (TransactionState transaction : transactions) { 472 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 473 oldestTX = transaction; 474 } 475 } 476 } 477 if( oldestTX == null ) { 478 return null; 479 } 480 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 481 } 482 483 @Override 484 public Response processEndTransaction(TransactionInfo info) throws Exception { 485 // No need to do anything. This packet is just sent by the client 486 // make sure he is synced with the server as commit command could 487 // come from a different connection. 488 return null; 489 } 490 491 @Override 492 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 493 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 494 context = null; 495 if (cs != null) { 496 context = cs.getContext(); 497 } 498 if (cs == null) { 499 throw new NullPointerException("Context is null"); 500 } 501 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 502 if (transactionState == null) { 503 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 504 + info.getTransactionId()); 505 } 506 // Avoid dups. 507 if (!transactionState.isPrepared()) { 508 transactionState.setPrepared(true); 509 int result = broker.prepareTransaction(context, info.getTransactionId()); 510 transactionState.setPreparedResult(result); 511 if (result == XAResource.XA_RDONLY) { 512 // we are done, no further rollback or commit from TM 513 cs.removeTransactionState(info.getTransactionId()); 514 } 515 IntegerResponse response = new IntegerResponse(result); 516 return response; 517 } else { 518 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 519 return response; 520 } 521 } 522 523 @Override 524 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 525 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 526 context = cs.getContext(); 527 cs.removeTransactionState(info.getTransactionId()); 528 broker.commitTransaction(context, info.getTransactionId(), true); 529 return null; 530 } 531 532 @Override 533 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 534 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 535 context = cs.getContext(); 536 cs.removeTransactionState(info.getTransactionId()); 537 broker.commitTransaction(context, info.getTransactionId(), false); 538 return null; 539 } 540 541 @Override 542 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 543 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 544 context = cs.getContext(); 545 cs.removeTransactionState(info.getTransactionId()); 546 broker.rollbackTransaction(context, info.getTransactionId()); 547 return null; 548 } 549 550 @Override 551 public Response processForgetTransaction(TransactionInfo info) throws Exception { 552 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 553 context = cs.getContext(); 554 broker.forgetTransaction(context, info.getTransactionId()); 555 return null; 556 } 557 558 @Override 559 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 560 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 561 context = cs.getContext(); 562 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 563 return new DataArrayResponse(preparedTransactions); 564 } 565 566 @Override 567 public Response processMessage(Message messageSend) throws Exception { 568 ProducerId producerId = messageSend.getProducerId(); 569 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 570 if (producerExchange.canDispatch(messageSend)) { 571 broker.send(producerExchange, messageSend); 572 } 573 return null; 574 } 575 576 @Override 577 public Response processMessageAck(MessageAck ack) throws Exception { 578 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 579 if (consumerExchange != null) { 580 broker.acknowledge(consumerExchange, ack); 581 } else if (ack.isInTransaction()) { 582 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 583 } 584 return null; 585 } 586 587 @Override 588 public Response processMessagePull(MessagePull pull) throws Exception { 589 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 590 } 591 592 @Override 593 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 594 broker.processDispatchNotification(notification); 595 return null; 596 } 597 598 @Override 599 public Response processAddDestination(DestinationInfo info) throws Exception { 600 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 601 broker.addDestinationInfo(cs.getContext(), info); 602 if (info.getDestination().isTemporary()) { 603 cs.addTempDestination(info); 604 } 605 return null; 606 } 607 608 @Override 609 public Response processRemoveDestination(DestinationInfo info) throws Exception { 610 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 611 broker.removeDestinationInfo(cs.getContext(), info); 612 if (info.getDestination().isTemporary()) { 613 cs.removeTempDestination(info.getDestination()); 614 } 615 return null; 616 } 617 618 @Override 619 public Response processAddProducer(ProducerInfo info) throws Exception { 620 SessionId sessionId = info.getProducerId().getParentId(); 621 ConnectionId connectionId = sessionId.getParentId(); 622 TransportConnectionState cs = lookupConnectionState(connectionId); 623 if (cs == null) { 624 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 625 + connectionId); 626 } 627 SessionState ss = cs.getSessionState(sessionId); 628 if (ss == null) { 629 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 630 + sessionId); 631 } 632 // Avoid replaying dup commands 633 if (!ss.getProducerIds().contains(info.getProducerId())) { 634 ActiveMQDestination destination = info.getDestination(); 635 // Do not check for null here as it would cause the count of max producers to exclude 636 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 637 // call it from here with a null Destination value. 638 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 639 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 640 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 641 } 642 } 643 broker.addProducer(cs.getContext(), info); 644 try { 645 ss.addProducer(info); 646 } catch (IllegalStateException e) { 647 broker.removeProducer(cs.getContext(), info); 648 } 649 650 } 651 return null; 652 } 653 654 @Override 655 public Response processRemoveProducer(ProducerId id) throws Exception { 656 SessionId sessionId = id.getParentId(); 657 ConnectionId connectionId = sessionId.getParentId(); 658 TransportConnectionState cs = lookupConnectionState(connectionId); 659 SessionState ss = cs.getSessionState(sessionId); 660 if (ss == null) { 661 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 662 + sessionId); 663 } 664 ProducerState ps = ss.removeProducer(id); 665 if (ps == null) { 666 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 667 } 668 removeProducerBrokerExchange(id); 669 broker.removeProducer(cs.getContext(), ps.getInfo()); 670 return null; 671 } 672 673 @Override 674 public Response processAddConsumer(ConsumerInfo info) throws Exception { 675 SessionId sessionId = info.getConsumerId().getParentId(); 676 ConnectionId connectionId = sessionId.getParentId(); 677 TransportConnectionState cs = lookupConnectionState(connectionId); 678 if (cs == null) { 679 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 680 + connectionId); 681 } 682 SessionState ss = cs.getSessionState(sessionId); 683 if (ss == null) { 684 throw new IllegalStateException(broker.getBrokerName() 685 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 686 } 687 // Avoid replaying dup commands 688 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 689 ActiveMQDestination destination = info.getDestination(); 690 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 691 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 692 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 693 } 694 } 695 696 broker.addConsumer(cs.getContext(), info); 697 try { 698 ss.addConsumer(info); 699 addConsumerBrokerExchange(cs, info.getConsumerId()); 700 } catch (IllegalStateException e) { 701 broker.removeConsumer(cs.getContext(), info); 702 } 703 704 } 705 return null; 706 } 707 708 @Override 709 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 710 SessionId sessionId = id.getParentId(); 711 ConnectionId connectionId = sessionId.getParentId(); 712 TransportConnectionState cs = lookupConnectionState(connectionId); 713 if (cs == null) { 714 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 715 + connectionId); 716 } 717 SessionState ss = cs.getSessionState(sessionId); 718 if (ss == null) { 719 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 720 + sessionId); 721 } 722 ConsumerState consumerState = ss.removeConsumer(id); 723 if (consumerState == null) { 724 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 725 } 726 ConsumerInfo info = consumerState.getInfo(); 727 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 728 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 729 removeConsumerBrokerExchange(id); 730 return null; 731 } 732 733 @Override 734 public Response processAddSession(SessionInfo info) throws Exception { 735 ConnectionId connectionId = info.getSessionId().getParentId(); 736 TransportConnectionState cs = lookupConnectionState(connectionId); 737 // Avoid replaying dup commands 738 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 739 broker.addSession(cs.getContext(), info); 740 try { 741 cs.addSession(info); 742 } catch (IllegalStateException e) { 743 e.printStackTrace(); 744 broker.removeSession(cs.getContext(), info); 745 } 746 } 747 return null; 748 } 749 750 @Override 751 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 752 ConnectionId connectionId = id.getParentId(); 753 TransportConnectionState cs = lookupConnectionState(connectionId); 754 if (cs == null) { 755 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 756 } 757 SessionState session = cs.getSessionState(id); 758 if (session == null) { 759 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 760 } 761 // Don't let new consumers or producers get added while we are closing 762 // this down. 763 session.shutdown(); 764 // Cascade the connection stop to the consumers and producers. 765 for (ConsumerId consumerId : session.getConsumerIds()) { 766 try { 767 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 768 } catch (Throwable e) { 769 LOG.warn("Failed to remove consumer: {}", consumerId, e); 770 } 771 } 772 for (ProducerId producerId : session.getProducerIds()) { 773 try { 774 processRemoveProducer(producerId); 775 } catch (Throwable e) { 776 LOG.warn("Failed to remove producer: {}", producerId, e); 777 } 778 } 779 cs.removeSession(id); 780 broker.removeSession(cs.getContext(), session.getInfo()); 781 return null; 782 } 783 784 @Override 785 public Response processAddConnection(ConnectionInfo info) throws Exception { 786 // Older clients should have been defaulting this field to true.. but 787 // they were not. 788 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 789 info.setClientMaster(true); 790 } 791 TransportConnectionState state; 792 // Make sure 2 concurrent connections by the same ID only generate 1 793 // TransportConnectionState object. 794 synchronized (brokerConnectionStates) { 795 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 796 if (state == null) { 797 state = new TransportConnectionState(info, this); 798 brokerConnectionStates.put(info.getConnectionId(), state); 799 } 800 state.incrementReference(); 801 } 802 // If there are 2 concurrent connections for the same connection id, 803 // then last one in wins, we need to sync here 804 // to figure out the winner. 805 synchronized (state.getConnectionMutex()) { 806 if (state.getConnection() != this) { 807 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 808 state.getConnection().stop(); 809 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 810 state.setConnection(this); 811 state.reset(info); 812 } 813 } 814 registerConnectionState(info.getConnectionId(), state); 815 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 816 this.faultTolerantConnection = info.isFaultTolerant(); 817 // Setup the context. 818 String clientId = info.getClientId(); 819 context = new ConnectionContext(); 820 context.setBroker(broker); 821 context.setClientId(clientId); 822 context.setClientMaster(info.isClientMaster()); 823 context.setConnection(this); 824 context.setConnectionId(info.getConnectionId()); 825 context.setConnector(connector); 826 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 827 context.setNetworkConnection(networkConnection); 828 context.setFaultTolerant(faultTolerantConnection); 829 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 830 context.setUserName(info.getUserName()); 831 context.setWireFormatInfo(wireFormatInfo); 832 context.setReconnect(info.isFailoverReconnect()); 833 this.manageable = info.isManageable(); 834 context.setConnectionState(state); 835 state.setContext(context); 836 state.setConnection(this); 837 if (info.getClientIp() == null) { 838 info.setClientIp(getRemoteAddress()); 839 } 840 841 try { 842 broker.addConnection(context, info); 843 } catch (Exception e) { 844 synchronized (brokerConnectionStates) { 845 brokerConnectionStates.remove(info.getConnectionId()); 846 } 847 unregisterConnectionState(info.getConnectionId()); 848 LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e); 849 if (e instanceof SecurityException) { 850 // close this down - in case the peer of this transport doesn't play nice 851 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 852 } 853 throw e; 854 } 855 if (info.isManageable()) { 856 // send ConnectionCommand 857 ConnectionControl command = this.connector.getConnectionControl(); 858 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 859 if (info.isFailoverReconnect()) { 860 command.setRebalanceConnection(false); 861 } 862 dispatchAsync(command); 863 } 864 return null; 865 } 866 867 @Override 868 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 869 throws InterruptedException { 870 LOG.debug("remove connection id: {}", id); 871 TransportConnectionState cs = lookupConnectionState(id); 872 if (cs != null) { 873 // Don't allow things to be added to the connection state while we 874 // are shutting down. 875 cs.shutdown(); 876 // Cascade the connection stop to the sessions. 877 for (SessionId sessionId : cs.getSessionIds()) { 878 try { 879 processRemoveSession(sessionId, lastDeliveredSequenceId); 880 } catch (Throwable e) { 881 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 882 } 883 } 884 // Cascade the connection stop to temp destinations. 885 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 886 DestinationInfo di = iter.next(); 887 try { 888 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 889 } catch (Throwable e) { 890 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 891 } 892 iter.remove(); 893 } 894 try { 895 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 896 } catch (Throwable e) { 897 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 898 } 899 TransportConnectionState state = unregisterConnectionState(id); 900 if (state != null) { 901 synchronized (brokerConnectionStates) { 902 // If we are the last reference, we should remove the state 903 // from the broker. 904 if (state.decrementReference() == 0) { 905 brokerConnectionStates.remove(id); 906 } 907 } 908 } 909 } 910 return null; 911 } 912 913 @Override 914 public Response processProducerAck(ProducerAck ack) throws Exception { 915 // A broker should not get ProducerAck messages. 916 return null; 917 } 918 919 @Override 920 public Connector getConnector() { 921 return connector; 922 } 923 924 @Override 925 public void dispatchSync(Command message) { 926 try { 927 processDispatch(message); 928 } catch (IOException e) { 929 serviceExceptionAsync(e); 930 } 931 } 932 933 @Override 934 public void dispatchAsync(Command message) { 935 if (!stopping.get()) { 936 if (taskRunner == null) { 937 dispatchSync(message); 938 } else { 939 synchronized (dispatchQueue) { 940 dispatchQueue.add(message); 941 } 942 try { 943 taskRunner.wakeup(); 944 } catch (InterruptedException e) { 945 Thread.currentThread().interrupt(); 946 } 947 } 948 } else { 949 if (message.isMessageDispatch()) { 950 MessageDispatch md = (MessageDispatch) message; 951 TransmitCallback sub = md.getTransmitCallback(); 952 broker.postProcessDispatch(md); 953 if (sub != null) { 954 sub.onFailure(); 955 } 956 } 957 } 958 } 959 960 protected void processDispatch(Command command) throws IOException { 961 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 962 try { 963 if (!stopping.get()) { 964 if (messageDispatch != null) { 965 try { 966 broker.preProcessDispatch(messageDispatch); 967 } catch (RuntimeException convertToIO) { 968 throw new IOException(convertToIO); 969 } 970 } 971 dispatch(command); 972 } 973 } catch (IOException e) { 974 if (messageDispatch != null) { 975 TransmitCallback sub = messageDispatch.getTransmitCallback(); 976 broker.postProcessDispatch(messageDispatch); 977 if (sub != null) { 978 sub.onFailure(); 979 } 980 messageDispatch = null; 981 throw e; 982 } 983 } finally { 984 if (messageDispatch != null) { 985 TransmitCallback sub = messageDispatch.getTransmitCallback(); 986 broker.postProcessDispatch(messageDispatch); 987 if (sub != null) { 988 sub.onSuccess(); 989 } 990 } 991 } 992 } 993 994 @Override 995 public boolean iterate() { 996 try { 997 if (pendingStop || stopping.get()) { 998 if (dispatchStopped.compareAndSet(false, true)) { 999 if (transportException.get() == null) { 1000 try { 1001 dispatch(new ShutdownInfo()); 1002 } catch (Throwable ignore) { 1003 } 1004 } 1005 dispatchStoppedLatch.countDown(); 1006 } 1007 return false; 1008 } 1009 if (!dispatchStopped.get()) { 1010 Command command = null; 1011 synchronized (dispatchQueue) { 1012 if (dispatchQueue.isEmpty()) { 1013 return false; 1014 } 1015 command = dispatchQueue.remove(0); 1016 } 1017 processDispatch(command); 1018 return true; 1019 } 1020 return false; 1021 } catch (IOException e) { 1022 if (dispatchStopped.compareAndSet(false, true)) { 1023 dispatchStoppedLatch.countDown(); 1024 } 1025 serviceExceptionAsync(e); 1026 return false; 1027 } 1028 } 1029 1030 /** 1031 * Returns the statistics for this connection 1032 */ 1033 @Override 1034 public ConnectionStatistics getStatistics() { 1035 return statistics; 1036 } 1037 1038 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1039 return messageAuthorizationPolicy; 1040 } 1041 1042 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1043 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1044 } 1045 1046 @Override 1047 public boolean isManageable() { 1048 return manageable; 1049 } 1050 1051 @Override 1052 public void start() throws Exception { 1053 try { 1054 synchronized (this) { 1055 starting = true; 1056 if (taskRunnerFactory != null) { 1057 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1058 + getRemoteAddress()); 1059 } else { 1060 taskRunner = null; 1061 } 1062 transport.start(); 1063 active = true; 1064 BrokerInfo info = connector.getBrokerInfo().copy(); 1065 if (connector.isUpdateClusterClients()) { 1066 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1067 } else { 1068 info.setPeerBrokerInfos(null); 1069 } 1070 dispatchAsync(info); 1071 1072 connector.onStarted(this); 1073 } 1074 } catch (Exception e) { 1075 // Force clean up on an error starting up. 1076 pendingStop = true; 1077 throw e; 1078 } finally { 1079 // stop() can be called from within the above block, 1080 // but we want to be sure start() completes before 1081 // stop() runs, so queue the stop until right now: 1082 setStarting(false); 1083 if (isPendingStop()) { 1084 LOG.debug("Calling the delayed stop() after start() {}", this); 1085 stop(); 1086 } 1087 } 1088 } 1089 1090 @Override 1091 public void stop() throws Exception { 1092 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1093 // as their lifecycle is handled elsewhere 1094 1095 stopAsync(); 1096 while (!stopped.await(5, TimeUnit.SECONDS)) { 1097 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1098 } 1099 } 1100 1101 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1102 if (waitTime > 0) { 1103 synchronized (this) { 1104 pendingStop = true; 1105 transportException.set(cause); 1106 } 1107 try { 1108 stopTaskRunnerFactory.execute(new Runnable() { 1109 @Override 1110 public void run() { 1111 try { 1112 Thread.sleep(waitTime); 1113 stopAsync(); 1114 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1115 } catch (InterruptedException e) { 1116 } 1117 } 1118 }); 1119 } catch (Throwable t) { 1120 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1121 } 1122 } 1123 } 1124 1125 public void stopAsync(Throwable cause) { 1126 transportException.set(cause); 1127 stopAsync(); 1128 } 1129 1130 public void stopAsync() { 1131 // If we're in the middle of starting then go no further... for now. 1132 synchronized (this) { 1133 pendingStop = true; 1134 if (starting) { 1135 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1136 return; 1137 } 1138 } 1139 if (stopping.compareAndSet(false, true)) { 1140 // Let all the connection contexts know we are shutting down 1141 // so that in progress operations can notice and unblock. 1142 List<TransportConnectionState> connectionStates = listConnectionStates(); 1143 for (TransportConnectionState cs : connectionStates) { 1144 ConnectionContext connectionContext = cs.getContext(); 1145 if (connectionContext != null) { 1146 connectionContext.getStopping().set(true); 1147 } 1148 } 1149 try { 1150 stopTaskRunnerFactory.execute(new Runnable() { 1151 @Override 1152 public void run() { 1153 serviceLock.writeLock().lock(); 1154 try { 1155 doStop(); 1156 } catch (Throwable e) { 1157 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1158 } finally { 1159 stopped.countDown(); 1160 serviceLock.writeLock().unlock(); 1161 } 1162 } 1163 }); 1164 } catch (Throwable t) { 1165 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1166 stopped.countDown(); 1167 } 1168 } 1169 } 1170 1171 @Override 1172 public String toString() { 1173 return "Transport Connection to: " + transport.getRemoteAddress(); 1174 } 1175 1176 protected void doStop() throws Exception { 1177 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1178 connector.onStopped(this); 1179 try { 1180 synchronized (this) { 1181 if (duplexBridge != null) { 1182 duplexBridge.stop(); 1183 } 1184 } 1185 } catch (Exception ignore) { 1186 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1187 } 1188 try { 1189 transport.stop(); 1190 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1191 } catch (Exception e) { 1192 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1193 } 1194 if (taskRunner != null) { 1195 taskRunner.shutdown(1); 1196 taskRunner = null; 1197 } 1198 active = false; 1199 // Run the MessageDispatch callbacks so that message references get 1200 // cleaned up. 1201 synchronized (dispatchQueue) { 1202 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1203 Command command = iter.next(); 1204 if (command.isMessageDispatch()) { 1205 MessageDispatch md = (MessageDispatch) command; 1206 TransmitCallback sub = md.getTransmitCallback(); 1207 broker.postProcessDispatch(md); 1208 if (sub != null) { 1209 sub.onFailure(); 1210 } 1211 } 1212 } 1213 dispatchQueue.clear(); 1214 } 1215 // 1216 // Remove all logical connection associated with this connection 1217 // from the broker. 1218 if (!broker.isStopped()) { 1219 List<TransportConnectionState> connectionStates = listConnectionStates(); 1220 connectionStates = listConnectionStates(); 1221 for (TransportConnectionState cs : connectionStates) { 1222 cs.getContext().getStopping().set(true); 1223 try { 1224 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1225 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1226 } catch (Throwable ignore) { 1227 ignore.printStackTrace(); 1228 } 1229 } 1230 } 1231 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1232 } 1233 1234 /** 1235 * @return Returns the blockedCandidate. 1236 */ 1237 public boolean isBlockedCandidate() { 1238 return blockedCandidate; 1239 } 1240 1241 /** 1242 * @param blockedCandidate The blockedCandidate to set. 1243 */ 1244 public void setBlockedCandidate(boolean blockedCandidate) { 1245 this.blockedCandidate = blockedCandidate; 1246 } 1247 1248 /** 1249 * @return Returns the markedCandidate. 1250 */ 1251 public boolean isMarkedCandidate() { 1252 return markedCandidate; 1253 } 1254 1255 /** 1256 * @param markedCandidate The markedCandidate to set. 1257 */ 1258 public void setMarkedCandidate(boolean markedCandidate) { 1259 this.markedCandidate = markedCandidate; 1260 if (!markedCandidate) { 1261 timeStamp = 0; 1262 blockedCandidate = false; 1263 } 1264 } 1265 1266 /** 1267 * @param slow The slow to set. 1268 */ 1269 public void setSlow(boolean slow) { 1270 this.slow = slow; 1271 } 1272 1273 /** 1274 * @return true if the Connection is slow 1275 */ 1276 @Override 1277 public boolean isSlow() { 1278 return slow; 1279 } 1280 1281 /** 1282 * @return true if the Connection is potentially blocked 1283 */ 1284 public boolean isMarkedBlockedCandidate() { 1285 return markedCandidate; 1286 } 1287 1288 /** 1289 * Mark the Connection, so we can deem if it's collectable on the next sweep 1290 */ 1291 public void doMark() { 1292 if (timeStamp == 0) { 1293 timeStamp = System.currentTimeMillis(); 1294 } 1295 } 1296 1297 /** 1298 * @return if after being marked, the Connection is still writing 1299 */ 1300 @Override 1301 public boolean isBlocked() { 1302 return blocked; 1303 } 1304 1305 /** 1306 * @return true if the Connection is connected 1307 */ 1308 @Override 1309 public boolean isConnected() { 1310 return connected; 1311 } 1312 1313 /** 1314 * @param blocked The blocked to set. 1315 */ 1316 public void setBlocked(boolean blocked) { 1317 this.blocked = blocked; 1318 } 1319 1320 /** 1321 * @param connected The connected to set. 1322 */ 1323 public void setConnected(boolean connected) { 1324 this.connected = connected; 1325 } 1326 1327 /** 1328 * @return true if the Connection is active 1329 */ 1330 @Override 1331 public boolean isActive() { 1332 return active; 1333 } 1334 1335 /** 1336 * @param active The active to set. 1337 */ 1338 public void setActive(boolean active) { 1339 this.active = active; 1340 } 1341 1342 /** 1343 * @return true if the Connection is starting 1344 */ 1345 public synchronized boolean isStarting() { 1346 return starting; 1347 } 1348 1349 @Override 1350 public synchronized boolean isNetworkConnection() { 1351 return networkConnection; 1352 } 1353 1354 @Override 1355 public boolean isFaultTolerantConnection() { 1356 return this.faultTolerantConnection; 1357 } 1358 1359 protected synchronized void setStarting(boolean starting) { 1360 this.starting = starting; 1361 } 1362 1363 /** 1364 * @return true if the Connection needs to stop 1365 */ 1366 public synchronized boolean isPendingStop() { 1367 return pendingStop; 1368 } 1369 1370 protected synchronized void setPendingStop(boolean pendingStop) { 1371 this.pendingStop = pendingStop; 1372 } 1373 1374 @Override 1375 public Response processBrokerInfo(BrokerInfo info) { 1376 if (info.isSlaveBroker()) { 1377 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1378 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1379 // so this TransportConnection is the rear end of a network bridge 1380 // We have been requested to create a two way pipe ... 1381 try { 1382 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1383 Map<String, String> props = createMap(properties); 1384 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1385 IntrospectionSupport.setProperties(config, props, ""); 1386 config.setBrokerName(broker.getBrokerName()); 1387 1388 // check for existing duplex connection hanging about 1389 1390 // We first look if existing network connection already exists for the same broker Id and network connector name 1391 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1392 // and the duplex network connector side wanting to open a new one 1393 // In this case, the old connection must be broken 1394 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1395 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1396 synchronized (connections) { 1397 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1398 TransportConnection c = iter.next(); 1399 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1400 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1401 c.stopAsync(); 1402 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1403 c.getStopped().await(1, TimeUnit.SECONDS); 1404 } 1405 } 1406 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1407 } 1408 Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); 1409 Transport remoteBridgeTransport = transport; 1410 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1411 // the vm transport case is already wrapped 1412 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1413 } 1414 String duplexName = localTransport.toString(); 1415 if (duplexName.contains("#")) { 1416 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1417 } 1418 MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName)); 1419 listener.setCreatedByDuplex(true); 1420 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1421 duplexBridge.setBrokerService(brokerService); 1422 // now turn duplex off this side 1423 info.setDuplexConnection(false); 1424 duplexBridge.setCreatedByDuplex(true); 1425 duplexBridge.duplexStart(this, brokerInfo, info); 1426 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1427 return null; 1428 } catch (TransportDisposedIOException e) { 1429 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1430 return null; 1431 } catch (Exception e) { 1432 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1433 return null; 1434 } 1435 } 1436 // We only expect to get one broker info command per connection 1437 if (this.brokerInfo != null) { 1438 LOG.warn("Unexpected extra broker info command received: {}", info); 1439 } 1440 this.brokerInfo = info; 1441 networkConnection = true; 1442 List<TransportConnectionState> connectionStates = listConnectionStates(); 1443 for (TransportConnectionState cs : connectionStates) { 1444 cs.getContext().setNetworkConnection(true); 1445 } 1446 return null; 1447 } 1448 1449 @SuppressWarnings({"unchecked", "rawtypes"}) 1450 private HashMap<String, String> createMap(Properties properties) { 1451 return new HashMap(properties); 1452 } 1453 1454 protected void dispatch(Command command) throws IOException { 1455 try { 1456 setMarkedCandidate(true); 1457 transport.oneway(command); 1458 } finally { 1459 setMarkedCandidate(false); 1460 } 1461 } 1462 1463 @Override 1464 public String getRemoteAddress() { 1465 return transport.getRemoteAddress(); 1466 } 1467 1468 public Transport getTransport() { 1469 return transport; 1470 } 1471 1472 @Override 1473 public String getConnectionId() { 1474 List<TransportConnectionState> connectionStates = listConnectionStates(); 1475 for (TransportConnectionState cs : connectionStates) { 1476 if (cs.getInfo().getClientId() != null) { 1477 return cs.getInfo().getClientId(); 1478 } 1479 return cs.getInfo().getConnectionId().toString(); 1480 } 1481 return null; 1482 } 1483 1484 @Override 1485 public void updateClient(ConnectionControl control) { 1486 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1487 && this.wireFormatInfo.getVersion() >= 6) { 1488 dispatchAsync(control); 1489 } 1490 } 1491 1492 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1493 ProducerBrokerExchange result = null; 1494 if (producerInfo != null && producerInfo.getProducerId() != null){ 1495 synchronized (producerExchanges){ 1496 result = producerExchanges.get(producerInfo.getProducerId()); 1497 } 1498 } 1499 return result; 1500 } 1501 1502 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1503 ProducerBrokerExchange result = producerExchanges.get(id); 1504 if (result == null) { 1505 synchronized (producerExchanges) { 1506 result = new ProducerBrokerExchange(); 1507 TransportConnectionState state = lookupConnectionState(id); 1508 context = state.getContext(); 1509 result.setConnectionContext(context); 1510 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1511 result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id)); 1512 } 1513 SessionState ss = state.getSessionState(id.getParentId()); 1514 if (ss != null) { 1515 result.setProducerState(ss.getProducerState(id)); 1516 ProducerState producerState = ss.getProducerState(id); 1517 if (producerState != null && producerState.getInfo() != null) { 1518 ProducerInfo info = producerState.getInfo(); 1519 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1520 } 1521 } 1522 producerExchanges.put(id, result); 1523 } 1524 } else { 1525 context = result.getConnectionContext(); 1526 } 1527 return result; 1528 } 1529 1530 private void removeProducerBrokerExchange(ProducerId id) { 1531 synchronized (producerExchanges) { 1532 producerExchanges.remove(id); 1533 } 1534 } 1535 1536 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1537 ConsumerBrokerExchange result = consumerExchanges.get(id); 1538 return result; 1539 } 1540 1541 private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) { 1542 ConsumerBrokerExchange result = consumerExchanges.get(id); 1543 if (result == null) { 1544 synchronized (consumerExchanges) { 1545 result = new ConsumerBrokerExchange(); 1546 context = connectionState.getContext(); 1547 result.setConnectionContext(context); 1548 SessionState ss = connectionState.getSessionState(id.getParentId()); 1549 if (ss != null) { 1550 ConsumerState cs = ss.getConsumerState(id); 1551 if (cs != null) { 1552 ConsumerInfo info = cs.getInfo(); 1553 if (info != null) { 1554 if (info.getDestination() != null && info.getDestination().isPattern()) { 1555 result.setWildcard(true); 1556 } 1557 } 1558 } 1559 } 1560 consumerExchanges.put(id, result); 1561 } 1562 } 1563 return result; 1564 } 1565 1566 private void removeConsumerBrokerExchange(ConsumerId id) { 1567 synchronized (consumerExchanges) { 1568 consumerExchanges.remove(id); 1569 } 1570 } 1571 1572 public int getProtocolVersion() { 1573 return protocolVersion.get(); 1574 } 1575 1576 @Override 1577 public Response processControlCommand(ControlCommand command) throws Exception { 1578 return null; 1579 } 1580 1581 @Override 1582 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1583 return null; 1584 } 1585 1586 @Override 1587 public Response processConnectionControl(ConnectionControl control) throws Exception { 1588 if (control != null) { 1589 faultTolerantConnection = control.isFaultTolerant(); 1590 } 1591 return null; 1592 } 1593 1594 @Override 1595 public Response processConnectionError(ConnectionError error) throws Exception { 1596 return null; 1597 } 1598 1599 @Override 1600 public Response processConsumerControl(ConsumerControl control) throws Exception { 1601 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1602 broker.processConsumerControl(consumerExchange, control); 1603 return null; 1604 } 1605 1606 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1607 TransportConnectionState state) { 1608 TransportConnectionState cs = null; 1609 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1610 // swap implementations 1611 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1612 newRegister.intialize(connectionStateRegister); 1613 connectionStateRegister = newRegister; 1614 } 1615 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1616 return cs; 1617 } 1618 1619 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1620 return connectionStateRegister.unregisterConnectionState(connectionId); 1621 } 1622 1623 protected synchronized List<TransportConnectionState> listConnectionStates() { 1624 return connectionStateRegister.listConnectionStates(); 1625 } 1626 1627 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1628 return connectionStateRegister.lookupConnectionState(connectionId); 1629 } 1630 1631 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1632 return connectionStateRegister.lookupConnectionState(id); 1633 } 1634 1635 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1636 return connectionStateRegister.lookupConnectionState(id); 1637 } 1638 1639 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1640 return connectionStateRegister.lookupConnectionState(id); 1641 } 1642 1643 // public only for testing 1644 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1645 return connectionStateRegister.lookupConnectionState(connectionId); 1646 } 1647 1648 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1649 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1650 } 1651 1652 protected synchronized String getDuplexNetworkConnectorId() { 1653 return this.duplexNetworkConnectorId; 1654 } 1655 1656 public boolean isStopping() { 1657 return stopping.get(); 1658 } 1659 1660 protected CountDownLatch getStopped() { 1661 return stopped; 1662 } 1663 1664 private int getProducerCount(ConnectionId connectionId) { 1665 int result = 0; 1666 TransportConnectionState cs = lookupConnectionState(connectionId); 1667 if (cs != null) { 1668 for (SessionId sessionId : cs.getSessionIds()) { 1669 SessionState sessionState = cs.getSessionState(sessionId); 1670 if (sessionState != null) { 1671 result += sessionState.getProducerIds().size(); 1672 } 1673 } 1674 } 1675 return result; 1676 } 1677 1678 private int getConsumerCount(ConnectionId connectionId) { 1679 int result = 0; 1680 TransportConnectionState cs = lookupConnectionState(connectionId); 1681 if (cs != null) { 1682 for (SessionId sessionId : cs.getSessionIds()) { 1683 SessionState sessionState = cs.getSessionState(sessionId); 1684 if (sessionState != null) { 1685 result += sessionState.getConsumerIds().size(); 1686 } 1687 } 1688 } 1689 return result; 1690 } 1691 1692 public WireFormatInfo getRemoteWireFormatInfo() { 1693 return wireFormatInfo; 1694 } 1695}