001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.List; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import javax.jms.JMSException; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 033import org.apache.activemq.command.ConsumerControl; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.Message; 036import org.apache.activemq.command.MessageAck; 037import org.apache.activemq.command.MessageDispatch; 038import org.apache.activemq.command.MessageDispatchNotification; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.MessagePull; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.transaction.Synchronization; 044import org.apache.activemq.transport.TransmitCallback; 045import org.apache.activemq.usage.SystemUsage; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A subscription that honors the pre-fetch option of the ConsumerInfo. 051 */ 052public abstract class PrefetchSubscription extends AbstractSubscription { 053 054 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 055 protected final Scheduler scheduler; 056 057 protected PendingMessageCursor pending; 058 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 059 protected final AtomicInteger prefetchExtension = new AtomicInteger(); 060 protected boolean usePrefetchExtension = true; 061 protected long enqueueCounter; 062 protected long dispatchCounter; 063 protected long dequeueCounter; 064 private int maxProducersToAudit=32; 065 private int maxAuditDepth=2048; 066 protected final SystemUsage usageManager; 067 protected final Object pendingLock = new Object(); 068 protected final Object dispatchLock = new Object(); 069 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 070 071 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException { 072 super(broker,context, info); 073 this.usageManager=usageManager; 074 pending = cursor; 075 try { 076 pending.start(); 077 } catch (Exception e) { 078 throw new JMSException(e.getMessage()); 079 } 080 this.scheduler = broker.getScheduler(); 081 } 082 083 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { 084 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 085 } 086 087 /** 088 * Allows a message to be pulled on demand by a client 089 */ 090 @Override 091 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 092 // The slave should not deliver pull messages. 093 // TODO: when the slave becomes a master, He should send a NULL message to all the 094 // consumers to 'wake them up' in case they were waiting for a message. 095 if (getPrefetchSize() == 0) { 096 prefetchExtension.set(pull.getQuantity()); 097 final long dispatchCounterBeforePull = dispatchCounter; 098 099 // Have the destination push us some messages. 100 for (Destination dest : destinations) { 101 dest.iterate(); 102 } 103 dispatchPending(); 104 105 synchronized(this) { 106 // If there was nothing dispatched.. we may need to setup a timeout. 107 if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone()) { 108 // immediate timeout used by receiveNoWait() 109 if (pull.getTimeout() == -1) { 110 // Null message indicates the pull is done or did not have pending. 111 prefetchExtension.set(1); 112 add(QueueMessageReference.NULL_MESSAGE); 113 dispatchPending(); 114 } 115 if (pull.getTimeout() > 0) { 116 scheduler.executeAfterDelay(new Runnable() { 117 @Override 118 public void run() { 119 pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone()); 120 } 121 }, pull.getTimeout()); 122 } 123 } 124 } 125 } 126 return null; 127 } 128 129 /** 130 * Occurs when a pull times out. If nothing has been dispatched since the 131 * timeout was setup, then send the NULL message. 132 */ 133 final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) { 134 synchronized (pendingLock) { 135 if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) { 136 try { 137 prefetchExtension.set(1); 138 add(QueueMessageReference.NULL_MESSAGE); 139 dispatchPending(); 140 } catch (Exception e) { 141 context.getConnection().serviceException(e); 142 } finally { 143 prefetchExtension.set(0); 144 } 145 } 146 } 147 } 148 149 @Override 150 public void add(MessageReference node) throws Exception { 151 synchronized (pendingLock) { 152 // The destination may have just been removed... 153 if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) { 154 // perhaps we should inform the caller that we are no longer valid to dispatch to? 155 return; 156 } 157 158 // Don't increment for the pullTimeout control message. 159 if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { 160 enqueueCounter++; 161 } 162 pending.addMessageLast(node); 163 } 164 dispatchPending(); 165 } 166 167 @Override 168 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 169 synchronized(pendingLock) { 170 try { 171 pending.reset(); 172 while (pending.hasNext()) { 173 MessageReference node = pending.next(); 174 node.decrementReferenceCount(); 175 if (node.getMessageId().equals(mdn.getMessageId())) { 176 // Synchronize between dispatched list and removal of messages from pending list 177 // related to remove subscription action 178 synchronized(dispatchLock) { 179 pending.remove(); 180 createMessageDispatch(node, node.getMessage()); 181 dispatched.add(node); 182 onDispatch(node, node.getMessage()); 183 } 184 return; 185 } 186 } 187 } finally { 188 pending.release(); 189 } 190 } 191 throw new JMSException( 192 "Slave broker out of sync with master: Dispatched message (" 193 + mdn.getMessageId() + ") was not in the pending list for " 194 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 195 } 196 197 @Override 198 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 199 // Handle the standard acknowledgment case. 200 boolean callDispatchMatched = false; 201 Destination destination = null; 202 203 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 204 // suppress unexpected ack exception in this expected case 205 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack); 206 return; 207 } 208 209 LOG.trace("ack: {}", ack); 210 211 synchronized(dispatchLock) { 212 if (ack.isStandardAck()) { 213 // First check if the ack matches the dispatched. When using failover this might 214 // not be the case. We don't ever want to ack the wrong messages. 215 assertAckMatchesDispatched(ack); 216 217 // Acknowledge all dispatched messages up till the message id of 218 // the acknowledgment. 219 boolean inAckRange = false; 220 List<MessageReference> removeList = new ArrayList<MessageReference>(); 221 for (final MessageReference node : dispatched) { 222 MessageId messageId = node.getMessageId(); 223 if (ack.getFirstMessageId() == null 224 || ack.getFirstMessageId().equals(messageId)) { 225 inAckRange = true; 226 } 227 if (inAckRange) { 228 // Don't remove the nodes until we are committed. 229 if (!context.isInTransaction()) { 230 dequeueCounter++; 231 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 232 removeList.add(node); 233 } else { 234 registerRemoveSync(context, node); 235 } 236 acknowledge(context, ack, node); 237 if (ack.getLastMessageId().equals(messageId)) { 238 destination = (Destination) node.getRegionDestination(); 239 callDispatchMatched = true; 240 break; 241 } 242 } 243 } 244 for (final MessageReference node : removeList) { 245 dispatched.remove(node); 246 } 247 // this only happens after a reconnect - get an ack which is not 248 // valid 249 if (!callDispatchMatched) { 250 LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack); 251 } 252 } else if (ack.isIndividualAck()) { 253 // Message was delivered and acknowledge - but only delete the 254 // individual message 255 for (final MessageReference node : dispatched) { 256 MessageId messageId = node.getMessageId(); 257 if (ack.getLastMessageId().equals(messageId)) { 258 // Don't remove the nodes until we are committed - immediateAck option 259 if (!context.isInTransaction()) { 260 dequeueCounter++; 261 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 262 dispatched.remove(node); 263 } else { 264 registerRemoveSync(context, node); 265 } 266 267 if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) { 268 // allow transaction batch to exceed prefetch 269 while (true) { 270 int currentExtension = prefetchExtension.get(); 271 int newExtension = Math.max(currentExtension, currentExtension + 1); 272 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 273 break; 274 } 275 } 276 } 277 278 acknowledge(context, ack, node); 279 destination = (Destination) node.getRegionDestination(); 280 callDispatchMatched = true; 281 break; 282 } 283 } 284 }else if (ack.isDeliveredAck() || ack.isExpiredAck()) { 285 // Message was delivered but not acknowledged: update pre-fetch 286 // counters. 287 int index = 0; 288 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 289 final MessageReference node = iter.next(); 290 Destination nodeDest = (Destination) node.getRegionDestination(); 291 if (node.isExpired()) { 292 if (broker.isExpired(node)) { 293 Destination regionDestination = nodeDest; 294 regionDestination.messageExpired(context, this, node); 295 } 296 iter.remove(); 297 nodeDest.getDestinationStatistics().getInflight().decrement(); 298 } 299 if (ack.getLastMessageId().equals(node.getMessageId())) { 300 if (usePrefetchExtension && getPrefetchSize() != 0) { 301 // allow batch to exceed prefetch 302 while (true) { 303 int currentExtension = prefetchExtension.get(); 304 int newExtension = Math.max(currentExtension, index + 1); 305 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 306 break; 307 } 308 } 309 } 310 destination = nodeDest; 311 callDispatchMatched = true; 312 break; 313 } 314 } 315 if (!callDispatchMatched) { 316 throw new JMSException( 317 "Could not correlate acknowledgment with dispatched message: " 318 + ack); 319 } 320 } else if (ack.isRedeliveredAck()) { 321 // Message was re-delivered but it was not yet considered to be 322 // a DLQ message. 323 boolean inAckRange = false; 324 for (final MessageReference node : dispatched) { 325 MessageId messageId = node.getMessageId(); 326 if (ack.getFirstMessageId() == null 327 || ack.getFirstMessageId().equals(messageId)) { 328 inAckRange = true; 329 } 330 if (inAckRange) { 331 if (ack.getLastMessageId().equals(messageId)) { 332 destination = (Destination) node.getRegionDestination(); 333 callDispatchMatched = true; 334 break; 335 } 336 } 337 } 338 if (!callDispatchMatched) { 339 throw new JMSException( 340 "Could not correlate acknowledgment with dispatched message: " 341 + ack); 342 } 343 } else if (ack.isPoisonAck()) { 344 // TODO: what if the message is already in a DLQ??? 345 // Handle the poison ACK case: we need to send the message to a 346 // DLQ 347 if (ack.isInTransaction()) { 348 throw new JMSException("Poison ack cannot be transacted: " 349 + ack); 350 } 351 int index = 0; 352 boolean inAckRange = false; 353 List<MessageReference> removeList = new ArrayList<MessageReference>(); 354 for (final MessageReference node : dispatched) { 355 MessageId messageId = node.getMessageId(); 356 if (ack.getFirstMessageId() == null 357 || ack.getFirstMessageId().equals(messageId)) { 358 inAckRange = true; 359 } 360 if (inAckRange) { 361 sendToDLQ(context, node, ack.getPoisonCause()); 362 Destination nodeDest = (Destination) node.getRegionDestination(); 363 nodeDest.getDestinationStatistics() 364 .getInflight().decrement(); 365 removeList.add(node); 366 dequeueCounter++; 367 index++; 368 acknowledge(context, ack, node); 369 if (ack.getLastMessageId().equals(messageId)) { 370 while (true) { 371 int currentExtension = prefetchExtension.get(); 372 int newExtension = Math.max(0, currentExtension - (index + 1)); 373 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 374 break; 375 } 376 } 377 destination = nodeDest; 378 callDispatchMatched = true; 379 break; 380 } 381 } 382 } 383 for (final MessageReference node : removeList) { 384 dispatched.remove(node); 385 } 386 if (!callDispatchMatched) { 387 throw new JMSException( 388 "Could not correlate acknowledgment with dispatched message: " 389 + ack); 390 } 391 } 392 } 393 if (callDispatchMatched && destination != null) { 394 destination.wakeup(); 395 dispatchPending(); 396 397 if (pending.isEmpty()) { 398 for (Destination dest : destinations) { 399 dest.wakeup(); 400 } 401 } 402 } else { 403 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack); 404 } 405 } 406 407 private void registerRemoveSync(ConnectionContext context, final MessageReference node) { 408 // setup a Synchronization to remove nodes from the 409 // dispatched list. 410 context.getTransaction().addSynchronization( 411 new Synchronization() { 412 413 @Override 414 public void beforeEnd() { 415 if (usePrefetchExtension && getPrefetchSize() != 0) { 416 while (true) { 417 int currentExtension = prefetchExtension.get(); 418 int newExtension = Math.max(0, currentExtension - 1); 419 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 420 break; 421 } 422 } 423 } 424 } 425 426 @Override 427 public void afterCommit() 428 throws Exception { 429 Destination nodeDest = (Destination) node.getRegionDestination(); 430 synchronized(dispatchLock) { 431 dequeueCounter++; 432 dispatched.remove(node); 433 nodeDest.getDestinationStatistics().getInflight().decrement(); 434 } 435 nodeDest.wakeup(); 436 dispatchPending(); 437 } 438 439 @Override 440 public void afterRollback() throws Exception { 441 synchronized(dispatchLock) { 442 // poisionAck will decrement - otherwise still inflight on client 443 } 444 } 445 }); 446 } 447 448 /** 449 * Checks an ack versus the contents of the dispatched list. 450 * called with dispatchLock held 451 * @param ack 452 * @throws JMSException if it does not match 453 */ 454 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 455 MessageId firstAckedMsg = ack.getFirstMessageId(); 456 MessageId lastAckedMsg = ack.getLastMessageId(); 457 int checkCount = 0; 458 boolean checkFoundStart = false; 459 boolean checkFoundEnd = false; 460 for (MessageReference node : dispatched) { 461 462 if (firstAckedMsg == null) { 463 checkFoundStart = true; 464 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 465 checkFoundStart = true; 466 } 467 468 if (checkFoundStart) { 469 checkCount++; 470 } 471 472 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 473 checkFoundEnd = true; 474 break; 475 } 476 } 477 if (!checkFoundStart && firstAckedMsg != null) 478 throw new JMSException("Unmatched acknowledge: " + ack 479 + "; Could not find Message-ID " + firstAckedMsg 480 + " in dispatched-list (start of ack)"); 481 if (!checkFoundEnd && lastAckedMsg != null) 482 throw new JMSException("Unmatched acknowledge: " + ack 483 + "; Could not find Message-ID " + lastAckedMsg 484 + " in dispatched-list (end of ack)"); 485 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 486 throw new JMSException("Unmatched acknowledge: " + ack 487 + "; Expected message count (" + ack.getMessageCount() 488 + ") differs from count in dispatched-list (" + checkCount 489 + ")"); 490 } 491 } 492 493 /** 494 * 495 * @param context 496 * @param node 497 * @param poisonCause 498 * @throws IOException 499 * @throws Exception 500 */ 501 protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception { 502 broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause); 503 } 504 505 @Override 506 public int getInFlightSize() { 507 return dispatched.size(); 508 } 509 510 /** 511 * Used to determine if the broker can dispatch to the consumer. 512 * 513 * @return 514 */ 515 @Override 516 public boolean isFull() { 517 return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); 518 } 519 520 /** 521 * @return true when 60% or more room is left for dispatching messages 522 */ 523 @Override 524 public boolean isLowWaterMark() { 525 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 526 } 527 528 /** 529 * @return true when 10% or less room is left for dispatching messages 530 */ 531 @Override 532 public boolean isHighWaterMark() { 533 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 534 } 535 536 @Override 537 public int countBeforeFull() { 538 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); 539 } 540 541 @Override 542 public int getPendingQueueSize() { 543 return pending.size(); 544 } 545 546 @Override 547 public int getDispatchedQueueSize() { 548 return dispatched.size(); 549 } 550 551 @Override 552 public long getDequeueCounter() { 553 return dequeueCounter; 554 } 555 556 @Override 557 public long getDispatchedCounter() { 558 return dispatchCounter; 559 } 560 561 @Override 562 public long getEnqueueCounter() { 563 return enqueueCounter; 564 } 565 566 @Override 567 public boolean isRecoveryRequired() { 568 return pending.isRecoveryRequired(); 569 } 570 571 public PendingMessageCursor getPending() { 572 return this.pending; 573 } 574 575 public void setPending(PendingMessageCursor pending) { 576 this.pending = pending; 577 if (this.pending!=null) { 578 this.pending.setSystemUsage(usageManager); 579 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 580 } 581 } 582 583 @Override 584 public void add(ConnectionContext context, Destination destination) throws Exception { 585 synchronized(pendingLock) { 586 super.add(context, destination); 587 pending.add(context, destination); 588 } 589 } 590 591 @Override 592 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 593 return remove(context, destination, dispatched); 594 } 595 596 public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception { 597 List<MessageReference> rc = new ArrayList<MessageReference>(); 598 synchronized(pendingLock) { 599 super.remove(context, destination); 600 // Here is a potential problem concerning Inflight stat: 601 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 602 // Except if each commit or rollback callback action comes before remove of subscriber. 603 rc.addAll(pending.remove(context, destination)); 604 605 if (dispatched == null) { 606 return rc; 607 } 608 609 // Synchronized to DispatchLock if necessary 610 if (dispatched == this.dispatched) { 611 synchronized(dispatchLock) { 612 updateDestinationStats(rc, destination, dispatched); 613 } 614 } else { 615 updateDestinationStats(rc, destination, dispatched); 616 } 617 } 618 return rc; 619 } 620 621 private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) { 622 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 623 for (MessageReference r : dispatched) { 624 if (r.getRegionDestination() == destination) { 625 references.add(r); 626 } 627 } 628 rc.addAll(references); 629 destination.getDestinationStatistics().getInflight().subtract(references.size()); 630 dispatched.removeAll(references); 631 } 632 633 // made public so it can be used in MQTTProtocolConverter 634 public void dispatchPending() throws IOException { 635 synchronized(pendingLock) { 636 try { 637 int numberToDispatch = countBeforeFull(); 638 if (numberToDispatch > 0) { 639 setSlowConsumer(false); 640 setPendingBatchSize(pending, numberToDispatch); 641 int count = 0; 642 pending.reset(); 643 while (pending.hasNext() && !isFull() && count < numberToDispatch) { 644 MessageReference node = pending.next(); 645 if (node == null) { 646 break; 647 } 648 649 // Synchronize between dispatched list and remove of message from pending list 650 // related to remove subscription action 651 synchronized(dispatchLock) { 652 pending.remove(); 653 node.decrementReferenceCount(); 654 if( !isDropped(node) && canDispatch(node)) { 655 656 // Message may have been sitting in the pending 657 // list a while waiting for the consumer to ak the message. 658 if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 659 //increment number to dispatch 660 numberToDispatch++; 661 if (broker.isExpired(node)) { 662 ((Destination)node.getRegionDestination()).messageExpired(context, this, node); 663 } 664 continue; 665 } 666 dispatch(node); 667 count++; 668 } 669 } 670 } 671 } else if (!isSlowConsumer()) { 672 setSlowConsumer(true); 673 for (Destination dest :destinations) { 674 dest.slowConsumer(context, this); 675 } 676 } 677 } finally { 678 pending.release(); 679 } 680 } 681 } 682 683 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 684 pending.setMaxBatchSize(numberToDispatch); 685 } 686 687 // called with dispatchLock held 688 protected boolean dispatch(final MessageReference node) throws IOException { 689 final Message message = node.getMessage(); 690 if (message == null) { 691 return false; 692 } 693 694 okForAckAsDispatchDone.countDown(); 695 696 MessageDispatch md = createMessageDispatch(node, message); 697 if (node != QueueMessageReference.NULL_MESSAGE) { 698 dispatchCounter++; 699 dispatched.add(node); 700 } 701 if (getPrefetchSize() == 0) { 702 while (true) { 703 int currentExtension = prefetchExtension.get(); 704 int newExtension = Math.max(0, currentExtension - 1); 705 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 706 break; 707 } 708 } 709 } 710 if (info.isDispatchAsync()) { 711 md.setTransmitCallback(new TransmitCallback() { 712 713 @Override 714 public void onSuccess() { 715 // Since the message gets queued up in async dispatch, we don't want to 716 // decrease the reference count until it gets put on the wire. 717 onDispatch(node, message); 718 } 719 720 @Override 721 public void onFailure() { 722 Destination nodeDest = (Destination) node.getRegionDestination(); 723 if (nodeDest != null) { 724 if (node != QueueMessageReference.NULL_MESSAGE) { 725 nodeDest.getDestinationStatistics().getDispatched().increment(); 726 nodeDest.getDestinationStatistics().getInflight().increment(); 727 LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() }); 728 } 729 } 730 if (node instanceof QueueMessageReference) { 731 ((QueueMessageReference) node).unlock(); 732 } 733 } 734 }); 735 context.getConnection().dispatchAsync(md); 736 } else { 737 context.getConnection().dispatchSync(md); 738 onDispatch(node, message); 739 } 740 return true; 741 } 742 743 protected void onDispatch(final MessageReference node, final Message message) { 744 Destination nodeDest = (Destination) node.getRegionDestination(); 745 if (nodeDest != null) { 746 if (node != QueueMessageReference.NULL_MESSAGE) { 747 nodeDest.getDestinationStatistics().getDispatched().increment(); 748 nodeDest.getDestinationStatistics().getInflight().increment(); 749 LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() }); 750 } 751 } 752 753 if (info.isDispatchAsync()) { 754 try { 755 dispatchPending(); 756 } catch (IOException e) { 757 context.getConnection().serviceExceptionAsync(e); 758 } 759 } 760 } 761 762 /** 763 * inform the MessageConsumer on the client to change it's prefetch 764 * 765 * @param newPrefetch 766 */ 767 @Override 768 public void updateConsumerPrefetch(int newPrefetch) { 769 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 770 ConsumerControl cc = new ConsumerControl(); 771 cc.setConsumerId(info.getConsumerId()); 772 cc.setPrefetch(newPrefetch); 773 context.getConnection().dispatchAsync(cc); 774 } 775 } 776 777 /** 778 * @param node 779 * @param message 780 * @return MessageDispatch 781 */ 782 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 783 MessageDispatch md = new MessageDispatch(); 784 md.setConsumerId(info.getConsumerId()); 785 786 if (node == QueueMessageReference.NULL_MESSAGE) { 787 md.setMessage(null); 788 md.setDestination(null); 789 } else { 790 Destination regionDestination = (Destination) node.getRegionDestination(); 791 md.setDestination(regionDestination.getActiveMQDestination()); 792 md.setMessage(message); 793 md.setRedeliveryCounter(node.getRedeliveryCounter()); 794 } 795 796 return md; 797 } 798 799 /** 800 * Use when a matched message is about to be dispatched to the client. 801 * 802 * @param node 803 * @return false if the message should not be dispatched to the client 804 * (another sub may have already dispatched it for example). 805 * @throws IOException 806 */ 807 protected abstract boolean canDispatch(MessageReference node) throws IOException; 808 809 protected abstract boolean isDropped(MessageReference node); 810 811 /** 812 * Used during acknowledgment to remove the message. 813 * 814 * @throws IOException 815 */ 816 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 817 818 819 public int getMaxProducersToAudit() { 820 return maxProducersToAudit; 821 } 822 823 public void setMaxProducersToAudit(int maxProducersToAudit) { 824 this.maxProducersToAudit = maxProducersToAudit; 825 if (this.pending != null) { 826 this.pending.setMaxProducersToAudit(maxProducersToAudit); 827 } 828 } 829 830 public int getMaxAuditDepth() { 831 return maxAuditDepth; 832 } 833 834 public void setMaxAuditDepth(int maxAuditDepth) { 835 this.maxAuditDepth = maxAuditDepth; 836 if (this.pending != null) { 837 this.pending.setMaxAuditDepth(maxAuditDepth); 838 } 839 } 840 841 public boolean isUsePrefetchExtension() { 842 return usePrefetchExtension; 843 } 844 845 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 846 this.usePrefetchExtension = usePrefetchExtension; 847 } 848 849 protected int getPrefetchExtension() { 850 return this.prefetchExtension.get(); 851 } 852 853 @Override 854 public void setPrefetchSize(int prefetchSize) { 855 this.info.setPrefetchSize(prefetchSize); 856 try { 857 this.dispatchPending(); 858 } catch (Exception e) { 859 LOG.trace("Caught exception during dispatch after prefetch change.", e); 860 } 861 } 862}