001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected Object sendMutex = new Object(); 223 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 224 225 private MessageListener messageListener; 226 private final JMSSessionStatsImpl stats; 227 private TransactionContext transactionContext; 228 private DeliveryListener deliveryListener; 229 private MessageTransformer transformer; 230 private BlobTransferPolicy blobTransferPolicy; 231 private long lastDeliveredSequenceId = -2; 232 233 /** 234 * Construct the Session 235 * 236 * @param connection 237 * @param sessionId 238 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 239 * Session.SESSION_TRANSACTED 240 * @param asyncDispatch 241 * @param sessionAsyncDispatch 242 * @throws JMSException on internal error 243 */ 244 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 245 this.debug = LOG.isDebugEnabled(); 246 this.connection = connection; 247 this.acknowledgementMode = acknowledgeMode; 248 this.asyncDispatch = asyncDispatch; 249 this.sessionAsyncDispatch = sessionAsyncDispatch; 250 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 251 setTransactionContext(new TransactionContext(connection)); 252 stats = new JMSSessionStatsImpl(producers, consumers); 253 this.connection.asyncSendPacket(info); 254 setTransformer(connection.getTransformer()); 255 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 256 this.connectionExecutor=connection.getExecutor(); 257 this.executor = new ActiveMQSessionExecutor(this); 258 connection.addSession(this); 259 if (connection.isStarted()) { 260 start(); 261 } 262 263 } 264 265 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 266 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 267 } 268 269 /** 270 * Sets the transaction context of the session. 271 * 272 * @param transactionContext - provides the means to control a JMS 273 * transaction. 274 */ 275 public void setTransactionContext(TransactionContext transactionContext) { 276 this.transactionContext = transactionContext; 277 } 278 279 /** 280 * Returns the transaction context of the session. 281 * 282 * @return transactionContext - session's transaction context. 283 */ 284 public TransactionContext getTransactionContext() { 285 return transactionContext; 286 } 287 288 /* 289 * (non-Javadoc) 290 * 291 * @see org.apache.activemq.management.StatsCapable#getStats() 292 */ 293 @Override 294 public StatsImpl getStats() { 295 return stats; 296 } 297 298 /** 299 * Returns the session's statistics. 300 * 301 * @return stats - session's statistics. 302 */ 303 public JMSSessionStatsImpl getSessionStats() { 304 return stats; 305 } 306 307 /** 308 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 309 * object is used to send a message containing a stream of uninterpreted 310 * bytes. 311 * 312 * @return the an ActiveMQBytesMessage 313 * @throws JMSException if the JMS provider fails to create this message due 314 * to some internal error. 315 */ 316 @Override 317 public BytesMessage createBytesMessage() throws JMSException { 318 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 319 configureMessage(message); 320 return message; 321 } 322 323 /** 324 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 325 * object is used to send a self-defining set of name-value pairs, where 326 * names are <CODE>String</CODE> objects and values are primitive values 327 * in the Java programming language. 328 * 329 * @return an ActiveMQMapMessage 330 * @throws JMSException if the JMS provider fails to create this message due 331 * to some internal error. 332 */ 333 @Override 334 public MapMessage createMapMessage() throws JMSException { 335 ActiveMQMapMessage message = new ActiveMQMapMessage(); 336 configureMessage(message); 337 return message; 338 } 339 340 /** 341 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 342 * interface is the root interface of all JMS messages. A 343 * <CODE>Message</CODE> object holds all the standard message header 344 * information. It can be sent when a message containing only header 345 * information is sufficient. 346 * 347 * @return an ActiveMQMessage 348 * @throws JMSException if the JMS provider fails to create this message due 349 * to some internal error. 350 */ 351 @Override 352 public Message createMessage() throws JMSException { 353 ActiveMQMessage message = new ActiveMQMessage(); 354 configureMessage(message); 355 return message; 356 } 357 358 /** 359 * Creates an <CODE>ObjectMessage</CODE> object. An 360 * <CODE>ObjectMessage</CODE> object is used to send a message that 361 * contains a serializable Java object. 362 * 363 * @return an ActiveMQObjectMessage 364 * @throws JMSException if the JMS provider fails to create this message due 365 * to some internal error. 366 */ 367 @Override 368 public ObjectMessage createObjectMessage() throws JMSException { 369 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 370 configureMessage(message); 371 return message; 372 } 373 374 /** 375 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 376 * <CODE>ObjectMessage</CODE> object is used to send a message that 377 * contains a serializable Java object. 378 * 379 * @param object the object to use to initialize this message 380 * @return an ActiveMQObjectMessage 381 * @throws JMSException if the JMS provider fails to create this message due 382 * to some internal error. 383 */ 384 @Override 385 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 386 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 387 configureMessage(message); 388 message.setObject(object); 389 return message; 390 } 391 392 /** 393 * Creates a <CODE>StreamMessage</CODE> object. A 394 * <CODE>StreamMessage</CODE> object is used to send a self-defining 395 * stream of primitive values in the Java programming language. 396 * 397 * @return an ActiveMQStreamMessage 398 * @throws JMSException if the JMS provider fails to create this message due 399 * to some internal error. 400 */ 401 @Override 402 public StreamMessage createStreamMessage() throws JMSException { 403 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 404 configureMessage(message); 405 return message; 406 } 407 408 /** 409 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 410 * object is used to send a message containing a <CODE>String</CODE> 411 * object. 412 * 413 * @return an ActiveMQTextMessage 414 * @throws JMSException if the JMS provider fails to create this message due 415 * to some internal error. 416 */ 417 @Override 418 public TextMessage createTextMessage() throws JMSException { 419 ActiveMQTextMessage message = new ActiveMQTextMessage(); 420 configureMessage(message); 421 return message; 422 } 423 424 /** 425 * Creates an initialized <CODE>TextMessage</CODE> object. A 426 * <CODE>TextMessage</CODE> object is used to send a message containing a 427 * <CODE>String</CODE>. 428 * 429 * @param text the string used to initialize this message 430 * @return an ActiveMQTextMessage 431 * @throws JMSException if the JMS provider fails to create this message due 432 * to some internal error. 433 */ 434 @Override 435 public TextMessage createTextMessage(String text) throws JMSException { 436 ActiveMQTextMessage message = new ActiveMQTextMessage(); 437 message.setText(text); 438 configureMessage(message); 439 return message; 440 } 441 442 /** 443 * Creates an initialized <CODE>BlobMessage</CODE> object. A 444 * <CODE>BlobMessage</CODE> object is used to send a message containing a 445 * <CODE>URL</CODE> which points to some network addressible BLOB. 446 * 447 * @param url the network addressable URL used to pass directly to the 448 * consumer 449 * @return a BlobMessage 450 * @throws JMSException if the JMS provider fails to create this message due 451 * to some internal error. 452 */ 453 public BlobMessage createBlobMessage(URL url) throws JMSException { 454 return createBlobMessage(url, false); 455 } 456 457 /** 458 * Creates an initialized <CODE>BlobMessage</CODE> object. A 459 * <CODE>BlobMessage</CODE> object is used to send a message containing a 460 * <CODE>URL</CODE> which points to some network addressible BLOB. 461 * 462 * @param url the network addressable URL used to pass directly to the 463 * consumer 464 * @param deletedByBroker indicates whether or not the resource is deleted 465 * by the broker when the message is acknowledged 466 * @return a BlobMessage 467 * @throws JMSException if the JMS provider fails to create this message due 468 * to some internal error. 469 */ 470 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 471 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 472 configureMessage(message); 473 message.setURL(url); 474 message.setDeletedByBroker(deletedByBroker); 475 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 476 return message; 477 } 478 479 /** 480 * Creates an initialized <CODE>BlobMessage</CODE> object. A 481 * <CODE>BlobMessage</CODE> object is used to send a message containing 482 * the <CODE>File</CODE> content. Before the message is sent the file 483 * conent will be uploaded to the broker or some other remote repository 484 * depending on the {@link #getBlobTransferPolicy()}. 485 * 486 * @param file the file to be uploaded to some remote repo (or the broker) 487 * depending on the strategy 488 * @return a BlobMessage 489 * @throws JMSException if the JMS provider fails to create this message due 490 * to some internal error. 491 */ 492 public BlobMessage createBlobMessage(File file) throws JMSException { 493 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 494 configureMessage(message); 495 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 496 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 497 message.setDeletedByBroker(true); 498 message.setName(file.getName()); 499 return message; 500 } 501 502 /** 503 * Creates an initialized <CODE>BlobMessage</CODE> object. A 504 * <CODE>BlobMessage</CODE> object is used to send a message containing 505 * the <CODE>File</CODE> content. Before the message is sent the file 506 * conent will be uploaded to the broker or some other remote repository 507 * depending on the {@link #getBlobTransferPolicy()}. 508 * 509 * @param in the stream to be uploaded to some remote repo (or the broker) 510 * depending on the strategy 511 * @return a BlobMessage 512 * @throws JMSException if the JMS provider fails to create this message due 513 * to some internal error. 514 */ 515 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 516 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 517 configureMessage(message); 518 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 519 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 520 message.setDeletedByBroker(true); 521 return message; 522 } 523 524 /** 525 * Indicates whether the session is in transacted mode. 526 * 527 * @return true if the session is in transacted mode 528 * @throws JMSException if there is some internal error. 529 */ 530 @Override 531 public boolean getTransacted() throws JMSException { 532 checkClosed(); 533 return isTransacted(); 534 } 535 536 /** 537 * Returns the acknowledgement mode of the session. The acknowledgement mode 538 * is set at the time that the session is created. If the session is 539 * transacted, the acknowledgement mode is ignored. 540 * 541 * @return If the session is not transacted, returns the current 542 * acknowledgement mode for the session. If the session is 543 * transacted, returns SESSION_TRANSACTED. 544 * @throws JMSException 545 * @see javax.jms.Connection#createSession(boolean,int) 546 * @since 1.1 exception JMSException if there is some internal error. 547 */ 548 @Override 549 public int getAcknowledgeMode() throws JMSException { 550 checkClosed(); 551 return this.acknowledgementMode; 552 } 553 554 /** 555 * Commits all messages done in this transaction and releases any locks 556 * currently held. 557 * 558 * @throws JMSException if the JMS provider fails to commit the transaction 559 * due to some internal error. 560 * @throws TransactionRolledBackException if the transaction is rolled back 561 * due to some internal error during commit. 562 * @throws javax.jms.IllegalStateException if the method is not called by a 563 * transacted session. 564 */ 565 @Override 566 public void commit() throws JMSException { 567 checkClosed(); 568 if (!getTransacted()) { 569 throw new javax.jms.IllegalStateException("Not a transacted session"); 570 } 571 if (LOG.isDebugEnabled()) { 572 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 573 } 574 transactionContext.commit(); 575 } 576 577 /** 578 * Rolls back any messages done in this transaction and releases any locks 579 * currently held. 580 * 581 * @throws JMSException if the JMS provider fails to roll back the 582 * transaction due to some internal error. 583 * @throws javax.jms.IllegalStateException if the method is not called by a 584 * transacted session. 585 */ 586 @Override 587 public void rollback() throws JMSException { 588 checkClosed(); 589 if (!getTransacted()) { 590 throw new javax.jms.IllegalStateException("Not a transacted session"); 591 } 592 if (LOG.isDebugEnabled()) { 593 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 594 } 595 transactionContext.rollback(); 596 } 597 598 /** 599 * Closes the session. 600 * <P> 601 * Since a provider may allocate some resources on behalf of a session 602 * outside the JVM, clients should close the resources when they are not 603 * needed. Relying on garbage collection to eventually reclaim these 604 * resources may not be timely enough. 605 * <P> 606 * There is no need to close the producers and consumers of a closed 607 * session. 608 * <P> 609 * This call will block until a <CODE>receive</CODE> call or message 610 * listener in progress has completed. A blocked message consumer 611 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 612 * is closed. 613 * <P> 614 * Closing a transacted session must roll back the transaction in progress. 615 * <P> 616 * This method is the only <CODE>Session</CODE> method that can be called 617 * concurrently. 618 * <P> 619 * Invoking any other <CODE>Session</CODE> method on a closed session must 620 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 621 * closed session must <I>not </I> throw an exception. 622 * 623 * @throws JMSException if the JMS provider fails to close the session due 624 * to some internal error. 625 */ 626 @Override 627 public void close() throws JMSException { 628 if (!closed) { 629 if (getTransactionContext().isInXATransaction()) { 630 if (!synchronizationRegistered) { 631 synchronizationRegistered = true; 632 getTransactionContext().addSynchronization(new Synchronization() { 633 634 @Override 635 public void afterCommit() throws Exception { 636 doClose(); 637 synchronizationRegistered = false; 638 } 639 640 @Override 641 public void afterRollback() throws Exception { 642 doClose(); 643 synchronizationRegistered = false; 644 } 645 }); 646 } 647 648 } else { 649 doClose(); 650 } 651 } 652 } 653 654 private void doClose() throws JMSException { 655 boolean interrupted = Thread.interrupted(); 656 dispose(); 657 RemoveInfo removeCommand = info.createRemoveCommand(); 658 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 659 connection.asyncSendPacket(removeCommand); 660 if (interrupted) { 661 Thread.currentThread().interrupt(); 662 } 663 } 664 665 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 666 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 667 clearRequestsCounter.incrementAndGet(); 668 executor.clearMessagesInProgress(); 669 // we are called from inside the transport reconnection logic which involves us 670 // clearing all the connections' consumers dispatch and delivered lists. So rather 671 // than trying to grab a mutex (which could be already owned by the message listener 672 // calling the send or an ack) we allow it to complete in a separate thread via the 673 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 674 // 675 // We must be careful though not to allow multiple calls to this method from a 676 // connection that is having issue becoming fully established from causing a large 677 // build up of scheduled tasks to clear the same consumers over and over. 678 if (consumers.isEmpty()) { 679 return; 680 } 681 682 if (clearInProgress.compareAndSet(false, true)) { 683 for (final ActiveMQMessageConsumer consumer : consumers) { 684 consumer.inProgressClearRequired(); 685 transportInterruptionProcessingComplete.incrementAndGet(); 686 try { 687 connection.getScheduler().executeAfterDelay(new Runnable() { 688 @Override 689 public void run() { 690 consumer.clearMessagesInProgress(); 691 }}, 0l); 692 } catch (JMSException e) { 693 connection.onClientInternalException(e); 694 } 695 } 696 697 try { 698 connection.getScheduler().executeAfterDelay(new Runnable() { 699 @Override 700 public void run() { 701 clearInProgress.set(false); 702 }}, 0l); 703 } catch (JMSException e) { 704 connection.onClientInternalException(e); 705 } 706 } 707 } 708 709 void deliverAcks() { 710 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 711 ActiveMQMessageConsumer consumer = iter.next(); 712 consumer.deliverAcks(); 713 } 714 } 715 716 public synchronized void dispose() throws JMSException { 717 if (!closed) { 718 719 try { 720 executor.stop(); 721 722 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 723 ActiveMQMessageConsumer consumer = iter.next(); 724 consumer.setFailureError(connection.getFirstFailureError()); 725 consumer.dispose(); 726 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 727 } 728 consumers.clear(); 729 730 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 731 ActiveMQMessageProducer producer = iter.next(); 732 producer.dispose(); 733 } 734 producers.clear(); 735 736 try { 737 if (getTransactionContext().isInLocalTransaction()) { 738 rollback(); 739 } 740 } catch (JMSException e) { 741 } 742 743 } finally { 744 connection.removeSession(this); 745 this.transactionContext = null; 746 closed = true; 747 } 748 } 749 } 750 751 /** 752 * Checks that the session is not closed then configures the message 753 */ 754 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 755 checkClosed(); 756 message.setConnection(connection); 757 } 758 759 /** 760 * Check if the session is closed. It is used for ensuring that the session 761 * is open before performing various operations. 762 * 763 * @throws IllegalStateException if the Session is closed 764 */ 765 protected void checkClosed() throws IllegalStateException { 766 if (closed) { 767 throw new IllegalStateException("The Session is closed"); 768 } 769 } 770 771 /** 772 * Checks if the session is closed. 773 * 774 * @return true if the session is closed, false otherwise. 775 */ 776 public boolean isClosed() { 777 return closed; 778 } 779 780 /** 781 * Stops message delivery in this session, and restarts message delivery 782 * with the oldest unacknowledged message. 783 * <P> 784 * All consumers deliver messages in a serial order. Acknowledging a 785 * received message automatically acknowledges all messages that have been 786 * delivered to the client. 787 * <P> 788 * Restarting a session causes it to take the following actions: 789 * <UL> 790 * <LI>Stop message delivery 791 * <LI>Mark all messages that might have been delivered but not 792 * acknowledged as "redelivered" 793 * <LI>Restart the delivery sequence including all unacknowledged messages 794 * that had been previously delivered. Redelivered messages do not have to 795 * be delivered in exactly their original delivery order. 796 * </UL> 797 * 798 * @throws JMSException if the JMS provider fails to stop and restart 799 * message delivery due to some internal error. 800 * @throws IllegalStateException if the method is called by a transacted 801 * session. 802 */ 803 @Override 804 public void recover() throws JMSException { 805 806 checkClosed(); 807 if (getTransacted()) { 808 throw new IllegalStateException("This session is transacted"); 809 } 810 811 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 812 ActiveMQMessageConsumer c = iter.next(); 813 c.rollback(); 814 } 815 816 } 817 818 /** 819 * Returns the session's distinguished message listener (optional). 820 * 821 * @return the message listener associated with this session 822 * @throws JMSException if the JMS provider fails to get the message 823 * listener due to an internal error. 824 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 825 * @see javax.jms.ServerSessionPool 826 * @see javax.jms.ServerSession 827 */ 828 @Override 829 public MessageListener getMessageListener() throws JMSException { 830 checkClosed(); 831 return this.messageListener; 832 } 833 834 /** 835 * Sets the session's distinguished message listener (optional). 836 * <P> 837 * When the distinguished message listener is set, no other form of message 838 * receipt in the session can be used; however, all forms of sending 839 * messages are still supported. 840 * <P> 841 * If this session has been closed, then an {@link IllegalStateException} is 842 * thrown, if trying to set a new listener. However setting the listener 843 * to <tt>null</tt> is allowed, to clear the listener, even if this session 844 * has been closed prior. 845 * <P> 846 * This is an expert facility not used by regular JMS clients. 847 * 848 * @param listener the message listener to associate with this session 849 * @throws JMSException if the JMS provider fails to set the message 850 * listener due to an internal error. 851 * @see javax.jms.Session#getMessageListener() 852 * @see javax.jms.ServerSessionPool 853 * @see javax.jms.ServerSession 854 */ 855 @Override 856 public void setMessageListener(MessageListener listener) throws JMSException { 857 // only check for closed if we set a new listener, as we allow to clear 858 // the listener, such as when an application is shutting down, and is 859 // no longer using a message listener on this session 860 if (listener != null) { 861 checkClosed(); 862 } 863 this.messageListener = listener; 864 865 if (listener != null) { 866 executor.setDispatchedBySessionPool(true); 867 } 868 } 869 870 /** 871 * Optional operation, intended to be used only by Application Servers, not 872 * by ordinary JMS clients. 873 * 874 * @see javax.jms.ServerSession 875 */ 876 @Override 877 public void run() { 878 MessageDispatch messageDispatch; 879 while ((messageDispatch = executor.dequeueNoWait()) != null) { 880 final MessageDispatch md = messageDispatch; 881 final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 882 883 MessageAck earlyAck = null; 884 if (message.isExpired()) { 885 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 886 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 887 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 888 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 889 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 890 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 891 } 892 if (earlyAck != null) { 893 try { 894 asyncSendPacket(earlyAck); 895 } catch (Throwable t) { 896 LOG.error("error dispatching ack: {} ", earlyAck, t); 897 connection.onClientInternalException(t); 898 } finally { 899 continue; 900 } 901 } 902 903 if (isClientAcknowledge()||isIndividualAcknowledge()) { 904 message.setAcknowledgeCallback(new Callback() { 905 @Override 906 public void execute() throws Exception { 907 } 908 }); 909 } 910 911 if (deliveryListener != null) { 912 deliveryListener.beforeDelivery(this, message); 913 } 914 915 md.setDeliverySequenceId(getNextDeliveryId()); 916 lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); 917 918 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 919 try { 920 ack.setFirstMessageId(md.getMessage().getMessageId()); 921 doStartTransaction(); 922 ack.setTransactionId(getTransactionContext().getTransactionId()); 923 if (ack.getTransactionId() != null) { 924 getTransactionContext().addSynchronization(new Synchronization() { 925 926 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 927 @Override 928 public void beforeEnd() throws Exception { 929 // validate our consumer so we don't push stale acks that get ignored 930 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 931 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 932 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 933 } 934 LOG.trace("beforeEnd ack {}", ack); 935 sendAck(ack); 936 } 937 938 @Override 939 public void afterRollback() throws Exception { 940 LOG.trace("rollback {}", ack, new Throwable("here")); 941 // ensure we don't filter this as a duplicate 942 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 943 944 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 945 if (clearRequestsCounter.get() > clearRequestCount) { 946 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 947 return; 948 } 949 950 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 951 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 952 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 953 return; 954 } 955 956 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 957 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 958 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 959 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 960 // We need to NACK the messages so that they get 961 // sent to the 962 // DLQ. 963 // Acknowledge the last message. 964 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 965 ack.setFirstMessageId(md.getMessage().getMessageId()); 966 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 967 asyncSendPacket(ack); 968 969 } else { 970 971 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 972 ack.setFirstMessageId(md.getMessage().getMessageId()); 973 asyncSendPacket(ack); 974 975 // Figure out how long we should wait to resend 976 // this message. 977 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 978 for (int i = 0; i < redeliveryCounter; i++) { 979 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 980 } 981 connection.getScheduler().executeAfterDelay(new Runnable() { 982 983 @Override 984 public void run() { 985 ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); 986 } 987 }, redeliveryDelay); 988 } 989 md.getMessage().onMessageRolledBack(); 990 } 991 }); 992 } 993 994 LOG.trace("{} onMessage({})", this, message.getMessageId()); 995 messageListener.onMessage(message); 996 997 } catch (Throwable e) { 998 LOG.error("error dispatching message: ", e); 999 // A problem while invoking the MessageListener does not 1000 // in general indicate a problem with the connection to the broker, i.e. 1001 // it will usually be sufficient to let the afterDelivery() method either 1002 // commit or roll back in order to deal with the exception. 1003 // However, we notify any registered client internal exception listener 1004 // of the problem. 1005 connection.onClientInternalException(e); 1006 } finally { 1007 if (ack.getTransactionId() == null) { 1008 try { 1009 asyncSendPacket(ack); 1010 } catch (Throwable e) { 1011 connection.onClientInternalException(e); 1012 } 1013 } 1014 } 1015 1016 if (deliveryListener != null) { 1017 deliveryListener.afterDelivery(this, message); 1018 } 1019 } 1020 } 1021 1022 /** 1023 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1024 * specified destination. 1025 * <P> 1026 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1027 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1028 * inherit from <CODE>Destination</CODE>, they can be used in the 1029 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1030 * 1031 * @param destination the <CODE>Destination</CODE> to send to, or null if 1032 * this is a producer which does not have a specified 1033 * destination. 1034 * @return the MessageProducer 1035 * @throws JMSException if the session fails to create a MessageProducer due 1036 * to some internal error. 1037 * @throws InvalidDestinationException if an invalid destination is 1038 * specified. 1039 * @since 1.1 1040 */ 1041 @Override 1042 public MessageProducer createProducer(Destination destination) throws JMSException { 1043 checkClosed(); 1044 if (destination instanceof CustomDestination) { 1045 CustomDestination customDestination = (CustomDestination)destination; 1046 return customDestination.createProducer(this); 1047 } 1048 int timeSendOut = connection.getSendTimeout(); 1049 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1050 } 1051 1052 /** 1053 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1054 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1055 * <CODE>Destination</CODE>, they can be used in the destination 1056 * parameter to create a <CODE>MessageConsumer</CODE>. 1057 * 1058 * @param destination the <CODE>Destination</CODE> to access. 1059 * @return the MessageConsumer 1060 * @throws JMSException if the session fails to create a consumer due to 1061 * some internal error. 1062 * @throws InvalidDestinationException if an invalid destination is 1063 * specified. 1064 * @since 1.1 1065 */ 1066 @Override 1067 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1068 return createConsumer(destination, (String) null); 1069 } 1070 1071 /** 1072 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1073 * using a message selector. Since <CODE> Queue</CODE> and 1074 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1075 * can be used in the destination parameter to create a 1076 * <CODE>MessageConsumer</CODE>. 1077 * <P> 1078 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1079 * that have been sent to a destination. 1080 * 1081 * @param destination the <CODE>Destination</CODE> to access 1082 * @param messageSelector only messages with properties matching the message 1083 * selector expression are delivered. A value of null or an 1084 * empty string indicates that there is no message selector 1085 * for the message consumer. 1086 * @return the MessageConsumer 1087 * @throws JMSException if the session fails to create a MessageConsumer due 1088 * to some internal error. 1089 * @throws InvalidDestinationException if an invalid destination is 1090 * specified. 1091 * @throws InvalidSelectorException if the message selector is invalid. 1092 * @since 1.1 1093 */ 1094 @Override 1095 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1096 return createConsumer(destination, messageSelector, false); 1097 } 1098 1099 /** 1100 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1101 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1102 * <CODE>Destination</CODE>, they can be used in the destination 1103 * parameter to create a <CODE>MessageConsumer</CODE>. 1104 * 1105 * @param destination the <CODE>Destination</CODE> to access. 1106 * @param messageListener the listener to use for async consumption of messages 1107 * @return the MessageConsumer 1108 * @throws JMSException if the session fails to create a consumer due to 1109 * some internal error. 1110 * @throws InvalidDestinationException if an invalid destination is 1111 * specified. 1112 * @since 1.1 1113 */ 1114 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1115 return createConsumer(destination, null, messageListener); 1116 } 1117 1118 /** 1119 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1120 * using a message selector. Since <CODE> Queue</CODE> and 1121 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1122 * can be used in the destination parameter to create a 1123 * <CODE>MessageConsumer</CODE>. 1124 * <P> 1125 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1126 * that have been sent to a destination. 1127 * 1128 * @param destination the <CODE>Destination</CODE> to access 1129 * @param messageSelector only messages with properties matching the message 1130 * selector expression are delivered. A value of null or an 1131 * empty string indicates that there is no message selector 1132 * for the message consumer. 1133 * @param messageListener the listener to use for async consumption of messages 1134 * @return the MessageConsumer 1135 * @throws JMSException if the session fails to create a MessageConsumer due 1136 * to some internal error. 1137 * @throws InvalidDestinationException if an invalid destination is 1138 * specified. 1139 * @throws InvalidSelectorException if the message selector is invalid. 1140 * @since 1.1 1141 */ 1142 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1143 return createConsumer(destination, messageSelector, false, messageListener); 1144 } 1145 1146 /** 1147 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1148 * using a message selector. This method can specify whether messages 1149 * published by its own connection should be delivered to it, if the 1150 * destination is a topic. 1151 * <P> 1152 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1153 * <CODE>Destination</CODE>, they can be used in the destination 1154 * parameter to create a <CODE>MessageConsumer</CODE>. 1155 * <P> 1156 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1157 * that have been published to a destination. 1158 * <P> 1159 * In some cases, a connection may both publish and subscribe to a topic. 1160 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1161 * inhibit the delivery of messages published by its own connection. The 1162 * default value for this attribute is False. The <CODE>noLocal</CODE> 1163 * value must be supported by destinations that are topics. 1164 * 1165 * @param destination the <CODE>Destination</CODE> to access 1166 * @param messageSelector only messages with properties matching the message 1167 * selector expression are delivered. A value of null or an 1168 * empty string indicates that there is no message selector 1169 * for the message consumer. 1170 * @param noLocal - if true, and the destination is a topic, inhibits the 1171 * delivery of messages published by its own connection. The 1172 * behavior for <CODE>NoLocal</CODE> is not specified if 1173 * the destination is a queue. 1174 * @return the MessageConsumer 1175 * @throws JMSException if the session fails to create a MessageConsumer due 1176 * to some internal error. 1177 * @throws InvalidDestinationException if an invalid destination is 1178 * specified. 1179 * @throws InvalidSelectorException if the message selector is invalid. 1180 * @since 1.1 1181 */ 1182 @Override 1183 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1184 return createConsumer(destination, messageSelector, noLocal, null); 1185 } 1186 1187 /** 1188 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1189 * using a message selector. This method can specify whether messages 1190 * published by its own connection should be delivered to it, if the 1191 * destination is a topic. 1192 * <P> 1193 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1194 * <CODE>Destination</CODE>, they can be used in the destination 1195 * parameter to create a <CODE>MessageConsumer</CODE>. 1196 * <P> 1197 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1198 * that have been published to a destination. 1199 * <P> 1200 * In some cases, a connection may both publish and subscribe to a topic. 1201 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1202 * inhibit the delivery of messages published by its own connection. The 1203 * default value for this attribute is False. The <CODE>noLocal</CODE> 1204 * value must be supported by destinations that are topics. 1205 * 1206 * @param destination the <CODE>Destination</CODE> to access 1207 * @param messageSelector only messages with properties matching the message 1208 * selector expression are delivered. A value of null or an 1209 * empty string indicates that there is no message selector 1210 * for the message consumer. 1211 * @param noLocal - if true, and the destination is a topic, inhibits the 1212 * delivery of messages published by its own connection. The 1213 * behavior for <CODE>NoLocal</CODE> is not specified if 1214 * the destination is a queue. 1215 * @param messageListener the listener to use for async consumption of messages 1216 * @return the MessageConsumer 1217 * @throws JMSException if the session fails to create a MessageConsumer due 1218 * to some internal error. 1219 * @throws InvalidDestinationException if an invalid destination is 1220 * specified. 1221 * @throws InvalidSelectorException if the message selector is invalid. 1222 * @since 1.1 1223 */ 1224 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1225 checkClosed(); 1226 1227 if (destination instanceof CustomDestination) { 1228 CustomDestination customDestination = (CustomDestination)destination; 1229 return customDestination.createConsumer(this, messageSelector, noLocal); 1230 } 1231 1232 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1233 int prefetch = 0; 1234 if (destination instanceof Topic) { 1235 prefetch = prefetchPolicy.getTopicPrefetch(); 1236 } else { 1237 prefetch = prefetchPolicy.getQueuePrefetch(); 1238 } 1239 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1240 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1241 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1242 } 1243 1244 /** 1245 * Creates a queue identity given a <CODE>Queue</CODE> name. 1246 * <P> 1247 * This facility is provided for the rare cases where clients need to 1248 * dynamically manipulate queue identity. It allows the creation of a queue 1249 * identity with a provider-specific name. Clients that depend on this 1250 * ability are not portable. 1251 * <P> 1252 * Note that this method is not for creating the physical queue. The 1253 * physical creation of queues is an administrative task and is not to be 1254 * initiated by the JMS API. The one exception is the creation of temporary 1255 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1256 * method. 1257 * 1258 * @param queueName the name of this <CODE>Queue</CODE> 1259 * @return a <CODE>Queue</CODE> with the given name 1260 * @throws JMSException if the session fails to create a queue due to some 1261 * internal error. 1262 * @since 1.1 1263 */ 1264 @Override 1265 public Queue createQueue(String queueName) throws JMSException { 1266 checkClosed(); 1267 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1268 return new ActiveMQTempQueue(queueName); 1269 } 1270 return new ActiveMQQueue(queueName); 1271 } 1272 1273 /** 1274 * Creates a topic identity given a <CODE>Topic</CODE> name. 1275 * <P> 1276 * This facility is provided for the rare cases where clients need to 1277 * dynamically manipulate topic identity. This allows the creation of a 1278 * topic identity with a provider-specific name. Clients that depend on this 1279 * ability are not portable. 1280 * <P> 1281 * Note that this method is not for creating the physical topic. The 1282 * physical creation of topics is an administrative task and is not to be 1283 * initiated by the JMS API. The one exception is the creation of temporary 1284 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1285 * method. 1286 * 1287 * @param topicName the name of this <CODE>Topic</CODE> 1288 * @return a <CODE>Topic</CODE> with the given name 1289 * @throws JMSException if the session fails to create a topic due to some 1290 * internal error. 1291 * @since 1.1 1292 */ 1293 @Override 1294 public Topic createTopic(String topicName) throws JMSException { 1295 checkClosed(); 1296 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1297 return new ActiveMQTempTopic(topicName); 1298 } 1299 return new ActiveMQTopic(topicName); 1300 } 1301 1302 /** 1303 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1304 * the specified queue. 1305 * 1306 * @param queue the <CODE>queue</CODE> to access 1307 * @exception InvalidDestinationException if an invalid destination is 1308 * specified 1309 * @since 1.1 1310 */ 1311 /** 1312 * Creates a durable subscriber to the specified topic. 1313 * <P> 1314 * If a client needs to receive all the messages published on a topic, 1315 * including the ones published while the subscriber is inactive, it uses a 1316 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1317 * record of this durable subscription and insures that all messages from 1318 * the topic's publishers are retained until they are acknowledged by this 1319 * durable subscriber or they have expired. 1320 * <P> 1321 * Sessions with durable subscribers must always provide the same client 1322 * identifier. In addition, each client must specify a name that uniquely 1323 * identifies (within client identifier) each durable subscription it 1324 * creates. Only one session at a time can have a 1325 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1326 * <P> 1327 * A client can change an existing durable subscription by creating a 1328 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1329 * and/or message selector. Changing a durable subscriber is equivalent to 1330 * unsubscribing (deleting) the old one and creating a new one. 1331 * <P> 1332 * In some cases, a connection may both publish and subscribe to a topic. 1333 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1334 * inhibit the delivery of messages published by its own connection. The 1335 * default value for this attribute is false. 1336 * 1337 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1338 * @param name the name used to identify this subscription 1339 * @return the TopicSubscriber 1340 * @throws JMSException if the session fails to create a subscriber due to 1341 * some internal error. 1342 * @throws InvalidDestinationException if an invalid topic is specified. 1343 * @since 1.1 1344 */ 1345 @Override 1346 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1347 checkClosed(); 1348 return createDurableSubscriber(topic, name, null, false); 1349 } 1350 1351 /** 1352 * Creates a durable subscriber to the specified topic, using a message 1353 * selector and specifying whether messages published by its own connection 1354 * should be delivered to it. 1355 * <P> 1356 * If a client needs to receive all the messages published on a topic, 1357 * including the ones published while the subscriber is inactive, it uses a 1358 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1359 * record of this durable subscription and insures that all messages from 1360 * the topic's publishers are retained until they are acknowledged by this 1361 * durable subscriber or they have expired. 1362 * <P> 1363 * Sessions with durable subscribers must always provide the same client 1364 * identifier. In addition, each client must specify a name which uniquely 1365 * identifies (within client identifier) each durable subscription it 1366 * creates. Only one session at a time can have a 1367 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1368 * inactive durable subscriber is one that exists but does not currently 1369 * have a message consumer associated with it. 1370 * <P> 1371 * A client can change an existing durable subscription by creating a 1372 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1373 * and/or message selector. Changing a durable subscriber is equivalent to 1374 * unsubscribing (deleting) the old one and creating a new one. 1375 * 1376 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1377 * @param name the name used to identify this subscription 1378 * @param messageSelector only messages with properties matching the message 1379 * selector expression are delivered. A value of null or an 1380 * empty string indicates that there is no message selector 1381 * for the message consumer. 1382 * @param noLocal if set, inhibits the delivery of messages published by its 1383 * own connection 1384 * @return the Queue Browser 1385 * @throws JMSException if the session fails to create a subscriber due to 1386 * some internal error. 1387 * @throws InvalidDestinationException if an invalid topic is specified. 1388 * @throws InvalidSelectorException if the message selector is invalid. 1389 * @since 1.1 1390 */ 1391 @Override 1392 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1393 checkClosed(); 1394 1395 if (topic == null) { 1396 throw new InvalidDestinationException("Topic cannot be null"); 1397 } 1398 1399 if (topic instanceof CustomDestination) { 1400 CustomDestination customDestination = (CustomDestination)topic; 1401 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1402 } 1403 1404 connection.checkClientIDWasManuallySpecified(); 1405 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1406 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1407 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1408 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1409 noLocal, false, asyncDispatch); 1410 } 1411 1412 /** 1413 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1414 * the specified queue. 1415 * 1416 * @param queue the <CODE>queue</CODE> to access 1417 * @return the Queue Browser 1418 * @throws JMSException if the session fails to create a browser due to some 1419 * internal error. 1420 * @throws InvalidDestinationException if an invalid destination is 1421 * specified 1422 * @since 1.1 1423 */ 1424 @Override 1425 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1426 checkClosed(); 1427 return createBrowser(queue, null); 1428 } 1429 1430 /** 1431 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1432 * the specified queue using a message selector. 1433 * 1434 * @param queue the <CODE>queue</CODE> to access 1435 * @param messageSelector only messages with properties matching the message 1436 * selector expression are delivered. A value of null or an 1437 * empty string indicates that there is no message selector 1438 * for the message consumer. 1439 * @return the Queue Browser 1440 * @throws JMSException if the session fails to create a browser due to some 1441 * internal error. 1442 * @throws InvalidDestinationException if an invalid destination is 1443 * specified 1444 * @throws InvalidSelectorException if the message selector is invalid. 1445 * @since 1.1 1446 */ 1447 @Override 1448 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1449 checkClosed(); 1450 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1451 } 1452 1453 /** 1454 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1455 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1456 * 1457 * @return a temporary queue identity 1458 * @throws JMSException if the session fails to create a temporary queue due 1459 * to some internal error. 1460 * @since 1.1 1461 */ 1462 @Override 1463 public TemporaryQueue createTemporaryQueue() throws JMSException { 1464 checkClosed(); 1465 return (TemporaryQueue)connection.createTempDestination(false); 1466 } 1467 1468 /** 1469 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1470 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1471 * 1472 * @return a temporary topic identity 1473 * @throws JMSException if the session fails to create a temporary topic due 1474 * to some internal error. 1475 * @since 1.1 1476 */ 1477 @Override 1478 public TemporaryTopic createTemporaryTopic() throws JMSException { 1479 checkClosed(); 1480 return (TemporaryTopic)connection.createTempDestination(true); 1481 } 1482 1483 /** 1484 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1485 * the specified queue. 1486 * 1487 * @param queue the <CODE>Queue</CODE> to access 1488 * @return 1489 * @throws JMSException if the session fails to create a receiver due to 1490 * some internal error. 1491 * @throws JMSException 1492 * @throws InvalidDestinationException if an invalid queue is specified. 1493 */ 1494 @Override 1495 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1496 checkClosed(); 1497 return createReceiver(queue, null); 1498 } 1499 1500 /** 1501 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1502 * the specified queue using a message selector. 1503 * 1504 * @param queue the <CODE>Queue</CODE> to access 1505 * @param messageSelector only messages with properties matching the message 1506 * selector expression are delivered. A value of null or an 1507 * empty string indicates that there is no message selector 1508 * for the message consumer. 1509 * @return QueueReceiver 1510 * @throws JMSException if the session fails to create a receiver due to 1511 * some internal error. 1512 * @throws InvalidDestinationException if an invalid queue is specified. 1513 * @throws InvalidSelectorException if the message selector is invalid. 1514 */ 1515 @Override 1516 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1517 checkClosed(); 1518 1519 if (queue instanceof CustomDestination) { 1520 CustomDestination customDestination = (CustomDestination)queue; 1521 return customDestination.createReceiver(this, messageSelector); 1522 } 1523 1524 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1525 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1526 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1527 } 1528 1529 /** 1530 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1531 * specified queue. 1532 * 1533 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1534 * unidentified producer 1535 * @return QueueSender 1536 * @throws JMSException if the session fails to create a sender due to some 1537 * internal error. 1538 * @throws InvalidDestinationException if an invalid queue is specified. 1539 */ 1540 @Override 1541 public QueueSender createSender(Queue queue) throws JMSException { 1542 checkClosed(); 1543 if (queue instanceof CustomDestination) { 1544 CustomDestination customDestination = (CustomDestination)queue; 1545 return customDestination.createSender(this); 1546 } 1547 int timeSendOut = connection.getSendTimeout(); 1548 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1549 } 1550 1551 /** 1552 * Creates a nondurable subscriber to the specified topic. <p/> 1553 * <P> 1554 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1555 * that have been published to a topic. <p/> 1556 * <P> 1557 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1558 * receive only messages that are published while they are active. <p/> 1559 * <P> 1560 * In some cases, a connection may both publish and subscribe to a topic. 1561 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1562 * inhibit the delivery of messages published by its own connection. The 1563 * default value for this attribute is false. 1564 * 1565 * @param topic the <CODE>Topic</CODE> to subscribe to 1566 * @return TopicSubscriber 1567 * @throws JMSException if the session fails to create a subscriber due to 1568 * some internal error. 1569 * @throws InvalidDestinationException if an invalid topic is specified. 1570 */ 1571 @Override 1572 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1573 checkClosed(); 1574 return createSubscriber(topic, null, false); 1575 } 1576 1577 /** 1578 * Creates a nondurable subscriber to the specified topic, using a message 1579 * selector or specifying whether messages published by its own connection 1580 * should be delivered to it. <p/> 1581 * <P> 1582 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1583 * that have been published to a topic. <p/> 1584 * <P> 1585 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1586 * receive only messages that are published while they are active. <p/> 1587 * <P> 1588 * Messages filtered out by a subscriber's message selector will never be 1589 * delivered to the subscriber. From the subscriber's perspective, they do 1590 * not exist. <p/> 1591 * <P> 1592 * In some cases, a connection may both publish and subscribe to a topic. 1593 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1594 * inhibit the delivery of messages published by its own connection. The 1595 * default value for this attribute is false. 1596 * 1597 * @param topic the <CODE>Topic</CODE> to subscribe to 1598 * @param messageSelector only messages with properties matching the message 1599 * selector expression are delivered. A value of null or an 1600 * empty string indicates that there is no message selector 1601 * for the message consumer. 1602 * @param noLocal if set, inhibits the delivery of messages published by its 1603 * own connection 1604 * @return TopicSubscriber 1605 * @throws JMSException if the session fails to create a subscriber due to 1606 * some internal error. 1607 * @throws InvalidDestinationException if an invalid topic is specified. 1608 * @throws InvalidSelectorException if the message selector is invalid. 1609 */ 1610 @Override 1611 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1612 checkClosed(); 1613 1614 if (topic instanceof CustomDestination) { 1615 CustomDestination customDestination = (CustomDestination)topic; 1616 return customDestination.createSubscriber(this, messageSelector, noLocal); 1617 } 1618 1619 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1620 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1621 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1622 } 1623 1624 /** 1625 * Creates a publisher for the specified topic. <p/> 1626 * <P> 1627 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1628 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1629 * a topic, it defines a new sequence of messages that have no ordering 1630 * relationship with the messages it has previously sent. 1631 * 1632 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1633 * an unidentified producer 1634 * @return TopicPublisher 1635 * @throws JMSException if the session fails to create a publisher due to 1636 * some internal error. 1637 * @throws InvalidDestinationException if an invalid topic is specified. 1638 */ 1639 @Override 1640 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1641 checkClosed(); 1642 1643 if (topic instanceof CustomDestination) { 1644 CustomDestination customDestination = (CustomDestination)topic; 1645 return customDestination.createPublisher(this); 1646 } 1647 int timeSendOut = connection.getSendTimeout(); 1648 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1649 } 1650 1651 /** 1652 * Unsubscribes a durable subscription that has been created by a client. 1653 * <P> 1654 * This method deletes the state being maintained on behalf of the 1655 * subscriber by its provider. 1656 * <P> 1657 * It is erroneous for a client to delete a durable subscription while there 1658 * is an active <CODE>MessageConsumer </CODE> or 1659 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1660 * message is part of a pending transaction or has not been acknowledged in 1661 * the session. 1662 * 1663 * @param name the name used to identify this subscription 1664 * @throws JMSException if the session fails to unsubscribe to the durable 1665 * subscription due to some internal error. 1666 * @throws InvalidDestinationException if an invalid subscription name is 1667 * specified. 1668 * @since 1.1 1669 */ 1670 @Override 1671 public void unsubscribe(String name) throws JMSException { 1672 checkClosed(); 1673 connection.unsubscribe(name); 1674 } 1675 1676 @Override 1677 public void dispatch(MessageDispatch messageDispatch) { 1678 try { 1679 executor.execute(messageDispatch); 1680 } catch (InterruptedException e) { 1681 Thread.currentThread().interrupt(); 1682 connection.onClientInternalException(e); 1683 } 1684 } 1685 1686 /** 1687 * Acknowledges all consumed messages of the session of this consumed 1688 * message. 1689 * <P> 1690 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1691 * for use when a client has specified that its JMS session's consumed 1692 * messages are to be explicitly acknowledged. By invoking 1693 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1694 * all messages consumed by the session that the message was delivered to. 1695 * <P> 1696 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1697 * sessions and sessions specified to use implicit acknowledgement modes. 1698 * <P> 1699 * A client may individually acknowledge each message as it is consumed, or 1700 * it may choose to acknowledge messages as an application-defined group 1701 * (which is done by calling acknowledge on the last received message of the 1702 * group, thereby acknowledging all messages consumed by the session.) 1703 * <P> 1704 * Messages that have been received but not acknowledged may be redelivered. 1705 * 1706 * @throws JMSException if the JMS provider fails to acknowledge the 1707 * messages due to some internal error. 1708 * @throws javax.jms.IllegalStateException if this method is called on a 1709 * closed session. 1710 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1711 */ 1712 public void acknowledge() throws JMSException { 1713 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1714 ActiveMQMessageConsumer c = iter.next(); 1715 c.acknowledge(); 1716 } 1717 } 1718 1719 /** 1720 * Add a message consumer. 1721 * 1722 * @param consumer - message consumer. 1723 * @throws JMSException 1724 */ 1725 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1726 this.consumers.add(consumer); 1727 if (consumer.isDurableSubscriber()) { 1728 stats.onCreateDurableSubscriber(); 1729 } 1730 this.connection.addDispatcher(consumer.getConsumerId(), this); 1731 } 1732 1733 /** 1734 * Remove the message consumer. 1735 * 1736 * @param consumer - consumer to be removed. 1737 * @throws JMSException 1738 */ 1739 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1740 this.connection.removeDispatcher(consumer.getConsumerId()); 1741 if (consumer.isDurableSubscriber()) { 1742 stats.onRemoveDurableSubscriber(); 1743 } 1744 this.consumers.remove(consumer); 1745 this.connection.removeDispatcher(consumer); 1746 } 1747 1748 /** 1749 * Adds a message producer. 1750 * 1751 * @param producer - message producer to be added. 1752 * @throws JMSException 1753 */ 1754 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1755 this.producers.add(producer); 1756 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1757 } 1758 1759 /** 1760 * Removes a message producer. 1761 * 1762 * @param producer - message producer to be removed. 1763 * @throws JMSException 1764 */ 1765 protected void removeProducer(ActiveMQMessageProducer producer) { 1766 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1767 this.producers.remove(producer); 1768 } 1769 1770 /** 1771 * Start this Session. 1772 * 1773 * @throws JMSException 1774 */ 1775 protected void start() throws JMSException { 1776 started.set(true); 1777 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1778 ActiveMQMessageConsumer c = iter.next(); 1779 c.start(); 1780 } 1781 executor.start(); 1782 } 1783 1784 /** 1785 * Stops this session. 1786 * 1787 * @throws JMSException 1788 */ 1789 protected void stop() throws JMSException { 1790 1791 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1792 ActiveMQMessageConsumer c = iter.next(); 1793 c.stop(); 1794 } 1795 1796 started.set(false); 1797 executor.stop(); 1798 } 1799 1800 /** 1801 * Returns the session id. 1802 * 1803 * @return value - session id. 1804 */ 1805 protected SessionId getSessionId() { 1806 return info.getSessionId(); 1807 } 1808 1809 /** 1810 * @return 1811 */ 1812 protected ConsumerId getNextConsumerId() { 1813 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1814 } 1815 1816 /** 1817 * @return 1818 */ 1819 protected ProducerId getNextProducerId() { 1820 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1821 } 1822 1823 /** 1824 * Sends the message for dispatch by the broker. 1825 * 1826 * 1827 * @param producer - message producer. 1828 * @param destination - message destination. 1829 * @param message - message to be sent. 1830 * @param deliveryMode - JMS messsage delivery mode. 1831 * @param priority - message priority. 1832 * @param timeToLive - message expiration. 1833 * @param producerWindow 1834 * @param onComplete 1835 * @throws JMSException 1836 */ 1837 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1838 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1839 1840 checkClosed(); 1841 if (destination.isTemporary() && connection.isDeleted(destination)) { 1842 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1843 } 1844 synchronized (sendMutex) { 1845 // tell the Broker we are about to start a new transaction 1846 doStartTransaction(); 1847 TransactionId txid = transactionContext.getTransactionId(); 1848 long sequenceNumber = producer.getMessageSequence(); 1849 1850 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1851 message.setJMSDeliveryMode(deliveryMode); 1852 long expiration = 0L; 1853 if (!producer.getDisableMessageTimestamp()) { 1854 long timeStamp = System.currentTimeMillis(); 1855 message.setJMSTimestamp(timeStamp); 1856 if (timeToLive > 0) { 1857 expiration = timeToLive + timeStamp; 1858 } 1859 } 1860 message.setJMSExpiration(expiration); 1861 message.setJMSPriority(priority); 1862 message.setJMSRedelivered(false); 1863 1864 // transform to our own message format here 1865 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1866 msg.setDestination(destination); 1867 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1868 1869 // Set the message id. 1870 if (msg != message) { 1871 message.setJMSMessageID(msg.getMessageId().toString()); 1872 // Make sure the JMS destination is set on the foreign messages too. 1873 message.setJMSDestination(destination); 1874 } 1875 //clear the brokerPath in case we are re-sending this message 1876 msg.setBrokerPath(null); 1877 1878 msg.setTransactionId(txid); 1879 if (connection.isCopyMessageOnSend()) { 1880 msg = (ActiveMQMessage)msg.copy(); 1881 } 1882 msg.setConnection(connection); 1883 msg.onSend(); 1884 msg.setProducerId(msg.getMessageId().getProducerId()); 1885 if (LOG.isTraceEnabled()) { 1886 LOG.trace(getSessionId() + " sending message: " + msg); 1887 } 1888 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1889 this.connection.asyncSendPacket(msg); 1890 if (producerWindow != null) { 1891 // Since we defer lots of the marshaling till we hit the 1892 // wire, this might not 1893 // provide and accurate size. We may change over to doing 1894 // more aggressive marshaling, 1895 // to get more accurate sizes.. this is more important once 1896 // users start using producer window 1897 // flow control. 1898 int size = msg.getSize(); 1899 producerWindow.increaseUsage(size); 1900 } 1901 } else { 1902 if (sendTimeout > 0 && onComplete==null) { 1903 this.connection.syncSendPacket(msg,sendTimeout); 1904 }else { 1905 this.connection.syncSendPacket(msg, onComplete); 1906 } 1907 } 1908 1909 } 1910 } 1911 1912 /** 1913 * Send TransactionInfo to indicate transaction has started 1914 * 1915 * @throws JMSException if some internal error occurs 1916 */ 1917 protected void doStartTransaction() throws JMSException { 1918 if (getTransacted() && !transactionContext.isInXATransaction()) { 1919 transactionContext.begin(); 1920 } 1921 } 1922 1923 /** 1924 * Checks whether the session has unconsumed messages. 1925 * 1926 * @return true - if there are unconsumed messages. 1927 */ 1928 public boolean hasUncomsumedMessages() { 1929 return executor.hasUncomsumedMessages(); 1930 } 1931 1932 /** 1933 * Checks whether the session uses transactions. 1934 * 1935 * @return true - if the session uses transactions. 1936 */ 1937 public boolean isTransacted() { 1938 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1939 } 1940 1941 /** 1942 * Checks whether the session used client acknowledgment. 1943 * 1944 * @return true - if the session uses client acknowledgment. 1945 */ 1946 protected boolean isClientAcknowledge() { 1947 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 1948 } 1949 1950 /** 1951 * Checks whether the session used auto acknowledgment. 1952 * 1953 * @return true - if the session uses client acknowledgment. 1954 */ 1955 public boolean isAutoAcknowledge() { 1956 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 1957 } 1958 1959 /** 1960 * Checks whether the session used dup ok acknowledgment. 1961 * 1962 * @return true - if the session uses client acknowledgment. 1963 */ 1964 public boolean isDupsOkAcknowledge() { 1965 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 1966 } 1967 1968 public boolean isIndividualAcknowledge(){ 1969 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 1970 } 1971 1972 /** 1973 * Returns the message delivery listener. 1974 * 1975 * @return deliveryListener - message delivery listener. 1976 */ 1977 public DeliveryListener getDeliveryListener() { 1978 return deliveryListener; 1979 } 1980 1981 /** 1982 * Sets the message delivery listener. 1983 * 1984 * @param deliveryListener - message delivery listener. 1985 */ 1986 public void setDeliveryListener(DeliveryListener deliveryListener) { 1987 this.deliveryListener = deliveryListener; 1988 } 1989 1990 /** 1991 * Returns the SessionInfo bean. 1992 * 1993 * @return info - SessionInfo bean. 1994 * @throws JMSException 1995 */ 1996 protected SessionInfo getSessionInfo() throws JMSException { 1997 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 1998 return info; 1999 } 2000 2001 /** 2002 * Send the asynchronus command. 2003 * 2004 * @param command - command to be executed. 2005 * @throws JMSException 2006 */ 2007 public void asyncSendPacket(Command command) throws JMSException { 2008 connection.asyncSendPacket(command); 2009 } 2010 2011 /** 2012 * Send the synchronus command. 2013 * 2014 * @param command - command to be executed. 2015 * @return Response 2016 * @throws JMSException 2017 */ 2018 public Response syncSendPacket(Command command) throws JMSException { 2019 return connection.syncSendPacket(command); 2020 } 2021 2022 public long getNextDeliveryId() { 2023 return deliveryIdGenerator.getNextSequenceId(); 2024 } 2025 2026 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2027 2028 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2029 for (MessageDispatch md : c) { 2030 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2031 } 2032 Collections.reverse(c); 2033 2034 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2035 MessageDispatch md = iter.next(); 2036 executor.executeFirst(md); 2037 } 2038 2039 } 2040 2041 public boolean isRunning() { 2042 return started.get(); 2043 } 2044 2045 public boolean isAsyncDispatch() { 2046 return asyncDispatch; 2047 } 2048 2049 public void setAsyncDispatch(boolean asyncDispatch) { 2050 this.asyncDispatch = asyncDispatch; 2051 } 2052 2053 /** 2054 * @return Returns the sessionAsyncDispatch. 2055 */ 2056 public boolean isSessionAsyncDispatch() { 2057 return sessionAsyncDispatch; 2058 } 2059 2060 /** 2061 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2062 */ 2063 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2064 this.sessionAsyncDispatch = sessionAsyncDispatch; 2065 } 2066 2067 public MessageTransformer getTransformer() { 2068 return transformer; 2069 } 2070 2071 public ActiveMQConnection getConnection() { 2072 return connection; 2073 } 2074 2075 /** 2076 * Sets the transformer used to transform messages before they are sent on 2077 * to the JMS bus or when they are received from the bus but before they are 2078 * delivered to the JMS client 2079 */ 2080 public void setTransformer(MessageTransformer transformer) { 2081 this.transformer = transformer; 2082 } 2083 2084 public BlobTransferPolicy getBlobTransferPolicy() { 2085 return blobTransferPolicy; 2086 } 2087 2088 /** 2089 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2090 * OBjects) are transferred from producers to brokers to consumers 2091 */ 2092 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2093 this.blobTransferPolicy = blobTransferPolicy; 2094 } 2095 2096 public List<MessageDispatch> getUnconsumedMessages() { 2097 return executor.getUnconsumedMessages(); 2098 } 2099 2100 @Override 2101 public String toString() { 2102 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}"; 2103 } 2104 2105 public void checkMessageListener() throws JMSException { 2106 if (messageListener != null) { 2107 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2108 } 2109 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2110 ActiveMQMessageConsumer consumer = i.next(); 2111 if (consumer.hasMessageListener()) { 2112 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2113 } 2114 } 2115 } 2116 2117 protected void setOptimizeAcknowledge(boolean value) { 2118 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2119 ActiveMQMessageConsumer c = iter.next(); 2120 c.setOptimizeAcknowledge(value); 2121 } 2122 } 2123 2124 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2125 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2126 ActiveMQMessageConsumer c = iter.next(); 2127 if (c.getConsumerId().equals(id)) { 2128 c.setPrefetchSize(prefetch); 2129 break; 2130 } 2131 } 2132 } 2133 2134 protected void close(ConsumerId id) { 2135 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2136 ActiveMQMessageConsumer c = iter.next(); 2137 if (c.getConsumerId().equals(id)) { 2138 try { 2139 c.close(); 2140 } catch (JMSException e) { 2141 LOG.warn("Exception closing consumer", e); 2142 } 2143 LOG.warn("Closed consumer on Command, " + id); 2144 break; 2145 } 2146 } 2147 } 2148 2149 public boolean isInUse(ActiveMQTempDestination destination) { 2150 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2151 ActiveMQMessageConsumer c = iter.next(); 2152 if (c.isInUse(destination)) { 2153 return true; 2154 } 2155 } 2156 return false; 2157 } 2158 2159 /** 2160 * highest sequence id of the last message delivered by this session. 2161 * Passed to the broker in the close command, maintained by dispose() 2162 * @return lastDeliveredSequenceId 2163 */ 2164 public long getLastDeliveredSequenceId() { 2165 return lastDeliveredSequenceId; 2166 } 2167 2168 protected void sendAck(MessageAck ack) throws JMSException { 2169 sendAck(ack,false); 2170 } 2171 2172 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2173 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2174 asyncSendPacket(ack); 2175 } else { 2176 syncSendPacket(ack); 2177 } 2178 } 2179 2180 protected Scheduler getScheduler() throws JMSException { 2181 return this.connection.getScheduler(); 2182 } 2183 2184 protected ThreadPoolExecutor getConnectionExecutor() { 2185 return this.connectionExecutor; 2186 } 2187}