001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicLong;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.IndirectMessageReference;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.QueueMessageReference;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
032import org.apache.activemq.openwire.OpenWireFormat;
033import org.apache.activemq.store.PList;
034import org.apache.activemq.store.PListStore;
035import org.apache.activemq.store.PListEntry;
036import org.apache.activemq.usage.SystemUsage;
037import org.apache.activemq.usage.Usage;
038import org.apache.activemq.usage.UsageListener;
039import org.apache.activemq.wireformat.WireFormat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.activemq.util.ByteSequence;
043
044/**
045 * persist pending messages pending message (messages awaiting dispatch to a
046 * consumer) cursor
047 * 
048 * 
049 */
050public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
051    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
052    private static final AtomicLong NAME_COUNT = new AtomicLong();
053    protected Broker broker;
054    private final PListStore store;
055    private final String name;
056    private PendingList memoryList;
057    private PList diskList;
058    private Iterator<MessageReference> iter;
059    private Destination regionDestination;
060    private boolean iterating;
061    private boolean flushRequired;
062    private final AtomicBoolean started = new AtomicBoolean();
063    private final WireFormat wireFormat = new OpenWireFormat();
064    /**
065     * @param broker
066     * @param name
067     * @param prioritizedMessages
068     */
069    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
070        super(prioritizedMessages);
071        if (this.prioritizedMessages) {
072            this.memoryList = new PrioritizedPendingList();
073        } else {
074            this.memoryList = new OrderedPendingList();
075        }
076        this.broker = broker;
077        // the store can be null if the BrokerService has persistence
078        // turned off
079        this.store = broker.getTempDataStore();
080        this.name = NAME_COUNT.incrementAndGet() + "_" + name;
081    }
082
083    @Override
084    public void start() throws Exception {
085        if (started.compareAndSet(false, true)) {
086            if( this.broker != null) {
087                wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion());
088            }
089            super.start();
090            if (systemUsage != null) {
091                systemUsage.getMemoryUsage().addUsageListener(this);
092            }
093        }
094    }
095
096    @Override
097    public void stop() throws Exception {
098        if (started.compareAndSet(true, false)) {
099            super.stop();
100            if (systemUsage != null) {
101                systemUsage.getMemoryUsage().removeUsageListener(this);
102            }
103        }
104    }
105
106    /**
107     * @return true if there are no pending messages
108     */
109    @Override
110    public synchronized boolean isEmpty() {
111        if (memoryList.isEmpty() && isDiskListEmpty()) {
112            return true;
113        }
114        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
115            MessageReference node = iterator.next();
116            if (node == QueueMessageReference.NULL_MESSAGE) {
117                continue;
118            }
119            if (!node.isDropped()) {
120                return false;
121            }
122            // We can remove dropped references.
123            iterator.remove();
124        }
125        return isDiskListEmpty();
126    }
127
128    /**
129     * reset the cursor
130     */
131    @Override
132    public synchronized void reset() {
133        iterating = true;
134        last = null;
135        if (isDiskListEmpty()) {
136            this.iter = this.memoryList.iterator();
137        } else {
138            this.iter = new DiskIterator();
139        }
140    }
141
142    @Override
143    public synchronized void release() {
144        iterating = false;
145        if (iter instanceof DiskIterator) {
146           ((DiskIterator)iter).release();
147        };
148        if (flushRequired) {
149            flushRequired = false;
150            if (!hasSpace()) {
151                flushToDisk();
152            }
153        }
154        // ensure any memory ref is released
155        iter = null;
156    }
157
158    @Override
159    public synchronized void destroy() throws Exception {
160        stop();
161        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
162            MessageReference node = i.next();
163            node.decrementReferenceCount();
164        }
165        memoryList.clear();
166        destroyDiskList();
167    }
168
169    private void destroyDiskList() throws Exception {
170        if (diskList != null) {
171            store.removePList(name);
172            diskList = null;
173        }
174    }
175
176    @Override
177    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
178        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
179        int count = 0;
180        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
181            MessageReference ref = i.next();
182            ref.incrementReferenceCount();
183            result.add(ref);
184            count++;
185        }
186        if (count < maxItems && !isDiskListEmpty()) {
187            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
188                Message message = (Message) i.next();
189                message.setRegionDestination(regionDestination);
190                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
191                message.incrementReferenceCount();
192                result.add(message);
193                count++;
194            }
195        }
196        return result;
197    }
198
199    /**
200     * add message to await dispatch
201     * 
202     * @param node
203     * @throws Exception 
204     */
205    @Override
206    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
207        return tryAddMessageLast(node, 0);
208    }
209    
210    @Override
211    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
212        if (!node.isExpired()) {
213            try {
214                regionDestination = (Destination) node.getMessage().getRegionDestination();
215                if (isDiskListEmpty()) {
216                    if (hasSpace() || this.store == null) {
217                        memoryList.addMessageLast(node);
218                        node.incrementReferenceCount();
219                        setCacheEnabled(true);
220                        return true;
221                    }
222                }
223                if (!hasSpace()) {
224                    if (isDiskListEmpty()) {
225                        expireOldMessages();
226                        if (hasSpace()) {
227                            memoryList.addMessageLast(node);
228                            node.incrementReferenceCount();
229                            return true;
230                        } else {
231                            flushToDisk();
232                        }
233                    }
234                }
235                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
236                    ByteSequence bs = getByteSequence(node.getMessage());
237                    getDiskList().addLast(node.getMessageId().toString(), bs);
238                    return true;
239                }
240                return false;
241
242            } catch (Exception e) {
243                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
244                throw new RuntimeException(e);
245            }
246        } else {
247            discardExpiredMessage(node);
248        }
249        //message expired
250        return true;
251    }
252
253    /**
254     * add message to await dispatch
255     * 
256     * @param node
257     */
258    @Override
259    public synchronized void addMessageFirst(MessageReference node) {
260        if (!node.isExpired()) {
261            try {
262                regionDestination = (Destination) node.getMessage().getRegionDestination();
263                if (isDiskListEmpty()) {
264                    if (hasSpace()) {
265                        memoryList.addMessageFirst(node);
266                        node.incrementReferenceCount();
267                        setCacheEnabled(true);
268                        return;
269                    }
270                }
271                if (!hasSpace()) {
272                    if (isDiskListEmpty()) {
273                        expireOldMessages();
274                        if (hasSpace()) {
275                            memoryList.addMessageFirst(node);
276                            node.incrementReferenceCount();
277                            return;
278                        } else {
279                            flushToDisk();
280                        }
281                    }
282                }
283                systemUsage.getTempUsage().waitForSpace();
284                node.decrementReferenceCount();
285                ByteSequence bs = getByteSequence(node.getMessage());
286                Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
287                node.getMessageId().setPlistLocator(locator);
288
289            } catch (Exception e) {
290                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
291                throw new RuntimeException(e);
292            }
293        } else {
294            discardExpiredMessage(node);
295        }
296    }
297
298    /**
299     * @return true if there pending messages to dispatch
300     */
301    @Override
302    public synchronized boolean hasNext() {
303        return iter.hasNext();
304    }
305
306    /**
307     * @return the next pending message
308     */
309    @Override
310    public synchronized MessageReference next() {
311        MessageReference reference = iter.next();
312        last = reference;
313        if (!isDiskListEmpty()) {
314            // got from disk
315            reference.getMessage().setRegionDestination(regionDestination);
316            reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
317        }
318        reference.incrementReferenceCount();
319        return reference;
320    }
321
322    /**
323     * remove the message at the cursor position
324     */
325    @Override
326    public synchronized void remove() {
327        iter.remove();
328        if (last != null) {
329            last.decrementReferenceCount();
330        }
331    }
332
333    /**
334     * @param node
335     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
336     */
337    @Override
338    public synchronized void remove(MessageReference node) {
339        if (memoryList.remove(node) != null) {
340            node.decrementReferenceCount();
341        }
342        if (!isDiskListEmpty()) {
343            try {
344                getDiskList().remove(node.getMessageId().getPlistLocator());
345            } catch (IOException e) {
346                throw new RuntimeException(e);
347            }
348        }
349    }
350
351    /**
352     * @return the number of pending messages
353     */
354    @Override
355    public synchronized int size() {
356        return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
357    }
358
359    /**
360     * clear all pending messages
361     */
362    @Override
363    public synchronized void clear() {
364        memoryList.clear();
365        if (!isDiskListEmpty()) {
366            try {
367                getDiskList().destroy();
368            } catch (IOException e) {
369                throw new RuntimeException(e);
370            }
371        }
372        last = null;
373    }
374
375    @Override
376    public synchronized boolean isFull() {
377
378        return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
379
380    }
381
382    @Override
383    public boolean hasMessagesBufferedToDeliver() {
384        return !isEmpty();
385    }
386
387    @Override
388    public void setSystemUsage(SystemUsage usageManager) {
389        super.setSystemUsage(usageManager);
390    }
391
392    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
393        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
394            synchronized (this) {
395                if (!flushRequired && size() != 0) {
396                    flushRequired =true;
397                    if (!iterating) {
398                        expireOldMessages();
399                        if (!hasSpace()) {
400                            flushToDisk();
401                            flushRequired = false;
402                        }
403                    }
404                }
405            }
406        }
407    }
408
409    @Override
410    public boolean isTransient() {
411        return true;
412    }
413
414    protected synchronized void expireOldMessages() {
415        if (!memoryList.isEmpty()) {
416            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
417                MessageReference node = iterator.next();
418                if (node.isExpired()) {
419                    node.decrementReferenceCount();
420                    discardExpiredMessage(node);
421                    iterator.remove();
422                }
423            }
424        }
425    }
426
427    protected synchronized void flushToDisk() {
428        if (!memoryList.isEmpty() && store != null) {
429            long start = 0;
430             if (LOG.isTraceEnabled()) {
431                start = System.currentTimeMillis();
432                LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
433             }
434            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
435                MessageReference node = iterator.next();
436                node.decrementReferenceCount();
437                ByteSequence bs;
438                try {
439                    bs = getByteSequence(node.getMessage());
440                    getDiskList().addLast(node.getMessageId().toString(), bs);
441                } catch (IOException e) {
442                    LOG.error("Failed to write to disk list", e);
443                    throw new RuntimeException(e);
444                }
445
446            }
447            memoryList.clear();
448            setCacheEnabled(false);
449            LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
450        }
451    }
452
453    protected boolean isDiskListEmpty() {
454        return diskList == null || diskList.isEmpty();
455    }
456
457    public PList getDiskList() {
458        if (diskList == null) {
459            try {
460                diskList = store.getPList(name);
461            } catch (Exception e) {
462                LOG.error("Caught an IO Exception getting the DiskList {}", name, e);
463                throw new RuntimeException(e);
464            }
465        }
466        return diskList;
467    }
468
469    private void discardExpiredMessage(MessageReference reference) {
470        LOG.debug("Discarding expired message {}", reference);
471        if (broker.isExpired(reference)) {
472            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
473            context.setBroker(broker);
474            ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
475        }
476    }
477
478    protected ByteSequence getByteSequence(Message message) throws IOException {
479        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
480        return new ByteSequence(packet.data, packet.offset, packet.length);
481    }
482
483    protected Message getMessage(ByteSequence bs) throws IOException {
484        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
485                .getOffset(), bs.getLength());
486        return (Message) this.wireFormat.unmarshal(packet);
487
488    }
489
490    final class DiskIterator implements Iterator<MessageReference> {
491        private final PList.PListIterator iterator;
492        DiskIterator() {
493            try {
494                iterator = getDiskList().iterator();
495            } catch (Exception e) {
496                throw new RuntimeException(e);
497            }
498        }
499
500        public boolean hasNext() {
501            return iterator.hasNext();
502        }
503
504        public MessageReference next() {
505            try {
506                PListEntry entry = iterator.next();
507                Message message = getMessage(entry.getByteSequence());
508                message.getMessageId().setPlistLocator(entry.getLocator());
509                return message;
510            } catch (IOException e) {
511                LOG.error("I/O error", e);
512                throw new RuntimeException(e);
513            }
514        }
515
516        public void remove() {
517            iterator.remove();
518        }
519
520        public void release() {
521            iterator.release();
522        }
523    }
524}