001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.region.MessageReference;
020import org.apache.activemq.broker.region.QueueMessageReference;
021import org.apache.activemq.command.MessageId;
022
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.Iterator;
026import java.util.List;
027
028/**
029 * An abstraction that keeps the correct order of messages that need to be dispatched
030 * to consumers, but also hides the fact that there might be redelivered messages that
031 * should be dispatched ahead of any other paged in messages.
032 *
033 * Direct usage of this class is recommended as you can control when redeliveries need
034 * to be added vs regular pending messages (the next set of messages that can be dispatched)
035 *
036 * Created by ceposta
037 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
038 */
039public class QueueDispatchPendingList implements PendingList {
040
041    private PendingList pagedInPendingDispatch = new OrderedPendingList();
042    private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
043
044
045    @Override
046    public boolean isEmpty() {
047        return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty();
048    }
049
050    @Override
051    public void clear() {
052        pagedInPendingDispatch.clear();
053        redeliveredWaitingDispatch.clear();
054    }
055
056    /**
057     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
058     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
059     * method
060     * @param message
061     *      The MessageReference that is to be added to this list.
062     *
063     * @return
064     */
065    @Override
066    public PendingNode addMessageFirst(MessageReference message) {
067        return pagedInPendingDispatch.addMessageFirst(message);
068    }
069
070    /**
071     * Messages added are added directly to the pagedInPendingDispatch set of messages. If
072     * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery()
073     * method
074     * @param message
075     *      The MessageReference that is to be added to this list.
076     *
077     * @return
078     */
079    @Override
080    public PendingNode addMessageLast(MessageReference message) {
081        return pagedInPendingDispatch.addMessageLast(message);
082    }
083
084    @Override
085    public PendingNode remove(MessageReference message) {
086        if (pagedInPendingDispatch.contains(message)) {
087            return pagedInPendingDispatch.remove(message);
088        }else if (redeliveredWaitingDispatch.contains(message)) {
089            return redeliveredWaitingDispatch.remove(message);
090        }
091        return null;
092    }
093
094    @Override
095    public int size() {
096        return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size();
097    }
098
099    @Override
100    public Iterator<MessageReference> iterator() {
101        return new Iterator<MessageReference>() {
102
103            Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator();
104            Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator();
105            Iterator<MessageReference> current = redeliveries;
106
107
108            @Override
109            public boolean hasNext() {
110                if (!redeliveries.hasNext() && (current == redeliveries)) {
111                    current = pendingDispatch;
112                }
113                return current.hasNext();
114            }
115
116            @Override
117            public MessageReference next() {
118                return current.next();
119            }
120
121            @Override
122            public void remove() {
123                current.remove();
124            }
125        };
126    }
127
128    @Override
129    public boolean contains(MessageReference message) {
130        return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message);
131    }
132
133    @Override
134    public Collection<MessageReference> values() {
135        List<MessageReference> messageReferences = new ArrayList<MessageReference>();
136        Iterator<MessageReference> iterator = iterator();
137        while (iterator.hasNext()) {
138            messageReferences.add(iterator.next());
139        }
140        return messageReferences;
141    }
142
143    @Override
144    public void addAll(PendingList pendingList) {
145        pagedInPendingDispatch.addAll(pendingList);
146    }
147
148    @Override
149    public MessageReference get(MessageId messageId) {
150        MessageReference rc = pagedInPendingDispatch.get(messageId);
151        if (rc == null) {
152            return redeliveredWaitingDispatch.get(messageId);
153        }
154        return rc;
155    }
156
157    public void setPrioritizedMessages(boolean prioritizedMessages) {
158        if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
159            pagedInPendingDispatch = new PrioritizedPendingList();
160            redeliveredWaitingDispatch = new PrioritizedPendingList();
161        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
162            pagedInPendingDispatch = new OrderedPendingList();
163            redeliveredWaitingDispatch = new OrderedPendingList();
164        }
165    }
166
167    public void addMessageForRedelivery(QueueMessageReference qmr) {
168        redeliveredWaitingDispatch.addMessageLast(qmr);
169    }
170
171    public boolean hasRedeliveries(){
172        return !redeliveredWaitingDispatch.isEmpty();
173    }
174}