001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedHashMap;
027import java.util.LinkedHashSet;
028import java.util.LinkedList;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.concurrent.CancellationException;
033import java.util.concurrent.ConcurrentLinkedQueue;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.DelayQueue;
036import java.util.concurrent.Delayed;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicLong;
041import java.util.concurrent.locks.Lock;
042import java.util.concurrent.locks.ReentrantLock;
043import java.util.concurrent.locks.ReentrantReadWriteLock;
044
045import javax.jms.InvalidSelectorException;
046import javax.jms.JMSException;
047import javax.jms.ResourceAllocationException;
048
049import org.apache.activemq.broker.BrokerService;
050import org.apache.activemq.broker.ConnectionContext;
051import org.apache.activemq.broker.ProducerBrokerExchange;
052import org.apache.activemq.broker.region.cursors.*;
053import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory;
054import org.apache.activemq.broker.region.group.MessageGroupMap;
055import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
056import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
057import org.apache.activemq.broker.region.policy.DispatchPolicy;
058import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
059import org.apache.activemq.broker.util.InsertionCountList;
060import org.apache.activemq.command.ActiveMQDestination;
061import org.apache.activemq.command.ConsumerId;
062import org.apache.activemq.command.ExceptionResponse;
063import org.apache.activemq.command.Message;
064import org.apache.activemq.command.MessageAck;
065import org.apache.activemq.command.MessageDispatchNotification;
066import org.apache.activemq.command.MessageId;
067import org.apache.activemq.command.ProducerAck;
068import org.apache.activemq.command.ProducerInfo;
069import org.apache.activemq.command.RemoveInfo;
070import org.apache.activemq.command.Response;
071import org.apache.activemq.filter.BooleanExpression;
072import org.apache.activemq.filter.MessageEvaluationContext;
073import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
074import org.apache.activemq.selector.SelectorParser;
075import org.apache.activemq.state.ProducerState;
076import org.apache.activemq.store.IndexListener;
077import org.apache.activemq.store.ListenableFuture;
078import org.apache.activemq.store.MessageRecoveryListener;
079import org.apache.activemq.store.MessageStore;
080import org.apache.activemq.thread.Task;
081import org.apache.activemq.thread.TaskRunner;
082import org.apache.activemq.thread.TaskRunnerFactory;
083import org.apache.activemq.transaction.Synchronization;
084import org.apache.activemq.usage.Usage;
085import org.apache.activemq.usage.UsageListener;
086import org.apache.activemq.util.BrokerSupport;
087import org.apache.activemq.util.ThreadPoolUtils;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090import org.slf4j.MDC;
091
092/**
093 * The Queue is a List of MessageEntry objects that are dispatched to matching
094 * subscriptions.
095 */
096public class Queue extends BaseDestination implements Task, UsageListener, IndexListener {
097    protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
098    protected final TaskRunnerFactory taskFactory;
099    protected TaskRunner taskRunner;
100    private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
101    protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
102    private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
103    protected PendingMessageCursor messages;
104    private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
105    private final PendingList pagedInMessages = new OrderedPendingList();
106    // Messages that are paged in but have not yet been targeted at a subscription
107    private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
108    protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList();
109    private MessageGroupMap messageGroupOwners;
110    private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
111    private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
112    final Lock sendLock = new ReentrantLock();
113    private ExecutorService executor;
114    private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
115    private boolean useConsumerPriority = true;
116    private boolean strictOrderDispatch = false;
117    private final QueueDispatchSelector dispatchSelector;
118    private boolean optimizedDispatch = false;
119    private boolean iterationRunning = false;
120    private boolean firstConsumer = false;
121    private int timeBeforeDispatchStarts = 0;
122    private int consumersBeforeDispatchStarts = 0;
123    private CountDownLatch consumersBeforeStartsLatch;
124    private final AtomicLong pendingWakeups = new AtomicLong();
125    private boolean allConsumersExclusiveByDefault = false;
126    private final AtomicBoolean started = new AtomicBoolean();
127
128    private boolean resetNeeded;
129
130    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
131        @Override
132        public void run() {
133            asyncWakeup();
134        }
135    };
136    private final Runnable expireMessagesTask = new Runnable() {
137        @Override
138        public void run() {
139            expireMessages();
140        }
141    };
142
143    private final Object iteratingMutex = new Object();
144
145
146
147    class TimeoutMessage implements Delayed {
148
149        Message message;
150        ConnectionContext context;
151        long trigger;
152
153        public TimeoutMessage(Message message, ConnectionContext context, long delay) {
154            this.message = message;
155            this.context = context;
156            this.trigger = System.currentTimeMillis() + delay;
157        }
158
159        @Override
160        public long getDelay(TimeUnit unit) {
161            long n = trigger - System.currentTimeMillis();
162            return unit.convert(n, TimeUnit.MILLISECONDS);
163        }
164
165        @Override
166        public int compareTo(Delayed delayed) {
167            long other = ((TimeoutMessage) delayed).trigger;
168            int returnValue;
169            if (this.trigger < other) {
170                returnValue = -1;
171            } else if (this.trigger > other) {
172                returnValue = 1;
173            } else {
174                returnValue = 0;
175            }
176            return returnValue;
177        }
178    }
179
180    DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
181
182    class FlowControlTimeoutTask extends Thread {
183
184        @Override
185        public void run() {
186            TimeoutMessage timeout;
187            try {
188                while (true) {
189                    timeout = flowControlTimeoutMessages.take();
190                    if (timeout != null) {
191                        synchronized (messagesWaitingForSpace) {
192                            if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
193                                ExceptionResponse response = new ExceptionResponse(
194                                        new ResourceAllocationException(
195                                                "Usage Manager Memory Limit reached. Stopping producer ("
196                                                        + timeout.message.getProducerId()
197                                                        + ") to prevent flooding "
198                                                        + getActiveMQDestination().getQualifiedName()
199                                                        + "."
200                                                        + " See http://activemq.apache.org/producer-flow-control.html for more info"));
201                                response.setCorrelationId(timeout.message.getCommandId());
202                                timeout.context.getConnection().dispatchAsync(response);
203                            }
204                        }
205                    }
206                }
207            } catch (InterruptedException e) {
208                LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping");
209            }
210        }
211    };
212
213    private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
214
215    private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
216
217        @Override
218        public int compare(Subscription s1, Subscription s2) {
219            // We want the list sorted in descending order
220            int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
221            if (val == 0 && messageGroupOwners != null) {
222                // then ascending order of assigned message groups to favour less loaded consumers
223                // Long.compare in jdk7
224                long x = s1.getConsumerInfo().getAssignedGroupCount();
225                long y = s2.getConsumerInfo().getAssignedGroupCount();
226                val = (x < y) ? -1 : ((x == y) ? 0 : 1);
227            }
228            return val;
229        }
230    };
231
232    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
233            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
234        super(brokerService, store, destination, parentStats);
235        this.taskFactory = taskFactory;
236        this.dispatchSelector = new QueueDispatchSelector(destination);
237        if (store != null) {
238            store.registerIndexListener(this);
239        }
240    }
241
242    @Override
243    public List<Subscription> getConsumers() {
244        consumersLock.readLock().lock();
245        try {
246            return new ArrayList<Subscription>(consumers);
247        } finally {
248            consumersLock.readLock().unlock();
249        }
250    }
251
252    // make the queue easily visible in the debugger from its task runner
253    // threads
254    final class QueueThread extends Thread {
255        final Queue queue;
256
257        public QueueThread(Runnable runnable, String name, Queue queue) {
258            super(runnable, name);
259            this.queue = queue;
260        }
261    }
262
263    class BatchMessageRecoveryListener implements MessageRecoveryListener {
264        final LinkedList<Message> toExpire = new LinkedList<Message>();
265        final double totalMessageCount;
266        int recoveredAccumulator = 0;
267        int currentBatchCount;
268
269        BatchMessageRecoveryListener(int totalMessageCount) {
270            this.totalMessageCount = totalMessageCount;
271            currentBatchCount = recoveredAccumulator;
272        }
273
274        @Override
275        public boolean recoverMessage(Message message) {
276            recoveredAccumulator++;
277            if ((recoveredAccumulator % 10000) == 0) {
278                LOG.info("cursor for {} has recovered {} messages. {}% complete", new Object[]{ getActiveMQDestination().getQualifiedName(), recoveredAccumulator, new Integer((int) (recoveredAccumulator * 100 / totalMessageCount))});
279            }
280            // Message could have expired while it was being
281            // loaded..
282            if (message.isExpired() && broker.isExpired(message)) {
283                toExpire.add(message);
284                return true;
285            }
286            if (hasSpace()) {
287                message.setRegionDestination(Queue.this);
288                messagesLock.writeLock().lock();
289                try {
290                    try {
291                        messages.addMessageLast(message);
292                    } catch (Exception e) {
293                        LOG.error("Failed to add message to cursor", e);
294                    }
295                } finally {
296                    messagesLock.writeLock().unlock();
297                }
298                destinationStatistics.getMessages().increment();
299                return true;
300            }
301            return false;
302        }
303
304        @Override
305        public boolean recoverMessageReference(MessageId messageReference) throws Exception {
306            throw new RuntimeException("Should not be called.");
307        }
308
309        @Override
310        public boolean hasSpace() {
311            return true;
312        }
313
314        @Override
315        public boolean isDuplicate(MessageId id) {
316            return false;
317        }
318
319        public void reset() {
320            currentBatchCount = recoveredAccumulator;
321        }
322
323        public void processExpired() {
324            for (Message message: toExpire) {
325                messageExpired(createConnectionContext(), createMessageReference(message));
326                // drop message will decrement so counter
327                // balance here
328                destinationStatistics.getMessages().increment();
329            }
330            toExpire.clear();
331        }
332
333        public boolean done() {
334            return currentBatchCount == recoveredAccumulator;
335        }
336    }
337
338    @Override
339    public void setPrioritizedMessages(boolean prioritizedMessages) {
340        super.setPrioritizedMessages(prioritizedMessages);
341        dispatchPendingList.setPrioritizedMessages(prioritizedMessages);
342    }
343
344    @Override
345    public void initialize() throws Exception {
346
347        if (this.messages == null) {
348            if (destination.isTemporary() || broker == null || store == null) {
349                this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
350            } else {
351                this.messages = new StoreQueueCursor(broker, this);
352            }
353        }
354
355        // If a VMPendingMessageCursor don't use the default Producer System
356        // Usage
357        // since it turns into a shared blocking queue which can lead to a
358        // network deadlock.
359        // If we are cursoring to disk..it's not and issue because it does not
360        // block due
361        // to large disk sizes.
362        if (messages instanceof VMPendingMessageCursor) {
363            this.systemUsage = brokerService.getSystemUsage();
364            memoryUsage.setParent(systemUsage.getMemoryUsage());
365        }
366
367        this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
368
369        super.initialize();
370        if (store != null) {
371            // Restore the persistent messages.
372            messages.setSystemUsage(systemUsage);
373            messages.setEnableAudit(isEnableAudit());
374            messages.setMaxAuditDepth(getMaxAuditDepth());
375            messages.setMaxProducersToAudit(getMaxProducersToAudit());
376            messages.setUseCache(isUseCache());
377            messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
378            final int messageCount = store.getMessageCount();
379            if (messageCount > 0 && messages.isRecoveryRequired()) {
380                BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
381                do {
382                   listener.reset();
383                   store.recoverNextMessages(getMaxPageSize(), listener);
384                   listener.processExpired();
385               } while (!listener.done());
386            } else {
387                destinationStatistics.getMessages().add(messageCount);
388            }
389        }
390    }
391
392    /*
393     * Holder for subscription that needs attention on next iterate browser
394     * needs access to existing messages in the queue that have already been
395     * dispatched
396     */
397    class BrowserDispatch {
398        QueueBrowserSubscription browser;
399
400        public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
401            browser = browserSubscription;
402            browser.incrementQueueRef();
403        }
404
405        void done() {
406            try {
407                browser.decrementQueueRef();
408            } catch (Exception e) {
409                LOG.warn("decrement ref on browser: " + browser, e);
410            }
411        }
412
413        public QueueBrowserSubscription getBrowser() {
414            return browser;
415        }
416    }
417
418    ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
419
420    @Override
421    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
422        LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), sub, getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() });
423
424        super.addSubscription(context, sub);
425        // synchronize with dispatch method so that no new messages are sent
426        // while setting up a subscription. avoid out of order messages,
427        // duplicates, etc.
428        pagedInPendingDispatchLock.writeLock().lock();
429        try {
430
431            sub.add(context, this);
432
433            // needs to be synchronized - so no contention with dispatching
434            // consumersLock.
435            consumersLock.writeLock().lock();
436            try {
437                // set a flag if this is a first consumer
438                if (consumers.size() == 0) {
439                    firstConsumer = true;
440                    if (consumersBeforeDispatchStarts != 0) {
441                        consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
442                    }
443                } else {
444                    if (consumersBeforeStartsLatch != null) {
445                        consumersBeforeStartsLatch.countDown();
446                    }
447                }
448
449                addToConsumerList(sub);
450                if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
451                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
452                    if (exclusiveConsumer == null) {
453                        exclusiveConsumer = sub;
454                    } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
455                        sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
456                        exclusiveConsumer = sub;
457                    }
458                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
459                }
460            } finally {
461                consumersLock.writeLock().unlock();
462            }
463
464            if (sub instanceof QueueBrowserSubscription) {
465                // tee up for dispatch in next iterate
466                QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
467                BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
468                browserDispatches.add(browserDispatch);
469            }
470
471            if (!this.optimizedDispatch) {
472                wakeup();
473            }
474        } finally {
475            pagedInPendingDispatchLock.writeLock().unlock();
476        }
477        if (this.optimizedDispatch) {
478            // Outside of dispatchLock() to maintain the lock hierarchy of
479            // iteratingMutex -> dispatchLock. - see
480            // https://issues.apache.org/activemq/browse/AMQ-1878
481            wakeup();
482        }
483    }
484
485    @Override
486    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
487            throws Exception {
488        super.removeSubscription(context, sub, lastDeliveredSequenceId);
489        // synchronize with dispatch method so that no new messages are sent
490        // while removing up a subscription.
491        pagedInPendingDispatchLock.writeLock().lock();
492        try {
493            LOG.debug("{} remove sub: {}, lastDeliveredSeqId: {}, dequeues: {}, dispatched: {}, inflight: {}, groups: {}", new Object[]{
494                    getActiveMQDestination().getQualifiedName(),
495                    sub,
496                    lastDeliveredSequenceId,
497                    getDestinationStatistics().getDequeues().getCount(),
498                    getDestinationStatistics().getDispatched().getCount(),
499                    getDestinationStatistics().getInflight().getCount(),
500                    sub.getConsumerInfo().getAssignedGroupCount()
501            });
502            consumersLock.writeLock().lock();
503            try {
504                removeFromConsumerList(sub);
505                if (sub.getConsumerInfo().isExclusive()) {
506                    Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
507                    if (exclusiveConsumer == sub) {
508                        exclusiveConsumer = null;
509                        for (Subscription s : consumers) {
510                            if (s.getConsumerInfo().isExclusive()
511                                    && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
512                                            .getConsumerInfo().getPriority())) {
513                                exclusiveConsumer = s;
514
515                            }
516                        }
517                        dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
518                    }
519                } else if (isAllConsumersExclusiveByDefault()) {
520                    Subscription exclusiveConsumer = null;
521                    for (Subscription s : consumers) {
522                        if (exclusiveConsumer == null
523                                || s.getConsumerInfo().getPriority() > exclusiveConsumer
524                                .getConsumerInfo().getPriority()) {
525                            exclusiveConsumer = s;
526                                }
527                    }
528                    dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
529                }
530                ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
531                getMessageGroupOwners().removeConsumer(consumerId);
532
533                // redeliver inflight messages
534
535                boolean markAsRedelivered = false;
536                MessageReference lastDeliveredRef = null;
537                List<MessageReference> unAckedMessages = sub.remove(context, this);
538
539                // locate last redelivered in unconsumed list (list in delivery rather than seq order)
540                if (lastDeliveredSequenceId > RemoveInfo.LAST_DELIVERED_UNSET) {
541                    for (MessageReference ref : unAckedMessages) {
542                        if (ref.getMessageId().getBrokerSequenceId() == lastDeliveredSequenceId) {
543                            lastDeliveredRef = ref;
544                            markAsRedelivered = true;
545                            LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeliveredSequenceId, ref.getMessageId());
546                            break;
547                        }
548                    }
549                }
550
551                for (MessageReference ref : unAckedMessages) {
552                    // AMQ-5107: don't resend if the broker is shutting down
553                    if ( this.brokerService.isStopping() ) {
554                        break;
555                    }
556                    QueueMessageReference qmr = (QueueMessageReference) ref;
557                    if (qmr.getLockOwner() == sub) {
558                        qmr.unlock();
559
560                        // have no delivery information
561                        if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
562                            qmr.incrementRedeliveryCounter();
563                        } else {
564                            if (markAsRedelivered) {
565                                qmr.incrementRedeliveryCounter();
566                            }
567                            if (ref == lastDeliveredRef) {
568                                // all that follow were not redelivered
569                                markAsRedelivered = false;
570                            }
571                        }
572                    }
573                    if (!qmr.isDropped()) {
574                        dispatchPendingList.addMessageForRedelivery(qmr);
575                    }
576                }
577                if (sub instanceof QueueBrowserSubscription) {
578                    ((QueueBrowserSubscription)sub).decrementQueueRef();
579                    browserDispatches.remove(sub);
580                }
581                // AMQ-5107: don't resend if the broker is shutting down
582                if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) {
583                    doDispatch(new OrderedPendingList());
584                }
585            } finally {
586                consumersLock.writeLock().unlock();
587            }
588            if (!this.optimizedDispatch) {
589                wakeup();
590            }
591        } finally {
592            pagedInPendingDispatchLock.writeLock().unlock();
593        }
594        if (this.optimizedDispatch) {
595            // Outside of dispatchLock() to maintain the lock hierarchy of
596            // iteratingMutex -> dispatchLock. - see
597            // https://issues.apache.org/activemq/browse/AMQ-1878
598            wakeup();
599        }
600    }
601
602    @Override
603    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
604        final ConnectionContext context = producerExchange.getConnectionContext();
605        // There is delay between the client sending it and it arriving at the
606        // destination.. it may have expired.
607        message.setRegionDestination(this);
608        ProducerState state = producerExchange.getProducerState();
609        if (state == null) {
610            LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange);
611            throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
612        }
613        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
614        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
615                && !context.isInRecoveryMode();
616        if (message.isExpired()) {
617            // message not stored - or added to stats yet - so chuck here
618            broker.getRoot().messageExpired(context, message, null);
619            if (sendProducerAck) {
620                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
621                context.getConnection().dispatchAsync(ack);
622            }
623            return;
624        }
625        if (memoryUsage.isFull()) {
626            isFull(context, memoryUsage);
627            fastProducer(context, producerInfo);
628            if (isProducerFlowControl() && context.isProducerFlowControl()) {
629                if (warnOnProducerFlowControl) {
630                    warnOnProducerFlowControl = false;
631                    LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
632                                    memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
633                }
634
635                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
636                    throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
637                            + message.getProducerId() + ") to prevent flooding "
638                            + getActiveMQDestination().getQualifiedName() + "."
639                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
640                }
641
642                // We can avoid blocking due to low usage if the producer is
643                // sending
644                // a sync message or if it is using a producer window
645                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
646                    // copy the exchange state since the context will be
647                    // modified while we are waiting
648                    // for space.
649                    final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
650                    synchronized (messagesWaitingForSpace) {
651                     // Start flow control timeout task
652                        // Prevent trying to start it multiple times
653                        if (!flowControlTimeoutTask.isAlive()) {
654                            flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task");
655                            flowControlTimeoutTask.start();
656                        }
657                        messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
658                            @Override
659                            public void run() {
660
661                                try {
662                                    // While waiting for space to free up... the
663                                    // message may have expired.
664                                    if (message.isExpired()) {
665                                        LOG.error("expired waiting for space..");
666                                        broker.messageExpired(context, message, null);
667                                        destinationStatistics.getExpired().increment();
668                                    } else {
669                                        doMessageSend(producerExchangeCopy, message);
670                                    }
671
672                                    if (sendProducerAck) {
673                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
674                                                .getSize());
675                                        context.getConnection().dispatchAsync(ack);
676                                    } else {
677                                        Response response = new Response();
678                                        response.setCorrelationId(message.getCommandId());
679                                        context.getConnection().dispatchAsync(response);
680                                    }
681
682                                } catch (Exception e) {
683                                    if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) {
684                                        ExceptionResponse response = new ExceptionResponse(e);
685                                        response.setCorrelationId(message.getCommandId());
686                                        context.getConnection().dispatchAsync(response);
687                                    } else {
688                                        LOG.debug("unexpected exception on deferred send of: {}", message, e);
689                                    }
690                                }
691                            }
692                        });
693
694                        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
695                            flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
696                                    .getSendFailIfNoSpaceAfterTimeout()));
697                        }
698
699                        registerCallbackForNotFullNotification();
700                        context.setDontSendReponse(true);
701                        return;
702                    }
703
704                } else {
705
706                    if (memoryUsage.isFull()) {
707                        waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
708                                + message.getProducerId() + ") stopped to prevent flooding "
709                                + getActiveMQDestination().getQualifiedName() + "."
710                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
711                    }
712
713                    // The usage manager could have delayed us by the time
714                    // we unblock the message could have expired..
715                    if (message.isExpired()) {
716                        LOG.debug("Expired message: {}", message);
717                        broker.getRoot().messageExpired(context, message, null);
718                        return;
719                    }
720                }
721            }
722        }
723        doMessageSend(producerExchange, message);
724        if (sendProducerAck) {
725            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
726            context.getConnection().dispatchAsync(ack);
727        }
728    }
729
730    private void registerCallbackForNotFullNotification() {
731        // If the usage manager is not full, then the task will not
732        // get called..
733        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
734            // so call it directly here.
735            sendMessagesWaitingForSpaceTask.run();
736        }
737    }
738
739    private final LinkedList<MessageContext> indexOrderedCursorUpdates = new LinkedList<>();
740
741    @Override
742    public void onAdd(MessageContext messageContext) {
743        synchronized (indexOrderedCursorUpdates) {
744            indexOrderedCursorUpdates.addLast(messageContext);
745        }
746    }
747
748    private void doPendingCursorAdditions() throws Exception {
749        LinkedList<MessageContext> orderedUpdates = new LinkedList<>();
750        sendLock.lockInterruptibly();
751        try {
752            synchronized (indexOrderedCursorUpdates) {
753                MessageContext candidate = indexOrderedCursorUpdates.peek();
754                while (candidate != null && candidate.message.getMessageId().getFutureOrSequenceLong() != null) {
755                    candidate = indexOrderedCursorUpdates.removeFirst();
756                    // check for duplicate adds suppressed by the store
757                    if (candidate.message.getMessageId().getFutureOrSequenceLong() instanceof Long && ((Long)candidate.message.getMessageId().getFutureOrSequenceLong()).compareTo(-1l) == 0) {
758                        LOG.warn("{} messageStore indicated duplicate add attempt for {}, suppressing duplicate dispatch", this, candidate.message.getMessageId());
759                    } else {
760                        orderedUpdates.add(candidate);
761                    }
762                    candidate = indexOrderedCursorUpdates.peek();
763                }
764            }
765            messagesLock.writeLock().lock();
766            try {
767                for (MessageContext messageContext : orderedUpdates) {
768                    if (!messages.addMessageLast(messageContext.message)) {
769                        // cursor suppressed a duplicate
770                        messageContext.duplicate = true;
771                    }
772                    if (messageContext.onCompletion != null) {
773                        messageContext.onCompletion.run();
774                    }
775                }
776            } finally {
777                messagesLock.writeLock().unlock();
778            }
779        } finally {
780            sendLock.unlock();
781        }
782        for (MessageContext messageContext : orderedUpdates) {
783            if (!messageContext.duplicate) {
784                messageSent(messageContext.context, messageContext.message);
785            }
786        }
787        orderedUpdates.clear();
788    }
789
790    final class CursorAddSync extends Synchronization {
791
792        private final MessageContext messageContext;
793
794        CursorAddSync(MessageContext messageContext) {
795            this.messageContext = messageContext;
796            this.messageContext.message.incrementReferenceCount();
797        }
798
799        @Override
800        public void afterCommit() throws Exception {
801            if (store != null && messageContext.message.isPersistent()) {
802                doPendingCursorAdditions();
803            } else {
804                cursorAdd(messageContext.message);
805                messageSent(messageContext.context, messageContext.message);
806            }
807            messageContext.message.decrementReferenceCount();
808        }
809
810        @Override
811        public void afterRollback() throws Exception {
812            messageContext.message.decrementReferenceCount();
813        }
814    }
815
816    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
817            Exception {
818        final ConnectionContext context = producerExchange.getConnectionContext();
819        ListenableFuture<Object> result = null;
820
821        producerExchange.incrementSend();
822        checkUsage(context, producerExchange, message);
823        sendLock.lockInterruptibly();
824        try {
825            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
826            if (store != null && message.isPersistent()) {
827                try {
828                    if (messages.isCacheEnabled()) {
829                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
830                        result.addListener(new PendingMarshalUsageTracker(message));
831                    } else {
832                        store.addMessage(context, message);
833                    }
834                    if (isReduceMemoryFootprint()) {
835                        message.clearMarshalledState();
836                    }
837                } catch (Exception e) {
838                    // we may have a store in inconsistent state, so reset the cursor
839                    // before restarting normal broker operations
840                    resetNeeded = true;
841                    throw e;
842                }
843            }
844            orderedCursorAdd(message, context);
845        } finally {
846            sendLock.unlock();
847        }
848        if (store == null || (!context.isInTransaction() && !message.isPersistent())) {
849            messageSent(context, message);
850        }
851        if (result != null && message.isResponseRequired() && !result.isCancelled()) {
852            try {
853                result.get();
854            } catch (CancellationException e) {
855                // ignore - the task has been cancelled if the message
856                // has already been deleted
857            }
858        }
859    }
860
861    private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception {
862        if (context.isInTransaction()) {
863            context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null)));
864        } else if (store != null && message.isPersistent()) {
865            doPendingCursorAdditions();
866        } else {
867            // no ordering issue with non persistent messages
868            cursorAdd(message);
869        }
870    }
871
872    private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
873        if (message.isPersistent()) {
874            if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
875                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
876                    + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
877                    + message.getProducerId() + ") to prevent flooding "
878                    + getActiveMQDestination().getQualifiedName() + "."
879                    + " See http://activemq.apache.org/producer-flow-control.html for more info";
880
881                waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
882            }
883        } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
884            final String logMessage = "Temp Store is Full ("
885                    + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
886                    +"). Stopping producer (" + message.getProducerId()
887                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
888                + " See http://activemq.apache.org/producer-flow-control.html for more info";
889
890            waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage);
891        }
892    }
893
894    private void expireMessages() {
895        LOG.debug("{} expiring messages ..", getActiveMQDestination().getQualifiedName());
896
897        // just track the insertion count
898        List<Message> browsedMessages = new InsertionCountList<Message>();
899        doBrowse(browsedMessages, this.getMaxExpirePageSize());
900        asyncWakeup();
901        LOG.debug("{} expiring messages done.", getActiveMQDestination().getQualifiedName());
902    }
903
904    @Override
905    public void gc() {
906    }
907
908    @Override
909    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
910            throws IOException {
911        messageConsumed(context, node);
912        if (store != null && node.isPersistent()) {
913            store.removeAsyncMessage(context, convertToNonRangedAck(ack, node));
914        }
915    }
916
917    Message loadMessage(MessageId messageId) throws IOException {
918        Message msg = null;
919        if (store != null) { // can be null for a temp q
920            msg = store.getMessage(messageId);
921            if (msg != null) {
922                msg.setRegionDestination(this);
923            }
924        }
925        return msg;
926    }
927
928    @Override
929    public String toString() {
930        return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
931                + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + destinationStatistics.getMessages().getCount() + ", pending="
932                + indexOrderedCursorUpdates.size();
933    }
934
935    @Override
936    public void start() throws Exception {
937        if (started.compareAndSet(false, true)) {
938            if (memoryUsage != null) {
939                memoryUsage.start();
940            }
941            if (systemUsage.getStoreUsage() != null) {
942                systemUsage.getStoreUsage().start();
943            }
944            systemUsage.getMemoryUsage().addUsageListener(this);
945            messages.start();
946            if (getExpireMessagesPeriod() > 0) {
947                scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
948            }
949            doPageIn(false);
950        }
951    }
952
953    @Override
954    public void stop() throws Exception {
955        if (started.compareAndSet(true, false)) {
956            if (taskRunner != null) {
957                taskRunner.shutdown();
958            }
959            if (this.executor != null) {
960                ThreadPoolUtils.shutdownNow(executor);
961                executor = null;
962            }
963
964            scheduler.cancel(expireMessagesTask);
965
966            if (flowControlTimeoutTask.isAlive()) {
967                flowControlTimeoutTask.interrupt();
968            }
969
970            if (messages != null) {
971                messages.stop();
972            }
973
974            for (MessageReference messageReference : pagedInMessages.values()) {
975                messageReference.decrementReferenceCount();
976            }
977            pagedInMessages.clear();
978
979            systemUsage.getMemoryUsage().removeUsageListener(this);
980            if (memoryUsage != null) {
981                memoryUsage.stop();
982            }
983            if (store != null) {
984                store.stop();
985            }
986        }
987    }
988
989    // Properties
990    // -------------------------------------------------------------------------
991    @Override
992    public ActiveMQDestination getActiveMQDestination() {
993        return destination;
994    }
995
996    public MessageGroupMap getMessageGroupOwners() {
997        if (messageGroupOwners == null) {
998            messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
999            messageGroupOwners.setDestination(this);
1000        }
1001        return messageGroupOwners;
1002    }
1003
1004    public DispatchPolicy getDispatchPolicy() {
1005        return dispatchPolicy;
1006    }
1007
1008    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
1009        this.dispatchPolicy = dispatchPolicy;
1010    }
1011
1012    public MessageGroupMapFactory getMessageGroupMapFactory() {
1013        return messageGroupMapFactory;
1014    }
1015
1016    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
1017        this.messageGroupMapFactory = messageGroupMapFactory;
1018    }
1019
1020    public PendingMessageCursor getMessages() {
1021        return this.messages;
1022    }
1023
1024    public void setMessages(PendingMessageCursor messages) {
1025        this.messages = messages;
1026    }
1027
1028    public boolean isUseConsumerPriority() {
1029        return useConsumerPriority;
1030    }
1031
1032    public void setUseConsumerPriority(boolean useConsumerPriority) {
1033        this.useConsumerPriority = useConsumerPriority;
1034    }
1035
1036    public boolean isStrictOrderDispatch() {
1037        return strictOrderDispatch;
1038    }
1039
1040    public void setStrictOrderDispatch(boolean strictOrderDispatch) {
1041        this.strictOrderDispatch = strictOrderDispatch;
1042    }
1043
1044    public boolean isOptimizedDispatch() {
1045        return optimizedDispatch;
1046    }
1047
1048    public void setOptimizedDispatch(boolean optimizedDispatch) {
1049        this.optimizedDispatch = optimizedDispatch;
1050    }
1051
1052    public int getTimeBeforeDispatchStarts() {
1053        return timeBeforeDispatchStarts;
1054    }
1055
1056    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
1057        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
1058    }
1059
1060    public int getConsumersBeforeDispatchStarts() {
1061        return consumersBeforeDispatchStarts;
1062    }
1063
1064    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
1065        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
1066    }
1067
1068    public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
1069        this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
1070    }
1071
1072    public boolean isAllConsumersExclusiveByDefault() {
1073        return allConsumersExclusiveByDefault;
1074    }
1075
1076    public boolean isResetNeeded() {
1077        return resetNeeded;
1078    }
1079
1080    // Implementation methods
1081    // -------------------------------------------------------------------------
1082    private QueueMessageReference createMessageReference(Message message) {
1083        QueueMessageReference result = new IndirectMessageReference(message);
1084        return result;
1085    }
1086
1087    @Override
1088    public Message[] browse() {
1089        List<Message> browseList = new ArrayList<Message>();
1090        doBrowse(browseList, getMaxBrowsePageSize());
1091        return browseList.toArray(new Message[browseList.size()]);
1092    }
1093
1094    public void doBrowse(List<Message> browseList, int max) {
1095        final ConnectionContext connectionContext = createConnectionContext();
1096        try {
1097            int maxPageInAttempts = 1;
1098            messagesLock.readLock().lock();
1099            try {
1100                maxPageInAttempts += (messages.size() / getMaxPageSize());
1101            } finally {
1102                messagesLock.readLock().unlock();
1103            }
1104
1105            while (shouldPageInMoreForBrowse(max) && maxPageInAttempts-- > 0) {
1106                pageInMessages(!memoryUsage.isFull(110));
1107            };
1108
1109            doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch");
1110            doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages");
1111
1112            // we need a store iterator to walk messages on disk, independent of the cursor which is tracking
1113            // the next message batch
1114        } catch (Exception e) {
1115            LOG.error("Problem retrieving message for browse", e);
1116        }
1117    }
1118
1119    protected void doBrowseList(List<Message> browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception {
1120        List<MessageReference> toExpire = new ArrayList<MessageReference>();
1121        lock.readLock().lock();
1122        try {
1123            addAll(list.values(), browseList, max, toExpire);
1124        } finally {
1125            lock.readLock().unlock();
1126        }
1127        for (MessageReference ref : toExpire) {
1128            if (broker.isExpired(ref)) {
1129                LOG.debug("expiring from {}: {}", name, ref);
1130                messageExpired(connectionContext, ref);
1131            } else {
1132                lock.writeLock().lock();
1133                try {
1134                    list.remove(ref);
1135                } finally {
1136                    lock.writeLock().unlock();
1137                }
1138                ref.decrementReferenceCount();
1139            }
1140        }
1141    }
1142
1143    private boolean shouldPageInMoreForBrowse(int max) {
1144        int alreadyPagedIn = 0;
1145        pagedInMessagesLock.readLock().lock();
1146        try {
1147            alreadyPagedIn = pagedInMessages.size();
1148        } finally {
1149            pagedInMessagesLock.readLock().unlock();
1150        }
1151        int messagesInQueue = alreadyPagedIn;
1152        messagesLock.readLock().lock();
1153        try {
1154            messagesInQueue += messages.size();
1155        } finally {
1156            messagesLock.readLock().unlock();
1157        }
1158
1159        LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
1160        return (alreadyPagedIn < max)
1161                && (alreadyPagedIn < messagesInQueue)
1162                && messages.hasSpace();
1163    }
1164
1165    private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max,
1166            List<MessageReference> toExpire) throws Exception {
1167        for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) {
1168            QueueMessageReference ref = (QueueMessageReference) i.next();
1169            if (ref.isExpired() && (ref.getLockOwner() == null)) {
1170                toExpire.add(ref);
1171            } else if (l.contains(ref.getMessage()) == false) {
1172                l.add(ref.getMessage());
1173            }
1174        }
1175    }
1176
1177    public QueueMessageReference getMessage(String id) {
1178        MessageId msgId = new MessageId(id);
1179        pagedInMessagesLock.readLock().lock();
1180        try {
1181            QueueMessageReference ref = (QueueMessageReference)this.pagedInMessages.get(msgId);
1182            if (ref != null) {
1183                return ref;
1184            }
1185        } finally {
1186            pagedInMessagesLock.readLock().unlock();
1187        }
1188        messagesLock.writeLock().lock();
1189        try{
1190            try {
1191                messages.reset();
1192                while (messages.hasNext()) {
1193                    MessageReference mr = messages.next();
1194                    QueueMessageReference qmr = createMessageReference(mr.getMessage());
1195                    qmr.decrementReferenceCount();
1196                    messages.rollback(qmr.getMessageId());
1197                    if (msgId.equals(qmr.getMessageId())) {
1198                        return qmr;
1199                    }
1200                }
1201            } finally {
1202                messages.release();
1203            }
1204        }finally {
1205            messagesLock.writeLock().unlock();
1206        }
1207        return null;
1208    }
1209
1210    public void purge() throws Exception {
1211        ConnectionContext c = createConnectionContext();
1212        List<MessageReference> list = null;
1213        long originalMessageCount = this.destinationStatistics.getMessages().getCount();
1214        do {
1215            doPageIn(true, false);  // signal no expiry processing needed.
1216            pagedInMessagesLock.readLock().lock();
1217            try {
1218                list = new ArrayList<MessageReference>(pagedInMessages.values());
1219            }finally {
1220                pagedInMessagesLock.readLock().unlock();
1221            }
1222
1223            for (MessageReference ref : list) {
1224                try {
1225                    QueueMessageReference r = (QueueMessageReference) ref;
1226                    removeMessage(c, r);
1227                } catch (IOException e) {
1228                }
1229            }
1230            // don't spin/hang if stats are out and there is nothing left in the
1231            // store
1232        } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1233
1234        if (this.destinationStatistics.getMessages().getCount() > 0) {
1235            LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
1236        }
1237        gc();
1238        this.destinationStatistics.getMessages().setCount(0);
1239        getMessages().clear();
1240    }
1241
1242    @Override
1243    public void clearPendingMessages() {
1244        messagesLock.writeLock().lock();
1245        try {
1246            if (resetNeeded) {
1247                messages.gc();
1248                messages.reset();
1249                resetNeeded = false;
1250            } else {
1251                messages.rebase();
1252            }
1253            asyncWakeup();
1254        } finally {
1255            messagesLock.writeLock().unlock();
1256        }
1257    }
1258
1259    /**
1260     * Removes the message matching the given messageId
1261     */
1262    public boolean removeMessage(String messageId) throws Exception {
1263        return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
1264    }
1265
1266    /**
1267     * Removes the messages matching the given selector
1268     *
1269     * @return the number of messages removed
1270     */
1271    public int removeMatchingMessages(String selector) throws Exception {
1272        return removeMatchingMessages(selector, -1);
1273    }
1274
1275    /**
1276     * Removes the messages matching the given selector up to the maximum number
1277     * of matched messages
1278     *
1279     * @return the number of messages removed
1280     */
1281    public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
1282        return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
1283    }
1284
1285    /**
1286     * Removes the messages matching the given filter up to the maximum number
1287     * of matched messages
1288     *
1289     * @return the number of messages removed
1290     */
1291    public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
1292        int movedCounter = 0;
1293        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1294        ConnectionContext context = createConnectionContext();
1295        do {
1296            doPageIn(true);
1297            pagedInMessagesLock.readLock().lock();
1298            try {
1299                set.addAll(pagedInMessages.values());
1300            } finally {
1301                pagedInMessagesLock.readLock().unlock();
1302            }
1303            List<MessageReference> list = new ArrayList<MessageReference>(set);
1304            for (MessageReference ref : list) {
1305                IndirectMessageReference r = (IndirectMessageReference) ref;
1306                if (filter.evaluate(context, r)) {
1307
1308                    removeMessage(context, r);
1309                    set.remove(r);
1310                    if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1311                        return movedCounter;
1312                    }
1313                }
1314            }
1315        } while (set.size() < this.destinationStatistics.getMessages().getCount());
1316        return movedCounter;
1317    }
1318
1319    /**
1320     * Copies the message matching the given messageId
1321     */
1322    public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1323            throws Exception {
1324        return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
1325    }
1326
1327    /**
1328     * Copies the messages matching the given selector
1329     *
1330     * @return the number of messages copied
1331     */
1332    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1333            throws Exception {
1334        return copyMatchingMessagesTo(context, selector, dest, -1);
1335    }
1336
1337    /**
1338     * Copies the messages matching the given selector up to the maximum number
1339     * of matched messages
1340     *
1341     * @return the number of messages copied
1342     */
1343    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1344            int maximumMessages) throws Exception {
1345        return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
1346    }
1347
1348    /**
1349     * Copies the messages matching the given filter up to the maximum number of
1350     * matched messages
1351     *
1352     * @return the number of messages copied
1353     */
1354    public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
1355            int maximumMessages) throws Exception {
1356        int movedCounter = 0;
1357        int count = 0;
1358        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1359        do {
1360            int oldMaxSize = getMaxPageSize();
1361            setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
1362            doPageIn(true);
1363            setMaxPageSize(oldMaxSize);
1364            pagedInMessagesLock.readLock().lock();
1365            try {
1366                set.addAll(pagedInMessages.values());
1367            } finally {
1368                pagedInMessagesLock.readLock().unlock();
1369            }
1370            List<MessageReference> list = new ArrayList<MessageReference>(set);
1371            for (MessageReference ref : list) {
1372                IndirectMessageReference r = (IndirectMessageReference) ref;
1373                if (filter.evaluate(context, r)) {
1374
1375                    r.incrementReferenceCount();
1376                    try {
1377                        Message m = r.getMessage();
1378                        BrokerSupport.resend(context, m, dest);
1379                        if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1380                            return movedCounter;
1381                        }
1382                    } finally {
1383                        r.decrementReferenceCount();
1384                    }
1385                }
1386                count++;
1387            }
1388        } while (count < this.destinationStatistics.getMessages().getCount());
1389        return movedCounter;
1390    }
1391
1392    /**
1393     * Move a message
1394     *
1395     * @param context
1396     *            connection context
1397     * @param m
1398     *            QueueMessageReference
1399     * @param dest
1400     *            ActiveMQDestination
1401     * @throws Exception
1402     */
1403    public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
1404        BrokerSupport.resend(context, m.getMessage(), dest);
1405        removeMessage(context, m);
1406        messagesLock.writeLock().lock();
1407        try {
1408            messages.rollback(m.getMessageId());
1409            if (isDLQ()) {
1410                DeadLetterStrategy stratagy = getDeadLetterStrategy();
1411                stratagy.rollback(m.getMessage());
1412            }
1413        } finally {
1414            messagesLock.writeLock().unlock();
1415        }
1416        return true;
1417    }
1418
1419    /**
1420     * Moves the message matching the given messageId
1421     */
1422    public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
1423            throws Exception {
1424        return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
1425    }
1426
1427    /**
1428     * Moves the messages matching the given selector
1429     *
1430     * @return the number of messages removed
1431     */
1432    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
1433            throws Exception {
1434        return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
1435    }
1436
1437    /**
1438     * Moves the messages matching the given selector up to the maximum number
1439     * of matched messages
1440     */
1441    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
1442            int maximumMessages) throws Exception {
1443        return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
1444    }
1445
1446    /**
1447     * Moves the messages matching the given filter up to the maximum number of
1448     * matched messages
1449     */
1450    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
1451            ActiveMQDestination dest, int maximumMessages) throws Exception {
1452        int movedCounter = 0;
1453        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1454        do {
1455            doPageIn(true);
1456            pagedInMessagesLock.readLock().lock();
1457            try {
1458                set.addAll(pagedInMessages.values());
1459            } finally {
1460                pagedInMessagesLock.readLock().unlock();
1461            }
1462            List<MessageReference> list = new ArrayList<MessageReference>(set);
1463            for (MessageReference ref : list) {
1464                if (filter.evaluate(context, ref)) {
1465                    // We should only move messages that can be locked.
1466                    moveMessageTo(context, (QueueMessageReference)ref, dest);
1467                    set.remove(ref);
1468                    if (++movedCounter >= maximumMessages && maximumMessages > 0) {
1469                        return movedCounter;
1470                    }
1471                }
1472            }
1473        } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1474        return movedCounter;
1475    }
1476
1477    public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception {
1478        if (!isDLQ()) {
1479            throw new Exception("Retry of message is only possible on Dead Letter Queues!");
1480        }
1481        int restoredCounter = 0;
1482        Set<MessageReference> set = new LinkedHashSet<MessageReference>();
1483        do {
1484            doPageIn(true);
1485            pagedInMessagesLock.readLock().lock();
1486            try {
1487                set.addAll(pagedInMessages.values());
1488            } finally {
1489                pagedInMessagesLock.readLock().unlock();
1490            }
1491            List<MessageReference> list = new ArrayList<MessageReference>(set);
1492            for (MessageReference ref : list) {
1493                if (ref.getMessage().getOriginalDestination() != null) {
1494
1495                    moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination());
1496                    set.remove(ref);
1497                    if (++restoredCounter >= maximumMessages && maximumMessages > 0) {
1498                        return restoredCounter;
1499                    }
1500                }
1501            }
1502        } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
1503        return restoredCounter;
1504    }
1505
1506    /**
1507     * @return true if we would like to iterate again
1508     * @see org.apache.activemq.thread.Task#iterate()
1509     */
1510    @Override
1511    public boolean iterate() {
1512        MDC.put("activemq.destination", getName());
1513        boolean pageInMoreMessages = false;
1514        synchronized (iteratingMutex) {
1515
1516            // If optimize dispatch is on or this is a slave this method could be called recursively
1517            // we set this state value to short-circuit wakeup in those cases to avoid that as it
1518            // could lead to errors.
1519            iterationRunning = true;
1520
1521            // do early to allow dispatch of these waiting messages
1522            synchronized (messagesWaitingForSpace) {
1523                Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
1524                while (it.hasNext()) {
1525                    if (!memoryUsage.isFull()) {
1526                        Runnable op = it.next();
1527                        it.remove();
1528                        op.run();
1529                    } else {
1530                        registerCallbackForNotFullNotification();
1531                        break;
1532                    }
1533                }
1534            }
1535
1536            if (firstConsumer) {
1537                firstConsumer = false;
1538                try {
1539                    if (consumersBeforeDispatchStarts > 0) {
1540                        int timeout = 1000; // wait one second by default if
1541                                            // consumer count isn't reached
1542                        if (timeBeforeDispatchStarts > 0) {
1543                            timeout = timeBeforeDispatchStarts;
1544                        }
1545                        if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
1546                            LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size());
1547                        } else {
1548                            LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size());
1549                        }
1550                    }
1551                    if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
1552                        iteratingMutex.wait(timeBeforeDispatchStarts);
1553                        LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts);
1554                    }
1555                } catch (Exception e) {
1556                    LOG.error(e.toString());
1557                }
1558            }
1559
1560            messagesLock.readLock().lock();
1561            try{
1562                pageInMoreMessages |= !messages.isEmpty();
1563            } finally {
1564                messagesLock.readLock().unlock();
1565            }
1566
1567            pagedInPendingDispatchLock.readLock().lock();
1568            try {
1569                pageInMoreMessages |= !dispatchPendingList.isEmpty();
1570            } finally {
1571                pagedInPendingDispatchLock.readLock().unlock();
1572            }
1573
1574            // Perhaps we should page always into the pagedInPendingDispatch
1575            // list if
1576            // !messages.isEmpty(), and then if
1577            // !pagedInPendingDispatch.isEmpty()
1578            // then we do a dispatch.
1579            boolean hasBrowsers = browserDispatches.size() > 0;
1580
1581            if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) {
1582                try {
1583                    pageInMessages(hasBrowsers);
1584                } catch (Throwable e) {
1585                    LOG.error("Failed to page in more queue messages ", e);
1586                }
1587            }
1588
1589            if (hasBrowsers) {
1590                ArrayList<MessageReference> alreadyDispatchedMessages = null;
1591                pagedInMessagesLock.readLock().lock();
1592                try{
1593                    alreadyDispatchedMessages = new ArrayList<MessageReference>(pagedInMessages.values());
1594                }finally {
1595                    pagedInMessagesLock.readLock().unlock();
1596                }
1597
1598                Iterator<BrowserDispatch> browsers = browserDispatches.iterator();
1599                while (browsers.hasNext()) {
1600                    BrowserDispatch browserDispatch = browsers.next();
1601                    try {
1602                        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
1603                        msgContext.setDestination(destination);
1604
1605                        QueueBrowserSubscription browser = browserDispatch.getBrowser();
1606
1607                        LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
1608                        boolean added = false;
1609                        for (MessageReference node : alreadyDispatchedMessages) {
1610                            if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
1611                                msgContext.setMessageReference(node);
1612                                if (browser.matches(node, msgContext)) {
1613                                    browser.add(node);
1614                                    added = true;
1615                                }
1616                            }
1617                        }
1618                        // are we done browsing? no new messages paged
1619                        if (!added || browser.atMax()) {
1620                            browser.decrementQueueRef();
1621                            browserDispatches.remove(browserDispatch);
1622                        }
1623                    } catch (Exception e) {
1624                        LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e);
1625                    }
1626                }
1627            }
1628
1629            if (pendingWakeups.get() > 0) {
1630                pendingWakeups.decrementAndGet();
1631            }
1632            MDC.remove("activemq.destination");
1633            iterationRunning = false;
1634
1635            return pendingWakeups.get() > 0;
1636        }
1637    }
1638
1639    public void pauseDispatch() {
1640        dispatchSelector.pause();
1641    }
1642
1643    public void resumeDispatch() {
1644        dispatchSelector.resume();
1645    }
1646
1647    public boolean isDispatchPaused() {
1648        return dispatchSelector.isPaused();
1649    }
1650
1651    protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
1652        return new MessageReferenceFilter() {
1653            @Override
1654            public boolean evaluate(ConnectionContext context, MessageReference r) {
1655                return messageId.equals(r.getMessageId().toString());
1656            }
1657
1658            @Override
1659            public String toString() {
1660                return "MessageIdFilter: " + messageId;
1661            }
1662        };
1663    }
1664
1665    protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
1666
1667        if (selector == null || selector.isEmpty()) {
1668            return new MessageReferenceFilter() {
1669
1670                @Override
1671                public boolean evaluate(ConnectionContext context, MessageReference messageReference) throws JMSException {
1672                    return true;
1673                }
1674            };
1675        }
1676
1677        final BooleanExpression selectorExpression = SelectorParser.parse(selector);
1678
1679        return new MessageReferenceFilter() {
1680            @Override
1681            public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
1682                MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
1683
1684                messageEvaluationContext.setMessageReference(r);
1685                if (messageEvaluationContext.getDestination() == null) {
1686                    messageEvaluationContext.setDestination(getActiveMQDestination());
1687                }
1688
1689                return selectorExpression.matches(messageEvaluationContext);
1690            }
1691        };
1692    }
1693
1694    protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
1695        removeMessage(c, null, r);
1696        pagedInPendingDispatchLock.writeLock().lock();
1697        try {
1698            dispatchPendingList.remove(r);
1699        } finally {
1700            pagedInPendingDispatchLock.writeLock().unlock();
1701        }
1702    }
1703
1704    protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
1705        MessageAck ack = new MessageAck();
1706        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
1707        ack.setDestination(destination);
1708        ack.setMessageID(r.getMessageId());
1709        removeMessage(c, subs, r, ack);
1710    }
1711
1712    protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
1713            MessageAck ack) throws IOException {
1714        LOG.trace("ack of {} with {}", reference.getMessageId(), ack);
1715        // This sends the ack the the journal..
1716        if (!ack.isInTransaction()) {
1717            acknowledge(context, sub, ack, reference);
1718            getDestinationStatistics().getDequeues().increment();
1719            dropMessage(reference);
1720        } else {
1721            try {
1722                acknowledge(context, sub, ack, reference);
1723            } finally {
1724                context.getTransaction().addSynchronization(new Synchronization() {
1725
1726                    @Override
1727                    public void afterCommit() throws Exception {
1728                        getDestinationStatistics().getDequeues().increment();
1729                        dropMessage(reference);
1730                        wakeup();
1731                    }
1732
1733                    @Override
1734                    public void afterRollback() throws Exception {
1735                        reference.setAcked(false);
1736                        wakeup();
1737                    }
1738                });
1739            }
1740        }
1741        if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) {
1742            // message gone to DLQ, is ok to allow redelivery
1743            messagesLock.writeLock().lock();
1744            try {
1745                messages.rollback(reference.getMessageId());
1746            } finally {
1747                messagesLock.writeLock().unlock();
1748            }
1749            if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) {
1750                getDestinationStatistics().getForwards().increment();
1751            }
1752        }
1753        // after successful store update
1754        reference.setAcked(true);
1755    }
1756
1757    private void dropMessage(QueueMessageReference reference) {
1758        if (!reference.isDropped()) {
1759            reference.drop();
1760            destinationStatistics.getMessages().decrement();
1761            pagedInMessagesLock.writeLock().lock();
1762            try {
1763                pagedInMessages.remove(reference);
1764            } finally {
1765                pagedInMessagesLock.writeLock().unlock();
1766            }
1767        }
1768    }
1769
1770    public void messageExpired(ConnectionContext context, MessageReference reference) {
1771        messageExpired(context, null, reference);
1772    }
1773
1774    @Override
1775    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
1776        LOG.debug("message expired: {}", reference);
1777        broker.messageExpired(context, reference, subs);
1778        destinationStatistics.getExpired().increment();
1779        try {
1780            removeMessage(context, subs, (QueueMessageReference) reference);
1781            messagesLock.writeLock().lock();
1782            try {
1783                messages.rollback(reference.getMessageId());
1784            } finally {
1785                messagesLock.writeLock().unlock();
1786            }
1787        } catch (IOException e) {
1788            LOG.error("Failed to remove expired Message from the store ", e);
1789        }
1790    }
1791
1792    final boolean cursorAdd(final Message msg) throws Exception {
1793        messagesLock.writeLock().lock();
1794        try {
1795            return messages.addMessageLast(msg);
1796        } finally {
1797            messagesLock.writeLock().unlock();
1798        }
1799    }
1800
1801    final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
1802        destinationStatistics.getEnqueues().increment();
1803        destinationStatistics.getMessages().increment();
1804        destinationStatistics.getMessageSize().addSize(msg.getSize());
1805        messageDelivered(context, msg);
1806        consumersLock.readLock().lock();
1807        try {
1808            if (consumers.isEmpty()) {
1809                onMessageWithNoConsumers(context, msg);
1810            }
1811        }finally {
1812            consumersLock.readLock().unlock();
1813        }
1814        LOG.debug("{} Message {} sent to {}", new Object[]{ broker.getBrokerName(), msg.getMessageId(), this.destination });
1815        wakeup();
1816    }
1817
1818    @Override
1819    public void wakeup() {
1820        if (optimizedDispatch && !iterationRunning) {
1821            iterate();
1822            pendingWakeups.incrementAndGet();
1823        } else {
1824            asyncWakeup();
1825        }
1826    }
1827
1828    private void asyncWakeup() {
1829        try {
1830            pendingWakeups.incrementAndGet();
1831            this.taskRunner.wakeup();
1832        } catch (InterruptedException e) {
1833            LOG.warn("Async task runner failed to wakeup ", e);
1834        }
1835    }
1836
1837    private void doPageIn(boolean force) throws Exception {
1838        doPageIn(force, true);
1839    }
1840
1841    private void doPageIn(boolean force, boolean processExpired) throws Exception {
1842        PendingList newlyPaged = doPageInForDispatch(force, processExpired);
1843        pagedInPendingDispatchLock.writeLock().lock();
1844        try {
1845            if (dispatchPendingList.isEmpty()) {
1846                dispatchPendingList.addAll(newlyPaged);
1847
1848            } else {
1849                for (MessageReference qmr : newlyPaged) {
1850                    if (!dispatchPendingList.contains(qmr)) {
1851                        dispatchPendingList.addMessageLast(qmr);
1852                    }
1853                }
1854            }
1855        } finally {
1856            pagedInPendingDispatchLock.writeLock().unlock();
1857        }
1858    }
1859
1860    private PendingList doPageInForDispatch(boolean force, boolean processExpired) throws Exception {
1861        List<QueueMessageReference> result = null;
1862        PendingList resultList = null;
1863
1864        int toPageIn = Math.min(getMaxPageSize(), messages.size());
1865        int pagedInPendingSize = 0;
1866        pagedInPendingDispatchLock.readLock().lock();
1867        try {
1868            pagedInPendingSize = dispatchPendingList.size();
1869        } finally {
1870            pagedInPendingDispatchLock.readLock().unlock();
1871        }
1872
1873        LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}",
1874                new Object[]{
1875                        this,
1876                        toPageIn,
1877                        destinationStatistics.getInflight().getCount(),
1878                        pagedInMessages.size(),
1879                        pagedInPendingSize,
1880                        destinationStatistics.getEnqueues().getCount(),
1881                        destinationStatistics.getDequeues().getCount(),
1882                        getMemoryUsage().getUsage()
1883                });
1884        if (isLazyDispatch() && !force) {
1885            // Only page in the minimum number of messages which can be
1886            // dispatched immediately.
1887            toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
1888        }
1889        if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
1890            int count = 0;
1891            result = new ArrayList<QueueMessageReference>(toPageIn);
1892            messagesLock.writeLock().lock();
1893            try {
1894                try {
1895                    messages.setMaxBatchSize(toPageIn);
1896                    messages.reset();
1897                    while (messages.hasNext() && count < toPageIn) {
1898                        MessageReference node = messages.next();
1899                        messages.remove();
1900
1901                        QueueMessageReference ref = createMessageReference(node.getMessage());
1902                        if (processExpired && ref.isExpired()) {
1903                            if (broker.isExpired(ref)) {
1904                                messageExpired(createConnectionContext(), ref);
1905                            } else {
1906                                ref.decrementReferenceCount();
1907                            }
1908                        } else {
1909                            result.add(ref);
1910                            count++;
1911                        }
1912                    }
1913                } finally {
1914                    messages.release();
1915                }
1916            } finally {
1917                messagesLock.writeLock().unlock();
1918            }
1919            // Only add new messages, not already pagedIn to avoid multiple
1920            // dispatch attempts
1921            pagedInMessagesLock.writeLock().lock();
1922            try {
1923                if(isPrioritizedMessages()) {
1924                    resultList = new PrioritizedPendingList();
1925                } else {
1926                    resultList = new OrderedPendingList();
1927                }
1928                for (QueueMessageReference ref : result) {
1929                    if (!pagedInMessages.contains(ref)) {
1930                        pagedInMessages.addMessageLast(ref);
1931                        resultList.addMessageLast(ref);
1932                    } else {
1933                        ref.decrementReferenceCount();
1934                        // store should have trapped duplicate in it's index, also cursor audit
1935                        // we need to remove the duplicate from the store in the knowledge that the original message may be inflight
1936                        // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
1937                        LOG.warn("{}, duplicate message {} paged in, is cursor audit disabled? Removing from store and redirecting to dlq", this, ref.getMessage());
1938                        if (store != null) {
1939                            ConnectionContext connectionContext = createConnectionContext();
1940                            store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
1941                            broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from store for " + destination));
1942                        }
1943                    }
1944                }
1945            } finally {
1946                pagedInMessagesLock.writeLock().unlock();
1947            }
1948        } else {
1949            // Avoid return null list, if condition is not validated
1950            resultList = new OrderedPendingList();
1951        }
1952
1953        return resultList;
1954    }
1955
1956    private void doDispatch(PendingList list) throws Exception {
1957        boolean doWakeUp = false;
1958
1959        pagedInPendingDispatchLock.writeLock().lock();
1960        try {
1961            doActualDispatch(dispatchPendingList);
1962            // and now see if we can dispatch the new stuff.. and append to the pending
1963            // list anything that does not actually get dispatched.
1964            if (list != null && !list.isEmpty()) {
1965                if (dispatchPendingList.isEmpty()) {
1966                    dispatchPendingList.addAll(doActualDispatch(list));
1967                } else {
1968                    for (MessageReference qmr : list) {
1969                        if (!dispatchPendingList.contains(qmr)) {
1970                            dispatchPendingList.addMessageLast(qmr);
1971                        }
1972                    }
1973                    doWakeUp = true;
1974                }
1975            }
1976        } finally {
1977            pagedInPendingDispatchLock.writeLock().unlock();
1978        }
1979
1980        if (doWakeUp) {
1981            // avoid lock order contention
1982            asyncWakeup();
1983        }
1984    }
1985
1986    /**
1987     * @return list of messages that could get dispatched to consumers if they
1988     *         were not full.
1989     */
1990    private PendingList doActualDispatch(PendingList list) throws Exception {
1991        List<Subscription> consumers;
1992        consumersLock.readLock().lock();
1993
1994        try {
1995            if (this.consumers.isEmpty()) {
1996                // slave dispatch happens in processDispatchNotification
1997                return list;
1998            }
1999            consumers = new ArrayList<Subscription>(this.consumers);
2000        } finally {
2001            consumersLock.readLock().unlock();
2002        }
2003
2004        Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
2005
2006        for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
2007
2008            MessageReference node = iterator.next();
2009            Subscription target = null;
2010            for (Subscription s : consumers) {
2011                if (s instanceof QueueBrowserSubscription) {
2012                    continue;
2013                }
2014                if (!fullConsumers.contains(s)) {
2015                    if (!s.isFull()) {
2016                        if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
2017                            // Dispatch it.
2018                            s.add(node);
2019                            LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
2020                            iterator.remove();
2021                            target = s;
2022                            break;
2023                        }
2024                    } else {
2025                        // no further dispatch of list to a full consumer to
2026                        // avoid out of order message receipt
2027                        fullConsumers.add(s);
2028                        LOG.trace("Subscription full {}", s);
2029                    }
2030                }
2031            }
2032
2033            if (target == null && node.isDropped()) {
2034                iterator.remove();
2035            }
2036
2037            // return if there are no consumers or all consumers are full
2038            if (target == null && consumers.size() == fullConsumers.size()) {
2039                return list;
2040            }
2041
2042            // If it got dispatched, rotate the consumer list to get round robin
2043            // distribution.
2044            if (target != null && !strictOrderDispatch && consumers.size() > 1
2045                    && !dispatchSelector.isExclusiveConsumer(target)) {
2046                consumersLock.writeLock().lock();
2047                try {
2048                    if (removeFromConsumerList(target)) {
2049                        addToConsumerList(target);
2050                        consumers = new ArrayList<Subscription>(this.consumers);
2051                    }
2052                } finally {
2053                    consumersLock.writeLock().unlock();
2054                }
2055            }
2056        }
2057
2058        return list;
2059    }
2060
2061    protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
2062        boolean result = true;
2063        // Keep message groups together.
2064        String groupId = node.getGroupID();
2065        int sequence = node.getGroupSequence();
2066        if (groupId != null) {
2067
2068            MessageGroupMap messageGroupOwners = getMessageGroupOwners();
2069            // If we can own the first, then no-one else should own the
2070            // rest.
2071            if (sequence == 1) {
2072                assignGroup(subscription, messageGroupOwners, node, groupId);
2073            } else {
2074
2075                // Make sure that the previous owner is still valid, we may
2076                // need to become the new owner.
2077                ConsumerId groupOwner;
2078
2079                groupOwner = messageGroupOwners.get(groupId);
2080                if (groupOwner == null) {
2081                    assignGroup(subscription, messageGroupOwners, node, groupId);
2082                } else {
2083                    if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
2084                        // A group sequence < 1 is an end of group signal.
2085                        if (sequence < 0) {
2086                            messageGroupOwners.removeGroup(groupId);
2087                            subscription.getConsumerInfo().decrementAssignedGroupCount();
2088                        }
2089                    } else {
2090                        result = false;
2091                    }
2092                }
2093            }
2094        }
2095
2096        return result;
2097    }
2098
2099    protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
2100        messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
2101        Message message = n.getMessage();
2102        message.setJMSXGroupFirstForConsumer(true);
2103        subs.getConsumerInfo().incrementAssignedGroupCount();
2104    }
2105
2106    protected void pageInMessages(boolean force) throws Exception {
2107        doDispatch(doPageInForDispatch(force, true));
2108    }
2109
2110    private void addToConsumerList(Subscription sub) {
2111        if (useConsumerPriority) {
2112            consumers.add(sub);
2113            Collections.sort(consumers, orderedCompare);
2114        } else {
2115            consumers.add(sub);
2116        }
2117    }
2118
2119    private boolean removeFromConsumerList(Subscription sub) {
2120        return consumers.remove(sub);
2121    }
2122
2123    private int getConsumerMessageCountBeforeFull() throws Exception {
2124        int total = 0;
2125        boolean zeroPrefetch = false;
2126        consumersLock.readLock().lock();
2127        try {
2128            for (Subscription s : consumers) {
2129                zeroPrefetch |= s.getPrefetchSize() == 0;
2130                int countBeforeFull = s.countBeforeFull();
2131                total += countBeforeFull;
2132            }
2133        } finally {
2134            consumersLock.readLock().unlock();
2135        }
2136        if (total == 0 && zeroPrefetch) {
2137            total = 1;
2138        }
2139        return total;
2140    }
2141
2142    /*
2143     * In slave mode, dispatch is ignored till we get this notification as the
2144     * dispatch process is non deterministic between master and slave. On a
2145     * notification, the actual dispatch to the subscription (as chosen by the
2146     * master) is completed. (non-Javadoc)
2147     * @see
2148     * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
2149     * (org.apache.activemq.command.MessageDispatchNotification)
2150     */
2151    @Override
2152    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
2153        // do dispatch
2154        Subscription sub = getMatchingSubscription(messageDispatchNotification);
2155        if (sub != null) {
2156            MessageReference message = getMatchingMessage(messageDispatchNotification);
2157            sub.add(message);
2158            sub.processMessageDispatchNotification(messageDispatchNotification);
2159        }
2160    }
2161
2162    private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
2163            throws Exception {
2164        QueueMessageReference message = null;
2165        MessageId messageId = messageDispatchNotification.getMessageId();
2166
2167        pagedInPendingDispatchLock.writeLock().lock();
2168        try {
2169            for (MessageReference ref : dispatchPendingList) {
2170                if (messageId.equals(ref.getMessageId())) {
2171                    message = (QueueMessageReference)ref;
2172                    dispatchPendingList.remove(ref);
2173                    break;
2174                }
2175            }
2176        } finally {
2177            pagedInPendingDispatchLock.writeLock().unlock();
2178        }
2179
2180        if (message == null) {
2181            pagedInMessagesLock.readLock().lock();
2182            try {
2183                message = (QueueMessageReference)pagedInMessages.get(messageId);
2184            } finally {
2185                pagedInMessagesLock.readLock().unlock();
2186            }
2187        }
2188
2189        if (message == null) {
2190            messagesLock.writeLock().lock();
2191            try {
2192                try {
2193                    messages.setMaxBatchSize(getMaxPageSize());
2194                    messages.reset();
2195                    while (messages.hasNext()) {
2196                        MessageReference node = messages.next();
2197                        messages.remove();
2198                        if (messageId.equals(node.getMessageId())) {
2199                            message = this.createMessageReference(node.getMessage());
2200                            break;
2201                        }
2202                    }
2203                } finally {
2204                    messages.release();
2205                }
2206            } finally {
2207                messagesLock.writeLock().unlock();
2208            }
2209        }
2210
2211        if (message == null) {
2212            Message msg = loadMessage(messageId);
2213            if (msg != null) {
2214                message = this.createMessageReference(msg);
2215            }
2216        }
2217
2218        if (message == null) {
2219            throw new JMSException("Slave broker out of sync with master - Message: "
2220                    + messageDispatchNotification.getMessageId() + " on "
2221                    + messageDispatchNotification.getDestination() + " does not exist among pending("
2222                    + dispatchPendingList.size() + ") for subscription: "
2223                    + messageDispatchNotification.getConsumerId());
2224        }
2225        return message;
2226    }
2227
2228    /**
2229     * Find a consumer that matches the id in the message dispatch notification
2230     *
2231     * @param messageDispatchNotification
2232     * @return sub or null if the subscription has been removed before dispatch
2233     * @throws JMSException
2234     */
2235    private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
2236            throws JMSException {
2237        Subscription sub = null;
2238        consumersLock.readLock().lock();
2239        try {
2240            for (Subscription s : consumers) {
2241                if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
2242                    sub = s;
2243                    break;
2244                }
2245            }
2246        } finally {
2247            consumersLock.readLock().unlock();
2248        }
2249        return sub;
2250    }
2251
2252    @Override
2253    public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
2254        if (oldPercentUsage > newPercentUsage) {
2255            asyncWakeup();
2256        }
2257    }
2258
2259    @Override
2260    protected Logger getLog() {
2261        return LOG;
2262    }
2263
2264    protected boolean isOptimizeStorage(){
2265        boolean result = false;
2266        if (isDoOptimzeMessageStorage()){
2267            consumersLock.readLock().lock();
2268            try{
2269                if (consumers.isEmpty()==false){
2270                    result = true;
2271                    for (Subscription s : consumers) {
2272                        if (s.getPrefetchSize()==0){
2273                            result = false;
2274                            break;
2275                        }
2276                        if (s.isSlowConsumer()){
2277                            result = false;
2278                            break;
2279                        }
2280                        if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
2281                            result = false;
2282                            break;
2283                        }
2284                    }
2285                }
2286            } finally {
2287                consumersLock.readLock().unlock();
2288            }
2289        }
2290        return result;
2291    }
2292}