001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.InputStreamReader; 023import java.io.OutputStreamWriter; 024import java.io.PrintWriter; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import java.util.concurrent.atomic.AtomicBoolean; 031 032import javax.jms.JMSException; 033 034import org.apache.activemq.ActiveMQPrefetchPolicy; 035import org.apache.activemq.advisory.AdvisorySupport; 036import org.apache.activemq.broker.BrokerContext; 037import org.apache.activemq.broker.BrokerContextAware; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTempQueue; 041import org.apache.activemq.command.ActiveMQTempTopic; 042import org.apache.activemq.command.Command; 043import org.apache.activemq.command.CommandTypes; 044import org.apache.activemq.command.ConnectionError; 045import org.apache.activemq.command.ConnectionId; 046import org.apache.activemq.command.ConnectionInfo; 047import org.apache.activemq.command.ConsumerControl; 048import org.apache.activemq.command.ConsumerId; 049import org.apache.activemq.command.ConsumerInfo; 050import org.apache.activemq.command.DestinationInfo; 051import org.apache.activemq.command.ExceptionResponse; 052import org.apache.activemq.command.LocalTransactionId; 053import org.apache.activemq.command.MessageAck; 054import org.apache.activemq.command.MessageDispatch; 055import org.apache.activemq.command.MessageId; 056import org.apache.activemq.command.ProducerId; 057import org.apache.activemq.command.ProducerInfo; 058import org.apache.activemq.command.RemoveSubscriptionInfo; 059import org.apache.activemq.command.Response; 060import org.apache.activemq.command.SessionId; 061import org.apache.activemq.command.SessionInfo; 062import org.apache.activemq.command.ShutdownInfo; 063import org.apache.activemq.command.TransactionId; 064import org.apache.activemq.command.TransactionInfo; 065import org.apache.activemq.util.ByteArrayOutputStream; 066import org.apache.activemq.util.FactoryFinder; 067import org.apache.activemq.util.IOExceptionSupport; 068import org.apache.activemq.util.IdGenerator; 069import org.apache.activemq.util.IntrospectionSupport; 070import org.apache.activemq.util.LongSequenceGenerator; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074/** 075 * @author <a href="http://hiramchirino.com">chirino</a> 076 */ 077public class ProtocolConverter { 078 079 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class); 080 081 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 082 083 private static final String BROKER_VERSION; 084 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE); 085 086 static { 087 InputStream in = null; 088 String version = "5.6.0"; 089 if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 090 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 091 try { 092 version = reader.readLine(); 093 } catch(Exception e) { 094 } 095 } 096 BROKER_VERSION = version; 097 } 098 099 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 100 private final SessionId sessionId = new SessionId(connectionId, -1); 101 private final ProducerId producerId = new ProducerId(sessionId, 1); 102 103 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 104 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 105 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); 106 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); 107 108 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 109 private final ConcurrentMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>(); 110 private final ConcurrentMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>(); 111 private final ConcurrentMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); 112 private final ConcurrentMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); 113 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); 114 private final StompTransport stompTransport; 115 116 private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>(); 117 private final IdGenerator ACK_ID_GENERATOR = new IdGenerator(); 118 119 private final Object commnadIdMutex = new Object(); 120 private int lastCommandId; 121 private final AtomicBoolean connected = new AtomicBoolean(false); 122 private final FrameTranslator frameTranslator = new LegacyFrameTranslator(); 123 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); 124 private final BrokerContext brokerContext; 125 private String version = "1.0"; 126 private long hbReadInterval; 127 private long hbWriteInterval; 128 private float hbGracePeriodMultiplier = 1.0f; 129 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT; 130 131 private static class AckEntry { 132 133 private final String messageId; 134 private final StompSubscription subscription; 135 136 public AckEntry(String messageId, StompSubscription subscription) { 137 this.messageId = messageId; 138 this.subscription = subscription; 139 } 140 141 public MessageAck onMessageAck(TransactionId transactionId) { 142 return subscription.onStompMessageAck(messageId, transactionId); 143 } 144 145 public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException { 146 return subscription.onStompMessageNack(messageId, transactionId); 147 } 148 149 public String getMessageId() { 150 return this.messageId; 151 } 152 153 @SuppressWarnings("unused") 154 public StompSubscription getSubscription() { 155 return this.subscription; 156 } 157 } 158 159 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) { 160 this.stompTransport = stompTransport; 161 this.brokerContext = brokerContext; 162 } 163 164 protected int generateCommandId() { 165 synchronized (commnadIdMutex) { 166 return lastCommandId++; 167 } 168 } 169 170 protected ResponseHandler createResponseHandler(final StompFrame command) { 171 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 172 if (receiptId != null) { 173 return new ResponseHandler() { 174 @Override 175 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 176 if (response.isException()) { 177 // Generally a command can fail.. but that does not invalidate the connection. 178 // We report back the failure but we don't close the connection. 179 Throwable exception = ((ExceptionResponse)response).getException(); 180 handleException(exception, command); 181 } else { 182 StompFrame sc = new StompFrame(); 183 sc.setAction(Stomp.Responses.RECEIPT); 184 sc.setHeaders(new HashMap<String, String>(1)); 185 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 186 stompTransport.sendToStomp(sc); 187 } 188 } 189 }; 190 } 191 return null; 192 } 193 194 protected void sendToActiveMQ(Command command, ResponseHandler handler) { 195 command.setCommandId(generateCommandId()); 196 if (handler != null) { 197 command.setResponseRequired(true); 198 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); 199 } 200 stompTransport.sendToActiveMQ(command); 201 } 202 203 protected void sendToStomp(StompFrame command) throws IOException { 204 stompTransport.sendToStomp(command); 205 } 206 207 protected FrameTranslator findTranslator(String header) { 208 return findTranslator(header, null, false); 209 } 210 211 protected FrameTranslator findTranslator(String header, ActiveMQDestination destination, boolean advisory) { 212 FrameTranslator translator = frameTranslator; 213 try { 214 if (header != null) { 215 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header); 216 } else { 217 if (destination != null && (advisory || AdvisorySupport.isAdvisoryTopic(destination))) { 218 translator = new JmsFrameTranslator(); 219 } 220 } 221 } catch (Exception ignore) { 222 // if anything goes wrong use the default translator 223 } 224 225 if (translator instanceof BrokerContextAware) { 226 ((BrokerContextAware)translator).setBrokerContext(brokerContext); 227 } 228 229 return translator; 230 } 231 232 /** 233 * Convert a STOMP command 234 * 235 * @param command 236 */ 237 public void onStompCommand(StompFrame command) throws IOException, JMSException { 238 try { 239 240 if (command.getClass() == StompFrameError.class) { 241 throw ((StompFrameError)command).getException(); 242 } 243 244 String action = command.getAction(); 245 if (action.startsWith(Stomp.Commands.SEND)) { 246 onStompSend(command); 247 } else if (action.startsWith(Stomp.Commands.ACK)) { 248 onStompAck(command); 249 } else if (action.startsWith(Stomp.Commands.NACK)) { 250 onStompNack(command); 251 } else if (action.startsWith(Stomp.Commands.BEGIN)) { 252 onStompBegin(command); 253 } else if (action.startsWith(Stomp.Commands.COMMIT)) { 254 onStompCommit(command); 255 } else if (action.startsWith(Stomp.Commands.ABORT)) { 256 onStompAbort(command); 257 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) { 258 onStompSubscribe(command); 259 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) { 260 onStompUnsubscribe(command); 261 } else if (action.startsWith(Stomp.Commands.CONNECT) || 262 action.startsWith(Stomp.Commands.STOMP)) { 263 onStompConnect(command); 264 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) { 265 onStompDisconnect(command); 266 } else { 267 throw new ProtocolException("Unknown STOMP action: " + action); 268 } 269 270 } catch (ProtocolException e) { 271 handleException(e, command); 272 // Some protocol errors can cause the connection to get closed. 273 if (e.isFatal()) { 274 getStompTransport().onException(e); 275 } 276 } 277 } 278 279 protected void handleException(Throwable exception, StompFrame command) throws IOException { 280 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 281 if (LOG.isDebugEnabled()) { 282 LOG.debug("Exception detail", exception); 283 } 284 285 // Let the stomp client know about any protocol errors. 286 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 287 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); 288 exception.printStackTrace(stream); 289 stream.close(); 290 291 HashMap<String, String> headers = new HashMap<String, String>(); 292 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); 293 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain"); 294 295 if (command != null) { 296 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 297 if (receiptId != null) { 298 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 299 } 300 } 301 302 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); 303 sendToStomp(errorMessage); 304 } 305 306 protected void onStompSend(StompFrame command) throws IOException, JMSException { 307 checkConnected(); 308 309 Map<String, String> headers = command.getHeaders(); 310 String destination = headers.get(Stomp.Headers.Send.DESTINATION); 311 if (destination == null) { 312 throw new ProtocolException("SEND received without a Destination specified!"); 313 } 314 315 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 316 headers.remove("transaction"); 317 318 ActiveMQMessage message = convertMessage(command); 319 320 message.setProducerId(producerId); 321 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); 322 message.setMessageId(id); 323 324 if (stompTx != null) { 325 TransactionId activemqTx = transactions.get(stompTx); 326 if (activemqTx == null) { 327 throw new ProtocolException("Invalid transaction id: " + stompTx); 328 } 329 message.setTransactionId(activemqTx); 330 } 331 332 message.onSend(); 333 sendToActiveMQ(message, createResponseHandler(command)); 334 } 335 336 protected void onStompNack(StompFrame command) throws ProtocolException { 337 338 checkConnected(); 339 340 if (this.version.equals(Stomp.V1_0)) { 341 throw new ProtocolException("NACK received but connection is in v1.0 mode."); 342 } 343 344 Map<String, String> headers = command.getHeaders(); 345 346 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 347 if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) { 348 throw new ProtocolException("NACK received without a subscription id for acknowledge!"); 349 } 350 351 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 352 if (messageId == null && !this.version.equals(Stomp.V1_2)) { 353 throw new ProtocolException("NACK received without a message-id to acknowledge!"); 354 } 355 356 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 357 if (ackId == null && this.version.equals(Stomp.V1_2)) { 358 throw new ProtocolException("NACK received without an ack header to acknowledge!"); 359 } 360 361 TransactionId activemqTx = null; 362 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 363 if (stompTx != null) { 364 activemqTx = transactions.get(stompTx); 365 if (activemqTx == null) { 366 throw new ProtocolException("Invalid transaction id: " + stompTx); 367 } 368 } 369 370 boolean nacked = false; 371 372 if (ackId != null) { 373 AckEntry pendingAck = this.pedingAcks.remove(ackId); 374 if (pendingAck != null) { 375 messageId = pendingAck.getMessageId(); 376 MessageAck ack = pendingAck.onMessageNack(activemqTx); 377 if (ack != null) { 378 sendToActiveMQ(ack, createResponseHandler(command)); 379 nacked = true; 380 } 381 } 382 } else if (subscriptionId != null) { 383 StompSubscription sub = this.subscriptions.get(subscriptionId); 384 if (sub != null) { 385 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); 386 if (ack != null) { 387 sendToActiveMQ(ack, createResponseHandler(command)); 388 nacked = true; 389 } 390 } 391 } 392 393 if (!nacked) { 394 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); 395 } 396 } 397 398 protected void onStompAck(StompFrame command) throws ProtocolException { 399 checkConnected(); 400 401 Map<String, String> headers = command.getHeaders(); 402 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID); 403 if (messageId == null && !(this.version.equals(Stomp.V1_2))) { 404 throw new ProtocolException("ACK received without a message-id to acknowledge!"); 405 } 406 407 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION); 408 if (subscriptionId == null && this.version.equals(Stomp.V1_1)) { 409 throw new ProtocolException("ACK received without a subscription id for acknowledge!"); 410 } 411 412 String ackId = headers.get(Stomp.Headers.Ack.ACK_ID); 413 if (ackId == null && this.version.equals(Stomp.V1_2)) { 414 throw new ProtocolException("ACK received without a ack id for acknowledge!"); 415 } 416 417 TransactionId activemqTx = null; 418 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 419 if (stompTx != null) { 420 activemqTx = transactions.get(stompTx); 421 if (activemqTx == null) { 422 throw new ProtocolException("Invalid transaction id: " + stompTx); 423 } 424 } 425 426 boolean acked = false; 427 428 if (ackId != null) { 429 AckEntry pendingAck = this.pedingAcks.remove(ackId); 430 if (pendingAck != null) { 431 messageId = pendingAck.getMessageId(); 432 MessageAck ack = pendingAck.onMessageAck(activemqTx); 433 if (ack != null) { 434 sendToActiveMQ(ack, createResponseHandler(command)); 435 acked = true; 436 } 437 } 438 439 } else if (subscriptionId != null) { 440 StompSubscription sub = this.subscriptions.get(subscriptionId); 441 if (sub != null) { 442 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 443 if (ack != null) { 444 sendToActiveMQ(ack, createResponseHandler(command)); 445 acked = true; 446 } 447 } 448 } else { 449 // STOMP v1.0: acking with just a message id is very bogus since the same message id 450 // could have been sent to 2 different subscriptions on the same Stomp connection. 451 // For example, when 2 subs are created on the same topic. 452 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 453 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); 454 if (ack != null) { 455 sendToActiveMQ(ack, createResponseHandler(command)); 456 acked = true; 457 break; 458 } 459 } 460 } 461 462 if (!acked) { 463 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]"); 464 } 465 } 466 467 protected void onStompBegin(StompFrame command) throws ProtocolException { 468 checkConnected(); 469 470 Map<String, String> headers = command.getHeaders(); 471 472 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 473 474 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) { 475 throw new ProtocolException("Must specify the transaction you are beginning"); 476 } 477 478 if (transactions.get(stompTx) != null) { 479 throw new ProtocolException("The transaction was already started: " + stompTx); 480 } 481 482 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId()); 483 transactions.put(stompTx, activemqTx); 484 485 TransactionInfo tx = new TransactionInfo(); 486 tx.setConnectionId(connectionId); 487 tx.setTransactionId(activemqTx); 488 tx.setType(TransactionInfo.BEGIN); 489 490 sendToActiveMQ(tx, createResponseHandler(command)); 491 } 492 493 protected void onStompCommit(StompFrame command) throws ProtocolException { 494 checkConnected(); 495 496 Map<String, String> headers = command.getHeaders(); 497 498 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 499 if (stompTx == null) { 500 throw new ProtocolException("Must specify the transaction you are committing"); 501 } 502 503 TransactionId activemqTx = transactions.remove(stompTx); 504 if (activemqTx == null) { 505 throw new ProtocolException("Invalid transaction id: " + stompTx); 506 } 507 508 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 509 sub.onStompCommit(activemqTx); 510 } 511 512 pedingAcks.clear(); 513 514 TransactionInfo tx = new TransactionInfo(); 515 tx.setConnectionId(connectionId); 516 tx.setTransactionId(activemqTx); 517 tx.setType(TransactionInfo.COMMIT_ONE_PHASE); 518 519 sendToActiveMQ(tx, createResponseHandler(command)); 520 } 521 522 protected void onStompAbort(StompFrame command) throws ProtocolException { 523 checkConnected(); 524 Map<String, String> headers = command.getHeaders(); 525 526 String stompTx = headers.get(Stomp.Headers.TRANSACTION); 527 if (stompTx == null) { 528 throw new ProtocolException("Must specify the transaction you are committing"); 529 } 530 531 TransactionId activemqTx = transactions.remove(stompTx); 532 if (activemqTx == null) { 533 throw new ProtocolException("Invalid transaction id: " + stompTx); 534 } 535 for (StompSubscription sub : subscriptionsByConsumerId.values()) { 536 try { 537 sub.onStompAbort(activemqTx); 538 } catch (Exception e) { 539 throw new ProtocolException("Transaction abort failed", false, e); 540 } 541 } 542 543 pedingAcks.clear(); 544 545 TransactionInfo tx = new TransactionInfo(); 546 tx.setConnectionId(connectionId); 547 tx.setTransactionId(activemqTx); 548 tx.setType(TransactionInfo.ROLLBACK); 549 550 sendToActiveMQ(tx, createResponseHandler(command)); 551 } 552 553 protected void onStompSubscribe(StompFrame command) throws ProtocolException { 554 checkConnected(); 555 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)); 556 Map<String, String> headers = command.getHeaders(); 557 558 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); 559 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); 560 561 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 562 throw new ProtocolException("SUBSCRIBE received without a subscription id!"); 563 } 564 565 final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); 566 567 if (actualDest == null) { 568 throw new ProtocolException("Invalid 'null' Destination."); 569 } 570 571 final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); 572 ConsumerInfo consumerInfo = new ConsumerInfo(id); 573 consumerInfo.setPrefetchSize(actualDest.isQueue() ? 574 ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH : 575 headers.containsKey("activemq.subscriptionName") ? 576 ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 577 consumerInfo.setDispatchAsync(true); 578 579 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER); 580 if (browser != null && browser.equals(Stomp.TRUE)) { 581 582 if (this.version.equals(Stomp.V1_0)) { 583 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!"); 584 } 585 586 consumerInfo.setBrowser(true); 587 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH); 588 } 589 590 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR); 591 if (selector != null) { 592 consumerInfo.setSelector("convert_string_expressions:" + selector); 593 } 594 595 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); 596 597 if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) { 598 throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!"); 599 } 600 601 consumerInfo.setDestination(translator.convertDestination(this, destination, true)); 602 603 StompSubscription stompSubscription; 604 if (!consumerInfo.isBrowser()) { 605 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 606 } else { 607 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION)); 608 } 609 stompSubscription.setDestination(actualDest); 610 611 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE); 612 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) { 613 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK); 614 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) { 615 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK); 616 } else { 617 stompSubscription.setAckMode(StompSubscription.AUTO_ACK); 618 } 619 620 subscriptionsByConsumerId.put(id, stompSubscription); 621 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set. 622 if (subscriptionId != null) { 623 subscriptions.put(subscriptionId, stompSubscription); 624 } 625 626 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 627 if (receiptId != null && consumerInfo.getPrefetchSize() > 0) { 628 629 final StompFrame cmd = command; 630 final int prefetch = consumerInfo.getPrefetchSize(); 631 632 // Since dispatch could beat the receipt we set prefetch to zero to start and then 633 // once we've sent our Receipt we are safe to turn on dispatch if the response isn't 634 // an error message. 635 consumerInfo.setPrefetchSize(0); 636 637 final ResponseHandler handler = new ResponseHandler() { 638 @Override 639 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 640 if (response.isException()) { 641 // Generally a command can fail.. but that does not invalidate the connection. 642 // We report back the failure but we don't close the connection. 643 Throwable exception = ((ExceptionResponse)response).getException(); 644 handleException(exception, cmd); 645 } else { 646 StompFrame sc = new StompFrame(); 647 sc.setAction(Stomp.Responses.RECEIPT); 648 sc.setHeaders(new HashMap<String, String>(1)); 649 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 650 stompTransport.sendToStomp(sc); 651 652 ConsumerControl control = new ConsumerControl(); 653 control.setPrefetch(prefetch); 654 control.setDestination(actualDest); 655 control.setConsumerId(id); 656 657 sendToActiveMQ(control, null); 658 } 659 } 660 }; 661 662 sendToActiveMQ(consumerInfo, handler); 663 } else { 664 sendToActiveMQ(consumerInfo, createResponseHandler(command)); 665 } 666 } 667 668 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { 669 checkConnected(); 670 Map<String, String> headers = command.getHeaders(); 671 672 ActiveMQDestination destination = null; 673 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); 674 if (o != null) { 675 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true); 676 } 677 678 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); 679 if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) { 680 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); 681 } 682 683 if (subscriptionId == null && destination == null) { 684 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); 685 } 686 687 // check if it is a durable subscription 688 String durable = command.getHeaders().get("activemq.subscriptionName"); 689 String clientId = durable; 690 if (!this.version.equals(Stomp.V1_0)) { 691 clientId = connectionInfo.getClientId(); 692 } 693 694 if (durable != null) { 695 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 696 info.setClientId(clientId); 697 info.setSubscriptionName(durable); 698 info.setConnectionId(connectionId); 699 sendToActiveMQ(info, createResponseHandler(command)); 700 return; 701 } 702 703 if (subscriptionId != null) { 704 705 StompSubscription sub = this.subscriptions.remove(subscriptionId); 706 if (sub != null) { 707 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 708 return; 709 } 710 711 } else { 712 713 // Unsubscribing using a destination is a bit weird if multiple subscriptions 714 // are created with the same destination. 715 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { 716 StompSubscription sub = iter.next(); 717 if (destination != null && destination.equals(sub.getDestination())) { 718 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); 719 iter.remove(); 720 return; 721 } 722 } 723 } 724 725 throw new ProtocolException("No subscription matched."); 726 } 727 728 ConnectionInfo connectionInfo = new ConnectionInfo(); 729 730 protected void onStompConnect(final StompFrame command) throws ProtocolException { 731 732 if (connected.get()) { 733 throw new ProtocolException("Already connected."); 734 } 735 736 final Map<String, String> headers = command.getHeaders(); 737 738 // allow anyone to login for now 739 String login = headers.get(Stomp.Headers.Connect.LOGIN); 740 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE); 741 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID); 742 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT); 743 744 if (heartBeat == null) { 745 heartBeat = defaultHeartBeat; 746 } 747 748 this.version = StompCodec.detectVersion(headers); 749 750 configureInactivityMonitor(heartBeat.trim()); 751 752 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq."); 753 connectionInfo.setConnectionId(connectionId); 754 if (clientId != null) { 755 connectionInfo.setClientId(clientId); 756 } else { 757 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 758 } 759 760 connectionInfo.setResponseRequired(true); 761 connectionInfo.setUserName(login); 762 connectionInfo.setPassword(passcode); 763 connectionInfo.setTransportContext(command.getTransportContext()); 764 765 sendToActiveMQ(connectionInfo, new ResponseHandler() { 766 @Override 767 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 768 769 if (response.isException()) { 770 // If the connection attempt fails we close the socket. 771 Throwable exception = ((ExceptionResponse)response).getException(); 772 handleException(exception, command); 773 getStompTransport().onException(IOExceptionSupport.create(exception)); 774 return; 775 } 776 777 final SessionInfo sessionInfo = new SessionInfo(sessionId); 778 sendToActiveMQ(sessionInfo, null); 779 780 final ProducerInfo producerInfo = new ProducerInfo(producerId); 781 sendToActiveMQ(producerInfo, new ResponseHandler() { 782 @Override 783 public void onResponse(ProtocolConverter converter, Response response) throws IOException { 784 785 if (response.isException()) { 786 // If the connection attempt fails we close the socket. 787 Throwable exception = ((ExceptionResponse)response).getException(); 788 handleException(exception, command); 789 getStompTransport().onException(IOExceptionSupport.create(exception)); 790 } 791 792 connected.set(true); 793 HashMap<String, String> responseHeaders = new HashMap<String, String>(); 794 795 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId()); 796 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID); 797 if (requestId == null) { 798 // TODO legacy 799 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED); 800 } 801 if (requestId != null) { 802 // TODO legacy 803 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId); 804 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId); 805 } 806 807 responseHeaders.put(Stomp.Headers.Connected.VERSION, version); 808 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT, 809 String.format("%d,%d", hbWriteInterval, hbReadInterval)); 810 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION); 811 812 StompFrame sc = new StompFrame(); 813 sc.setAction(Stomp.Responses.CONNECTED); 814 sc.setHeaders(responseHeaders); 815 sendToStomp(sc); 816 817 StompWireFormat format = stompTransport.getWireFormat(); 818 if (format != null) { 819 format.setStompVersion(version); 820 } 821 } 822 }); 823 } 824 }); 825 } 826 827 protected void onStompDisconnect(StompFrame command) throws ProtocolException { 828 if (connected.get()) { 829 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); 830 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); 831 connected.set(false); 832 } 833 } 834 835 protected void checkConnected() throws ProtocolException { 836 if (!connected.get()) { 837 throw new ProtocolException("Not connected."); 838 } 839 } 840 841 /** 842 * Dispatch a ActiveMQ command 843 * 844 * @param command 845 * @throws IOException 846 */ 847 public void onActiveMQCommand(Command command) throws IOException, JMSException { 848 if (command.isResponse()) { 849 Response response = (Response)command; 850 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 851 if (rh != null) { 852 rh.onResponse(this, response); 853 } else { 854 // Pass down any unexpected errors. Should this close the connection? 855 if (response.isException()) { 856 Throwable exception = ((ExceptionResponse)response).getException(); 857 handleException(exception, null); 858 } 859 } 860 } else if (command.isMessageDispatch()) { 861 MessageDispatch md = (MessageDispatch)command; 862 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); 863 if (sub != null) { 864 String ackId = null; 865 if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) { 866 AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub); 867 ackId = this.ACK_ID_GENERATOR.generateId(); 868 this.pedingAcks.put(ackId, pendingAck); 869 } 870 try { 871 sub.onMessageDispatch(md, ackId); 872 } catch (Exception ex) { 873 if (ackId != null) { 874 this.pedingAcks.remove(ackId); 875 } 876 } 877 } 878 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) { 879 stompTransport.sendToStomp(ping); 880 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 881 // Pass down any unexpected async errors. Should this close the connection? 882 Throwable exception = ((ConnectionError)command).getException(); 883 handleException(exception, null); 884 } 885 } 886 887 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { 888 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command); 889 return msg; 890 } 891 892 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException { 893 if (ignoreTransformation == true) { 894 return frameTranslator.convertMessage(this, message); 895 } else { 896 FrameTranslator translator = findTranslator( 897 message.getStringProperty(Stomp.Headers.TRANSFORMATION), message.getDestination(), message.isAdvisory()); 898 return translator.convertMessage(this, message); 899 } 900 } 901 902 public StompTransport getStompTransport() { 903 return stompTransport; 904 } 905 906 public ActiveMQDestination createTempDestination(String name, boolean topic) { 907 ActiveMQDestination rc = tempDestinations.get(name); 908 if( rc == null ) { 909 if (topic) { 910 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); 911 } else { 912 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); 913 } 914 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); 915 tempDestinations.put(name, rc); 916 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); 917 } 918 return rc; 919 } 920 921 public String getCreatedTempDestinationName(ActiveMQDestination destination) { 922 return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); 923 } 924 925 public String getDefaultHeartBeat() { 926 return defaultHeartBeat; 927 } 928 929 public void setDefaultHeartBeat(String defaultHeartBeat) { 930 this.defaultHeartBeat = defaultHeartBeat; 931 } 932 933 /** 934 * @return the hbGracePeriodMultiplier 935 */ 936 public float getHbGracePeriodMultiplier() { 937 return hbGracePeriodMultiplier; 938 } 939 940 /** 941 * @param hbGracePeriodMultiplier the hbGracePeriodMultiplier to set 942 */ 943 public void setHbGracePeriodMultiplier(float hbGracePeriodMultiplier) { 944 this.hbGracePeriodMultiplier = hbGracePeriodMultiplier; 945 } 946 947 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException { 948 949 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA); 950 951 if (keepAliveOpts == null || keepAliveOpts.length != 2) { 952 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 953 } else { 954 955 try { 956 hbReadInterval = (Long.parseLong(keepAliveOpts[0])); 957 hbWriteInterval = Long.parseLong(keepAliveOpts[1]); 958 } catch(NumberFormatException e) { 959 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true); 960 } 961 962 try { 963 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor(); 964 monitor.setReadCheckTime((long) (hbReadInterval * hbGracePeriodMultiplier)); 965 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval)); 966 monitor.setWriteCheckTime(hbWriteInterval); 967 monitor.startMonitoring(); 968 } catch(Exception ex) { 969 hbReadInterval = 0; 970 hbWriteInterval = 0; 971 } 972 973 if (LOG.isDebugEnabled()) { 974 LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]"); 975 } 976 } 977 } 978 979 protected void sendReceipt(StompFrame command) { 980 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); 981 if (receiptId != null) { 982 StompFrame sc = new StompFrame(); 983 sc.setAction(Stomp.Responses.RECEIPT); 984 sc.setHeaders(new HashMap<String, String>(1)); 985 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); 986 try { 987 sendToStomp(sc); 988 } catch (IOException e) { 989 LOG.warn("Could not send a receipt for " + command, e); 990 } 991 } 992 } 993}