001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import org.apache.activemq.broker.region.MessageReference; 020import org.apache.activemq.broker.region.QueueMessageReference; 021import org.apache.activemq.command.MessageId; 022 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.Iterator; 026import java.util.List; 027 028/** 029 * An abstraction that keeps the correct order of messages that need to be dispatched 030 * to consumers, but also hides the fact that there might be redelivered messages that 031 * should be dispatched ahead of any other paged in messages. 032 * 033 * Direct usage of this class is recommended as you can control when redeliveries need 034 * to be added vs regular pending messages (the next set of messages that can be dispatched) 035 * 036 * Created by ceposta 037 * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. 038 */ 039public class QueueDispatchPendingList implements PendingList { 040 041 private PendingList pagedInPendingDispatch = new OrderedPendingList(); 042 private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); 043 044 045 @Override 046 public boolean isEmpty() { 047 return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty(); 048 } 049 050 @Override 051 public void clear() { 052 pagedInPendingDispatch.clear(); 053 redeliveredWaitingDispatch.clear(); 054 } 055 056 /** 057 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 058 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 059 * method 060 * @param message 061 * The MessageReference that is to be added to this list. 062 * 063 * @return 064 */ 065 @Override 066 public PendingNode addMessageFirst(MessageReference message) { 067 return pagedInPendingDispatch.addMessageFirst(message); 068 } 069 070 /** 071 * Messages added are added directly to the pagedInPendingDispatch set of messages. If 072 * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() 073 * method 074 * @param message 075 * The MessageReference that is to be added to this list. 076 * 077 * @return 078 */ 079 @Override 080 public PendingNode addMessageLast(MessageReference message) { 081 return pagedInPendingDispatch.addMessageLast(message); 082 } 083 084 @Override 085 public PendingNode remove(MessageReference message) { 086 if (pagedInPendingDispatch.contains(message)) { 087 return pagedInPendingDispatch.remove(message); 088 }else if (redeliveredWaitingDispatch.contains(message)) { 089 return redeliveredWaitingDispatch.remove(message); 090 } 091 return null; 092 } 093 094 @Override 095 public int size() { 096 return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size(); 097 } 098 099 @Override 100 public Iterator<MessageReference> iterator() { 101 return new Iterator<MessageReference>() { 102 103 Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator(); 104 Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator(); 105 Iterator<MessageReference> current = redeliveries; 106 107 108 @Override 109 public boolean hasNext() { 110 if (!redeliveries.hasNext() && (current == redeliveries)) { 111 current = pendingDispatch; 112 } 113 return current.hasNext(); 114 } 115 116 @Override 117 public MessageReference next() { 118 return current.next(); 119 } 120 121 @Override 122 public void remove() { 123 current.remove(); 124 } 125 }; 126 } 127 128 @Override 129 public boolean contains(MessageReference message) { 130 return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message); 131 } 132 133 @Override 134 public Collection<MessageReference> values() { 135 List<MessageReference> messageReferences = new ArrayList<MessageReference>(); 136 Iterator<MessageReference> iterator = iterator(); 137 while (iterator.hasNext()) { 138 messageReferences.add(iterator.next()); 139 } 140 return messageReferences; 141 } 142 143 @Override 144 public void addAll(PendingList pendingList) { 145 pagedInPendingDispatch.addAll(pendingList); 146 } 147 148 @Override 149 public MessageReference get(MessageId messageId) { 150 MessageReference rc = pagedInPendingDispatch.get(messageId); 151 if (rc == null) { 152 return redeliveredWaitingDispatch.get(messageId); 153 } 154 return rc; 155 } 156 157 public void setPrioritizedMessages(boolean prioritizedMessages) { 158 if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { 159 pagedInPendingDispatch = new PrioritizedPendingList(); 160 redeliveredWaitingDispatch = new PrioritizedPendingList(); 161 } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { 162 pagedInPendingDispatch = new OrderedPendingList(); 163 redeliveredWaitingDispatch = new OrderedPendingList(); 164 } 165 } 166 167 public void addMessageForRedelivery(QueueMessageReference qmr) { 168 redeliveredWaitingDispatch.addMessageLast(qmr); 169 } 170 171 public boolean hasRedeliveries(){ 172 return !redeliveredWaitingDispatch.isEmpty(); 173 } 174}