001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.List;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import javax.jms.JMSException;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatch;
038import org.apache.activemq.command.MessageDispatchNotification;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.transaction.Synchronization;
044import org.apache.activemq.transport.TransmitCallback;
045import org.apache.activemq.usage.SystemUsage;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A subscription that honors the pre-fetch option of the ConsumerInfo.
051 */
052public abstract class PrefetchSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
055    protected final Scheduler scheduler;
056
057    protected PendingMessageCursor pending;
058    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
059    protected final AtomicInteger prefetchExtension = new AtomicInteger();
060    protected boolean usePrefetchExtension = true;
061    protected long enqueueCounter;
062    protected long dispatchCounter;
063    protected long dequeueCounter;
064    private int maxProducersToAudit=32;
065    private int maxAuditDepth=2048;
066    protected final SystemUsage usageManager;
067    protected final Object pendingLock = new Object();
068    protected final Object dispatchLock = new Object();
069    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
070
071    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
072        super(broker,context, info);
073        this.usageManager=usageManager;
074        pending = cursor;
075        try {
076            pending.start();
077        } catch (Exception e) {
078            throw new JMSException(e.getMessage());
079        }
080        this.scheduler = broker.getScheduler();
081    }
082
083    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
084        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
085    }
086
087    /**
088     * Allows a message to be pulled on demand by a client
089     */
090    @Override
091    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
092        // The slave should not deliver pull messages.
093        // TODO: when the slave becomes a master, He should send a NULL message to all the
094        // consumers to 'wake them up' in case they were waiting for a message.
095        if (getPrefetchSize() == 0) {
096            prefetchExtension.set(pull.getQuantity());
097            final long dispatchCounterBeforePull = dispatchCounter;
098
099            // Have the destination push us some messages.
100            for (Destination dest : destinations) {
101                dest.iterate();
102            }
103            dispatchPending();
104
105            synchronized(this) {
106                // If there was nothing dispatched.. we may need to setup a timeout.
107                if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone()) {
108                    // immediate timeout used by receiveNoWait()
109                    if (pull.getTimeout() == -1) {
110                        // Null message indicates the pull is done or did not have pending.
111                        prefetchExtension.set(1);
112                        add(QueueMessageReference.NULL_MESSAGE);
113                        dispatchPending();
114                    }
115                    if (pull.getTimeout() > 0) {
116                        scheduler.executeAfterDelay(new Runnable() {
117                            @Override
118                            public void run() {
119                                pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
120                            }
121                        }, pull.getTimeout());
122                    }
123                }
124            }
125        }
126        return null;
127    }
128
129    /**
130     * Occurs when a pull times out. If nothing has been dispatched since the
131     * timeout was setup, then send the NULL message.
132     */
133    final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
134        synchronized (pendingLock) {
135            if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) {
136                try {
137                    prefetchExtension.set(1);
138                    add(QueueMessageReference.NULL_MESSAGE);
139                    dispatchPending();
140                } catch (Exception e) {
141                    context.getConnection().serviceException(e);
142                } finally {
143                    prefetchExtension.set(0);
144                }
145            }
146        }
147    }
148
149    @Override
150    public void add(MessageReference node) throws Exception {
151        synchronized (pendingLock) {
152            // The destination may have just been removed...
153            if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
154                // perhaps we should inform the caller that we are no longer valid to dispatch to?
155                return;
156            }
157
158            // Don't increment for the pullTimeout control message.
159            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
160                enqueueCounter++;
161            }
162            pending.addMessageLast(node);
163        }
164        dispatchPending();
165    }
166
167    @Override
168    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
169        synchronized(pendingLock) {
170            try {
171                pending.reset();
172                while (pending.hasNext()) {
173                    MessageReference node = pending.next();
174                    node.decrementReferenceCount();
175                    if (node.getMessageId().equals(mdn.getMessageId())) {
176                        // Synchronize between dispatched list and removal of messages from pending list
177                        // related to remove subscription action
178                        synchronized(dispatchLock) {
179                            pending.remove();
180                            createMessageDispatch(node, node.getMessage());
181                            dispatched.add(node);
182                            onDispatch(node, node.getMessage());
183                        }
184                        return;
185                    }
186                }
187            } finally {
188                pending.release();
189            }
190        }
191        throw new JMSException(
192                "Slave broker out of sync with master: Dispatched message ("
193                        + mdn.getMessageId() + ") was not in the pending list for "
194                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
195    }
196
197    @Override
198    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
199        // Handle the standard acknowledgment case.
200        boolean callDispatchMatched = false;
201        Destination destination = null;
202
203        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
204            // suppress unexpected ack exception in this expected case
205            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
206            return;
207        }
208
209        LOG.trace("ack: {}", ack);
210
211        synchronized(dispatchLock) {
212            if (ack.isStandardAck()) {
213                // First check if the ack matches the dispatched. When using failover this might
214                // not be the case. We don't ever want to ack the wrong messages.
215                assertAckMatchesDispatched(ack);
216
217                // Acknowledge all dispatched messages up till the message id of
218                // the acknowledgment.
219                boolean inAckRange = false;
220                List<MessageReference> removeList = new ArrayList<MessageReference>();
221                for (final MessageReference node : dispatched) {
222                    MessageId messageId = node.getMessageId();
223                    if (ack.getFirstMessageId() == null
224                            || ack.getFirstMessageId().equals(messageId)) {
225                        inAckRange = true;
226                    }
227                    if (inAckRange) {
228                        // Don't remove the nodes until we are committed.
229                        if (!context.isInTransaction()) {
230                            dequeueCounter++;
231                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
232                            removeList.add(node);
233                        } else {
234                            registerRemoveSync(context, node);
235                        }
236                        acknowledge(context, ack, node);
237                        if (ack.getLastMessageId().equals(messageId)) {
238                            destination = (Destination) node.getRegionDestination();
239                            callDispatchMatched = true;
240                            break;
241                        }
242                    }
243                }
244                for (final MessageReference node : removeList) {
245                    dispatched.remove(node);
246                }
247                // this only happens after a reconnect - get an ack which is not
248                // valid
249                if (!callDispatchMatched) {
250                    LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
251                }
252            } else if (ack.isIndividualAck()) {
253                // Message was delivered and acknowledge - but only delete the
254                // individual message
255                for (final MessageReference node : dispatched) {
256                    MessageId messageId = node.getMessageId();
257                    if (ack.getLastMessageId().equals(messageId)) {
258                        // Don't remove the nodes until we are committed - immediateAck option
259                        if (!context.isInTransaction()) {
260                            dequeueCounter++;
261                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
262                            dispatched.remove(node);
263                        } else {
264                            registerRemoveSync(context, node);
265                        }
266
267                        if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
268                            // allow transaction batch to exceed prefetch
269                            while (true) {
270                                int currentExtension = prefetchExtension.get();
271                                int newExtension = Math.max(currentExtension, currentExtension + 1);
272                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
273                                    break;
274                                }
275                            }
276                        }
277
278                        acknowledge(context, ack, node);
279                        destination = (Destination) node.getRegionDestination();
280                        callDispatchMatched = true;
281                        break;
282                    }
283                }
284            }else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
285                // Message was delivered but not acknowledged: update pre-fetch
286                // counters.
287                int index = 0;
288                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
289                    final MessageReference node = iter.next();
290                    Destination nodeDest = (Destination) node.getRegionDestination();
291                    if (node.isExpired()) {
292                        if (broker.isExpired(node)) {
293                            Destination regionDestination = nodeDest;
294                            regionDestination.messageExpired(context, this, node);
295                        }
296                        iter.remove();
297                        nodeDest.getDestinationStatistics().getInflight().decrement();
298                    }
299                    if (ack.getLastMessageId().equals(node.getMessageId())) {
300                        if (usePrefetchExtension && getPrefetchSize() != 0) {
301                            // allow  batch to exceed prefetch
302                            while (true) {
303                                int currentExtension = prefetchExtension.get();
304                                int newExtension = Math.max(currentExtension, index + 1);
305                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
306                                    break;
307                                }
308                            }
309                        }
310                        destination = nodeDest;
311                        callDispatchMatched = true;
312                        break;
313                    }
314                }
315                if (!callDispatchMatched) {
316                    throw new JMSException(
317                            "Could not correlate acknowledgment with dispatched message: "
318                                    + ack);
319                }
320            } else if (ack.isRedeliveredAck()) {
321                // Message was re-delivered but it was not yet considered to be
322                // a DLQ message.
323                boolean inAckRange = false;
324                for (final MessageReference node : dispatched) {
325                    MessageId messageId = node.getMessageId();
326                    if (ack.getFirstMessageId() == null
327                            || ack.getFirstMessageId().equals(messageId)) {
328                        inAckRange = true;
329                    }
330                    if (inAckRange) {
331                        if (ack.getLastMessageId().equals(messageId)) {
332                            destination = (Destination) node.getRegionDestination();
333                            callDispatchMatched = true;
334                            break;
335                        }
336                    }
337                }
338                if (!callDispatchMatched) {
339                    throw new JMSException(
340                            "Could not correlate acknowledgment with dispatched message: "
341                                    + ack);
342                }
343            } else if (ack.isPoisonAck()) {
344                // TODO: what if the message is already in a DLQ???
345                // Handle the poison ACK case: we need to send the message to a
346                // DLQ
347                if (ack.isInTransaction()) {
348                    throw new JMSException("Poison ack cannot be transacted: "
349                            + ack);
350                }
351                int index = 0;
352                boolean inAckRange = false;
353                List<MessageReference> removeList = new ArrayList<MessageReference>();
354                for (final MessageReference node : dispatched) {
355                    MessageId messageId = node.getMessageId();
356                    if (ack.getFirstMessageId() == null
357                            || ack.getFirstMessageId().equals(messageId)) {
358                        inAckRange = true;
359                    }
360                    if (inAckRange) {
361                        sendToDLQ(context, node, ack.getPoisonCause());
362                        Destination nodeDest = (Destination) node.getRegionDestination();
363                        nodeDest.getDestinationStatistics()
364                                .getInflight().decrement();
365                        removeList.add(node);
366                        dequeueCounter++;
367                        index++;
368                        acknowledge(context, ack, node);
369                        if (ack.getLastMessageId().equals(messageId)) {
370                            while (true) {
371                                int currentExtension = prefetchExtension.get();
372                                int newExtension = Math.max(0, currentExtension - (index + 1));
373                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
374                                    break;
375                                }
376                            }
377                            destination = nodeDest;
378                            callDispatchMatched = true;
379                            break;
380                        }
381                    }
382                }
383                for (final MessageReference node : removeList) {
384                    dispatched.remove(node);
385                }
386                if (!callDispatchMatched) {
387                    throw new JMSException(
388                            "Could not correlate acknowledgment with dispatched message: "
389                                    + ack);
390                }
391            }
392        }
393        if (callDispatchMatched && destination != null) {
394            destination.wakeup();
395            dispatchPending();
396
397            if (pending.isEmpty()) {
398                for (Destination dest : destinations) {
399                    dest.wakeup();
400                }
401            }
402        } else {
403            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
404        }
405    }
406
407    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
408        // setup a Synchronization to remove nodes from the
409        // dispatched list.
410        context.getTransaction().addSynchronization(
411                new Synchronization() {
412
413                    @Override
414                    public void beforeEnd() {
415                        if (usePrefetchExtension && getPrefetchSize() != 0) {
416                            while (true) {
417                                int currentExtension = prefetchExtension.get();
418                                int newExtension = Math.max(0, currentExtension - 1);
419                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
420                                    break;
421                                }
422                            }
423                        }
424                    }
425
426                    @Override
427                    public void afterCommit()
428                            throws Exception {
429                        Destination nodeDest = (Destination) node.getRegionDestination();
430                        synchronized(dispatchLock) {
431                            dequeueCounter++;
432                            dispatched.remove(node);
433                            nodeDest.getDestinationStatistics().getInflight().decrement();
434                        }
435                        nodeDest.wakeup();
436                        dispatchPending();
437                    }
438
439                    @Override
440                    public void afterRollback() throws Exception {
441                        synchronized(dispatchLock) {
442                            // poisionAck will decrement - otherwise still inflight on client
443                        }
444                    }
445                });
446    }
447
448    /**
449     * Checks an ack versus the contents of the dispatched list.
450     *  called with dispatchLock held
451     * @param ack
452     * @throws JMSException if it does not match
453     */
454    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
455        MessageId firstAckedMsg = ack.getFirstMessageId();
456        MessageId lastAckedMsg = ack.getLastMessageId();
457        int checkCount = 0;
458        boolean checkFoundStart = false;
459        boolean checkFoundEnd = false;
460        for (MessageReference node : dispatched) {
461
462            if (firstAckedMsg == null) {
463                checkFoundStart = true;
464            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
465                checkFoundStart = true;
466            }
467
468            if (checkFoundStart) {
469                checkCount++;
470            }
471
472            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
473                checkFoundEnd = true;
474                break;
475            }
476        }
477        if (!checkFoundStart && firstAckedMsg != null)
478            throw new JMSException("Unmatched acknowledge: " + ack
479                    + "; Could not find Message-ID " + firstAckedMsg
480                    + " in dispatched-list (start of ack)");
481        if (!checkFoundEnd && lastAckedMsg != null)
482            throw new JMSException("Unmatched acknowledge: " + ack
483                    + "; Could not find Message-ID " + lastAckedMsg
484                    + " in dispatched-list (end of ack)");
485        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
486            throw new JMSException("Unmatched acknowledge: " + ack
487                    + "; Expected message count (" + ack.getMessageCount()
488                    + ") differs from count in dispatched-list (" + checkCount
489                    + ")");
490        }
491    }
492
493    /**
494     *
495     * @param context
496     * @param node
497     * @param poisonCause
498     * @throws IOException
499     * @throws Exception
500     */
501    protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
502        broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
503    }
504
505    @Override
506    public int getInFlightSize() {
507        return dispatched.size();
508    }
509
510    /**
511     * Used to determine if the broker can dispatch to the consumer.
512     *
513     * @return
514     */
515    @Override
516    public boolean isFull() {
517        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
518    }
519
520    /**
521     * @return true when 60% or more room is left for dispatching messages
522     */
523    @Override
524    public boolean isLowWaterMark() {
525        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
526    }
527
528    /**
529     * @return true when 10% or less room is left for dispatching messages
530     */
531    @Override
532    public boolean isHighWaterMark() {
533        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
534    }
535
536    @Override
537    public int countBeforeFull() {
538        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
539    }
540
541    @Override
542    public int getPendingQueueSize() {
543        return pending.size();
544    }
545
546    @Override
547    public int getDispatchedQueueSize() {
548        return dispatched.size();
549    }
550
551    @Override
552    public long getDequeueCounter() {
553        return dequeueCounter;
554    }
555
556    @Override
557    public long getDispatchedCounter() {
558        return dispatchCounter;
559    }
560
561    @Override
562    public long getEnqueueCounter() {
563        return enqueueCounter;
564    }
565
566    @Override
567    public boolean isRecoveryRequired() {
568        return pending.isRecoveryRequired();
569    }
570
571    public PendingMessageCursor getPending() {
572        return this.pending;
573    }
574
575    public void setPending(PendingMessageCursor pending) {
576        this.pending = pending;
577        if (this.pending!=null) {
578            this.pending.setSystemUsage(usageManager);
579            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
580        }
581    }
582
583    @Override
584    public void add(ConnectionContext context, Destination destination) throws Exception {
585        synchronized(pendingLock) {
586            super.add(context, destination);
587            pending.add(context, destination);
588        }
589    }
590
591    @Override
592    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
593        return remove(context, destination, dispatched);
594    }
595
596    public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
597        List<MessageReference> rc = new ArrayList<MessageReference>();
598        synchronized(pendingLock) {
599            super.remove(context, destination);
600            // Here is a potential problem concerning Inflight stat:
601            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
602            // Except if each commit or rollback callback action comes before remove of subscriber.
603            rc.addAll(pending.remove(context, destination));
604
605            if (dispatched == null) {
606                return rc;
607            }
608
609            // Synchronized to DispatchLock if necessary
610            if (dispatched == this.dispatched) {
611                synchronized(dispatchLock) {
612                    updateDestinationStats(rc, destination, dispatched);
613                }
614            } else {
615                updateDestinationStats(rc, destination, dispatched);
616            }
617        }
618        return rc;
619    }
620
621    private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
622        ArrayList<MessageReference> references = new ArrayList<MessageReference>();
623        for (MessageReference r : dispatched) {
624            if (r.getRegionDestination() == destination) {
625                references.add(r);
626            }
627        }
628        rc.addAll(references);
629        destination.getDestinationStatistics().getInflight().subtract(references.size());
630        dispatched.removeAll(references);
631    }
632
633    // made public so it can be used in MQTTProtocolConverter
634    public void dispatchPending() throws IOException {
635       synchronized(pendingLock) {
636            try {
637                int numberToDispatch = countBeforeFull();
638                if (numberToDispatch > 0) {
639                    setSlowConsumer(false);
640                    setPendingBatchSize(pending, numberToDispatch);
641                    int count = 0;
642                    pending.reset();
643                    while (pending.hasNext() && !isFull() && count < numberToDispatch) {
644                        MessageReference node = pending.next();
645                        if (node == null) {
646                            break;
647                        }
648
649                        // Synchronize between dispatched list and remove of message from pending list
650                        // related to remove subscription action
651                        synchronized(dispatchLock) {
652                            pending.remove();
653                            node.decrementReferenceCount();
654                            if( !isDropped(node) && canDispatch(node)) {
655
656                                // Message may have been sitting in the pending
657                                // list a while waiting for the consumer to ak the message.
658                                if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
659                                    //increment number to dispatch
660                                    numberToDispatch++;
661                                    if (broker.isExpired(node)) {
662                                        ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
663                                    }
664                                    continue;
665                                }
666                                dispatch(node);
667                                count++;
668                            }
669                        }
670                    }
671                } else if (!isSlowConsumer()) {
672                    setSlowConsumer(true);
673                    for (Destination dest :destinations) {
674                        dest.slowConsumer(context, this);
675                    }
676                }
677            } finally {
678                pending.release();
679            }
680        }
681    }
682
683    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
684        pending.setMaxBatchSize(numberToDispatch);
685    }
686
687    // called with dispatchLock held
688    protected boolean dispatch(final MessageReference node) throws IOException {
689        final Message message = node.getMessage();
690        if (message == null) {
691            return false;
692        }
693
694        okForAckAsDispatchDone.countDown();
695
696        MessageDispatch md = createMessageDispatch(node, message);
697        if (node != QueueMessageReference.NULL_MESSAGE) {
698            dispatchCounter++;
699            dispatched.add(node);
700        }
701        if (getPrefetchSize() == 0) {
702            while (true) {
703                int currentExtension = prefetchExtension.get();
704                int newExtension = Math.max(0, currentExtension - 1);
705                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
706                    break;
707                }
708            }
709        }
710        if (info.isDispatchAsync()) {
711            md.setTransmitCallback(new TransmitCallback() {
712
713                @Override
714                public void onSuccess() {
715                    // Since the message gets queued up in async dispatch, we don't want to
716                    // decrease the reference count until it gets put on the wire.
717                    onDispatch(node, message);
718                }
719
720                @Override
721                public void onFailure() {
722                    Destination nodeDest = (Destination) node.getRegionDestination();
723                    if (nodeDest != null) {
724                        if (node != QueueMessageReference.NULL_MESSAGE) {
725                            nodeDest.getDestinationStatistics().getDispatched().increment();
726                            nodeDest.getDestinationStatistics().getInflight().increment();
727                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
728                        }
729                    }
730                    if (node instanceof QueueMessageReference) {
731                        ((QueueMessageReference) node).unlock();
732                    }
733                }
734            });
735            context.getConnection().dispatchAsync(md);
736        } else {
737            context.getConnection().dispatchSync(md);
738            onDispatch(node, message);
739        }
740        return true;
741    }
742
743    protected void onDispatch(final MessageReference node, final Message message) {
744        Destination nodeDest = (Destination) node.getRegionDestination();
745        if (nodeDest != null) {
746            if (node != QueueMessageReference.NULL_MESSAGE) {
747                nodeDest.getDestinationStatistics().getDispatched().increment();
748                nodeDest.getDestinationStatistics().getInflight().increment();
749                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
750            }
751        }
752
753        if (info.isDispatchAsync()) {
754            try {
755                dispatchPending();
756            } catch (IOException e) {
757                context.getConnection().serviceExceptionAsync(e);
758            }
759        }
760    }
761
762    /**
763     * inform the MessageConsumer on the client to change it's prefetch
764     *
765     * @param newPrefetch
766     */
767    @Override
768    public void updateConsumerPrefetch(int newPrefetch) {
769        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
770            ConsumerControl cc = new ConsumerControl();
771            cc.setConsumerId(info.getConsumerId());
772            cc.setPrefetch(newPrefetch);
773            context.getConnection().dispatchAsync(cc);
774        }
775    }
776
777    /**
778     * @param node
779     * @param message
780     * @return MessageDispatch
781     */
782    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
783        MessageDispatch md = new MessageDispatch();
784        md.setConsumerId(info.getConsumerId());
785
786        if (node == QueueMessageReference.NULL_MESSAGE) {
787            md.setMessage(null);
788            md.setDestination(null);
789        } else {
790            Destination regionDestination = (Destination) node.getRegionDestination();
791            md.setDestination(regionDestination.getActiveMQDestination());
792            md.setMessage(message);
793            md.setRedeliveryCounter(node.getRedeliveryCounter());
794        }
795
796        return md;
797    }
798
799    /**
800     * Use when a matched message is about to be dispatched to the client.
801     *
802     * @param node
803     * @return false if the message should not be dispatched to the client
804     *         (another sub may have already dispatched it for example).
805     * @throws IOException
806     */
807    protected abstract boolean canDispatch(MessageReference node) throws IOException;
808
809    protected abstract boolean isDropped(MessageReference node);
810
811    /**
812     * Used during acknowledgment to remove the message.
813     *
814     * @throws IOException
815     */
816    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
817
818
819    public int getMaxProducersToAudit() {
820        return maxProducersToAudit;
821    }
822
823    public void setMaxProducersToAudit(int maxProducersToAudit) {
824        this.maxProducersToAudit = maxProducersToAudit;
825        if (this.pending != null) {
826            this.pending.setMaxProducersToAudit(maxProducersToAudit);
827        }
828    }
829
830    public int getMaxAuditDepth() {
831        return maxAuditDepth;
832    }
833
834    public void setMaxAuditDepth(int maxAuditDepth) {
835        this.maxAuditDepth = maxAuditDepth;
836        if (this.pending != null) {
837            this.pending.setMaxAuditDepth(maxAuditDepth);
838        }
839    }
840
841    public boolean isUsePrefetchExtension() {
842        return usePrefetchExtension;
843    }
844
845    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
846        this.usePrefetchExtension = usePrefetchExtension;
847    }
848
849    protected int getPrefetchExtension() {
850        return this.prefetchExtension.get();
851    }
852
853    @Override
854    public void setPrefetchSize(int prefetchSize) {
855        this.info.setPrefetchSize(prefetchSize);
856        try {
857            this.dispatchPending();
858        } catch (Exception e) {
859            LOG.trace("Caught exception during dispatch after prefetch change.", e);
860        }
861    }
862}