001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY; 020import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; 021import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID; 022import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD; 023import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX; 024import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; 025import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; 026import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX; 027import static org.apache.activemq.transport.amqp.AmqpSupport.contains; 028 029import java.io.IOException; 030import java.nio.ByteBuffer; 031import java.util.Date; 032import java.util.HashMap; 033import java.util.Map; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import javax.jms.InvalidClientIDException; 039 040import org.apache.activemq.broker.BrokerService; 041import org.apache.activemq.broker.region.DurableTopicSubscription; 042import org.apache.activemq.broker.region.RegionBroker; 043import org.apache.activemq.broker.region.TopicRegion; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.ActiveMQTempDestination; 046import org.apache.activemq.command.ActiveMQTempQueue; 047import org.apache.activemq.command.ActiveMQTempTopic; 048import org.apache.activemq.command.Command; 049import org.apache.activemq.command.ConnectionError; 050import org.apache.activemq.command.ConnectionId; 051import org.apache.activemq.command.ConnectionInfo; 052import org.apache.activemq.command.ConsumerControl; 053import org.apache.activemq.command.ConsumerId; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.DestinationInfo; 056import org.apache.activemq.command.ExceptionResponse; 057import org.apache.activemq.command.MessageDispatch; 058import org.apache.activemq.command.RemoveInfo; 059import org.apache.activemq.command.Response; 060import org.apache.activemq.command.SessionId; 061import org.apache.activemq.command.ShutdownInfo; 062import org.apache.activemq.transport.InactivityIOException; 063import org.apache.activemq.transport.amqp.AmqpHeader; 064import org.apache.activemq.transport.amqp.AmqpInactivityMonitor; 065import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 066import org.apache.activemq.transport.amqp.AmqpProtocolException; 067import org.apache.activemq.transport.amqp.AmqpTransport; 068import org.apache.activemq.transport.amqp.AmqpTransportFilter; 069import org.apache.activemq.transport.amqp.AmqpWireFormat; 070import org.apache.activemq.transport.amqp.ResponseHandler; 071import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator; 072import org.apache.activemq.util.IOExceptionSupport; 073import org.apache.activemq.util.IdGenerator; 074import org.apache.qpid.proton.Proton; 075import org.apache.qpid.proton.amqp.Symbol; 076import org.apache.qpid.proton.amqp.transaction.Coordinator; 077import org.apache.qpid.proton.amqp.transport.AmqpError; 078import org.apache.qpid.proton.amqp.transport.ErrorCondition; 079import org.apache.qpid.proton.engine.Collector; 080import org.apache.qpid.proton.engine.Connection; 081import org.apache.qpid.proton.engine.Delivery; 082import org.apache.qpid.proton.engine.EndpointState; 083import org.apache.qpid.proton.engine.Event; 084import org.apache.qpid.proton.engine.Link; 085import org.apache.qpid.proton.engine.Receiver; 086import org.apache.qpid.proton.engine.Sender; 087import org.apache.qpid.proton.engine.Session; 088import org.apache.qpid.proton.engine.Transport; 089import org.apache.qpid.proton.engine.impl.CollectorImpl; 090import org.apache.qpid.proton.engine.impl.ProtocolTracer; 091import org.apache.qpid.proton.engine.impl.TransportImpl; 092import org.apache.qpid.proton.framing.TransportFrame; 093import org.fusesource.hawtbuf.Buffer; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * Implements the mechanics of managing a single remote peer connection. 099 */ 100public class AmqpConnection implements AmqpProtocolConverter { 101 102 private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; 103 private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); 104 private static final int CHANNEL_MAX = 32767; 105 106 private final Transport protonTransport = Proton.transport(); 107 private final Connection protonConnection = Proton.connection(); 108 private final Collector eventCollector = new CollectorImpl(); 109 110 private final AmqpTransport amqpTransport; 111 private final AmqpWireFormat amqpWireFormat; 112 private final BrokerService brokerService; 113 114 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 115 private final AtomicInteger lastCommandId = new AtomicInteger(); 116 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 117 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 118 private long nextSessionId; 119 private long nextTempDestinationId; 120 private boolean closing; 121 private boolean closedSocket; 122 private AmqpAuthenticator authenticator; 123 124 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 125 private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>(); 126 127 public AmqpConnection(AmqpTransport transport, BrokerService brokerService) { 128 this.amqpTransport = transport; 129 130 AmqpInactivityMonitor monitor = transport.getInactivityMonitor(); 131 if (monitor != null) { 132 monitor.setAmqpTransport(amqpTransport); 133 } 134 135 this.amqpWireFormat = transport.getWireFormat(); 136 this.brokerService = brokerService; 137 138 // the configured maxFrameSize on the URI. 139 int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize(); 140 if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) { 141 this.protonTransport.setMaxFrameSize(maxFrameSize); 142 } 143 144 this.protonTransport.bind(this.protonConnection); 145 this.protonTransport.setChannelMax(CHANNEL_MAX); 146 147 this.protonConnection.collect(eventCollector); 148 149 updateTracer(); 150 } 151 152 /** 153 * Load and return a <code>[]Symbol</code> that contains the connection capabilities 154 * offered to new connections 155 * 156 * @return the capabilities that are offered to new clients on connect. 157 */ 158 protected Symbol[] getConnectionCapabilitiesOffered() { 159 return new Symbol[]{ ANONYMOUS_RELAY }; 160 } 161 162 /** 163 * Load and return a <code>Map<Symbol, Object></code> that contains the properties 164 * that this connection supplies to incoming connections. 165 * 166 * @return the properties that are offered to the incoming connection. 167 */ 168 protected Map<Symbol, Object> getConnetionProperties() { 169 Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); 170 171 properties.put(QUEUE_PREFIX, "queue://"); 172 properties.put(TOPIC_PREFIX, "topic://"); 173 174 return properties; 175 } 176 177 /** 178 * Load and return a <code>Map<Symbol, Object></code> that contains the properties 179 * that this connection supplies to incoming connections when the open has failed 180 * and the remote should expect a close to follow. 181 * 182 * @return the properties that are offered to the incoming connection. 183 */ 184 protected Map<Symbol, Object> getFailedConnetionProperties() { 185 Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); 186 187 properties.put(CONNECTION_OPEN_FAILED, true); 188 189 return properties; 190 } 191 192 @Override 193 public void updateTracer() { 194 if (amqpTransport.isTrace()) { 195 ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { 196 @Override 197 public void receivedFrame(TransportFrame transportFrame) { 198 TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()); 199 } 200 201 @Override 202 public void sentFrame(TransportFrame transportFrame) { 203 TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody()); 204 } 205 }); 206 } 207 } 208 209 @Override 210 public long keepAlive() throws IOException { 211 long rescheduleAt = 0l; 212 213 LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress()); 214 215 if (protonConnection.getLocalState() != EndpointState.CLOSED) { 216 rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); 217 pumpProtonToSocket(); 218 if (protonTransport.isClosed()) { 219 rescheduleAt = 0; 220 LOG.debug("Transport closed after inactivity check."); 221 throw new InactivityIOException("Channel was inactive for to long"); 222 } 223 } 224 225 LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.", 226 amqpTransport.getRemoteAddress(), rescheduleAt); 227 228 return rescheduleAt; 229 } 230 231 //----- Connection Properties Accessors ----------------------------------// 232 233 /** 234 * @return the amount of credit assigned to AMQP receiver links created from 235 * sender links on the remote peer. 236 */ 237 public int getConfiguredReceiverCredit() { 238 return amqpWireFormat.getProducerCredit(); 239 } 240 241 /** 242 * @return the transformer type that was configured for this AMQP transport. 243 */ 244 public String getConfiguredTransformer() { 245 return amqpWireFormat.getTransformer(); 246 } 247 248 /** 249 * @return the ActiveMQ ConnectionId that identifies this AMQP Connection. 250 */ 251 public ConnectionId getConnectionId() { 252 return connectionId; 253 } 254 255 /** 256 * @return the Client ID used to create the connection with ActiveMQ 257 */ 258 public String getClientId() { 259 return connectionInfo.getClientId(); 260 } 261 262 /** 263 * @return the configured max frame size allowed for incoming messages. 264 */ 265 public long getMaxFrameSize() { 266 return amqpWireFormat.getMaxFrameSize(); 267 } 268 269 //----- Proton Event handling and IO support -----------------------------// 270 271 void pumpProtonToSocket() { 272 try { 273 boolean done = false; 274 while (!done) { 275 ByteBuffer toWrite = protonTransport.getOutputBuffer(); 276 if (toWrite != null && toWrite.hasRemaining()) { 277 LOG.trace("Sending {} bytes out", toWrite.limit()); 278 amqpTransport.sendToAmqp(toWrite); 279 protonTransport.outputConsumed(); 280 } else { 281 done = true; 282 } 283 } 284 } catch (IOException e) { 285 amqpTransport.onException(e); 286 } 287 } 288 289 @Override 290 public void onAMQPData(Object command) throws Exception { 291 Buffer frame; 292 if (command.getClass() == AmqpHeader.class) { 293 AmqpHeader header = (AmqpHeader) command; 294 295 if (amqpWireFormat.isHeaderValid(header)) { 296 LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header); 297 } else { 298 LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header); 299 AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader(); 300 amqpTransport.sendToAmqp(reply.getBuffer()); 301 handleException(new AmqpProtocolException( 302 "Connection from client using unsupported AMQP attempted", true)); 303 } 304 305 switch (header.getProtocolId()) { 306 case 0: 307 authenticator = null; 308 break; // nothing to do.. 309 case 3: // Client will be using SASL for auth.. 310 authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService); 311 break; 312 default: 313 } 314 frame = header.getBuffer(); 315 } else { 316 frame = (Buffer) command; 317 } 318 319 if (protonTransport.isClosed()) { 320 LOG.debug("Ignoring incoming AMQP data, transport is closed."); 321 return; 322 } 323 324 while (frame.length > 0) { 325 try { 326 int count = protonTransport.input(frame.data, frame.offset, frame.length); 327 frame.moveHead(count); 328 } catch (Throwable e) { 329 handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e)); 330 return; 331 } 332 333 if (authenticator != null) { 334 processSaslExchange(); 335 } else { 336 processProtonEvents(); 337 } 338 } 339 } 340 341 private void processSaslExchange() throws Exception { 342 authenticator.processSaslExchange(connectionInfo); 343 if (authenticator.isDone()) { 344 amqpTransport.getWireFormat().resetMagicRead(); 345 } 346 pumpProtonToSocket(); 347 } 348 349 private void processProtonEvents() throws Exception { 350 try { 351 Event event = null; 352 while ((event = eventCollector.peek()) != null) { 353 if (amqpTransport.isTrace()) { 354 LOG.trace("Processing event: {}", event.getType()); 355 } 356 switch (event.getType()) { 357 case CONNECTION_REMOTE_OPEN: 358 processConnectionOpen(event.getConnection()); 359 break; 360 case CONNECTION_REMOTE_CLOSE: 361 processConnectionClose(event.getConnection()); 362 break; 363 case SESSION_REMOTE_OPEN: 364 processSessionOpen(event.getSession()); 365 break; 366 case SESSION_REMOTE_CLOSE: 367 processSessionClose(event.getSession()); 368 break; 369 case LINK_REMOTE_OPEN: 370 processLinkOpen(event.getLink()); 371 break; 372 case LINK_REMOTE_DETACH: 373 processLinkDetach(event.getLink()); 374 break; 375 case LINK_REMOTE_CLOSE: 376 processLinkClose(event.getLink()); 377 break; 378 case LINK_FLOW: 379 processLinkFlow(event.getLink()); 380 break; 381 case DELIVERY: 382 processDelivery(event.getDelivery()); 383 break; 384 default: 385 break; 386 } 387 388 eventCollector.pop(); 389 } 390 391 } catch (Throwable e) { 392 handleException(new AmqpProtocolException("Could not process AMQP commands", true, e)); 393 } 394 395 pumpProtonToSocket(); 396 } 397 398 protected void processConnectionOpen(Connection connection) throws Exception { 399 400 stopConnectionTimeoutChecker(); 401 402 connectionInfo.setResponseRequired(true); 403 connectionInfo.setConnectionId(connectionId); 404 405 String clientId = protonConnection.getRemoteContainer(); 406 if (clientId != null && !clientId.isEmpty()) { 407 connectionInfo.setClientId(clientId); 408 } 409 410 connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); 411 412 if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) { 413 // We cannot meet the requested Idle processing because the inactivity monitor is 414 // disabled so we won't send idle frames to match the request. 415 protonConnection.setProperties(getFailedConnetionProperties()); 416 protonConnection.open(); 417 protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames")); 418 protonConnection.close(); 419 pumpProtonToSocket(); 420 421 amqpTransport.onException(new IOException( 422 "Connection failed, remote requested idle processing but inactivity monitoring is disbaled.")); 423 return; 424 } 425 426 sendToActiveMQ(connectionInfo, new ResponseHandler() { 427 @Override 428 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 429 Throwable exception = null; 430 try { 431 if (response.isException()) { 432 protonConnection.setProperties(getFailedConnetionProperties()); 433 protonConnection.open(); 434 435 exception = ((ExceptionResponse) response).getException(); 436 if (exception instanceof SecurityException) { 437 protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); 438 } else if (exception instanceof InvalidClientIDException) { 439 ErrorCondition condition = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()); 440 441 Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> (); 442 infoMap.put(INVALID_FIELD, CONTAINER_ID); 443 condition.setInfo(infoMap); 444 445 protonConnection.setCondition(condition); 446 } else { 447 protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage())); 448 } 449 450 protonConnection.close(); 451 } else { 452 453 if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) { 454 LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout()); 455 protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout()); 456 } 457 458 protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); 459 protonConnection.setProperties(getConnetionProperties()); 460 protonConnection.open(); 461 462 configureInactivityMonitor(); 463 } 464 } finally { 465 pumpProtonToSocket(); 466 467 if (response.isException()) { 468 amqpTransport.onException(IOExceptionSupport.create(exception)); 469 } 470 } 471 } 472 }); 473 } 474 475 protected void processConnectionClose(Connection connection) throws Exception { 476 if (!closing) { 477 closing = true; 478 sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() { 479 @Override 480 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 481 protonConnection.close(); 482 protonConnection.free(); 483 484 if (!closedSocket) { 485 pumpProtonToSocket(); 486 } 487 } 488 }); 489 490 sendToActiveMQ(new ShutdownInfo()); 491 } 492 } 493 494 protected void processSessionOpen(Session protonSession) throws Exception { 495 new AmqpSession(this, getNextSessionId(), protonSession).open(); 496 } 497 498 protected void processSessionClose(Session protonSession) throws Exception { 499 if (protonSession.getContext() != null) { 500 ((AmqpResource) protonSession.getContext()).close(); 501 } else { 502 protonSession.close(); 503 protonSession.free(); 504 } 505 } 506 507 protected void processLinkOpen(Link link) throws Exception { 508 link.setSource(link.getRemoteSource()); 509 link.setTarget(link.getRemoteTarget()); 510 511 AmqpSession session = (AmqpSession) link.getSession().getContext(); 512 if (link instanceof Receiver) { 513 if (link.getRemoteTarget() instanceof Coordinator) { 514 session.createCoordinator((Receiver) link); 515 } else { 516 session.createReceiver((Receiver) link); 517 } 518 } else { 519 session.createSender((Sender) link); 520 } 521 } 522 523 protected void processLinkDetach(Link link) throws Exception { 524 Object context = link.getContext(); 525 526 if (context instanceof AmqpLink) { 527 ((AmqpLink) context).detach(); 528 } else { 529 link.detach(); 530 link.free(); 531 } 532 } 533 534 protected void processLinkClose(Link link) throws Exception { 535 Object context = link.getContext(); 536 537 if (context instanceof AmqpLink) { 538 ((AmqpLink) context).close();; 539 } else { 540 link.close(); 541 link.free(); 542 } 543 } 544 545 protected void processLinkFlow(Link link) throws Exception { 546 Object context = link.getContext(); 547 if (context instanceof AmqpLink) { 548 ((AmqpLink) context).flow(); 549 } 550 } 551 552 protected void processDelivery(Delivery delivery) throws Exception { 553 if (!delivery.isPartial()) { 554 Object context = delivery.getLink().getContext(); 555 if (context instanceof AmqpLink) { 556 AmqpLink amqpLink = (AmqpLink) context; 557 amqpLink.delivery(delivery); 558 } 559 } 560 } 561 562 //----- Event entry points for ActiveMQ commands and errors --------------// 563 564 @Override 565 public void onAMQPException(IOException error) { 566 closedSocket = true; 567 if (!closing) { 568 try { 569 closing = true; 570 // Attempt to inform the other end that we are going to close 571 // so that the client doesn't wait around forever. 572 protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage())); 573 protonConnection.close(); 574 pumpProtonToSocket(); 575 } catch (Exception ignore) { 576 } 577 amqpTransport.sendToActiveMQ(error); 578 } else { 579 try { 580 amqpTransport.stop(); 581 } catch (Exception ignore) { 582 } 583 } 584 } 585 586 @Override 587 public void onActiveMQCommand(Command command) throws Exception { 588 if (command.isResponse()) { 589 Response response = (Response) command; 590 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 591 if (rh != null) { 592 rh.onResponse(this, response); 593 } else { 594 // Pass down any unexpected errors. Should this close the connection? 595 if (response.isException()) { 596 Throwable exception = ((ExceptionResponse) response).getException(); 597 handleException(exception); 598 } 599 } 600 } else if (command.isMessageDispatch()) { 601 MessageDispatch dispatch = (MessageDispatch) command; 602 AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId()); 603 if (sender != null) { 604 // End of Queue Browse will have no Message object. 605 if (dispatch.getMessage() != null) { 606 LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId()); 607 } else { 608 LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId()); 609 } 610 sender.onMessageDispatch(dispatch); 611 if (dispatch.getMessage() != null) { 612 LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId()); 613 } 614 } 615 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 616 // Pass down any unexpected async errors. Should this close the connection? 617 Throwable exception = ((ConnectionError) command).getException(); 618 handleException(exception); 619 } else if (command.isConsumerControl()) { 620 ConsumerControl control = (ConsumerControl) command; 621 AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId()); 622 if (sender != null) { 623 sender.onConsumerControl(control); 624 } 625 } else if (command.isBrokerInfo()) { 626 // ignore 627 } else { 628 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 629 } 630 } 631 632 //----- Utility methods for connection resources to use ------------------// 633 634 void registerSender(ConsumerId consumerId, AmqpSender sender) { 635 subscriptionsByConsumerId.put(consumerId, sender); 636 } 637 638 void unregisterSender(ConsumerId consumerId) { 639 subscriptionsByConsumerId.remove(consumerId); 640 } 641 642 ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException { 643 ConsumerInfo result = null; 644 RegionBroker regionBroker; 645 646 try { 647 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 648 } catch (Exception e) { 649 throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e); 650 } 651 652 final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 653 DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId()); 654 if (subscription != null) { 655 result = subscription.getConsumerInfo(); 656 } 657 658 return result; 659 } 660 661 ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) { 662 ActiveMQDestination rc = null; 663 if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) { 664 rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++); 665 } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) { 666 rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++); 667 } else { 668 LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue"); 669 rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++); 670 } 671 672 DestinationInfo info = new DestinationInfo(); 673 info.setConnectionId(connectionId); 674 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 675 info.setDestination(rc); 676 677 sendToActiveMQ(info, new ResponseHandler() { 678 679 @Override 680 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 681 if (response.isException()) { 682 link.setSource(null); 683 684 Throwable exception = ((ExceptionResponse) response).getException(); 685 if (exception instanceof SecurityException) { 686 link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage())); 687 } else { 688 link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); 689 } 690 691 link.close(); 692 link.free(); 693 } 694 } 695 }); 696 697 return rc; 698 } 699 700 void deleteTemporaryDestination(ActiveMQTempDestination destination) { 701 DestinationInfo info = new DestinationInfo(); 702 info.setConnectionId(connectionId); 703 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 704 info.setDestination(destination); 705 706 sendToActiveMQ(info, new ResponseHandler() { 707 708 @Override 709 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 710 if (response.isException()) { 711 Throwable exception = ((ExceptionResponse) response).getException(); 712 LOG.debug("Error during temp destination removeal: {}", exception.getMessage()); 713 } 714 } 715 }); 716 } 717 718 void sendToActiveMQ(Command command) { 719 sendToActiveMQ(command, null); 720 } 721 722 void sendToActiveMQ(Command command, ResponseHandler handler) { 723 command.setCommandId(lastCommandId.incrementAndGet()); 724 if (handler != null) { 725 command.setResponseRequired(true); 726 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 727 } 728 amqpTransport.sendToActiveMQ(command); 729 } 730 731 void handleException(Throwable exception) { 732 LOG.debug("Exception detail", exception); 733 if (exception instanceof AmqpProtocolException) { 734 onAMQPException((IOException) exception); 735 } else { 736 try { 737 // Must ensure that the broker removes Connection resources. 738 sendToActiveMQ(new ShutdownInfo()); 739 amqpTransport.stop(); 740 } catch (Throwable e) { 741 LOG.error("Failed to stop AMQP Transport ", e); 742 } 743 } 744 } 745 746 //----- Internal implementation ------------------------------------------// 747 748 private SessionId getNextSessionId() { 749 return new SessionId(connectionId, nextSessionId++); 750 } 751 752 private void stopConnectionTimeoutChecker() { 753 AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); 754 if (monitor != null) { 755 monitor.stopConnectionTimeoutChecker(); 756 } 757 } 758 759 private void configureInactivityMonitor() { 760 AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); 761 if (monitor == null) { 762 return; 763 } 764 765 // If either end has idle timeout requirements then the tick method 766 // will give us a deadline on the next time we need to tick() in order 767 // to meet those obligations. 768 long nextIdleCheck = protonTransport.tick(System.currentTimeMillis()); 769 if (nextIdleCheck > 0) { 770 LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck)); 771 monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis()); 772 } else { 773 LOG.trace("Connection does not require keep-alive processing"); 774 } 775 } 776}