001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.ANONYMOUS_RELAY;
020import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
021import static org.apache.activemq.transport.amqp.AmqpSupport.CONTAINER_ID;
022import static org.apache.activemq.transport.amqp.AmqpSupport.INVALID_FIELD;
023import static org.apache.activemq.transport.amqp.AmqpSupport.QUEUE_PREFIX;
024import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
025import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
026import static org.apache.activemq.transport.amqp.AmqpSupport.TOPIC_PREFIX;
027import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
028
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.util.Date;
032import java.util.HashMap;
033import java.util.Map;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import javax.jms.InvalidClientIDException;
039
040import org.apache.activemq.broker.BrokerService;
041import org.apache.activemq.broker.region.DurableTopicSubscription;
042import org.apache.activemq.broker.region.RegionBroker;
043import org.apache.activemq.broker.region.TopicRegion;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.command.ActiveMQTempDestination;
046import org.apache.activemq.command.ActiveMQTempQueue;
047import org.apache.activemq.command.ActiveMQTempTopic;
048import org.apache.activemq.command.Command;
049import org.apache.activemq.command.ConnectionError;
050import org.apache.activemq.command.ConnectionId;
051import org.apache.activemq.command.ConnectionInfo;
052import org.apache.activemq.command.ConsumerControl;
053import org.apache.activemq.command.ConsumerId;
054import org.apache.activemq.command.ConsumerInfo;
055import org.apache.activemq.command.DestinationInfo;
056import org.apache.activemq.command.ExceptionResponse;
057import org.apache.activemq.command.MessageDispatch;
058import org.apache.activemq.command.RemoveInfo;
059import org.apache.activemq.command.Response;
060import org.apache.activemq.command.SessionId;
061import org.apache.activemq.command.ShutdownInfo;
062import org.apache.activemq.transport.InactivityIOException;
063import org.apache.activemq.transport.amqp.AmqpHeader;
064import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
065import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
066import org.apache.activemq.transport.amqp.AmqpProtocolException;
067import org.apache.activemq.transport.amqp.AmqpTransport;
068import org.apache.activemq.transport.amqp.AmqpTransportFilter;
069import org.apache.activemq.transport.amqp.AmqpWireFormat;
070import org.apache.activemq.transport.amqp.ResponseHandler;
071import org.apache.activemq.transport.amqp.sasl.AmqpAuthenticator;
072import org.apache.activemq.util.IOExceptionSupport;
073import org.apache.activemq.util.IdGenerator;
074import org.apache.qpid.proton.Proton;
075import org.apache.qpid.proton.amqp.Symbol;
076import org.apache.qpid.proton.amqp.transaction.Coordinator;
077import org.apache.qpid.proton.amqp.transport.AmqpError;
078import org.apache.qpid.proton.amqp.transport.ErrorCondition;
079import org.apache.qpid.proton.engine.Collector;
080import org.apache.qpid.proton.engine.Connection;
081import org.apache.qpid.proton.engine.Delivery;
082import org.apache.qpid.proton.engine.EndpointState;
083import org.apache.qpid.proton.engine.Event;
084import org.apache.qpid.proton.engine.Link;
085import org.apache.qpid.proton.engine.Receiver;
086import org.apache.qpid.proton.engine.Sender;
087import org.apache.qpid.proton.engine.Session;
088import org.apache.qpid.proton.engine.Transport;
089import org.apache.qpid.proton.engine.impl.CollectorImpl;
090import org.apache.qpid.proton.engine.impl.ProtocolTracer;
091import org.apache.qpid.proton.engine.impl.TransportImpl;
092import org.apache.qpid.proton.framing.TransportFrame;
093import org.fusesource.hawtbuf.Buffer;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * Implements the mechanics of managing a single remote peer connection.
099 */
100public class AmqpConnection implements AmqpProtocolConverter {
101
102    private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
103    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
104    private static final int CHANNEL_MAX = 32767;
105
106    private final Transport protonTransport = Proton.transport();
107    private final Connection protonConnection = Proton.connection();
108    private final Collector eventCollector = new CollectorImpl();
109
110    private final AmqpTransport amqpTransport;
111    private final AmqpWireFormat amqpWireFormat;
112    private final BrokerService brokerService;
113
114    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
115    private final AtomicInteger lastCommandId = new AtomicInteger();
116    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
117    private final ConnectionInfo connectionInfo = new ConnectionInfo();
118    private long nextSessionId;
119    private long nextTempDestinationId;
120    private boolean closing;
121    private boolean closedSocket;
122    private AmqpAuthenticator authenticator;
123
124    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
125    private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
126
127    public AmqpConnection(AmqpTransport transport, BrokerService brokerService) {
128        this.amqpTransport = transport;
129
130        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
131        if (monitor != null) {
132            monitor.setAmqpTransport(amqpTransport);
133        }
134
135        this.amqpWireFormat = transport.getWireFormat();
136        this.brokerService = brokerService;
137
138        // the configured maxFrameSize on the URI.
139        int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
140        if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
141            this.protonTransport.setMaxFrameSize(maxFrameSize);
142        }
143
144        this.protonTransport.bind(this.protonConnection);
145        this.protonTransport.setChannelMax(CHANNEL_MAX);
146
147        this.protonConnection.collect(eventCollector);
148
149        updateTracer();
150    }
151
152    /**
153     * Load and return a <code>[]Symbol</code> that contains the connection capabilities
154     * offered to new connections
155     *
156     * @return the capabilities that are offered to new clients on connect.
157     */
158    protected Symbol[] getConnectionCapabilitiesOffered() {
159        return new Symbol[]{ ANONYMOUS_RELAY };
160    }
161
162    /**
163     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
164     * that this connection supplies to incoming connections.
165     *
166     * @return the properties that are offered to the incoming connection.
167     */
168    protected Map<Symbol, Object> getConnetionProperties() {
169        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
170
171        properties.put(QUEUE_PREFIX, "queue://");
172        properties.put(TOPIC_PREFIX, "topic://");
173
174        return properties;
175    }
176
177    /**
178     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
179     * that this connection supplies to incoming connections when the open has failed
180     * and the remote should expect a close to follow.
181     *
182     * @return the properties that are offered to the incoming connection.
183     */
184    protected Map<Symbol, Object> getFailedConnetionProperties() {
185        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
186
187        properties.put(CONNECTION_OPEN_FAILED, true);
188
189        return properties;
190    }
191
192    @Override
193    public void updateTracer() {
194        if (amqpTransport.isTrace()) {
195            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
196                @Override
197                public void receivedFrame(TransportFrame transportFrame) {
198                    TRACE_FRAMES.trace("{} | RECV: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
199                }
200
201                @Override
202                public void sentFrame(TransportFrame transportFrame) {
203                    TRACE_FRAMES.trace("{} | SENT: {}", AmqpConnection.this.amqpTransport.getRemoteAddress(), transportFrame.getBody());
204                }
205            });
206        }
207    }
208
209    @Override
210    public long keepAlive() throws IOException {
211        long rescheduleAt = 0l;
212
213        LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress());
214
215        if (protonConnection.getLocalState() != EndpointState.CLOSED) {
216            rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis();
217            pumpProtonToSocket();
218            if (protonTransport.isClosed()) {
219                rescheduleAt = 0;
220                LOG.debug("Transport closed after inactivity check.");
221                throw new InactivityIOException("Channel was inactive for to long");
222            }
223        }
224
225        LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.",
226                  amqpTransport.getRemoteAddress(), rescheduleAt);
227
228        return rescheduleAt;
229    }
230
231    //----- Connection Properties Accessors ----------------------------------//
232
233    /**
234     * @return the amount of credit assigned to AMQP receiver links created from
235     *         sender links on the remote peer.
236     */
237    public int getConfiguredReceiverCredit() {
238        return amqpWireFormat.getProducerCredit();
239    }
240
241    /**
242     * @return the transformer type that was configured for this AMQP transport.
243     */
244    public String getConfiguredTransformer() {
245        return amqpWireFormat.getTransformer();
246    }
247
248    /**
249     * @return the ActiveMQ ConnectionId that identifies this AMQP Connection.
250     */
251    public ConnectionId getConnectionId() {
252        return connectionId;
253    }
254
255    /**
256     * @return the Client ID used to create the connection with ActiveMQ
257     */
258    public String getClientId() {
259        return connectionInfo.getClientId();
260    }
261
262    /**
263     * @return the configured max frame size allowed for incoming messages.
264     */
265    public long getMaxFrameSize() {
266        return amqpWireFormat.getMaxFrameSize();
267    }
268
269    //----- Proton Event handling and IO support -----------------------------//
270
271    void pumpProtonToSocket() {
272        try {
273            boolean done = false;
274            while (!done) {
275                ByteBuffer toWrite = protonTransport.getOutputBuffer();
276                if (toWrite != null && toWrite.hasRemaining()) {
277                    LOG.trace("Sending {} bytes out", toWrite.limit());
278                    amqpTransport.sendToAmqp(toWrite);
279                    protonTransport.outputConsumed();
280                } else {
281                    done = true;
282                }
283            }
284        } catch (IOException e) {
285            amqpTransport.onException(e);
286        }
287    }
288
289    @Override
290    public void onAMQPData(Object command) throws Exception {
291        Buffer frame;
292        if (command.getClass() == AmqpHeader.class) {
293            AmqpHeader header = (AmqpHeader) command;
294
295            if (amqpWireFormat.isHeaderValid(header)) {
296                LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
297            } else {
298                LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
299                AmqpHeader reply = amqpWireFormat.getMinimallySupportedHeader();
300                amqpTransport.sendToAmqp(reply.getBuffer());
301                handleException(new AmqpProtocolException(
302                    "Connection from client using unsupported AMQP attempted", true));
303            }
304
305            switch (header.getProtocolId()) {
306                case 0:
307                    authenticator = null;
308                    break; // nothing to do..
309                case 3: // Client will be using SASL for auth..
310                    authenticator = new AmqpAuthenticator(amqpTransport, protonTransport.sasl(), brokerService);
311                    break;
312                default:
313            }
314            frame = header.getBuffer();
315        } else {
316            frame = (Buffer) command;
317        }
318
319        if (protonTransport.isClosed()) {
320            LOG.debug("Ignoring incoming AMQP data, transport is closed.");
321            return;
322        }
323
324        while (frame.length > 0) {
325            try {
326                int count = protonTransport.input(frame.data, frame.offset, frame.length);
327                frame.moveHead(count);
328            } catch (Throwable e) {
329                handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
330                return;
331            }
332
333            if (authenticator != null) {
334                processSaslExchange();
335            } else {
336                processProtonEvents();
337            }
338        }
339    }
340
341    private void processSaslExchange() throws Exception {
342        authenticator.processSaslExchange(connectionInfo);
343        if (authenticator.isDone()) {
344            amqpTransport.getWireFormat().resetMagicRead();
345        }
346        pumpProtonToSocket();
347    }
348
349    private void processProtonEvents() throws Exception {
350        try {
351            Event event = null;
352            while ((event = eventCollector.peek()) != null) {
353                if (amqpTransport.isTrace()) {
354                    LOG.trace("Processing event: {}", event.getType());
355                }
356                switch (event.getType()) {
357                    case CONNECTION_REMOTE_OPEN:
358                        processConnectionOpen(event.getConnection());
359                        break;
360                    case CONNECTION_REMOTE_CLOSE:
361                        processConnectionClose(event.getConnection());
362                        break;
363                    case SESSION_REMOTE_OPEN:
364                        processSessionOpen(event.getSession());
365                        break;
366                    case SESSION_REMOTE_CLOSE:
367                        processSessionClose(event.getSession());
368                        break;
369                    case LINK_REMOTE_OPEN:
370                        processLinkOpen(event.getLink());
371                        break;
372                    case LINK_REMOTE_DETACH:
373                        processLinkDetach(event.getLink());
374                        break;
375                    case LINK_REMOTE_CLOSE:
376                        processLinkClose(event.getLink());
377                        break;
378                    case LINK_FLOW:
379                        processLinkFlow(event.getLink());
380                        break;
381                    case DELIVERY:
382                        processDelivery(event.getDelivery());
383                        break;
384                    default:
385                        break;
386                }
387
388                eventCollector.pop();
389            }
390
391        } catch (Throwable e) {
392            handleException(new AmqpProtocolException("Could not process AMQP commands", true, e));
393        }
394
395        pumpProtonToSocket();
396    }
397
398    protected void processConnectionOpen(Connection connection) throws Exception {
399
400        stopConnectionTimeoutChecker();
401
402        connectionInfo.setResponseRequired(true);
403        connectionInfo.setConnectionId(connectionId);
404
405        String clientId = protonConnection.getRemoteContainer();
406        if (clientId != null && !clientId.isEmpty()) {
407            connectionInfo.setClientId(clientId);
408        }
409
410        connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
411
412        if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) {
413            // We cannot meet the requested Idle processing because the inactivity monitor is
414            // disabled so we won't send idle frames to match the request.
415            protonConnection.setProperties(getFailedConnetionProperties());
416            protonConnection.open();
417            protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames"));
418            protonConnection.close();
419            pumpProtonToSocket();
420
421            amqpTransport.onException(new IOException(
422                "Connection failed, remote requested idle processing but inactivity monitoring is disbaled."));
423            return;
424        }
425
426        sendToActiveMQ(connectionInfo, new ResponseHandler() {
427            @Override
428            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
429                Throwable exception = null;
430                try {
431                    if (response.isException()) {
432                        protonConnection.setProperties(getFailedConnetionProperties());
433                        protonConnection.open();
434
435                        exception = ((ExceptionResponse) response).getException();
436                        if (exception instanceof SecurityException) {
437                            protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
438                        } else if (exception instanceof InvalidClientIDException) {
439                            ErrorCondition condition = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
440
441                            Map<Symbol, Object> infoMap = new HashMap<Symbol, Object> ();
442                            infoMap.put(INVALID_FIELD, CONTAINER_ID);
443                            condition.setInfo(infoMap);
444
445                            protonConnection.setCondition(condition);
446                        } else {
447                            protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
448                        }
449
450                        protonConnection.close();
451                    } else {
452
453                        if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
454                            LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
455                            protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());
456                        }
457
458                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
459                        protonConnection.setProperties(getConnetionProperties());
460                        protonConnection.open();
461
462                        configureInactivityMonitor();
463                    }
464                } finally {
465                    pumpProtonToSocket();
466
467                    if (response.isException()) {
468                        amqpTransport.onException(IOExceptionSupport.create(exception));
469                    }
470                }
471            }
472        });
473    }
474
475    protected void processConnectionClose(Connection connection) throws Exception {
476        if (!closing) {
477            closing = true;
478            sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
479                @Override
480                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
481                    protonConnection.close();
482                    protonConnection.free();
483
484                    if (!closedSocket) {
485                        pumpProtonToSocket();
486                    }
487                }
488            });
489
490            sendToActiveMQ(new ShutdownInfo());
491        }
492    }
493
494    protected void processSessionOpen(Session protonSession) throws Exception {
495        new AmqpSession(this, getNextSessionId(), protonSession).open();
496    }
497
498    protected void processSessionClose(Session protonSession) throws Exception {
499        if (protonSession.getContext() != null) {
500            ((AmqpResource) protonSession.getContext()).close();
501        } else {
502            protonSession.close();
503            protonSession.free();
504        }
505    }
506
507    protected void processLinkOpen(Link link) throws Exception {
508        link.setSource(link.getRemoteSource());
509        link.setTarget(link.getRemoteTarget());
510
511        AmqpSession session = (AmqpSession) link.getSession().getContext();
512        if (link instanceof Receiver) {
513            if (link.getRemoteTarget() instanceof Coordinator) {
514                session.createCoordinator((Receiver) link);
515            } else {
516                session.createReceiver((Receiver) link);
517            }
518        } else {
519            session.createSender((Sender) link);
520        }
521    }
522
523    protected void processLinkDetach(Link link) throws Exception {
524        Object context = link.getContext();
525
526        if (context instanceof AmqpLink) {
527            ((AmqpLink) context).detach();
528        } else {
529            link.detach();
530            link.free();
531        }
532    }
533
534    protected void processLinkClose(Link link) throws Exception {
535        Object context = link.getContext();
536
537        if (context instanceof AmqpLink) {
538            ((AmqpLink) context).close();;
539        } else {
540            link.close();
541            link.free();
542        }
543    }
544
545    protected void processLinkFlow(Link link) throws Exception {
546        Object context = link.getContext();
547        if (context instanceof AmqpLink) {
548            ((AmqpLink) context).flow();
549        }
550    }
551
552    protected void processDelivery(Delivery delivery) throws Exception {
553        if (!delivery.isPartial()) {
554            Object context = delivery.getLink().getContext();
555            if (context instanceof AmqpLink) {
556                AmqpLink amqpLink = (AmqpLink) context;
557                amqpLink.delivery(delivery);
558            }
559        }
560    }
561
562    //----- Event entry points for ActiveMQ commands and errors --------------//
563
564    @Override
565    public void onAMQPException(IOException error) {
566        closedSocket = true;
567        if (!closing) {
568            try {
569                closing = true;
570                // Attempt to inform the other end that we are going to close
571                // so that the client doesn't wait around forever.
572                protonConnection.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage()));
573                protonConnection.close();
574                pumpProtonToSocket();
575            } catch (Exception ignore) {
576            }
577            amqpTransport.sendToActiveMQ(error);
578        } else {
579            try {
580                amqpTransport.stop();
581            } catch (Exception ignore) {
582            }
583        }
584    }
585
586    @Override
587    public void onActiveMQCommand(Command command) throws Exception {
588        if (command.isResponse()) {
589            Response response = (Response) command;
590            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
591            if (rh != null) {
592                rh.onResponse(this, response);
593            } else {
594                // Pass down any unexpected errors. Should this close the connection?
595                if (response.isException()) {
596                    Throwable exception = ((ExceptionResponse) response).getException();
597                    handleException(exception);
598                }
599            }
600        } else if (command.isMessageDispatch()) {
601            MessageDispatch dispatch = (MessageDispatch) command;
602            AmqpSender sender = subscriptionsByConsumerId.get(dispatch.getConsumerId());
603            if (sender != null) {
604                // End of Queue Browse will have no Message object.
605                if (dispatch.getMessage() != null) {
606                    LOG.trace("Dispatching MessageId: {} to consumer", dispatch.getMessage().getMessageId());
607                } else {
608                    LOG.trace("Dispatching End of Browse Command to consumer {}", dispatch.getConsumerId());
609                }
610                sender.onMessageDispatch(dispatch);
611                if (dispatch.getMessage() != null) {
612                    LOG.trace("Finished Dispatch of MessageId: {} to consumer", dispatch.getMessage().getMessageId());
613                }
614            }
615        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
616            // Pass down any unexpected async errors. Should this close the connection?
617            Throwable exception = ((ConnectionError) command).getException();
618            handleException(exception);
619        } else if (command.isConsumerControl()) {
620            ConsumerControl control = (ConsumerControl) command;
621            AmqpSender sender = subscriptionsByConsumerId.get(control.getConsumerId());
622            if (sender != null) {
623                sender.onConsumerControl(control);
624            }
625        } else if (command.isBrokerInfo()) {
626            // ignore
627        } else {
628            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
629        }
630    }
631
632    //----- Utility methods for connection resources to use ------------------//
633
634    void registerSender(ConsumerId consumerId, AmqpSender sender) {
635        subscriptionsByConsumerId.put(consumerId, sender);
636    }
637
638    void unregisterSender(ConsumerId consumerId) {
639        subscriptionsByConsumerId.remove(consumerId);
640    }
641
642    ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException {
643        ConsumerInfo result = null;
644        RegionBroker regionBroker;
645
646        try {
647            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
648        } catch (Exception e) {
649            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
650        }
651
652        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
653        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
654        if (subscription != null) {
655            result = subscription.getConsumerInfo();
656        }
657
658        return result;
659    }
660
661    ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
662        ActiveMQDestination rc = null;
663        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
664            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
665        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
666            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
667        } else {
668            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
669            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
670        }
671
672        DestinationInfo info = new DestinationInfo();
673        info.setConnectionId(connectionId);
674        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
675        info.setDestination(rc);
676
677        sendToActiveMQ(info, new ResponseHandler() {
678
679            @Override
680            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
681                if (response.isException()) {
682                    link.setSource(null);
683
684                    Throwable exception = ((ExceptionResponse) response).getException();
685                    if (exception instanceof SecurityException) {
686                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
687                    } else {
688                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
689                    }
690
691                    link.close();
692                    link.free();
693                }
694            }
695        });
696
697        return rc;
698    }
699
700    void deleteTemporaryDestination(ActiveMQTempDestination destination) {
701        DestinationInfo info = new DestinationInfo();
702        info.setConnectionId(connectionId);
703        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
704        info.setDestination(destination);
705
706        sendToActiveMQ(info, new ResponseHandler() {
707
708            @Override
709            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
710                if (response.isException()) {
711                    Throwable exception = ((ExceptionResponse) response).getException();
712                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
713                }
714            }
715        });
716    }
717
718    void sendToActiveMQ(Command command) {
719        sendToActiveMQ(command, null);
720    }
721
722    void sendToActiveMQ(Command command, ResponseHandler handler) {
723        command.setCommandId(lastCommandId.incrementAndGet());
724        if (handler != null) {
725            command.setResponseRequired(true);
726            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
727        }
728        amqpTransport.sendToActiveMQ(command);
729    }
730
731    void handleException(Throwable exception) {
732        LOG.debug("Exception detail", exception);
733        if (exception instanceof AmqpProtocolException) {
734            onAMQPException((IOException) exception);
735        } else {
736            try {
737                // Must ensure that the broker removes Connection resources.
738                sendToActiveMQ(new ShutdownInfo());
739                amqpTransport.stop();
740            } catch (Throwable e) {
741                LOG.error("Failed to stop AMQP Transport ", e);
742            }
743        }
744    }
745
746    //----- Internal implementation ------------------------------------------//
747
748    private SessionId getNextSessionId() {
749        return new SessionId(connectionId, nextSessionId++);
750    }
751
752    private void stopConnectionTimeoutChecker() {
753        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
754        if (monitor != null) {
755            monitor.stopConnectionTimeoutChecker();
756        }
757    }
758
759    private void configureInactivityMonitor() {
760        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
761        if (monitor == null) {
762            return;
763        }
764
765        // If either end has idle timeout requirements then the tick method
766        // will give us a deadline on the next time we need to tick() in order
767        // to meet those obligations.
768        long nextIdleCheck = protonTransport.tick(System.currentTimeMillis());
769        if (nextIdleCheck > 0) {
770            LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck));
771            monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis());
772        } else {
773            LOG.trace("Connection does not require keep-alive processing");
774        }
775    }
776}