001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.File;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InterruptedIOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.OutputStream;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.Date;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.Iterator;
039import java.util.LinkedHashMap;
040import java.util.LinkedHashSet;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.SortedSet;
047import java.util.TreeMap;
048import java.util.TreeSet;
049import java.util.concurrent.atomic.AtomicBoolean;
050import java.util.concurrent.atomic.AtomicLong;
051import java.util.concurrent.locks.ReentrantReadWriteLock;
052
053import org.apache.activemq.ActiveMQMessageAuditNoSync;
054import org.apache.activemq.broker.BrokerService;
055import org.apache.activemq.broker.BrokerServiceAware;
056import org.apache.activemq.command.MessageAck;
057import org.apache.activemq.command.TransactionId;
058import org.apache.activemq.openwire.OpenWireFormat;
059import org.apache.activemq.protobuf.Buffer;
060import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
061import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
062import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
063import org.apache.activemq.store.kahadb.data.KahaDestination;
064import org.apache.activemq.store.kahadb.data.KahaEntryType;
065import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
066import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
067import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
068import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
069import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
070import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
071import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
072import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
073import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
074import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
075import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
076import org.apache.activemq.store.kahadb.disk.index.ListIndex;
077import org.apache.activemq.store.kahadb.disk.journal.DataFile;
078import org.apache.activemq.store.kahadb.disk.journal.Journal;
079import org.apache.activemq.store.kahadb.disk.journal.Location;
080import org.apache.activemq.store.kahadb.disk.page.Page;
081import org.apache.activemq.store.kahadb.disk.page.PageFile;
082import org.apache.activemq.store.kahadb.disk.page.Transaction;
083import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
084import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
085import org.apache.activemq.store.kahadb.disk.util.Marshaller;
086import org.apache.activemq.store.kahadb.disk.util.Sequence;
087import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
088import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
089import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
090import org.apache.activemq.util.ByteSequence;
091import org.apache.activemq.util.DataByteArrayInputStream;
092import org.apache.activemq.util.DataByteArrayOutputStream;
093import org.apache.activemq.util.IOHelper;
094import org.apache.activemq.util.ServiceStopper;
095import org.apache.activemq.util.ServiceSupport;
096import org.slf4j.Logger;
097import org.slf4j.LoggerFactory;
098
099public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
100
101    protected BrokerService brokerService;
102
103    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
104    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
105    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
106    protected static final Buffer UNMATCHED;
107    static {
108        UNMATCHED = new Buffer(new byte[]{});
109    }
110    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
111
112    static final int CLOSED_STATE = 1;
113    static final int OPEN_STATE = 2;
114    static final long NOT_ACKED = -1;
115
116    static final int VERSION = 5;
117
118    protected class Metadata {
119        protected Page<Metadata> page;
120        protected int state;
121        protected BTreeIndex<String, StoredDestination> destinations;
122        protected Location lastUpdate;
123        protected Location firstInProgressTransactionLocation;
124        protected Location producerSequenceIdTrackerLocation = null;
125        protected Location ackMessageFileMapLocation = null;
126        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
127        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
128        protected int version = VERSION;
129        protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION;
130
131        public void read(DataInput is) throws IOException {
132            state = is.readInt();
133            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
134            if (is.readBoolean()) {
135                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
136            } else {
137                lastUpdate = null;
138            }
139            if (is.readBoolean()) {
140                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
141            } else {
142                firstInProgressTransactionLocation = null;
143            }
144            try {
145                if (is.readBoolean()) {
146                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
147                } else {
148                    producerSequenceIdTrackerLocation = null;
149                }
150            } catch (EOFException expectedOnUpgrade) {
151            }
152            try {
153                version = is.readInt();
154            } catch (EOFException expectedOnUpgrade) {
155                version = 1;
156            }
157            if (version >= 5 && is.readBoolean()) {
158                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
159            } else {
160                ackMessageFileMapLocation = null;
161            }
162            try {
163                openwireVersion = is.readInt();
164            } catch (EOFException expectedOnUpgrade) {
165                openwireVersion = OpenWireFormat.DEFAULT_VERSION;
166            }
167            LOG.info("KahaDB is version " + version);
168        }
169
170        public void write(DataOutput os) throws IOException {
171            os.writeInt(state);
172            os.writeLong(destinations.getPageId());
173
174            if (lastUpdate != null) {
175                os.writeBoolean(true);
176                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
177            } else {
178                os.writeBoolean(false);
179            }
180
181            if (firstInProgressTransactionLocation != null) {
182                os.writeBoolean(true);
183                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
184            } else {
185                os.writeBoolean(false);
186            }
187
188            if (producerSequenceIdTrackerLocation != null) {
189                os.writeBoolean(true);
190                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
191            } else {
192                os.writeBoolean(false);
193            }
194            os.writeInt(VERSION);
195            if (ackMessageFileMapLocation != null) {
196                os.writeBoolean(true);
197                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
198            } else {
199                os.writeBoolean(false);
200            }
201            os.writeInt(this.openwireVersion);
202        }
203    }
204
205    class MetadataMarshaller extends VariableMarshaller<Metadata> {
206        @Override
207        public Metadata readPayload(DataInput dataIn) throws IOException {
208            Metadata rc = createMetadata();
209            rc.read(dataIn);
210            return rc;
211        }
212
213        @Override
214        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
215            object.write(dataOut);
216        }
217    }
218
219    protected PageFile pageFile;
220    protected Journal journal;
221    protected Metadata metadata = new Metadata();
222
223    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
224
225    protected boolean failIfDatabaseIsLocked;
226
227    protected boolean deleteAllMessages;
228    protected File directory = DEFAULT_DIRECTORY;
229    protected File indexDirectory = null;
230    protected Thread checkpointThread;
231    protected boolean enableJournalDiskSyncs=true;
232    protected boolean archiveDataLogs;
233    protected File directoryArchive;
234    protected AtomicLong journalSize = new AtomicLong(0);
235    long checkpointInterval = 5*1000;
236    long cleanupInterval = 30*1000;
237    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
238    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
239    boolean enableIndexWriteAsync = false;
240    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
241    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
242    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
243
244    protected AtomicBoolean opened = new AtomicBoolean();
245    private boolean ignoreMissingJournalfiles = false;
246    private int indexCacheSize = 10000;
247    private boolean checkForCorruptJournalFiles = false;
248    private boolean checksumJournalFiles = true;
249    protected boolean forceRecoverIndex = false;
250    private final Object checkpointThreadLock = new Object();
251    private boolean rewriteOnRedelivery = false;
252    private boolean archiveCorruptedIndex = false;
253    private boolean useIndexLFRUEviction = false;
254    private float indexLFUEvictionFactor = 0.2f;
255    private boolean enableIndexDiskSyncs = true;
256    private boolean enableIndexRecoveryFile = true;
257    private boolean enableIndexPageCaching = true;
258    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
259
260    @Override
261    public void doStart() throws Exception {
262        load();
263    }
264
265    @Override
266    public void doStop(ServiceStopper stopper) throws Exception {
267        unload();
268    }
269
270    private void loadPageFile() throws IOException {
271        this.indexLock.writeLock().lock();
272        try {
273            final PageFile pageFile = getPageFile();
274            pageFile.load();
275            pageFile.tx().execute(new Transaction.Closure<IOException>() {
276                @Override
277                public void execute(Transaction tx) throws IOException {
278                    if (pageFile.getPageCount() == 0) {
279                        // First time this is created.. Initialize the metadata
280                        Page<Metadata> page = tx.allocate();
281                        assert page.getPageId() == 0;
282                        page.set(metadata);
283                        metadata.page = page;
284                        metadata.state = CLOSED_STATE;
285                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
286
287                        tx.store(metadata.page, metadataMarshaller, true);
288                    } else {
289                        Page<Metadata> page = tx.load(0, metadataMarshaller);
290                        metadata = page.get();
291                        metadata.page = page;
292                    }
293                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
294                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
295                    metadata.destinations.load(tx);
296                }
297            });
298            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
299            // Perhaps we should just keep an index of file
300            storedDestinations.clear();
301            pageFile.tx().execute(new Transaction.Closure<IOException>() {
302                @Override
303                public void execute(Transaction tx) throws IOException {
304                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
305                        Entry<String, StoredDestination> entry = iterator.next();
306                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
307                        storedDestinations.put(entry.getKey(), sd);
308
309                        if (checkForCorruptJournalFiles) {
310                            // sanity check the index also
311                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
312                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
313                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
314                                }
315                            }
316                        }
317                    }
318                }
319            });
320            pageFile.flush();
321        } finally {
322            this.indexLock.writeLock().unlock();
323        }
324    }
325
326    private void startCheckpoint() {
327        if (checkpointInterval == 0 &&  cleanupInterval == 0) {
328            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
329            return;
330        }
331        synchronized (checkpointThreadLock) {
332            boolean start = false;
333            if (checkpointThread == null) {
334                start = true;
335            } else if (!checkpointThread.isAlive()) {
336                start = true;
337                LOG.info("KahaDB: Recovering checkpoint thread after death");
338            }
339            if (start) {
340                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
341                    @Override
342                    public void run() {
343                        try {
344                            long lastCleanup = System.currentTimeMillis();
345                            long lastCheckpoint = System.currentTimeMillis();
346                            // Sleep for a short time so we can periodically check
347                            // to see if we need to exit this thread.
348                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
349                            while (opened.get()) {
350                                Thread.sleep(sleepTime);
351                                long now = System.currentTimeMillis();
352                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
353                                    checkpointCleanup(true);
354                                    lastCleanup = now;
355                                    lastCheckpoint = now;
356                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
357                                    checkpointCleanup(false);
358                                    lastCheckpoint = now;
359                                }
360                            }
361                        } catch (InterruptedException e) {
362                            // Looks like someone really wants us to exit this thread...
363                        } catch (IOException ioe) {
364                            LOG.error("Checkpoint failed", ioe);
365                            brokerService.handleIOException(ioe);
366                        }
367                    }
368                };
369
370                checkpointThread.setDaemon(true);
371                checkpointThread.start();
372            }
373        }
374    }
375
376    public void open() throws IOException {
377        if( opened.compareAndSet(false, true) ) {
378            getJournal().start();
379            try {
380                loadPageFile();
381            } catch (Throwable t) {
382                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
383                if (LOG.isDebugEnabled()) {
384                    LOG.debug("Index load failure", t);
385                }
386                // try to recover index
387                try {
388                    pageFile.unload();
389                } catch (Exception ignore) {}
390                if (archiveCorruptedIndex) {
391                    pageFile.archive();
392                } else {
393                    pageFile.delete();
394                }
395                metadata = createMetadata();
396                pageFile = null;
397                loadPageFile();
398            }
399            startCheckpoint();
400            recover();
401        }
402    }
403
404    public void load() throws IOException {
405        this.indexLock.writeLock().lock();
406        IOHelper.mkdirs(directory);
407        try {
408            if (deleteAllMessages) {
409                getJournal().start();
410                getJournal().delete();
411                getJournal().close();
412                journal = null;
413                getPageFile().delete();
414                LOG.info("Persistence store purged.");
415                deleteAllMessages = false;
416            }
417
418            open();
419            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
420        } finally {
421            this.indexLock.writeLock().unlock();
422        }
423    }
424
425    public void close() throws IOException, InterruptedException {
426        if( opened.compareAndSet(true, false)) {
427            checkpointLock.writeLock().lock();
428            try {
429                if (metadata.page != null) {
430                    checkpointUpdate(true);
431                }
432                pageFile.unload();
433                metadata = createMetadata();
434            } finally {
435                checkpointLock.writeLock().unlock();
436            }
437            journal.close();
438            synchronized (checkpointThreadLock) {
439                if (checkpointThread != null) {
440                    checkpointThread.join();
441                }
442            }
443        }
444    }
445
446    public void unload() throws IOException, InterruptedException {
447        this.indexLock.writeLock().lock();
448        try {
449            if( pageFile != null && pageFile.isLoaded() ) {
450                metadata.state = CLOSED_STATE;
451                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
452
453                if (metadata.page != null) {
454                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
455                        @Override
456                        public void execute(Transaction tx) throws IOException {
457                            tx.store(metadata.page, metadataMarshaller, true);
458                        }
459                    });
460                }
461            }
462        } finally {
463            this.indexLock.writeLock().unlock();
464        }
465        close();
466    }
467
468    // public for testing
469    @SuppressWarnings("rawtypes")
470    public Location[] getInProgressTxLocationRange() {
471        Location[] range = new Location[]{null, null};
472        synchronized (inflightTransactions) {
473            if (!inflightTransactions.isEmpty()) {
474                for (List<Operation> ops : inflightTransactions.values()) {
475                    if (!ops.isEmpty()) {
476                        trackMaxAndMin(range, ops);
477                    }
478                }
479            }
480            if (!preparedTransactions.isEmpty()) {
481                for (List<Operation> ops : preparedTransactions.values()) {
482                    if (!ops.isEmpty()) {
483                        trackMaxAndMin(range, ops);
484                    }
485                }
486            }
487        }
488        return range;
489    }
490
491    @SuppressWarnings("rawtypes")
492    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
493        Location t = ops.get(0).getLocation();
494        if (range[0]==null || t.compareTo(range[0]) <= 0) {
495            range[0] = t;
496        }
497        t = ops.get(ops.size() -1).getLocation();
498        if (range[1]==null || t.compareTo(range[1]) >= 0) {
499            range[1] = t;
500        }
501    }
502
503    class TranInfo {
504        TransactionId id;
505        Location location;
506
507        class opCount {
508            int add;
509            int remove;
510        }
511        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
512
513        @SuppressWarnings("rawtypes")
514        public void track(Operation operation) {
515            if (location == null ) {
516                location = operation.getLocation();
517            }
518            KahaDestination destination;
519            boolean isAdd = false;
520            if (operation instanceof AddOperation) {
521                AddOperation add = (AddOperation) operation;
522                destination = add.getCommand().getDestination();
523                isAdd = true;
524            } else {
525                RemoveOperation removeOpperation = (RemoveOperation) operation;
526                destination = removeOpperation.getCommand().getDestination();
527            }
528            opCount opCount = destinationOpCount.get(destination);
529            if (opCount == null) {
530                opCount = new opCount();
531                destinationOpCount.put(destination, opCount);
532            }
533            if (isAdd) {
534                opCount.add++;
535            } else {
536                opCount.remove++;
537            }
538        }
539
540        @Override
541        public String toString() {
542           StringBuffer buffer = new StringBuffer();
543           buffer.append(location).append(";").append(id).append(";\n");
544           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
545               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
546           }
547           return buffer.toString();
548        }
549    }
550
551    @SuppressWarnings("rawtypes")
552    public String getTransactions() {
553
554        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
555        synchronized (inflightTransactions) {
556            if (!inflightTransactions.isEmpty()) {
557                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
558                    TranInfo info = new TranInfo();
559                    info.id = entry.getKey();
560                    for (Operation operation : entry.getValue()) {
561                        info.track(operation);
562                    }
563                    infos.add(info);
564                }
565            }
566        }
567        synchronized (preparedTransactions) {
568            if (!preparedTransactions.isEmpty()) {
569                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
570                    TranInfo info = new TranInfo();
571                    info.id = entry.getKey();
572                    for (Operation operation : entry.getValue()) {
573                        info.track(operation);
574                    }
575                    infos.add(info);
576                }
577            }
578        }
579        return infos.toString();
580    }
581
582    /**
583     * Move all the messages that were in the journal into long term storage. We
584     * just replay and do a checkpoint.
585     *
586     * @throws IOException
587     * @throws IOException
588     * @throws IllegalStateException
589     */
590    private void recover() throws IllegalStateException, IOException {
591        this.indexLock.writeLock().lock();
592        try {
593
594            long start = System.currentTimeMillis();
595            Location producerAuditPosition = recoverProducerAudit();
596            Location ackMessageFileLocation = recoverAckMessageFileMap();
597            Location lastIndoubtPosition = getRecoveryPosition();
598
599            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
600            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
601
602            if (recoveryPosition != null) {
603                int redoCounter = 0;
604                LOG.info("Recovering from the journal @" + recoveryPosition);
605                while (recoveryPosition != null) {
606                    try {
607                        JournalCommand<?> message = load(recoveryPosition);
608                        metadata.lastUpdate = recoveryPosition;
609                        process(message, recoveryPosition, lastIndoubtPosition);
610                        redoCounter++;
611                    } catch (IOException failedRecovery) {
612                        if (isIgnoreMissingJournalfiles()) {
613                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
614                            // track this dud location
615                            journal.corruptRecoveryLocation(recoveryPosition);
616                        } else {
617                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
618                        }
619                    }
620                    recoveryPosition = journal.getNextLocation(recoveryPosition);
621                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
622                         LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
623                     }
624                }
625                if (LOG.isInfoEnabled()) {
626                    long end = System.currentTimeMillis();
627                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
628                }
629            }
630
631            // We may have to undo some index updates.
632            pageFile.tx().execute(new Transaction.Closure<IOException>() {
633                @Override
634                public void execute(Transaction tx) throws IOException {
635                    recoverIndex(tx);
636                }
637            });
638
639            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
640            Set<TransactionId> toRollback = new HashSet<TransactionId>();
641            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
642            synchronized (inflightTransactions) {
643                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
644                    TransactionId id = it.next();
645                    if (id.isLocalTransaction()) {
646                        toRollback.add(id);
647                    } else {
648                        toDiscard.add(id);
649                    }
650                }
651                for (TransactionId tx: toRollback) {
652                    if (LOG.isDebugEnabled()) {
653                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
654                    }
655                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
656                }
657                for (TransactionId tx: toDiscard) {
658                    if (LOG.isDebugEnabled()) {
659                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
660                    }
661                    inflightTransactions.remove(tx);
662                }
663            }
664
665            synchronized (preparedTransactions) {
666                for (TransactionId txId : preparedTransactions.keySet()) {
667                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
668                }
669            }
670
671        } finally {
672            this.indexLock.writeLock().unlock();
673        }
674    }
675
676    @SuppressWarnings("unused")
677    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
678        return TransactionIdConversion.convertToLocal(tx);
679    }
680
681    private Location minimum(Location producerAuditPosition,
682            Location lastIndoubtPosition) {
683        Location min = null;
684        if (producerAuditPosition != null) {
685            min = producerAuditPosition;
686            if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
687                min = lastIndoubtPosition;
688            }
689        } else {
690            min = lastIndoubtPosition;
691        }
692        return min;
693    }
694
695    private Location recoverProducerAudit() throws IOException {
696        if (metadata.producerSequenceIdTrackerLocation != null) {
697            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
698            try {
699                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
700                int maxNumProducers = getMaxFailoverProducersToTrack();
701                int maxAuditDepth = getFailoverProducersAuditDepth();
702                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
703                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
704                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
705                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
706            } catch (Exception e) {
707                LOG.warn("Cannot recover message audit", e);
708                return journal.getNextLocation(null);
709            }
710        } else {
711            // got no audit stored so got to recreate via replay from start of the journal
712            return journal.getNextLocation(null);
713        }
714    }
715
716    @SuppressWarnings("unchecked")
717    private Location recoverAckMessageFileMap() throws IOException {
718        if (metadata.ackMessageFileMapLocation != null) {
719            KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
720            try {
721                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
722                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
723                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
724            } catch (Exception e) {
725                LOG.warn("Cannot recover ackMessageFileMap", e);
726                return journal.getNextLocation(null);
727            }
728        } else {
729            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
730            return journal.getNextLocation(null);
731        }
732    }
733
734    protected void recoverIndex(Transaction tx) throws IOException {
735        long start = System.currentTimeMillis();
736        // It is possible index updates got applied before the journal updates..
737        // in that case we need to removed references to messages that are not in the journal
738        final Location lastAppendLocation = journal.getLastAppendLocation();
739        long undoCounter=0;
740
741        // Go through all the destinations to see if they have messages past the lastAppendLocation
742        for (StoredDestination sd : storedDestinations.values()) {
743
744            final ArrayList<Long> matches = new ArrayList<Long>();
745            // Find all the Locations that are >= than the last Append Location.
746            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
747                @Override
748                protected void matched(Location key, Long value) {
749                    matches.add(value);
750                }
751            });
752
753            for (Long sequenceId : matches) {
754                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
755                sd.locationIndex.remove(tx, keys.location);
756                sd.messageIdIndex.remove(tx, keys.messageId);
757                metadata.producerSequenceIdTracker.rollback(keys.messageId);
758                undoCounter++;
759                // TODO: do we need to modify the ack positions for the pub sub case?
760            }
761        }
762
763        if( undoCounter > 0 ) {
764            // The rolledback operations are basically in flight journal writes.  To avoid getting
765            // these the end user should do sync writes to the journal.
766            if (LOG.isInfoEnabled()) {
767                long end = System.currentTimeMillis();
768                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
769            }
770        }
771
772        undoCounter = 0;
773        start = System.currentTimeMillis();
774
775        // Lets be extra paranoid here and verify that all the datafiles being referenced
776        // by the indexes still exists.
777
778        final SequenceSet ss = new SequenceSet();
779        for (StoredDestination sd : storedDestinations.values()) {
780            // Use a visitor to cut down the number of pages that we load
781            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
782                int last=-1;
783
784                @Override
785                public boolean isInterestedInKeysBetween(Location first, Location second) {
786                    if( first==null ) {
787                        return !ss.contains(0, second.getDataFileId());
788                    } else if( second==null ) {
789                        return true;
790                    } else {
791                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
792                    }
793                }
794
795                @Override
796                public void visit(List<Location> keys, List<Long> values) {
797                    for (Location l : keys) {
798                        int fileId = l.getDataFileId();
799                        if( last != fileId ) {
800                            ss.add(fileId);
801                            last = fileId;
802                        }
803                    }
804                }
805
806            });
807        }
808        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
809        while (!ss.isEmpty()) {
810            missingJournalFiles.add((int) ss.removeFirst());
811        }
812        missingJournalFiles.removeAll(journal.getFileMap().keySet());
813
814        if (!missingJournalFiles.isEmpty()) {
815            if (LOG.isInfoEnabled()) {
816                LOG.info("Some journal files are missing: " + missingJournalFiles);
817            }
818        }
819
820        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
821        for (Integer missing : missingJournalFiles) {
822            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
823        }
824
825        if (checkForCorruptJournalFiles) {
826            Collection<DataFile> dataFiles = journal.getFileMap().values();
827            for (DataFile dataFile : dataFiles) {
828                int id = dataFile.getDataFileId();
829                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
830                Sequence seq = dataFile.getCorruptedBlocks().getHead();
831                while (seq != null) {
832                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
833                    seq = seq.getNext();
834                }
835            }
836        }
837
838        if (!missingPredicates.isEmpty()) {
839            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
840                final StoredDestination sd = sdEntry.getValue();
841                final ArrayList<Long> matches = new ArrayList<Long>();
842                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
843                    @Override
844                    protected void matched(Location key, Long value) {
845                        matches.add(value);
846                    }
847                });
848
849                // If somes message references are affected by the missing data files...
850                if (!matches.isEmpty()) {
851
852                    // We either 'gracefully' recover dropping the missing messages or
853                    // we error out.
854                    if( ignoreMissingJournalfiles ) {
855                        // Update the index to remove the references to the missing data
856                        for (Long sequenceId : matches) {
857                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
858                            sd.locationIndex.remove(tx, keys.location);
859                            sd.messageIdIndex.remove(tx, keys.messageId);
860                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
861                            undoCounter++;
862                            // TODO: do we need to modify the ack positions for the pub sub case?
863                        }
864                    } else {
865                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
866                    }
867                }
868            }
869        }
870
871        if( undoCounter > 0 ) {
872            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
873            // should do sync writes to the journal.
874            if (LOG.isInfoEnabled()) {
875                long end = System.currentTimeMillis();
876                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
877            }
878        }
879    }
880
881    private Location nextRecoveryPosition;
882    private Location lastRecoveryPosition;
883
884    public void incrementalRecover() throws IOException {
885        this.indexLock.writeLock().lock();
886        try {
887            if( nextRecoveryPosition == null ) {
888                if( lastRecoveryPosition==null ) {
889                    nextRecoveryPosition = getRecoveryPosition();
890                } else {
891                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
892                }
893            }
894            while (nextRecoveryPosition != null) {
895                lastRecoveryPosition = nextRecoveryPosition;
896                metadata.lastUpdate = lastRecoveryPosition;
897                JournalCommand<?> message = load(lastRecoveryPosition);
898                process(message, lastRecoveryPosition, (IndexAware) null);
899                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
900            }
901        } finally {
902            this.indexLock.writeLock().unlock();
903        }
904    }
905
906    public Location getLastUpdatePosition() throws IOException {
907        return metadata.lastUpdate;
908    }
909
910    private Location getRecoveryPosition() throws IOException {
911
912        if (!this.forceRecoverIndex) {
913
914            // If we need to recover the transactions..
915            if (metadata.firstInProgressTransactionLocation != null) {
916                return metadata.firstInProgressTransactionLocation;
917            }
918
919            // Perhaps there were no transactions...
920            if( metadata.lastUpdate!=null) {
921                // Start replay at the record after the last one recorded in the index file.
922                return journal.getNextLocation(metadata.lastUpdate);
923            }
924        }
925        // This loads the first position.
926        return journal.getNextLocation(null);
927    }
928
929    protected void checkpointCleanup(final boolean cleanup) throws IOException {
930        long start;
931        this.indexLock.writeLock().lock();
932        try {
933            start = System.currentTimeMillis();
934            if( !opened.get() ) {
935                return;
936            }
937        } finally {
938            this.indexLock.writeLock().unlock();
939        }
940        checkpointUpdate(cleanup);
941        long end = System.currentTimeMillis();
942        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
943            if (LOG.isInfoEnabled()) {
944                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
945            }
946        }
947    }
948
949    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
950        int size = data.serializedSizeFramed();
951        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
952        os.writeByte(data.type().getNumber());
953        data.writeFramed(os);
954        return os.toByteSequence();
955    }
956
957    // /////////////////////////////////////////////////////////////////
958    // Methods call by the broker to update and query the store.
959    // /////////////////////////////////////////////////////////////////
960    public Location store(JournalCommand<?> data) throws IOException {
961        return store(data, false, null,null);
962    }
963
964    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
965        return store(data, false, null, null, onJournalStoreComplete);
966    }
967
968    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
969        return store(data, sync, before, after, null);
970    }
971
972    /**
973     * All updated are are funneled through this method. The updates are converted
974     * to a JournalMessage which is logged to the journal and then the data from
975     * the JournalMessage is used to update the index just like it would be done
976     * during a recovery process.
977     */
978    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
979        try {
980            ByteSequence sequence = toByteSequence(data);
981
982            Location location;
983            checkpointLock.readLock().lock();
984            try {
985
986                long start = System.currentTimeMillis();
987                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :  journal.write(sequence, onJournalStoreComplete) ;
988                long start2 = System.currentTimeMillis();
989                process(data, location, before);
990
991                long end = System.currentTimeMillis();
992                if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
993                    if (LOG.isInfoEnabled()) {
994                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
995                    }
996                }
997
998            } finally{
999                checkpointLock.readLock().unlock();
1000            }
1001            if (after != null) {
1002                after.run();
1003            }
1004
1005            if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) {
1006                startCheckpoint();
1007            }
1008            return location;
1009        } catch (IOException ioe) {
1010            LOG.error("KahaDB failed to store to Journal", ioe);
1011            brokerService.handleIOException(ioe);
1012            throw ioe;
1013        }
1014    }
1015
1016    /**
1017     * Loads a previously stored JournalMessage
1018     *
1019     * @param location
1020     * @return
1021     * @throws IOException
1022     */
1023    public JournalCommand<?> load(Location location) throws IOException {
1024        long start = System.currentTimeMillis();
1025        ByteSequence data = journal.read(location);
1026        long end = System.currentTimeMillis();
1027        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1028            if (LOG.isInfoEnabled()) {
1029                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1030            }
1031        }
1032        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1033        byte readByte = is.readByte();
1034        KahaEntryType type = KahaEntryType.valueOf(readByte);
1035        if( type == null ) {
1036            try {
1037                is.close();
1038            } catch (IOException e) {}
1039            throw new IOException("Could not load journal record. Invalid location: "+location);
1040        }
1041        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1042        message.mergeFramed(is);
1043        return message;
1044    }
1045
1046    /**
1047     * do minimal recovery till we reach the last inDoubtLocation
1048     * @param data
1049     * @param location
1050     * @param inDoubtlocation
1051     * @throws IOException
1052     */
1053    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1054        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1055            if (data instanceof KahaSubscriptionCommand) {
1056                KahaSubscriptionCommand kahaSubscriptionCommand = (KahaSubscriptionCommand)data;
1057                if (kahaSubscriptionCommand.hasSubscriptionInfo()) {
1058                    // needs to be processed via activate and will be replayed on reconnect
1059                    LOG.debug("ignoring add sub command during recovery replay:" + data);
1060                    return;
1061                }
1062            }
1063            process(data, location, (IndexAware) null);
1064        } else {
1065            // just recover producer audit
1066            data.visit(new Visitor() {
1067                @Override
1068                public void visit(KahaAddMessageCommand command) throws IOException {
1069                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1070                }
1071            });
1072        }
1073    }
1074
1075    // /////////////////////////////////////////////////////////////////
1076    // Journaled record processing methods. Once the record is journaled,
1077    // these methods handle applying the index updates. These may be called
1078    // from the recovery method too so they need to be idempotent
1079    // /////////////////////////////////////////////////////////////////
1080
1081    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1082        data.visit(new Visitor() {
1083            @Override
1084            public void visit(KahaAddMessageCommand command) throws IOException {
1085                process(command, location, onSequenceAssignedCallback);
1086            }
1087
1088            @Override
1089            public void visit(KahaRemoveMessageCommand command) throws IOException {
1090                process(command, location);
1091            }
1092
1093            @Override
1094            public void visit(KahaPrepareCommand command) throws IOException {
1095                process(command, location);
1096            }
1097
1098            @Override
1099            public void visit(KahaCommitCommand command) throws IOException {
1100                process(command, location, onSequenceAssignedCallback);
1101            }
1102
1103            @Override
1104            public void visit(KahaRollbackCommand command) throws IOException {
1105                process(command, location);
1106            }
1107
1108            @Override
1109            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1110                process(command, location);
1111            }
1112
1113            @Override
1114            public void visit(KahaSubscriptionCommand command) throws IOException {
1115                process(command, location);
1116            }
1117
1118            @Override
1119            public void visit(KahaProducerAuditCommand command) throws IOException {
1120                processLocation(location);
1121            }
1122
1123            @Override
1124            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1125                processLocation(location);
1126            }
1127
1128            @Override
1129            public void visit(KahaTraceCommand command) {
1130                processLocation(location);
1131            }
1132
1133            @Override
1134            public void visit(KahaUpdateMessageCommand command) throws IOException {
1135                process(command, location);
1136            }
1137        });
1138    }
1139
1140    @SuppressWarnings("rawtypes")
1141    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1142        if (command.hasTransactionInfo()) {
1143            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1144            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1145        } else {
1146            this.indexLock.writeLock().lock();
1147            try {
1148                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1149                    @Override
1150                    public void execute(Transaction tx) throws IOException {
1151                        long assignedIndex = updateIndex(tx, command, location);
1152                        if (runWithIndexLock != null) {
1153                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1154                        }
1155                    }
1156                });
1157
1158            } finally {
1159                this.indexLock.writeLock().unlock();
1160            }
1161        }
1162    }
1163
1164    @SuppressWarnings("rawtypes")
1165    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1166        this.indexLock.writeLock().lock();
1167        try {
1168            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1169                @Override
1170                public void execute(Transaction tx) throws IOException {
1171                    updateIndex(tx, command, location);
1172                }
1173            });
1174        } finally {
1175            this.indexLock.writeLock().unlock();
1176        }
1177    }
1178
1179    @SuppressWarnings("rawtypes")
1180    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1181        if (command.hasTransactionInfo()) {
1182           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1183           inflightTx.add(new RemoveOperation(command, location));
1184        } else {
1185            this.indexLock.writeLock().lock();
1186            try {
1187                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1188                    @Override
1189                    public void execute(Transaction tx) throws IOException {
1190                        updateIndex(tx, command, location);
1191                    }
1192                });
1193            } finally {
1194                this.indexLock.writeLock().unlock();
1195            }
1196        }
1197    }
1198
1199    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1200        this.indexLock.writeLock().lock();
1201        try {
1202            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1203                @Override
1204                public void execute(Transaction tx) throws IOException {
1205                    updateIndex(tx, command, location);
1206                }
1207            });
1208        } finally {
1209            this.indexLock.writeLock().unlock();
1210        }
1211    }
1212
1213    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1214        this.indexLock.writeLock().lock();
1215        try {
1216            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1217                @Override
1218                public void execute(Transaction tx) throws IOException {
1219                    updateIndex(tx, command, location);
1220                }
1221            });
1222        } finally {
1223            this.indexLock.writeLock().unlock();
1224        }
1225    }
1226
1227    protected void processLocation(final Location location) {
1228        this.indexLock.writeLock().lock();
1229        try {
1230            metadata.lastUpdate = location;
1231        } finally {
1232            this.indexLock.writeLock().unlock();
1233        }
1234    }
1235
1236    @SuppressWarnings("rawtypes")
1237    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1238        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1239        List<Operation> inflightTx;
1240        synchronized (inflightTransactions) {
1241            inflightTx = inflightTransactions.remove(key);
1242            if (inflightTx == null) {
1243                inflightTx = preparedTransactions.remove(key);
1244            }
1245        }
1246        if (inflightTx == null) {
1247            // only non persistent messages in this tx
1248            if (before != null) {
1249                before.sequenceAssignedWithIndexLocked(-1);
1250            }
1251            return;
1252        }
1253
1254        final List<Operation> messagingTx = inflightTx;
1255        indexLock.writeLock().lock();
1256        try {
1257            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1258                @Override
1259                public void execute(Transaction tx) throws IOException {
1260                    for (Operation op : messagingTx) {
1261                        op.execute(tx);
1262                    }
1263                }
1264            });
1265            metadata.lastUpdate = location;
1266        } finally {
1267            indexLock.writeLock().unlock();
1268        }
1269    }
1270
1271    @SuppressWarnings("rawtypes")
1272    protected void process(KahaPrepareCommand command, Location location) {
1273        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1274        synchronized (inflightTransactions) {
1275            List<Operation> tx = inflightTransactions.remove(key);
1276            if (tx != null) {
1277                preparedTransactions.put(key, tx);
1278            }
1279        }
1280    }
1281
1282    @SuppressWarnings("rawtypes")
1283    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1284        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1285        List<Operation> updates = null;
1286        synchronized (inflightTransactions) {
1287            updates = inflightTransactions.remove(key);
1288            if (updates == null) {
1289                updates = preparedTransactions.remove(key);
1290            }
1291        }
1292    }
1293
1294    // /////////////////////////////////////////////////////////////////
1295    // These methods do the actual index updates.
1296    // /////////////////////////////////////////////////////////////////
1297
1298    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1299    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1300
1301    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1302        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1303
1304        // Skip adding the message to the index if this is a topic and there are
1305        // no subscriptions.
1306        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1307            return -1;
1308        }
1309
1310        // Add the message.
1311        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1312        long id = sd.orderIndex.getNextMessageId(priority);
1313        Long previous = sd.locationIndex.put(tx, location, id);
1314        if (previous == null) {
1315            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1316            if (previous == null) {
1317                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1318                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1319                    addAckLocationForNewMessage(tx, sd, id);
1320                }
1321                metadata.lastUpdate = location;
1322            } else {
1323                // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1324                LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1325                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1326                sd.locationIndex.remove(tx, location);
1327                id = -1;
1328            }
1329        } else {
1330            // restore the previous value.. Looks like this was a redo of a previously
1331            // added message. We don't want to assign it a new id as the other indexes would
1332            // be wrong..
1333            sd.locationIndex.put(tx, location, previous);
1334            metadata.lastUpdate = location;
1335        }
1336        // record this id in any event, initial send or recovery
1337        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1338        return id;
1339    }
1340
1341    void trackPendingAdd(KahaDestination destination, Long seq) {
1342        StoredDestination sd = storedDestinations.get(key(destination));
1343        if (sd != null) {
1344            sd.trackPendingAdd(seq);
1345        }
1346    }
1347
1348    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1349        StoredDestination sd = storedDestinations.get(key(destination));
1350        if (sd != null) {
1351            sd.trackPendingAddComplete(seq);
1352        }
1353    }
1354
1355    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1356        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1357        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1358
1359        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1360        if (id != null) {
1361            MessageKeys previousKeys = sd.orderIndex.put(
1362                    tx,
1363                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1364                    id,
1365                    new MessageKeys(command.getMessageId(), location)
1366            );
1367            sd.locationIndex.put(tx, location, id);
1368            if(previousKeys != null) {
1369                sd.locationIndex.remove(tx, previousKeys.location);
1370            }
1371            metadata.lastUpdate = location;
1372        } else {
1373            LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1374        }
1375    }
1376
1377    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1378        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1379        if (!command.hasSubscriptionKey()) {
1380
1381            // In the queue case we just remove the message from the index..
1382            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1383            if (sequenceId != null) {
1384                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1385                if (keys != null) {
1386                    sd.locationIndex.remove(tx, keys.location);
1387                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1388                    metadata.lastUpdate = ackLocation;
1389                }  else if (LOG.isDebugEnabled()) {
1390                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1391                }
1392            } else if (LOG.isDebugEnabled()) {
1393                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1394            }
1395        } else {
1396            // In the topic case we need remove the message once it's been acked
1397            // by all the subs
1398            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1399
1400            // Make sure it's a valid message id...
1401            if (sequence != null) {
1402                String subscriptionKey = command.getSubscriptionKey();
1403                if (command.getAck() != UNMATCHED) {
1404                    sd.orderIndex.get(tx, sequence);
1405                    byte priority = sd.orderIndex.lastGetPriority();
1406                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1407                }
1408
1409                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1410                if (keys != null) {
1411                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1412                }
1413                // The following method handles deleting un-referenced messages.
1414                removeAckLocation(tx, sd, subscriptionKey, sequence);
1415                metadata.lastUpdate = ackLocation;
1416            } else if (LOG.isDebugEnabled()) {
1417                LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1418            }
1419
1420        }
1421    }
1422
1423    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1424        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1425        if (referenceFileIds == null) {
1426            referenceFileIds = new HashSet<Integer>();
1427            referenceFileIds.add(messageLocation.getDataFileId());
1428            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1429        } else {
1430            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1431            if (!referenceFileIds.contains(id)) {
1432                referenceFileIds.add(id);
1433            }
1434        }
1435    }
1436
1437    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1438        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1439        sd.orderIndex.remove(tx);
1440
1441        sd.locationIndex.clear(tx);
1442        sd.locationIndex.unload(tx);
1443        tx.free(sd.locationIndex.getPageId());
1444
1445        sd.messageIdIndex.clear(tx);
1446        sd.messageIdIndex.unload(tx);
1447        tx.free(sd.messageIdIndex.getPageId());
1448
1449        if (sd.subscriptions != null) {
1450            sd.subscriptions.clear(tx);
1451            sd.subscriptions.unload(tx);
1452            tx.free(sd.subscriptions.getPageId());
1453
1454            sd.subscriptionAcks.clear(tx);
1455            sd.subscriptionAcks.unload(tx);
1456            tx.free(sd.subscriptionAcks.getPageId());
1457
1458            sd.ackPositions.clear(tx);
1459            sd.ackPositions.unload(tx);
1460            tx.free(sd.ackPositions.getHeadPageId());
1461
1462            sd.subLocations.clear(tx);
1463            sd.subLocations.unload(tx);
1464            tx.free(sd.subLocations.getHeadPageId());
1465        }
1466
1467        String key = key(command.getDestination());
1468        storedDestinations.remove(key);
1469        metadata.destinations.remove(tx, key);
1470    }
1471
1472    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1473        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1474        final String subscriptionKey = command.getSubscriptionKey();
1475
1476        // If set then we are creating it.. otherwise we are destroying the sub
1477        if (command.hasSubscriptionInfo()) {
1478            sd.subscriptions.put(tx, subscriptionKey, command);
1479            sd.subLocations.put(tx, subscriptionKey, location);
1480            long ackLocation=NOT_ACKED;
1481            if (!command.getRetroactive()) {
1482                ackLocation = sd.orderIndex.nextMessageId-1;
1483            } else {
1484                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1485            }
1486            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1487            sd.subscriptionCache.add(subscriptionKey);
1488        } else {
1489            // delete the sub...
1490            sd.subscriptions.remove(tx, subscriptionKey);
1491            sd.subLocations.remove(tx, subscriptionKey);
1492            sd.subscriptionAcks.remove(tx, subscriptionKey);
1493            sd.subscriptionCache.remove(subscriptionKey);
1494            removeAckLocationsForSub(tx, sd, subscriptionKey);
1495
1496            if (sd.subscriptions.isEmpty(tx)) {
1497                // remove the stored destination
1498                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1499                removeDestinationCommand.setDestination(command.getDestination());
1500                updateIndex(tx, removeDestinationCommand, null);
1501            }
1502        }
1503    }
1504
1505    private void checkpointUpdate(final boolean cleanup) throws IOException {
1506        checkpointLock.writeLock().lock();
1507        try {
1508            this.indexLock.writeLock().lock();
1509            try {
1510                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1511                    @Override
1512                    public void execute(Transaction tx) throws IOException {
1513                        checkpointUpdate(tx, cleanup);
1514                    }
1515                });
1516            } finally {
1517                this.indexLock.writeLock().unlock();
1518            }
1519
1520        } finally {
1521            checkpointLock.writeLock().unlock();
1522        }
1523    }
1524
1525    /**
1526     * @param tx
1527     * @throws IOException
1528     */
1529    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1530        LOG.debug("Checkpoint started.");
1531
1532        // reflect last update exclusive of current checkpoint
1533        Location lastUpdate = metadata.lastUpdate;
1534
1535        metadata.state = OPEN_STATE;
1536        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1537        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1538        Location[] inProgressTxRange = getInProgressTxLocationRange();
1539        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1540        tx.store(metadata.page, metadataMarshaller, true);
1541        pageFile.flush();
1542
1543        if( cleanup ) {
1544
1545            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1546            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1547
1548            if (LOG.isTraceEnabled()) {
1549                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1550            }
1551
1552            if (lastUpdate != null) {
1553                gcCandidateSet.remove(lastUpdate.getDataFileId());
1554            }
1555
1556            // Don't GC files under replication
1557            if( journalFilesBeingReplicated!=null ) {
1558                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1559            }
1560
1561            if (metadata.producerSequenceIdTrackerLocation != null) {
1562                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1563                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1564                    // rewrite so we don't prevent gc
1565                    metadata.producerSequenceIdTracker.setModified(true);
1566                    if (LOG.isTraceEnabled()) {
1567                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1568                    }
1569                }
1570                gcCandidateSet.remove(dataFileId);
1571                if (LOG.isTraceEnabled()) {
1572                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
1573                }
1574            }
1575
1576            if (metadata.ackMessageFileMapLocation != null) {
1577                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1578                gcCandidateSet.remove(dataFileId);
1579                if (LOG.isTraceEnabled()) {
1580                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
1581                }
1582            }
1583
1584            // Don't GC files referenced by in-progress tx
1585            if (inProgressTxRange[0] != null) {
1586                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1587                    gcCandidateSet.remove(pendingTx);
1588                }
1589            }
1590            if (LOG.isTraceEnabled()) {
1591                LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1592            }
1593
1594            // Go through all the destinations to see if any of them can remove GC candidates.
1595            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1596                if( gcCandidateSet.isEmpty() ) {
1597                    break;
1598                }
1599
1600                // Use a visitor to cut down the number of pages that we load
1601                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1602                    int last=-1;
1603                    @Override
1604                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1605                        if( first==null ) {
1606                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1607                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1608                                subset.remove(second.getDataFileId());
1609                            }
1610                            return !subset.isEmpty();
1611                        } else if( second==null ) {
1612                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1613                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1614                                subset.remove(first.getDataFileId());
1615                            }
1616                            return !subset.isEmpty();
1617                        } else {
1618                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1619                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1620                                subset.remove(first.getDataFileId());
1621                            }
1622                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1623                                subset.remove(second.getDataFileId());
1624                            }
1625                            return !subset.isEmpty();
1626                        }
1627                    }
1628
1629                    @Override
1630                    public void visit(List<Location> keys, List<Long> values) {
1631                        for (Location l : keys) {
1632                            int fileId = l.getDataFileId();
1633                            if( last != fileId ) {
1634                                gcCandidateSet.remove(fileId);
1635                                last = fileId;
1636                            }
1637                        }
1638                    }
1639                });
1640
1641                // Durable Subscription
1642                if (entry.getValue().subLocations != null) {
1643                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1644                    while (iter.hasNext()) {
1645                        Entry<String, Location> subscription = iter.next();
1646                        int dataFileId = subscription.getValue().getDataFileId();
1647
1648                        // Move subscription along if it has no outstanding messages that need ack'd
1649                        // and its in the last log file in the journal.
1650                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1651                            final StoredDestination destination = entry.getValue();
1652                            final String subscriptionKey = subscription.getKey();
1653                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1654
1655                            // When pending is size one that is the next message Id meaning there
1656                            // are no pending messages currently.
1657                            if (pendingAcks == null || pendingAcks.size() <= 1) {
1658                                if (LOG.isTraceEnabled()) {
1659                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1660                                }
1661
1662                                final KahaSubscriptionCommand kahaSub =
1663                                    destination.subscriptions.get(tx, subscriptionKey);
1664                                destination.subLocations.put(
1665                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1666
1667                                // Skips the remove from candidates if we rewrote the subscription
1668                                // in order to prevent duplicate subscription commands on recover.
1669                                // If another subscription is on the same file and isn't rewritten
1670                                // than it will remove the file from the set.
1671                                continue;
1672                            }
1673                        }
1674
1675                        gcCandidateSet.remove(dataFileId);
1676                    }
1677                }
1678
1679                if (LOG.isTraceEnabled()) {
1680                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1681                }
1682            }
1683
1684            // check we are not deleting file with ack for in-use journal files
1685            if (LOG.isTraceEnabled()) {
1686                LOG.trace("gc candidates: " + gcCandidateSet);
1687            }
1688            Iterator<Integer> candidates = gcCandidateSet.iterator();
1689            while (candidates.hasNext()) {
1690                Integer candidate = candidates.next();
1691                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1692                if (referencedFileIds != null) {
1693                    for (Integer referencedFileId : referencedFileIds) {
1694                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1695                            // active file that is not targeted for deletion is referenced so don't delete
1696                            candidates.remove();
1697                            break;
1698                        }
1699                    }
1700                    if (gcCandidateSet.contains(candidate)) {
1701                        metadata.ackMessageFileMap.remove(candidate);
1702                    } else {
1703                        if (LOG.isTraceEnabled()) {
1704                            LOG.trace("not removing data file: " + candidate
1705                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1706                        }
1707                    }
1708                }
1709            }
1710
1711            if (!gcCandidateSet.isEmpty()) {
1712                if (LOG.isDebugEnabled()) {
1713                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
1714                }
1715                journal.removeDataFiles(gcCandidateSet);
1716            }
1717        }
1718
1719        LOG.debug("Checkpoint done.");
1720    }
1721
1722    final Runnable nullCompletionCallback = new Runnable() {
1723        @Override
1724        public void run() {
1725        }
1726    };
1727
1728    private Location checkpointProducerAudit() throws IOException {
1729        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
1730            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1731            ObjectOutputStream oout = new ObjectOutputStream(baos);
1732            oout.writeObject(metadata.producerSequenceIdTracker);
1733            oout.flush();
1734            oout.close();
1735            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1736            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
1737            try {
1738                location.getLatch().await();
1739            } catch (InterruptedException e) {
1740                throw new InterruptedIOException(e.toString());
1741            }
1742            return location;
1743        }
1744        return metadata.producerSequenceIdTrackerLocation;
1745    }
1746
1747    private Location checkpointAckMessageFileMap() throws IOException {
1748        ByteArrayOutputStream baos = new ByteArrayOutputStream();
1749        ObjectOutputStream oout = new ObjectOutputStream(baos);
1750        oout.writeObject(metadata.ackMessageFileMap);
1751        oout.flush();
1752        oout.close();
1753        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
1754        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
1755        try {
1756            location.getLatch().await();
1757        } catch (InterruptedException e) {
1758            throw new InterruptedIOException(e.toString());
1759        }
1760        return location;
1761    }
1762
1763    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
1764
1765        ByteSequence sequence = toByteSequence(subscription);
1766        Location location = journal.write(sequence, nullCompletionCallback) ;
1767
1768        try {
1769            location.getLatch().await();
1770        } catch (InterruptedException e) {
1771            throw new InterruptedIOException(e.toString());
1772        }
1773        return location;
1774    }
1775
1776    public HashSet<Integer> getJournalFilesBeingReplicated() {
1777        return journalFilesBeingReplicated;
1778    }
1779
1780    // /////////////////////////////////////////////////////////////////
1781    // StoredDestination related implementation methods.
1782    // /////////////////////////////////////////////////////////////////
1783
1784    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1785
1786    static class MessageKeys {
1787        final String messageId;
1788        final Location location;
1789
1790        public MessageKeys(String messageId, Location location) {
1791            this.messageId=messageId;
1792            this.location=location;
1793        }
1794
1795        @Override
1796        public String toString() {
1797            return "["+messageId+","+location+"]";
1798        }
1799    }
1800
1801    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1802        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1803
1804        @Override
1805        public MessageKeys readPayload(DataInput dataIn) throws IOException {
1806            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1807        }
1808
1809        @Override
1810        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1811            dataOut.writeUTF(object.messageId);
1812            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1813        }
1814    }
1815
1816    class LastAck {
1817        long lastAckedSequence;
1818        byte priority;
1819
1820        public LastAck(LastAck source) {
1821            this.lastAckedSequence = source.lastAckedSequence;
1822            this.priority = source.priority;
1823        }
1824
1825        public LastAck() {
1826            this.priority = MessageOrderIndex.HI;
1827        }
1828
1829        public LastAck(long ackLocation) {
1830            this.lastAckedSequence = ackLocation;
1831            this.priority = MessageOrderIndex.LO;
1832        }
1833
1834        public LastAck(long ackLocation, byte priority) {
1835            this.lastAckedSequence = ackLocation;
1836            this.priority = priority;
1837        }
1838
1839        @Override
1840        public String toString() {
1841            return "[" + lastAckedSequence + ":" + priority + "]";
1842        }
1843    }
1844
1845    protected class LastAckMarshaller implements Marshaller<LastAck> {
1846
1847        @Override
1848        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1849            dataOut.writeLong(object.lastAckedSequence);
1850            dataOut.writeByte(object.priority);
1851        }
1852
1853        @Override
1854        public LastAck readPayload(DataInput dataIn) throws IOException {
1855            LastAck lastAcked = new LastAck();
1856            lastAcked.lastAckedSequence = dataIn.readLong();
1857            if (metadata.version >= 3) {
1858                lastAcked.priority = dataIn.readByte();
1859            }
1860            return lastAcked;
1861        }
1862
1863        @Override
1864        public int getFixedSize() {
1865            return 9;
1866        }
1867
1868        @Override
1869        public LastAck deepCopy(LastAck source) {
1870            return new LastAck(source);
1871        }
1872
1873        @Override
1874        public boolean isDeepCopySupported() {
1875            return true;
1876        }
1877    }
1878
1879    class StoredDestination {
1880
1881        MessageOrderIndex orderIndex = new MessageOrderIndex();
1882        BTreeIndex<Location, Long> locationIndex;
1883        BTreeIndex<String, Long> messageIdIndex;
1884
1885        // These bits are only set for Topics
1886        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1887        BTreeIndex<String, LastAck> subscriptionAcks;
1888        HashMap<String, MessageOrderCursor> subscriptionCursors;
1889        ListIndex<String, SequenceSet> ackPositions;
1890        ListIndex<String, Location> subLocations;
1891
1892        // Transient data used to track which Messages are no longer needed.
1893        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
1894        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
1895
1896        public void trackPendingAdd(Long seq) {
1897            orderIndex.trackPendingAdd(seq);
1898        }
1899
1900        public void trackPendingAddComplete(Long seq) {
1901            orderIndex.trackPendingAddComplete(seq);
1902        }
1903
1904        @Override
1905        public String toString() {
1906            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
1907        }
1908    }
1909
1910    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1911
1912        @Override
1913        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
1914            final StoredDestination value = new StoredDestination();
1915            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1916            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1917            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1918
1919            if (dataIn.readBoolean()) {
1920                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1921                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1922                if (metadata.version >= 4) {
1923                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
1924                } else {
1925                    // upgrade
1926                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1927                        @Override
1928                        public void execute(Transaction tx) throws IOException {
1929                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
1930
1931                            if (metadata.version >= 3) {
1932                                // migrate
1933                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
1934                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1935                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1936                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1937                                oldAckPositions.load(tx);
1938
1939
1940                                // Do the initial build of the data in memory before writing into the store
1941                                // based Ack Positions List to avoid a lot of disk thrashing.
1942                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
1943                                while (iterator.hasNext()) {
1944                                    Entry<Long, HashSet<String>> entry = iterator.next();
1945
1946                                    for(String subKey : entry.getValue()) {
1947                                        SequenceSet pendingAcks = temp.get(subKey);
1948                                        if (pendingAcks == null) {
1949                                            pendingAcks = new SequenceSet();
1950                                            temp.put(subKey, pendingAcks);
1951                                        }
1952
1953                                        pendingAcks.add(entry.getKey());
1954                                    }
1955                                }
1956                            }
1957                            // Now move the pending messages to ack data into the store backed
1958                            // structure.
1959                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
1960                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
1961                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
1962                            value.ackPositions.load(tx);
1963                            for(String subscriptionKey : temp.keySet()) {
1964                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
1965                            }
1966
1967                        }
1968                    });
1969                }
1970
1971                if (metadata.version >= 5) {
1972                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
1973                } else {
1974                    // upgrade
1975                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1976                        @Override
1977                        public void execute(Transaction tx) throws IOException {
1978                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
1979                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
1980                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
1981                            value.subLocations.load(tx);
1982                        }
1983                    });
1984                }
1985            }
1986            if (metadata.version >= 2) {
1987                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1988                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1989            } else {
1990                // upgrade
1991                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1992                    @Override
1993                    public void execute(Transaction tx) throws IOException {
1994                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1995                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1996                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1997                        value.orderIndex.lowPriorityIndex.load(tx);
1998
1999                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2000                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2001                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2002                        value.orderIndex.highPriorityIndex.load(tx);
2003                    }
2004                });
2005            }
2006
2007            return value;
2008        }
2009
2010        @Override
2011        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2012            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2013            dataOut.writeLong(value.locationIndex.getPageId());
2014            dataOut.writeLong(value.messageIdIndex.getPageId());
2015            if (value.subscriptions != null) {
2016                dataOut.writeBoolean(true);
2017                dataOut.writeLong(value.subscriptions.getPageId());
2018                dataOut.writeLong(value.subscriptionAcks.getPageId());
2019                dataOut.writeLong(value.ackPositions.getHeadPageId());
2020                dataOut.writeLong(value.subLocations.getHeadPageId());
2021            } else {
2022                dataOut.writeBoolean(false);
2023            }
2024            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2025            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2026        }
2027    }
2028
2029    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2030        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2031
2032        @Override
2033        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2034            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2035            rc.mergeFramed((InputStream)dataIn);
2036            return rc;
2037        }
2038
2039        @Override
2040        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2041            object.writeFramed((OutputStream)dataOut);
2042        }
2043    }
2044
2045    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2046        String key = key(destination);
2047        StoredDestination rc = storedDestinations.get(key);
2048        if (rc == null) {
2049            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2050            rc = loadStoredDestination(tx, key, topic);
2051            // Cache it. We may want to remove/unload destinations from the
2052            // cache that are not used for a while
2053            // to reduce memory usage.
2054            storedDestinations.put(key, rc);
2055        }
2056        return rc;
2057    }
2058
2059    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2060        String key = key(destination);
2061        StoredDestination rc = storedDestinations.get(key);
2062        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2063            rc = getStoredDestination(destination, tx);
2064        }
2065        return rc;
2066    }
2067
2068    /**
2069     * @param tx
2070     * @param key
2071     * @param topic
2072     * @return
2073     * @throws IOException
2074     */
2075    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2076        // Try to load the existing indexes..
2077        StoredDestination rc = metadata.destinations.get(tx, key);
2078        if (rc == null) {
2079            // Brand new destination.. allocate indexes for it.
2080            rc = new StoredDestination();
2081            rc.orderIndex.allocate(tx);
2082            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2083            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2084
2085            if (topic) {
2086                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2087                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2088                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2089                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2090            }
2091            metadata.destinations.put(tx, key, rc);
2092        }
2093
2094        // Configure the marshalers and load.
2095        rc.orderIndex.load(tx);
2096
2097        // Figure out the next key using the last entry in the destination.
2098        rc.orderIndex.configureLast(tx);
2099
2100        rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
2101        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2102        rc.locationIndex.load(tx);
2103
2104        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2105        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2106        rc.messageIdIndex.load(tx);
2107
2108        // If it was a topic...
2109        if (topic) {
2110
2111            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2112            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2113            rc.subscriptions.load(tx);
2114
2115            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2116            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2117            rc.subscriptionAcks.load(tx);
2118
2119            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2120            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2121            rc.ackPositions.load(tx);
2122
2123            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2124            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2125            rc.subLocations.load(tx);
2126
2127            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2128
2129            if (metadata.version < 3) {
2130
2131                // on upgrade need to fill ackLocation with available messages past last ack
2132                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2133                    Entry<String, LastAck> entry = iterator.next();
2134                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2135                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2136                        Long sequence = orderIterator.next().getKey();
2137                        addAckLocation(tx, rc, sequence, entry.getKey());
2138                    }
2139                    // modify so it is upgraded
2140                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2141                }
2142            }
2143
2144            // Configure the message references index
2145            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2146            while (subscriptions.hasNext()) {
2147                Entry<String, SequenceSet> subscription = subscriptions.next();
2148                SequenceSet pendingAcks = subscription.getValue();
2149                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2150                    Long lastPendingAck = pendingAcks.getTail().getLast();
2151                    for(Long sequenceId : pendingAcks) {
2152                        Long current = rc.messageReferences.get(sequenceId);
2153                        if (current == null) {
2154                            current = new Long(0);
2155                        }
2156
2157                        // We always add a trailing empty entry for the next position to start from
2158                        // so we need to ensure we don't count that as a message reference on reload.
2159                        if (!sequenceId.equals(lastPendingAck)) {
2160                            current = current.longValue() + 1;
2161                        }
2162
2163                        rc.messageReferences.put(sequenceId, current);
2164                    }
2165                }
2166            }
2167
2168            // Configure the subscription cache
2169            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2170                Entry<String, LastAck> entry = iterator.next();
2171                rc.subscriptionCache.add(entry.getKey());
2172            }
2173
2174            if (rc.orderIndex.nextMessageId == 0) {
2175                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2176                if (!rc.subscriptionAcks.isEmpty(tx)) {
2177                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2178                        Entry<String, LastAck> entry = iterator.next();
2179                        rc.orderIndex.nextMessageId =
2180                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2181                    }
2182                }
2183            } else {
2184                // update based on ackPositions for unmatched, last entry is always the next
2185                if (!rc.messageReferences.isEmpty()) {
2186                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2187                    rc.orderIndex.nextMessageId =
2188                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2189                }
2190            }
2191        }
2192
2193        if (metadata.version < VERSION) {
2194            // store again after upgrade
2195            metadata.destinations.put(tx, key, rc);
2196        }
2197        return rc;
2198    }
2199
2200    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2201        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2202        if (sequences == null) {
2203            sequences = new SequenceSet();
2204            sequences.add(messageSequence);
2205            sd.ackPositions.add(tx, subscriptionKey, sequences);
2206        } else {
2207            sequences.add(messageSequence);
2208            sd.ackPositions.put(tx, subscriptionKey, sequences);
2209        }
2210
2211        Long count = sd.messageReferences.get(messageSequence);
2212        if (count == null) {
2213            count = Long.valueOf(0L);
2214        }
2215        count = count.longValue() + 1;
2216        sd.messageReferences.put(messageSequence, count);
2217    }
2218
2219    // new sub is interested in potentially all existing messages
2220    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2221        SequenceSet allOutstanding = new SequenceSet();
2222        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2223        while (iterator.hasNext()) {
2224            SequenceSet set = iterator.next().getValue();
2225            for (Long entry : set) {
2226                allOutstanding.add(entry);
2227            }
2228        }
2229        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2230
2231        for (Long ackPosition : allOutstanding) {
2232            Long count = sd.messageReferences.get(ackPosition);
2233            count = count.longValue() + 1;
2234            sd.messageReferences.put(ackPosition, count);
2235        }
2236    }
2237
2238    // on a new message add, all existing subs are interested in this message
2239    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2240        for(String subscriptionKey : sd.subscriptionCache) {
2241            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2242            if (sequences == null) {
2243                sequences = new SequenceSet();
2244                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2245                sd.ackPositions.add(tx, subscriptionKey, sequences);
2246            } else {
2247                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2248                sd.ackPositions.put(tx, subscriptionKey, sequences);
2249            }
2250
2251            Long count = sd.messageReferences.get(messageSequence);
2252            if (count == null) {
2253                count = Long.valueOf(0L);
2254            }
2255            count = count.longValue() + 1;
2256            sd.messageReferences.put(messageSequence, count);
2257            sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
2258        }
2259    }
2260
2261    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2262        if (!sd.ackPositions.isEmpty(tx)) {
2263            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2264            if (sequences == null || sequences.isEmpty()) {
2265                return;
2266            }
2267
2268            ArrayList<Long> unreferenced = new ArrayList<Long>();
2269
2270            for(Long sequenceId : sequences) {
2271                Long references = sd.messageReferences.get(sequenceId);
2272                if (references != null) {
2273                    references = references.longValue() - 1;
2274
2275                    if (references.longValue() > 0) {
2276                        sd.messageReferences.put(sequenceId, references);
2277                    } else {
2278                        sd.messageReferences.remove(sequenceId);
2279                        unreferenced.add(sequenceId);
2280                    }
2281                }
2282            }
2283
2284            for(Long sequenceId : unreferenced) {
2285                // Find all the entries that need to get deleted.
2286                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2287                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2288
2289                // Do the actual deletes.
2290                for (Entry<Long, MessageKeys> entry : deletes) {
2291                    sd.locationIndex.remove(tx, entry.getValue().location);
2292                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2293                    sd.orderIndex.remove(tx, entry.getKey());
2294                }
2295            }
2296        }
2297    }
2298
2299    /**
2300     * @param tx
2301     * @param sd
2302     * @param subscriptionKey
2303     * @param messageSequence
2304     * @throws IOException
2305     */
2306    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2307        // Remove the sub from the previous location set..
2308        if (messageSequence != null) {
2309            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2310            if (range != null && !range.isEmpty()) {
2311                range.remove(messageSequence);
2312                if (!range.isEmpty()) {
2313                    sd.ackPositions.put(tx, subscriptionKey, range);
2314                } else {
2315                    sd.ackPositions.remove(tx, subscriptionKey);
2316                }
2317
2318                // Check if the message is reference by any other subscription.
2319                Long count = sd.messageReferences.get(messageSequence);
2320                if (count != null){
2321                long references = count.longValue() - 1;
2322                    if (references > 0) {
2323                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2324                        return;
2325                    } else {
2326                        sd.messageReferences.remove(messageSequence);
2327                    }
2328                }
2329
2330                // Find all the entries that need to get deleted.
2331                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2332                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2333
2334                // Do the actual deletes.
2335                for (Entry<Long, MessageKeys> entry : deletes) {
2336                    sd.locationIndex.remove(tx, entry.getValue().location);
2337                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2338                    sd.orderIndex.remove(tx, entry.getKey());
2339                }
2340            }
2341        }
2342    }
2343
2344    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2345        return sd.subscriptionAcks.get(tx, subscriptionKey);
2346    }
2347
2348    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2349        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2350        if (messageSequences != null) {
2351            long result = messageSequences.rangeSize();
2352            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2353            return result > 0 ? result - 1 : 0;
2354        }
2355
2356        return 0;
2357    }
2358
2359    protected String key(KahaDestination destination) {
2360        return destination.getType().getNumber() + ":" + destination.getName();
2361    }
2362
2363    // /////////////////////////////////////////////////////////////////
2364    // Transaction related implementation methods.
2365    // /////////////////////////////////////////////////////////////////
2366    @SuppressWarnings("rawtypes")
2367    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2368    @SuppressWarnings("rawtypes")
2369    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2370    protected final Set<String> ackedAndPrepared = new HashSet<String>();
2371    protected final Set<String> rolledBackAcks = new HashSet<String>();
2372
2373    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2374    // till then they are skipped by the store.
2375    // 'at most once' XA guarantee
2376    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2377        this.indexLock.writeLock().lock();
2378        try {
2379            for (MessageAck ack : acks) {
2380                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
2381            }
2382        } finally {
2383            this.indexLock.writeLock().unlock();
2384        }
2385    }
2386
2387    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
2388        if (acks != null) {
2389            this.indexLock.writeLock().lock();
2390            try {
2391                for (MessageAck ack : acks) {
2392                    final String id = ack.getLastMessageId().toProducerKey();
2393                    ackedAndPrepared.remove(id);
2394                    if (rollback) {
2395                        rolledBackAcks.add(id);
2396                    }
2397                }
2398            } finally {
2399                this.indexLock.writeLock().unlock();
2400            }
2401        }
2402    }
2403
2404    @SuppressWarnings("rawtypes")
2405    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2406        TransactionId key = TransactionIdConversion.convert(info);
2407        List<Operation> tx;
2408        synchronized (inflightTransactions) {
2409            tx = inflightTransactions.get(key);
2410            if (tx == null) {
2411                tx = Collections.synchronizedList(new ArrayList<Operation>());
2412                inflightTransactions.put(key, tx);
2413            }
2414        }
2415        return tx;
2416    }
2417
2418    @SuppressWarnings("unused")
2419    private TransactionId key(KahaTransactionInfo transactionInfo) {
2420        return TransactionIdConversion.convert(transactionInfo);
2421    }
2422
2423    abstract class Operation <T extends JournalCommand<T>> {
2424        final T command;
2425        final Location location;
2426
2427        public Operation(T command, Location location) {
2428            this.command = command;
2429            this.location = location;
2430        }
2431
2432        public Location getLocation() {
2433            return location;
2434        }
2435
2436        public T getCommand() {
2437            return command;
2438        }
2439
2440        abstract public void execute(Transaction tx) throws IOException;
2441    }
2442
2443    class AddOperation extends Operation<KahaAddMessageCommand> {
2444        final IndexAware runWithIndexLock;
2445        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2446            super(command, location);
2447            this.runWithIndexLock = runWithIndexLock;
2448        }
2449
2450        @Override
2451        public void execute(Transaction tx) throws IOException {
2452            long seq = updateIndex(tx, command, location);
2453            if (runWithIndexLock != null) {
2454                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2455            }
2456        }
2457
2458    }
2459
2460    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2461
2462        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2463            super(command, location);
2464        }
2465
2466        @Override
2467        public void execute(Transaction tx) throws IOException {
2468            updateIndex(tx, command, location);
2469        }
2470    }
2471
2472    // /////////////////////////////////////////////////////////////////
2473    // Initialization related implementation methods.
2474    // /////////////////////////////////////////////////////////////////
2475
2476    private PageFile createPageFile() throws IOException {
2477        if( indexDirectory == null ) {
2478            indexDirectory = directory;
2479        }
2480        IOHelper.mkdirs(indexDirectory);
2481        PageFile index = new PageFile(indexDirectory, "db");
2482        index.setEnableWriteThread(isEnableIndexWriteAsync());
2483        index.setWriteBatchSize(getIndexWriteBatchSize());
2484        index.setPageCacheSize(indexCacheSize);
2485        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2486        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2487        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2488        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2489        index.setEnablePageCaching(isEnableIndexPageCaching());
2490        return index;
2491    }
2492
2493    private Journal createJournal() throws IOException {
2494        Journal manager = new Journal();
2495        manager.setDirectory(directory);
2496        manager.setMaxFileLength(getJournalMaxFileLength());
2497        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2498        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2499        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2500        manager.setArchiveDataLogs(isArchiveDataLogs());
2501        manager.setSizeAccumulator(journalSize);
2502        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2503        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2504        manager.setPreallocationStrategy(
2505                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2506        if (getDirectoryArchive() != null) {
2507            IOHelper.mkdirs(getDirectoryArchive());
2508            manager.setDirectoryArchive(getDirectoryArchive());
2509        }
2510        return manager;
2511    }
2512
2513    private Metadata createMetadata() {
2514        Metadata md = new Metadata();
2515        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2516        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2517        return md;
2518    }
2519
2520    public int getJournalMaxWriteBatchSize() {
2521        return journalMaxWriteBatchSize;
2522    }
2523
2524    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2525        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2526    }
2527
2528    public File getDirectory() {
2529        return directory;
2530    }
2531
2532    public void setDirectory(File directory) {
2533        this.directory = directory;
2534    }
2535
2536    public boolean isDeleteAllMessages() {
2537        return deleteAllMessages;
2538    }
2539
2540    public void setDeleteAllMessages(boolean deleteAllMessages) {
2541        this.deleteAllMessages = deleteAllMessages;
2542    }
2543
2544    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2545        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2546    }
2547
2548    public int getIndexWriteBatchSize() {
2549        return setIndexWriteBatchSize;
2550    }
2551
2552    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2553        this.enableIndexWriteAsync = enableIndexWriteAsync;
2554    }
2555
2556    boolean isEnableIndexWriteAsync() {
2557        return enableIndexWriteAsync;
2558    }
2559
2560    public boolean isEnableJournalDiskSyncs() {
2561        return enableJournalDiskSyncs;
2562    }
2563
2564    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2565        this.enableJournalDiskSyncs = syncWrites;
2566    }
2567
2568    public long getCheckpointInterval() {
2569        return checkpointInterval;
2570    }
2571
2572    public void setCheckpointInterval(long checkpointInterval) {
2573        this.checkpointInterval = checkpointInterval;
2574    }
2575
2576    public long getCleanupInterval() {
2577        return cleanupInterval;
2578    }
2579
2580    public void setCleanupInterval(long cleanupInterval) {
2581        this.cleanupInterval = cleanupInterval;
2582    }
2583
2584    public void setJournalMaxFileLength(int journalMaxFileLength) {
2585        this.journalMaxFileLength = journalMaxFileLength;
2586    }
2587
2588    public int getJournalMaxFileLength() {
2589        return journalMaxFileLength;
2590    }
2591
2592    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2593        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2594    }
2595
2596    public int getMaxFailoverProducersToTrack() {
2597        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2598    }
2599
2600    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2601        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2602    }
2603
2604    public int getFailoverProducersAuditDepth() {
2605        return this.metadata.producerSequenceIdTracker.getAuditDepth();
2606    }
2607
2608    public PageFile getPageFile() throws IOException {
2609        if (pageFile == null) {
2610            pageFile = createPageFile();
2611        }
2612        return pageFile;
2613    }
2614
2615    public Journal getJournal() throws IOException {
2616        if (journal == null) {
2617            journal = createJournal();
2618        }
2619        return journal;
2620    }
2621
2622    public boolean isFailIfDatabaseIsLocked() {
2623        return failIfDatabaseIsLocked;
2624    }
2625
2626    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
2627        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
2628    }
2629
2630    public boolean isIgnoreMissingJournalfiles() {
2631        return ignoreMissingJournalfiles;
2632    }
2633
2634    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
2635        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
2636    }
2637
2638    public int getIndexCacheSize() {
2639        return indexCacheSize;
2640    }
2641
2642    public void setIndexCacheSize(int indexCacheSize) {
2643        this.indexCacheSize = indexCacheSize;
2644    }
2645
2646    public boolean isCheckForCorruptJournalFiles() {
2647        return checkForCorruptJournalFiles;
2648    }
2649
2650    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
2651        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
2652    }
2653
2654    public boolean isChecksumJournalFiles() {
2655        return checksumJournalFiles;
2656    }
2657
2658    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
2659        this.checksumJournalFiles = checksumJournalFiles;
2660    }
2661
2662    @Override
2663    public void setBrokerService(BrokerService brokerService) {
2664        this.brokerService = brokerService;
2665    }
2666
2667    /**
2668     * @return the archiveDataLogs
2669     */
2670    public boolean isArchiveDataLogs() {
2671        return this.archiveDataLogs;
2672    }
2673
2674    /**
2675     * @param archiveDataLogs the archiveDataLogs to set
2676     */
2677    public void setArchiveDataLogs(boolean archiveDataLogs) {
2678        this.archiveDataLogs = archiveDataLogs;
2679    }
2680
2681    /**
2682     * @return the directoryArchive
2683     */
2684    public File getDirectoryArchive() {
2685        return this.directoryArchive;
2686    }
2687
2688    /**
2689     * @param directoryArchive the directoryArchive to set
2690     */
2691    public void setDirectoryArchive(File directoryArchive) {
2692        this.directoryArchive = directoryArchive;
2693    }
2694
2695    public boolean isArchiveCorruptedIndex() {
2696        return archiveCorruptedIndex;
2697    }
2698
2699    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
2700        this.archiveCorruptedIndex = archiveCorruptedIndex;
2701    }
2702
2703    public float getIndexLFUEvictionFactor() {
2704        return indexLFUEvictionFactor;
2705    }
2706
2707    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
2708        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
2709    }
2710
2711    public boolean isUseIndexLFRUEviction() {
2712        return useIndexLFRUEviction;
2713    }
2714
2715    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
2716        this.useIndexLFRUEviction = useIndexLFRUEviction;
2717    }
2718
2719    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
2720        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
2721    }
2722
2723    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
2724        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
2725    }
2726
2727    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
2728        this.enableIndexPageCaching = enableIndexPageCaching;
2729    }
2730
2731    public boolean isEnableIndexDiskSyncs() {
2732        return enableIndexDiskSyncs;
2733    }
2734
2735    public boolean isEnableIndexRecoveryFile() {
2736        return enableIndexRecoveryFile;
2737    }
2738
2739    public boolean isEnableIndexPageCaching() {
2740        return enableIndexPageCaching;
2741    }
2742
2743    // /////////////////////////////////////////////////////////////////
2744    // Internal conversion methods.
2745    // /////////////////////////////////////////////////////////////////
2746
2747    class MessageOrderCursor{
2748        long defaultCursorPosition;
2749        long lowPriorityCursorPosition;
2750        long highPriorityCursorPosition;
2751        MessageOrderCursor(){
2752        }
2753
2754        MessageOrderCursor(long position){
2755            this.defaultCursorPosition=position;
2756            this.lowPriorityCursorPosition=position;
2757            this.highPriorityCursorPosition=position;
2758        }
2759
2760        MessageOrderCursor(MessageOrderCursor other){
2761            this.defaultCursorPosition=other.defaultCursorPosition;
2762            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2763            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2764        }
2765
2766        MessageOrderCursor copy() {
2767            return new MessageOrderCursor(this);
2768        }
2769
2770        void reset() {
2771            this.defaultCursorPosition=0;
2772            this.highPriorityCursorPosition=0;
2773            this.lowPriorityCursorPosition=0;
2774        }
2775
2776        void increment() {
2777            if (defaultCursorPosition!=0) {
2778                defaultCursorPosition++;
2779            }
2780            if (highPriorityCursorPosition!=0) {
2781                highPriorityCursorPosition++;
2782            }
2783            if (lowPriorityCursorPosition!=0) {
2784                lowPriorityCursorPosition++;
2785            }
2786        }
2787
2788        @Override
2789        public String toString() {
2790           return "MessageOrderCursor:[def:" + defaultCursorPosition
2791                   + ", low:" + lowPriorityCursorPosition
2792                   + ", high:" +  highPriorityCursorPosition + "]";
2793        }
2794
2795        public void sync(MessageOrderCursor other) {
2796            this.defaultCursorPosition=other.defaultCursorPosition;
2797            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2798            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2799        }
2800    }
2801
2802    class MessageOrderIndex {
2803        static final byte HI = 9;
2804        static final byte LO = 0;
2805        static final byte DEF = 4;
2806
2807        long nextMessageId;
2808        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2809        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2810        BTreeIndex<Long, MessageKeys> highPriorityIndex;
2811        final MessageOrderCursor cursor = new MessageOrderCursor();
2812        Long lastDefaultKey;
2813        Long lastHighKey;
2814        Long lastLowKey;
2815        byte lastGetPriority;
2816        final List<Long> pendingAdditions = new LinkedList<Long>();
2817
2818        MessageKeys remove(Transaction tx, Long key) throws IOException {
2819            MessageKeys result = defaultPriorityIndex.remove(tx, key);
2820            if (result == null && highPriorityIndex!=null) {
2821                result = highPriorityIndex.remove(tx, key);
2822                if (result ==null && lowPriorityIndex!=null) {
2823                    result = lowPriorityIndex.remove(tx, key);
2824                }
2825            }
2826            return result;
2827        }
2828
2829        void load(Transaction tx) throws IOException {
2830            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2831            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2832            defaultPriorityIndex.load(tx);
2833            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2834            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2835            lowPriorityIndex.load(tx);
2836            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2837            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2838            highPriorityIndex.load(tx);
2839        }
2840
2841        void allocate(Transaction tx) throws IOException {
2842            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2843            if (metadata.version >= 2) {
2844                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2845                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2846            }
2847        }
2848
2849        void configureLast(Transaction tx) throws IOException {
2850            // Figure out the next key using the last entry in the destination.
2851            TreeSet<Long> orderedSet = new TreeSet<Long>();
2852
2853            addLast(orderedSet, highPriorityIndex, tx);
2854            addLast(orderedSet, defaultPriorityIndex, tx);
2855            addLast(orderedSet, lowPriorityIndex, tx);
2856
2857            if (!orderedSet.isEmpty()) {
2858                nextMessageId = orderedSet.last() + 1;
2859            }
2860        }
2861
2862        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
2863            if (index != null) {
2864                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
2865                if (lastEntry != null) {
2866                    orderedSet.add(lastEntry.getKey());
2867                }
2868            }
2869        }
2870
2871        void clear(Transaction tx) throws IOException {
2872            this.remove(tx);
2873            this.resetCursorPosition();
2874            this.allocate(tx);
2875            this.load(tx);
2876            this.configureLast(tx);
2877        }
2878
2879        void remove(Transaction tx) throws IOException {
2880            defaultPriorityIndex.clear(tx);
2881            defaultPriorityIndex.unload(tx);
2882            tx.free(defaultPriorityIndex.getPageId());
2883            if (lowPriorityIndex != null) {
2884                lowPriorityIndex.clear(tx);
2885                lowPriorityIndex.unload(tx);
2886
2887                tx.free(lowPriorityIndex.getPageId());
2888            }
2889            if (highPriorityIndex != null) {
2890                highPriorityIndex.clear(tx);
2891                highPriorityIndex.unload(tx);
2892                tx.free(highPriorityIndex.getPageId());
2893            }
2894        }
2895
2896        void resetCursorPosition() {
2897            this.cursor.reset();
2898            lastDefaultKey = null;
2899            lastHighKey = null;
2900            lastLowKey = null;
2901        }
2902
2903        void setBatch(Transaction tx, Long sequence) throws IOException {
2904            if (sequence != null) {
2905                Long nextPosition = new Long(sequence.longValue() + 1);
2906                if (defaultPriorityIndex.containsKey(tx, sequence)) {
2907                    lastDefaultKey = sequence;
2908                    cursor.defaultCursorPosition = nextPosition.longValue();
2909                } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequence)) {
2910                    lastHighKey = sequence;
2911                    cursor.highPriorityCursorPosition = nextPosition.longValue();
2912                } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2913                    lastLowKey = sequence;
2914                    cursor.lowPriorityCursorPosition = nextPosition.longValue();
2915                } else {
2916                    lastDefaultKey = sequence;
2917                    cursor.defaultCursorPosition = nextPosition.longValue();
2918                }
2919            }
2920        }
2921
2922        void setBatch(Transaction tx, LastAck last) throws IOException {
2923            setBatch(tx, last.lastAckedSequence);
2924            if (cursor.defaultCursorPosition == 0
2925                    && cursor.highPriorityCursorPosition == 0
2926                    && cursor.lowPriorityCursorPosition == 0) {
2927                long next = last.lastAckedSequence + 1;
2928                switch (last.priority) {
2929                    case DEF:
2930                        cursor.defaultCursorPosition = next;
2931                        cursor.highPriorityCursorPosition = next;
2932                        break;
2933                    case HI:
2934                        cursor.highPriorityCursorPosition = next;
2935                        break;
2936                    case LO:
2937                        cursor.lowPriorityCursorPosition = next;
2938                        cursor.defaultCursorPosition = next;
2939                        cursor.highPriorityCursorPosition = next;
2940                        break;
2941                }
2942            }
2943        }
2944
2945        void stoppedIterating() {
2946            if (lastDefaultKey!=null) {
2947                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2948            }
2949            if (lastHighKey!=null) {
2950                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2951            }
2952            if (lastLowKey!=null) {
2953                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2954            }
2955            lastDefaultKey = null;
2956            lastHighKey = null;
2957            lastLowKey = null;
2958        }
2959
2960        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2961                throws IOException {
2962            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2963                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2964            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2965                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2966            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2967                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2968            }
2969        }
2970
2971        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2972                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2973
2974            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
2975            deletes.add(iterator.next());
2976        }
2977
2978        long getNextMessageId(int priority) {
2979            return nextMessageId++;
2980        }
2981
2982        MessageKeys get(Transaction tx, Long key) throws IOException {
2983            MessageKeys result = defaultPriorityIndex.get(tx, key);
2984            if (result == null) {
2985                result = highPriorityIndex.get(tx, key);
2986                if (result == null) {
2987                    result = lowPriorityIndex.get(tx, key);
2988                    lastGetPriority = LO;
2989                } else {
2990                    lastGetPriority = HI;
2991                }
2992            } else {
2993                lastGetPriority = DEF;
2994            }
2995            return result;
2996        }
2997
2998        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2999            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3000                return defaultPriorityIndex.put(tx, key, value);
3001            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3002                return highPriorityIndex.put(tx, key, value);
3003            } else {
3004                return lowPriorityIndex.put(tx, key, value);
3005            }
3006        }
3007
3008        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3009            return new MessageOrderIterator(tx,cursor,this);
3010        }
3011
3012        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3013            return new MessageOrderIterator(tx,m,this);
3014        }
3015
3016        public byte lastGetPriority() {
3017            return lastGetPriority;
3018        }
3019
3020        public boolean alreadyDispatched(Long sequence) {
3021            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3022                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3023                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3024        }
3025
3026        public void trackPendingAdd(Long seq) {
3027            synchronized (pendingAdditions) {
3028                pendingAdditions.add(seq);
3029            }
3030        }
3031
3032        public void trackPendingAddComplete(Long seq) {
3033            synchronized (pendingAdditions) {
3034                pendingAdditions.remove(seq);
3035            }
3036        }
3037
3038        public Long minPendingAdd() {
3039            synchronized (pendingAdditions) {
3040                if (!pendingAdditions.isEmpty()) {
3041                    return pendingAdditions.get(0);
3042                } else {
3043                    return null;
3044                }
3045            }
3046        }
3047
3048
3049        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3050            Iterator<Entry<Long, MessageKeys>>currentIterator;
3051            final Iterator<Entry<Long, MessageKeys>>highIterator;
3052            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3053            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3054
3055            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3056                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3057                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3058                if (highPriorityIndex != null) {
3059                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3060                } else {
3061                    this.highIterator = null;
3062                }
3063                if (lowPriorityIndex != null) {
3064                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3065                } else {
3066                    this.lowIterator = null;
3067                }
3068            }
3069
3070            @Override
3071            public boolean hasNext() {
3072                if (currentIterator == null) {
3073                    if (highIterator != null) {
3074                        if (highIterator.hasNext()) {
3075                            currentIterator = highIterator;
3076                            return currentIterator.hasNext();
3077                        }
3078                        if (defaultIterator.hasNext()) {
3079                            currentIterator = defaultIterator;
3080                            return currentIterator.hasNext();
3081                        }
3082                        if (lowIterator.hasNext()) {
3083                            currentIterator = lowIterator;
3084                            return currentIterator.hasNext();
3085                        }
3086                        return false;
3087                    } else {
3088                        currentIterator = defaultIterator;
3089                        return currentIterator.hasNext();
3090                    }
3091                }
3092                if (highIterator != null) {
3093                    if (currentIterator.hasNext()) {
3094                        return true;
3095                    }
3096                    if (currentIterator == highIterator) {
3097                        if (defaultIterator.hasNext()) {
3098                            currentIterator = defaultIterator;
3099                            return currentIterator.hasNext();
3100                        }
3101                        if (lowIterator.hasNext()) {
3102                            currentIterator = lowIterator;
3103                            return currentIterator.hasNext();
3104                        }
3105                        return false;
3106                    }
3107
3108                    if (currentIterator == defaultIterator) {
3109                        if (lowIterator.hasNext()) {
3110                            currentIterator = lowIterator;
3111                            return currentIterator.hasNext();
3112                        }
3113                        return false;
3114                    }
3115                }
3116                return currentIterator.hasNext();
3117            }
3118
3119            @Override
3120            public Entry<Long, MessageKeys> next() {
3121                Entry<Long, MessageKeys> result = currentIterator.next();
3122                if (result != null) {
3123                    Long key = result.getKey();
3124                    if (highIterator != null) {
3125                        if (currentIterator == defaultIterator) {
3126                            lastDefaultKey = key;
3127                        } else if (currentIterator == highIterator) {
3128                            lastHighKey = key;
3129                        } else {
3130                            lastLowKey = key;
3131                        }
3132                    } else {
3133                        lastDefaultKey = key;
3134                    }
3135                }
3136                return result;
3137            }
3138
3139            @Override
3140            public void remove() {
3141                throw new UnsupportedOperationException();
3142            }
3143
3144        }
3145    }
3146
3147    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3148        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3149
3150        @Override
3151        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3152            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3153            ObjectOutputStream oout = new ObjectOutputStream(baos);
3154            oout.writeObject(object);
3155            oout.flush();
3156            oout.close();
3157            byte[] data = baos.toByteArray();
3158            dataOut.writeInt(data.length);
3159            dataOut.write(data);
3160        }
3161
3162        @Override
3163        @SuppressWarnings("unchecked")
3164        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3165            int dataLen = dataIn.readInt();
3166            byte[] data = new byte[dataLen];
3167            dataIn.readFully(data);
3168            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3169            ObjectInputStream oin = new ObjectInputStream(bais);
3170            try {
3171                return (HashSet<String>) oin.readObject();
3172            } catch (ClassNotFoundException cfe) {
3173                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3174                ioe.initCause(cfe);
3175                throw ioe;
3176            }
3177        }
3178    }
3179
3180    public File getIndexDirectory() {
3181        return indexDirectory;
3182    }
3183
3184    public void setIndexDirectory(File indexDirectory) {
3185        this.indexDirectory = indexDirectory;
3186    }
3187
3188    interface IndexAware {
3189        public void sequenceAssignedWithIndexLocked(long index);
3190    }
3191
3192    public String getPreallocationScope() {
3193        return preallocationScope;
3194    }
3195
3196    public void setPreallocationScope(String preallocationScope) {
3197        this.preallocationScope = preallocationScope;
3198    }
3199
3200    public String getPreallocationStrategy() {
3201        return preallocationStrategy;
3202    }
3203
3204    public void setPreallocationStrategy(String preallocationStrategy) {
3205        this.preallocationStrategy = preallocationStrategy;
3206    }
3207
3208}