001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
020import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
021import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
022import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
023import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
024import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination;
025import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
026
027import java.io.IOException;
028import java.util.HashMap;
029import java.util.Map;
030
031import javax.jms.InvalidSelectorException;
032
033import org.apache.activemq.command.ActiveMQDestination;
034import org.apache.activemq.command.ActiveMQTempDestination;
035import org.apache.activemq.command.ConsumerId;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.ProducerId;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.RemoveInfo;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.command.SessionId;
043import org.apache.activemq.command.SessionInfo;
044import org.apache.activemq.selector.SelectorParser;
045import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
046import org.apache.activemq.transport.amqp.AmqpProtocolException;
047import org.apache.activemq.transport.amqp.ResponseHandler;
048import org.apache.qpid.proton.amqp.DescribedType;
049import org.apache.qpid.proton.amqp.Symbol;
050import org.apache.qpid.proton.amqp.messaging.Target;
051import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
052import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
053import org.apache.qpid.proton.amqp.transport.AmqpError;
054import org.apache.qpid.proton.amqp.transport.ErrorCondition;
055import org.apache.qpid.proton.engine.Receiver;
056import org.apache.qpid.proton.engine.Sender;
057import org.apache.qpid.proton.engine.Session;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Wraps the AMQP Session and provides the services needed to manage the remote
063 * peer requests for link establishment.
064 */
065public class AmqpSession implements AmqpResource {
066
067    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
068
069    private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
070
071    private final AmqpConnection connection;
072    private final Session protonSession;
073    private final SessionId sessionId;
074
075    private long nextProducerId = 0;
076    private long nextConsumerId = 0;
077
078    /**
079     * Create new AmqpSession instance whose parent is the given AmqpConnection.
080     *
081     * @param connection
082     *        the parent connection for this session.
083     * @param sessionId
084     *        the ActiveMQ SessionId that is used to identify this session.
085     * @param session
086     *        the AMQP Session that this class manages.
087     */
088    public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
089        this.connection = connection;
090        this.sessionId = sessionId;
091        this.protonSession = session;
092    }
093
094    @Override
095    public void open() {
096        LOG.debug("Session {} opened", getSessionId());
097
098        getEndpoint().setContext(this);
099        getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
100        getEndpoint().open();
101
102        connection.sendToActiveMQ(new SessionInfo(getSessionId()));
103    }
104
105    @Override
106    public void close() {
107        LOG.debug("Session {} closed", getSessionId());
108
109        getEndpoint().setContext(null);
110        getEndpoint().close();
111        getEndpoint().free();
112
113        connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
114    }
115
116    /**
117     * Commits all pending work for all resources managed under this session.
118     *
119     * @throws Exception if an error occurs while attempting to commit work.
120     */
121    public void commit() throws Exception {
122        for (AmqpSender consumer : consumers.values()) {
123            consumer.commit();
124        }
125    }
126
127    /**
128     * Rolls back any pending work being down under this session.
129     *
130     * @throws Exception if an error occurs while attempting to roll back work.
131     */
132    public void rollback() throws Exception {
133        for (AmqpSender consumer : consumers.values()) {
134            consumer.rollback();
135        }
136    }
137
138    /**
139     * Used to direct all Session managed Senders to push any queued Messages
140     * out to the remote peer.
141     *
142     * @throws Exception if an error occurs while flushing the messages.
143     */
144    public void flushPendingMessages() throws Exception {
145        for (AmqpSender consumer : consumers.values()) {
146            consumer.pumpOutbound();
147        }
148    }
149
150    public void createCoordinator(final Receiver protonReceiver) throws Exception {
151        AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
152        txCoordinator.flow(connection.getConfiguredReceiverCredit());
153        txCoordinator.open();
154    }
155
156    public void createReceiver(final Receiver protonReceiver) throws Exception {
157        org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
158
159        ProducerInfo producerInfo = new ProducerInfo(getNextProducerId());
160        final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
161
162        LOG.debug("opening new receiver {} on link: {}", producerInfo.getProducerId(), protonReceiver.getName());
163
164        try {
165            Target target = (Target) remoteTarget;
166            ActiveMQDestination destination = null;
167            String targetNodeName = target.getAddress();
168
169            if (target.getDynamic()) {
170                destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities());
171                Target actualTarget = new Target();
172                actualTarget.setAddress(destination.getQualifiedName());
173                actualTarget.setDynamic(true);
174                protonReceiver.setTarget(actualTarget);
175                receiver.addCloseAction(new Runnable() {
176
177                    @Override
178                    public void run() {
179                        connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination());
180                    }
181                });
182            } else if (targetNodeName != null && !targetNodeName.isEmpty()) {
183                destination = createDestination(remoteTarget);
184                if (destination.isTemporary()) {
185                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
186                    if (connectionId == null) {
187                        throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination");
188                    }
189                }
190            }
191
192            receiver.setDestination(destination);
193            connection.sendToActiveMQ(producerInfo, new ResponseHandler() {
194                @Override
195                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
196                    if (response.isException()) {
197                        ErrorCondition error = null;
198                        Throwable exception = ((ExceptionResponse) response).getException();
199                        if (exception instanceof SecurityException) {
200                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
201                        } else {
202                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
203                        }
204
205                        receiver.close(error);
206                    } else {
207                        receiver.flow(connection.getConfiguredReceiverCredit());
208                        receiver.open();
209                    }
210                    pumpProtonToSocket();
211                }
212            });
213
214        } catch (AmqpProtocolException exception) {
215            receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage()));
216        }
217    }
218
219    @SuppressWarnings("unchecked")
220    public void createSender(final Sender protonSender) throws Exception {
221        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();
222
223        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
224        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
225
226        LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName());
227
228        try {
229            final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>();
230            protonSender.setContext(sender);
231
232            boolean noLocal = false;
233            String selector = null;
234
235            if (source != null) {
236                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
237                if (filter != null) {
238                    selector = filter.getValue().getDescribed().toString();
239                    // Validate the Selector.
240                    try {
241                        SelectorParser.parse(selector);
242                    } catch (InvalidSelectorException e) {
243                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
244                        return;
245                    }
246
247                    supportedFilters.put(filter.getKey(), filter.getValue());
248                }
249
250                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
251                if (filter != null) {
252                    noLocal = true;
253                    supportedFilters.put(filter.getKey(), filter.getValue());
254                }
255            }
256
257            ActiveMQDestination destination;
258            if (source == null) {
259                // Attempt to recover previous subscription
260                ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName());
261
262                if (storedInfo != null) {
263                    destination = storedInfo.getDestination();
264
265                    source = new org.apache.qpid.proton.amqp.messaging.Source();
266                    source.setAddress(destination.getQualifiedName());
267                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
268                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
269                    source.setDistributionMode(COPY);
270
271                    if (storedInfo.isNoLocal()) {
272                        supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
273                    }
274
275                    if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) {
276                        supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector()));
277                    }
278                } else {
279                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
280                    return;
281                }
282            } else if (source.getDynamic()) {
283                // lets create a temp dest.
284                destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
285                source = new org.apache.qpid.proton.amqp.messaging.Source();
286                source.setAddress(destination.getQualifiedName());
287                source.setDynamic(true);
288                sender.addCloseAction(new Runnable() {
289
290                    @Override
291                    public void run() {
292                        connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
293                    }
294                });
295            } else {
296                destination = createDestination(source);
297                if (destination.isTemporary()) {
298                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
299                    if (connectionId == null) {
300                        throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination");
301                    }
302                }
303            }
304
305            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
306            protonSender.setSource(source);
307
308            int senderCredit = protonSender.getRemoteCredit();
309
310            consumerInfo.setSelector(selector);
311            consumerInfo.setNoRangeAcks(true);
312            consumerInfo.setDestination(destination);
313            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
314            consumerInfo.setDispatchAsync(true);
315            consumerInfo.setNoLocal(noLocal);
316
317            if (source.getDistributionMode() == COPY && destination.isQueue()) {
318                consumerInfo.setBrowser(true);
319            }
320
321            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
322                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
323                consumerInfo.setSubscriptionName(protonSender.getName());
324            }
325
326            connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
327                @Override
328                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
329                    if (response.isException()) {
330                        ErrorCondition error = null;
331                        Throwable exception = ((ExceptionResponse) response).getException();
332                        if (exception instanceof SecurityException) {
333                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
334                        } else if (exception instanceof InvalidSelectorException) {
335                            error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
336                        } else {
337                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
338                        }
339
340                        sender.close(error);
341                    } else {
342                        sender.open();
343                    }
344                    pumpProtonToSocket();
345                }
346            });
347
348        } catch (AmqpProtocolException e) {
349            sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
350        }
351    }
352
353    /**
354     * Send all pending work out to the remote peer.
355     */
356    public void pumpProtonToSocket() {
357        connection.pumpProtonToSocket();
358    }
359
360    public void registerSender(ConsumerId consumerId, AmqpSender sender) {
361        consumers.put(consumerId, sender);
362        connection.registerSender(consumerId, sender);
363    }
364
365    public void unregisterSender(ConsumerId consumerId) {
366        consumers.remove(consumerId);
367        connection.unregisterSender(consumerId);
368    }
369
370    //----- Configuration accessors ------------------------------------------//
371
372    public AmqpConnection getConnection() {
373        return connection;
374    }
375
376    public SessionId getSessionId() {
377        return sessionId;
378    }
379
380    public Session getEndpoint() {
381        return protonSession;
382    }
383
384    public long getMaxFrameSize() {
385        return connection.getMaxFrameSize();
386    }
387
388    //----- Internal Implementation ------------------------------------------//
389
390    private ConsumerId getNextConsumerId() {
391        return new ConsumerId(sessionId, nextConsumerId++);
392    }
393
394    private ProducerId getNextProducerId() {
395        return new ProducerId(sessionId, nextProducerId++);
396    }
397}