001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; 020import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; 021import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; 022import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; 023import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; 024import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination; 025import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; 026 027import java.io.IOException; 028import java.util.HashMap; 029import java.util.Map; 030 031import javax.jms.InvalidSelectorException; 032 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.ActiveMQTempDestination; 035import org.apache.activemq.command.ConsumerId; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.ExceptionResponse; 038import org.apache.activemq.command.ProducerId; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.RemoveInfo; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.command.SessionId; 043import org.apache.activemq.command.SessionInfo; 044import org.apache.activemq.selector.SelectorParser; 045import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 046import org.apache.activemq.transport.amqp.AmqpProtocolException; 047import org.apache.activemq.transport.amqp.ResponseHandler; 048import org.apache.qpid.proton.amqp.DescribedType; 049import org.apache.qpid.proton.amqp.Symbol; 050import org.apache.qpid.proton.amqp.messaging.Target; 051import org.apache.qpid.proton.amqp.messaging.TerminusDurability; 052import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; 053import org.apache.qpid.proton.amqp.transport.AmqpError; 054import org.apache.qpid.proton.amqp.transport.ErrorCondition; 055import org.apache.qpid.proton.engine.Receiver; 056import org.apache.qpid.proton.engine.Sender; 057import org.apache.qpid.proton.engine.Session; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Wraps the AMQP Session and provides the services needed to manage the remote 063 * peer requests for link establishment. 064 */ 065public class AmqpSession implements AmqpResource { 066 067 private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class); 068 069 private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>(); 070 071 private final AmqpConnection connection; 072 private final Session protonSession; 073 private final SessionId sessionId; 074 075 private long nextProducerId = 0; 076 private long nextConsumerId = 0; 077 078 /** 079 * Create new AmqpSession instance whose parent is the given AmqpConnection. 080 * 081 * @param connection 082 * the parent connection for this session. 083 * @param sessionId 084 * the ActiveMQ SessionId that is used to identify this session. 085 * @param session 086 * the AMQP Session that this class manages. 087 */ 088 public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) { 089 this.connection = connection; 090 this.sessionId = sessionId; 091 this.protonSession = session; 092 } 093 094 @Override 095 public void open() { 096 LOG.debug("Session {} opened", getSessionId()); 097 098 getEndpoint().setContext(this); 099 getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); 100 getEndpoint().open(); 101 102 connection.sendToActiveMQ(new SessionInfo(getSessionId())); 103 } 104 105 @Override 106 public void close() { 107 LOG.debug("Session {} closed", getSessionId()); 108 109 getEndpoint().setContext(null); 110 getEndpoint().close(); 111 getEndpoint().free(); 112 113 connection.sendToActiveMQ(new RemoveInfo(getSessionId())); 114 } 115 116 /** 117 * Commits all pending work for all resources managed under this session. 118 * 119 * @throws Exception if an error occurs while attempting to commit work. 120 */ 121 public void commit() throws Exception { 122 for (AmqpSender consumer : consumers.values()) { 123 consumer.commit(); 124 } 125 } 126 127 /** 128 * Rolls back any pending work being down under this session. 129 * 130 * @throws Exception if an error occurs while attempting to roll back work. 131 */ 132 public void rollback() throws Exception { 133 for (AmqpSender consumer : consumers.values()) { 134 consumer.rollback(); 135 } 136 } 137 138 /** 139 * Used to direct all Session managed Senders to push any queued Messages 140 * out to the remote peer. 141 * 142 * @throws Exception if an error occurs while flushing the messages. 143 */ 144 public void flushPendingMessages() throws Exception { 145 for (AmqpSender consumer : consumers.values()) { 146 consumer.pumpOutbound(); 147 } 148 } 149 150 public void createCoordinator(final Receiver protonReceiver) throws Exception { 151 AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver); 152 txCoordinator.flow(connection.getConfiguredReceiverCredit()); 153 txCoordinator.open(); 154 } 155 156 public void createReceiver(final Receiver protonReceiver) throws Exception { 157 org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget(); 158 159 ProducerInfo producerInfo = new ProducerInfo(getNextProducerId()); 160 final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo); 161 162 LOG.debug("opening new receiver {} on link: {}", producerInfo.getProducerId(), protonReceiver.getName()); 163 164 try { 165 Target target = (Target) remoteTarget; 166 ActiveMQDestination destination = null; 167 String targetNodeName = target.getAddress(); 168 169 if (target.getDynamic()) { 170 destination = connection.createTemporaryDestination(protonReceiver, target.getCapabilities()); 171 Target actualTarget = new Target(); 172 actualTarget.setAddress(destination.getQualifiedName()); 173 actualTarget.setDynamic(true); 174 protonReceiver.setTarget(actualTarget); 175 receiver.addCloseAction(new Runnable() { 176 177 @Override 178 public void run() { 179 connection.deleteTemporaryDestination((ActiveMQTempDestination) receiver.getDestination()); 180 } 181 }); 182 } else if (targetNodeName != null && !targetNodeName.isEmpty()) { 183 destination = createDestination(remoteTarget); 184 if (destination.isTemporary()) { 185 String connectionId = ((ActiveMQTempDestination) destination).getConnectionId(); 186 if (connectionId == null) { 187 throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination"); 188 } 189 } 190 } 191 192 receiver.setDestination(destination); 193 connection.sendToActiveMQ(producerInfo, new ResponseHandler() { 194 @Override 195 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 196 if (response.isException()) { 197 ErrorCondition error = null; 198 Throwable exception = ((ExceptionResponse) response).getException(); 199 if (exception instanceof SecurityException) { 200 error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()); 201 } else { 202 error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()); 203 } 204 205 receiver.close(error); 206 } else { 207 receiver.flow(connection.getConfiguredReceiverCredit()); 208 receiver.open(); 209 } 210 pumpProtonToSocket(); 211 } 212 }); 213 214 } catch (AmqpProtocolException exception) { 215 receiver.close(new ErrorCondition(Symbol.getSymbol(exception.getSymbolicName()), exception.getMessage())); 216 } 217 } 218 219 @SuppressWarnings("unchecked") 220 public void createSender(final Sender protonSender) throws Exception { 221 org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource(); 222 223 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 224 final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo); 225 226 LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName()); 227 228 try { 229 final Map<Symbol, Object> supportedFilters = new HashMap<Symbol, Object>(); 230 protonSender.setContext(sender); 231 232 boolean noLocal = false; 233 String selector = null; 234 235 if (source != null) { 236 Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS); 237 if (filter != null) { 238 selector = filter.getValue().getDescribed().toString(); 239 // Validate the Selector. 240 try { 241 SelectorParser.parse(selector); 242 } catch (InvalidSelectorException e) { 243 sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); 244 return; 245 } 246 247 supportedFilters.put(filter.getKey(), filter.getValue()); 248 } 249 250 filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS); 251 if (filter != null) { 252 noLocal = true; 253 supportedFilters.put(filter.getKey(), filter.getValue()); 254 } 255 } 256 257 ActiveMQDestination destination; 258 if (source == null) { 259 // Attempt to recover previous subscription 260 ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName()); 261 262 if (storedInfo != null) { 263 destination = storedInfo.getDestination(); 264 265 source = new org.apache.qpid.proton.amqp.messaging.Source(); 266 source.setAddress(destination.getQualifiedName()); 267 source.setDurable(TerminusDurability.UNSETTLED_STATE); 268 source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); 269 source.setDistributionMode(COPY); 270 271 if (storedInfo.isNoLocal()) { 272 supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); 273 } 274 275 if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) { 276 supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector())); 277 } 278 } else { 279 sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName())); 280 return; 281 } 282 } else if (source.getDynamic()) { 283 // lets create a temp dest. 284 destination = connection.createTemporaryDestination(protonSender, source.getCapabilities()); 285 source = new org.apache.qpid.proton.amqp.messaging.Source(); 286 source.setAddress(destination.getQualifiedName()); 287 source.setDynamic(true); 288 sender.addCloseAction(new Runnable() { 289 290 @Override 291 public void run() { 292 connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination()); 293 } 294 }); 295 } else { 296 destination = createDestination(source); 297 if (destination.isTemporary()) { 298 String connectionId = ((ActiveMQTempDestination) destination).getConnectionId(); 299 if (connectionId == null) { 300 throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination"); 301 } 302 } 303 } 304 305 source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); 306 protonSender.setSource(source); 307 308 int senderCredit = protonSender.getRemoteCredit(); 309 310 consumerInfo.setSelector(selector); 311 consumerInfo.setNoRangeAcks(true); 312 consumerInfo.setDestination(destination); 313 consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); 314 consumerInfo.setDispatchAsync(true); 315 consumerInfo.setNoLocal(noLocal); 316 317 if (source.getDistributionMode() == COPY && destination.isQueue()) { 318 consumerInfo.setBrowser(true); 319 } 320 321 if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || 322 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) { 323 consumerInfo.setSubscriptionName(protonSender.getName()); 324 } 325 326 connection.sendToActiveMQ(consumerInfo, new ResponseHandler() { 327 @Override 328 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 329 if (response.isException()) { 330 ErrorCondition error = null; 331 Throwable exception = ((ExceptionResponse) response).getException(); 332 if (exception instanceof SecurityException) { 333 error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()); 334 } else if (exception instanceof InvalidSelectorException) { 335 error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()); 336 } else { 337 error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()); 338 } 339 340 sender.close(error); 341 } else { 342 sender.open(); 343 } 344 pumpProtonToSocket(); 345 } 346 }); 347 348 } catch (AmqpProtocolException e) { 349 sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage())); 350 } 351 } 352 353 /** 354 * Send all pending work out to the remote peer. 355 */ 356 public void pumpProtonToSocket() { 357 connection.pumpProtonToSocket(); 358 } 359 360 public void registerSender(ConsumerId consumerId, AmqpSender sender) { 361 consumers.put(consumerId, sender); 362 connection.registerSender(consumerId, sender); 363 } 364 365 public void unregisterSender(ConsumerId consumerId) { 366 consumers.remove(consumerId); 367 connection.unregisterSender(consumerId); 368 } 369 370 //----- Configuration accessors ------------------------------------------// 371 372 public AmqpConnection getConnection() { 373 return connection; 374 } 375 376 public SessionId getSessionId() { 377 return sessionId; 378 } 379 380 public Session getEndpoint() { 381 return protonSession; 382 } 383 384 public long getMaxFrameSize() { 385 return connection.getMaxFrameSize(); 386 } 387 388 //----- Internal Implementation ------------------------------------------// 389 390 private ConsumerId getNextConsumerId() { 391 return new ConsumerId(sessionId, nextConsumerId++); 392 } 393 394 private ProducerId getNextProducerId() { 395 return new ProducerId(sessionId, nextProducerId++); 396 } 397}