001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInputStream;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Set;
031import java.util.concurrent.BlockingQueue;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.FutureTask;
034import java.util.concurrent.LinkedBlockingQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.ThreadFactory;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.activemq.broker.ConnectionContext;
044import org.apache.activemq.broker.region.BaseDestination;
045import org.apache.activemq.broker.scheduler.JobSchedulerStore;
046import org.apache.activemq.command.ActiveMQDestination;
047import org.apache.activemq.command.ActiveMQQueue;
048import org.apache.activemq.command.ActiveMQTempQueue;
049import org.apache.activemq.command.ActiveMQTempTopic;
050import org.apache.activemq.command.ActiveMQTopic;
051import org.apache.activemq.command.Message;
052import org.apache.activemq.command.MessageAck;
053import org.apache.activemq.command.MessageId;
054import org.apache.activemq.command.ProducerId;
055import org.apache.activemq.command.SubscriptionInfo;
056import org.apache.activemq.command.TransactionId;
057import org.apache.activemq.openwire.OpenWireFormat;
058import org.apache.activemq.protobuf.Buffer;
059import org.apache.activemq.store.AbstractMessageStore;
060import org.apache.activemq.store.IndexListener;
061import org.apache.activemq.store.ListenableFuture;
062import org.apache.activemq.store.MessageRecoveryListener;
063import org.apache.activemq.store.MessageStore;
064import org.apache.activemq.store.PersistenceAdapter;
065import org.apache.activemq.store.TopicMessageStore;
066import org.apache.activemq.store.TransactionIdTransformer;
067import org.apache.activemq.store.TransactionStore;
068import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
069import org.apache.activemq.store.kahadb.data.KahaDestination;
070import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
071import org.apache.activemq.store.kahadb.data.KahaLocation;
072import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
073import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
074import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
075import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
076import org.apache.activemq.store.kahadb.disk.journal.Location;
077import org.apache.activemq.store.kahadb.disk.page.Transaction;
078import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
079import org.apache.activemq.usage.MemoryUsage;
080import org.apache.activemq.usage.SystemUsage;
081import org.apache.activemq.util.ServiceStopper;
082import org.apache.activemq.util.ThreadPoolUtils;
083import org.apache.activemq.wireformat.WireFormat;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
088    static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
089    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
090
091    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
092    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
093            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
094    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
095    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
096            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
097
098    protected ExecutorService queueExecutor;
099    protected ExecutorService topicExecutor;
100    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
101    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
102    final WireFormat wireFormat = new OpenWireFormat();
103    private SystemUsage usageManager;
104    private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
105    private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
106    Semaphore globalQueueSemaphore;
107    Semaphore globalTopicSemaphore;
108    private boolean concurrentStoreAndDispatchQueues = true;
109    // when true, message order may be compromised when cache is exhausted if store is out
110    // or order w.r.t cache
111    private boolean concurrentStoreAndDispatchTopics = false;
112    private final boolean concurrentStoreAndDispatchTransactions = false;
113    private int maxAsyncJobs = MAX_ASYNC_JOBS;
114    private final KahaDBTransactionStore transactionStore;
115    private TransactionIdTransformer transactionIdTransformer;
116
117    public KahaDBStore() {
118        this.transactionStore = new KahaDBTransactionStore(this);
119        this.transactionIdTransformer = new TransactionIdTransformer() {
120            @Override
121            public TransactionId transform(TransactionId txid) {
122                return txid;
123            }
124        };
125    }
126
127    @Override
128    public String toString() {
129        return "KahaDB:[" + directory.getAbsolutePath() + "]";
130    }
131
132    @Override
133    public void setBrokerName(String brokerName) {
134    }
135
136    @Override
137    public void setUsageManager(SystemUsage usageManager) {
138        this.usageManager = usageManager;
139    }
140
141    public SystemUsage getUsageManager() {
142        return this.usageManager;
143    }
144
145    /**
146     * @return the concurrentStoreAndDispatch
147     */
148    public boolean isConcurrentStoreAndDispatchQueues() {
149        return this.concurrentStoreAndDispatchQueues;
150    }
151
152    /**
153     * @param concurrentStoreAndDispatch
154     *            the concurrentStoreAndDispatch to set
155     */
156    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
157        this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
158    }
159
160    /**
161     * @return the concurrentStoreAndDispatch
162     */
163    public boolean isConcurrentStoreAndDispatchTopics() {
164        return this.concurrentStoreAndDispatchTopics;
165    }
166
167    /**
168     * @param concurrentStoreAndDispatch
169     *            the concurrentStoreAndDispatch to set
170     */
171    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
172        this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
173    }
174
175    public boolean isConcurrentStoreAndDispatchTransactions() {
176        return this.concurrentStoreAndDispatchTransactions;
177    }
178
179    /**
180     * @return the maxAsyncJobs
181     */
182    public int getMaxAsyncJobs() {
183        return this.maxAsyncJobs;
184    }
185
186    /**
187     * @param maxAsyncJobs
188     *            the maxAsyncJobs to set
189     */
190    public void setMaxAsyncJobs(int maxAsyncJobs) {
191        this.maxAsyncJobs = maxAsyncJobs;
192    }
193
194    @Override
195    public void doStart() throws Exception {
196        if (brokerService != null) {
197            metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
198            wireFormat.setVersion(metadata.openwireVersion);
199
200            if (LOG.isDebugEnabled()) {
201                LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion);
202            }
203
204        }
205        super.doStart();
206
207        if (brokerService != null) {
208            // In case the recovered store used a different OpenWire version log a warning
209            // to assist in determining why journal reads fail.
210            if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) {
211                LOG.warn("Recovered Store uses a different OpenWire version[{}] " +
212                         "than the version configured[{}].",
213                         metadata.openwireVersion, brokerService.getStoreOpenWireVersion());
214            }
215        }
216
217        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
218        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
219        this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
220        this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
221        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
222            asyncQueueJobQueue, new ThreadFactory() {
223                @Override
224                public Thread newThread(Runnable runnable) {
225                    Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
226                    thread.setDaemon(true);
227                    return thread;
228                }
229            });
230        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
231            asyncTopicJobQueue, new ThreadFactory() {
232                @Override
233                public Thread newThread(Runnable runnable) {
234                    Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
235                    thread.setDaemon(true);
236                    return thread;
237                }
238            });
239    }
240
241    @Override
242    public void doStop(ServiceStopper stopper) throws Exception {
243        // drain down async jobs
244        LOG.info("Stopping async queue tasks");
245        if (this.globalQueueSemaphore != null) {
246            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
247        }
248        synchronized (this.asyncQueueMaps) {
249            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
250                synchronized (m) {
251                    for (StoreTask task : m.values()) {
252                        task.cancel();
253                    }
254                }
255            }
256            this.asyncQueueMaps.clear();
257        }
258        LOG.info("Stopping async topic tasks");
259        if (this.globalTopicSemaphore != null) {
260            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
261        }
262        synchronized (this.asyncTopicMaps) {
263            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
264                synchronized (m) {
265                    for (StoreTask task : m.values()) {
266                        task.cancel();
267                    }
268                }
269            }
270            this.asyncTopicMaps.clear();
271        }
272        if (this.globalQueueSemaphore != null) {
273            this.globalQueueSemaphore.drainPermits();
274        }
275        if (this.globalTopicSemaphore != null) {
276            this.globalTopicSemaphore.drainPermits();
277        }
278        if (this.queueExecutor != null) {
279            ThreadPoolUtils.shutdownNow(queueExecutor);
280            queueExecutor = null;
281        }
282        if (this.topicExecutor != null) {
283            ThreadPoolUtils.shutdownNow(topicExecutor);
284            topicExecutor = null;
285        }
286        LOG.info("Stopped KahaDB");
287        super.doStop(stopper);
288    }
289
290    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
291        return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
292            @Override
293            public Location execute(Transaction tx) throws IOException {
294                StoredDestination sd = getStoredDestination(destination, tx);
295                Long sequence = sd.messageIdIndex.get(tx, key);
296                if (sequence == null) {
297                    return null;
298                }
299                return sd.orderIndex.get(tx, sequence).location;
300            }
301        });
302    }
303
304    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
305        StoreQueueTask task = null;
306        synchronized (store.asyncTaskMap) {
307            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
308        }
309        return task;
310    }
311
312    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
313        synchronized (store.asyncTaskMap) {
314            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
315        }
316        this.queueExecutor.execute(task);
317    }
318
319    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
320        StoreTopicTask task = null;
321        synchronized (store.asyncTaskMap) {
322            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
323        }
324        return task;
325    }
326
327    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
328        synchronized (store.asyncTaskMap) {
329            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
330        }
331        this.topicExecutor.execute(task);
332    }
333
334    @Override
335    public TransactionStore createTransactionStore() throws IOException {
336        return this.transactionStore;
337    }
338
339    public boolean getForceRecoverIndex() {
340        return this.forceRecoverIndex;
341    }
342
343    public void setForceRecoverIndex(boolean forceRecoverIndex) {
344        this.forceRecoverIndex = forceRecoverIndex;
345    }
346
347    public class KahaDBMessageStore extends AbstractMessageStore {
348        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
349        protected KahaDestination dest;
350        private final int maxAsyncJobs;
351        private final Semaphore localDestinationSemaphore;
352
353        double doneTasks, canceledTasks = 0;
354
355        public KahaDBMessageStore(ActiveMQDestination destination) {
356            super(destination);
357            this.dest = convert(destination);
358            this.maxAsyncJobs = getMaxAsyncJobs();
359            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
360        }
361
362        @Override
363        public ActiveMQDestination getDestination() {
364            return destination;
365        }
366
367        @Override
368        public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
369                throws IOException {
370            if (isConcurrentStoreAndDispatchQueues()) {
371                StoreQueueTask result = new StoreQueueTask(this, context, message);
372                ListenableFuture<Object> future = result.getFuture();
373                message.getMessageId().setFutureOrSequenceLong(future);
374                message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
375                result.aquireLocks();
376                addQueueTask(this, result);
377                if (indexListener != null) {
378                    // allow concurrent dispatch by setting entry locator,
379                    indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
380                }
381                return future;
382            } else {
383                return super.asyncAddQueueMessage(context, message);
384            }
385        }
386
387        @Override
388        public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
389            if (isConcurrentStoreAndDispatchQueues()) {
390                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
391                StoreQueueTask task = null;
392                synchronized (asyncTaskMap) {
393                    task = (StoreQueueTask) asyncTaskMap.get(key);
394                }
395                if (task != null) {
396                    if (ack.isInTransaction() || !task.cancel()) {
397                        try {
398                            task.future.get();
399                        } catch (InterruptedException e) {
400                            throw new InterruptedIOException(e.toString());
401                        } catch (Exception ignored) {
402                            LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
403                        }
404                        removeMessage(context, ack);
405                    } else {
406                        synchronized (asyncTaskMap) {
407                            asyncTaskMap.remove(key);
408                        }
409                    }
410                } else {
411                    removeMessage(context, ack);
412                }
413            } else {
414                removeMessage(context, ack);
415            }
416        }
417
418        @Override
419        public void addMessage(final ConnectionContext context, final Message message) throws IOException {
420            final KahaAddMessageCommand command = new KahaAddMessageCommand();
421            command.setDestination(dest);
422            command.setMessageId(message.getMessageId().toProducerKey());
423            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
424            command.setPriority(message.getPriority());
425            command.setPrioritySupported(isPrioritizedMessages());
426            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
427            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
428            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
429                // sync add? (for async, future present from getFutureOrSequenceLong)
430                Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
431
432                @Override
433                public void sequenceAssignedWithIndexLocked(final long sequence) {
434                    message.getMessageId().setFutureOrSequenceLong(sequence);
435                    if (indexListener != null) {
436                        if (possibleFuture == null) {
437                            trackPendingAdd(dest, sequence);
438                            indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
439                                @Override
440                                public void run() {
441                                    trackPendingAddComplete(dest, sequence);
442                                }
443                            }));
444                        }
445                    }
446                }
447            }, null);
448        }
449
450        @Override
451        public void updateMessage(Message message) throws IOException {
452            if (LOG.isTraceEnabled()) {
453                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
454            }
455            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
456            KahaAddMessageCommand command = new KahaAddMessageCommand();
457            command.setDestination(dest);
458            command.setMessageId(message.getMessageId().toProducerKey());
459            command.setPriority(message.getPriority());
460            command.setPrioritySupported(prioritizedMessages);
461            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
462            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
463            updateMessageCommand.setMessage(command);
464            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
465        }
466
467        @Override
468        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
469            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
470            command.setDestination(dest);
471            command.setMessageId(ack.getLastMessageId().toProducerKey());
472            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
473
474            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
475            command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
476            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
477        }
478
479        @Override
480        public void removeAllMessages(ConnectionContext context) throws IOException {
481            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
482            command.setDestination(dest);
483            store(command, true, null, null);
484        }
485
486        @Override
487        public Message getMessage(MessageId identity) throws IOException {
488            final String key = identity.toProducerKey();
489
490            // Hopefully one day the page file supports concurrent read
491            // operations... but for now we must
492            // externally synchronize...
493            Location location;
494            indexLock.writeLock().lock();
495            try {
496                location = findMessageLocation(key, dest);
497            } finally {
498                indexLock.writeLock().unlock();
499            }
500            if (location == null) {
501                return null;
502            }
503
504            return loadMessage(location);
505        }
506
507        @Override
508        public int getMessageCount() throws IOException {
509            try {
510                lockAsyncJobQueue();
511                indexLock.writeLock().lock();
512                try {
513                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
514                        @Override
515                        public Integer execute(Transaction tx) throws IOException {
516                            // Iterate through all index entries to get a count
517                            // of messages in the destination.
518                            StoredDestination sd = getStoredDestination(dest, tx);
519                            int rc = 0;
520                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
521                                iterator.next();
522                                rc++;
523                            }
524                            return rc;
525                        }
526                    });
527                } finally {
528                    indexLock.writeLock().unlock();
529                }
530            } finally {
531                unlockAsyncJobQueue();
532            }
533        }
534
535        @Override
536        public boolean isEmpty() throws IOException {
537            indexLock.writeLock().lock();
538            try {
539                return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
540                    @Override
541                    public Boolean execute(Transaction tx) throws IOException {
542                        // Iterate through all index entries to get a count of
543                        // messages in the destination.
544                        StoredDestination sd = getStoredDestination(dest, tx);
545                        return sd.locationIndex.isEmpty(tx);
546                    }
547                });
548            } finally {
549                indexLock.writeLock().unlock();
550            }
551        }
552
553        @Override
554        public void recover(final MessageRecoveryListener listener) throws Exception {
555            // recovery may involve expiry which will modify
556            indexLock.writeLock().lock();
557            try {
558                pageFile.tx().execute(new Transaction.Closure<Exception>() {
559                    @Override
560                    public void execute(Transaction tx) throws Exception {
561                        StoredDestination sd = getStoredDestination(dest, tx);
562                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
563                        sd.orderIndex.resetCursorPosition();
564                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
565                                .hasNext(); ) {
566                            Entry<Long, MessageKeys> entry = iterator.next();
567                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
568                                continue;
569                            }
570                            Message msg = loadMessage(entry.getValue().location);
571                            listener.recoverMessage(msg);
572                        }
573                    }
574                });
575            } finally {
576                indexLock.writeLock().unlock();
577            }
578        }
579
580        @Override
581        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
582            indexLock.writeLock().lock();
583            try {
584                pageFile.tx().execute(new Transaction.Closure<Exception>() {
585                    @Override
586                    public void execute(Transaction tx) throws Exception {
587                        StoredDestination sd = getStoredDestination(dest, tx);
588                        Entry<Long, MessageKeys> entry = null;
589                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
590                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
591                            entry = iterator.next();
592                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
593                                continue;
594                            }
595                            Message msg = loadMessage(entry.getValue().location);
596                            msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
597                            listener.recoverMessage(msg);
598                            counter++;
599                            if (counter >= maxReturned) {
600                                break;
601                            }
602                        }
603                        sd.orderIndex.stoppedIterating();
604                    }
605                });
606            } finally {
607                indexLock.writeLock().unlock();
608            }
609        }
610
611        protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
612            int counter = 0;
613            String id;
614            for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) {
615                id = iterator.next();
616                iterator.remove();
617                Long sequence = sd.messageIdIndex.get(tx, id);
618                if (sequence != null) {
619                    if (sd.orderIndex.alreadyDispatched(sequence)) {
620                        listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location));
621                        counter++;
622                        if (counter >= maxReturned) {
623                            break;
624                        }
625                    } else {
626                        LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
627                    }
628                } else {
629                    LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd);
630                }
631            }
632            return counter;
633        }
634
635
636        @Override
637        public void resetBatching() {
638            if (pageFile.isLoaded()) {
639                indexLock.writeLock().lock();
640                try {
641                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
642                        @Override
643                        public void execute(Transaction tx) throws Exception {
644                            StoredDestination sd = getExistingStoredDestination(dest, tx);
645                            if (sd != null) {
646                                sd.orderIndex.resetCursorPosition();}
647                            }
648                        });
649                } catch (Exception e) {
650                    LOG.error("Failed to reset batching",e);
651                } finally {
652                    indexLock.writeLock().unlock();
653                }
654            }
655        }
656
657        @Override
658        public void setBatch(final MessageId identity) throws IOException {
659            indexLock.writeLock().lock();
660            try {
661                pageFile.tx().execute(new Transaction.Closure<IOException>() {
662                    @Override
663                    public void execute(Transaction tx) throws IOException {
664                        StoredDestination sd = getStoredDestination(dest, tx);
665                        Long location = (Long) identity.getFutureOrSequenceLong();
666                        Long pending = sd.orderIndex.minPendingAdd();
667                        if (pending != null) {
668                            location = Math.min(location, pending-1);
669                        }
670                        sd.orderIndex.setBatch(tx, location);
671                    }
672                });
673            } finally {
674                indexLock.writeLock().unlock();
675            }
676        }
677
678        @Override
679        public void setMemoryUsage(MemoryUsage memoryUsage) {
680        }
681        @Override
682        public void start() throws Exception {
683            super.start();
684        }
685        @Override
686        public void stop() throws Exception {
687            super.stop();
688        }
689
690        protected void lockAsyncJobQueue() {
691            try {
692                if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
693                    throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
694                }
695            } catch (Exception e) {
696                LOG.error("Failed to lock async jobs for " + this.destination, e);
697            }
698        }
699
700        protected void unlockAsyncJobQueue() {
701            this.localDestinationSemaphore.release(this.maxAsyncJobs);
702        }
703
704        protected void acquireLocalAsyncLock() {
705            try {
706                this.localDestinationSemaphore.acquire();
707            } catch (InterruptedException e) {
708                LOG.error("Failed to aquire async lock for " + this.destination, e);
709            }
710        }
711
712        protected void releaseLocalAsyncLock() {
713            this.localDestinationSemaphore.release();
714        }
715
716        @Override
717        public String toString(){
718            return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
719        }
720    }
721
722    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
723        private final AtomicInteger subscriptionCount = new AtomicInteger();
724        public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
725            super(destination);
726            this.subscriptionCount.set(getAllSubscriptions().length);
727            if (isConcurrentStoreAndDispatchTopics()) {
728                asyncTopicMaps.add(asyncTaskMap);
729            }
730        }
731
732        @Override
733        public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
734                throws IOException {
735            if (isConcurrentStoreAndDispatchTopics()) {
736                StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
737                result.aquireLocks();
738                addTopicTask(this, result);
739                return result.getFuture();
740            } else {
741                return super.asyncAddTopicMessage(context, message);
742            }
743        }
744
745        @Override
746        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
747                                MessageId messageId, MessageAck ack) throws IOException {
748            String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
749            if (isConcurrentStoreAndDispatchTopics()) {
750                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
751                StoreTopicTask task = null;
752                synchronized (asyncTaskMap) {
753                    task = (StoreTopicTask) asyncTaskMap.get(key);
754                }
755                if (task != null) {
756                    if (task.addSubscriptionKey(subscriptionKey)) {
757                        removeTopicTask(this, messageId);
758                        if (task.cancel()) {
759                            synchronized (asyncTaskMap) {
760                                asyncTaskMap.remove(key);
761                            }
762                        }
763                    }
764                } else {
765                    doAcknowledge(context, subscriptionKey, messageId, ack);
766                }
767            } else {
768                doAcknowledge(context, subscriptionKey, messageId, ack);
769            }
770        }
771
772        protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
773                throws IOException {
774            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
775            command.setDestination(dest);
776            command.setSubscriptionKey(subscriptionKey);
777            command.setMessageId(messageId.toProducerKey());
778            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
779            if (ack != null && ack.isUnmatchedAck()) {
780                command.setAck(UNMATCHED);
781            } else {
782                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
783                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
784            }
785            store(command, false, null, null);
786        }
787
788        @Override
789        public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
790            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
791                    .getSubscriptionName());
792            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
793            command.setDestination(dest);
794            command.setSubscriptionKey(subscriptionKey.toString());
795            command.setRetroactive(retroactive);
796            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
797            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
798            store(command, isEnableJournalDiskSyncs() && true, null, null);
799            this.subscriptionCount.incrementAndGet();
800        }
801
802        @Override
803        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
804            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
805            command.setDestination(dest);
806            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString());
807            store(command, isEnableJournalDiskSyncs() && true, null, null);
808            this.subscriptionCount.decrementAndGet();
809        }
810
811        @Override
812        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
813
814            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
815            indexLock.writeLock().lock();
816            try {
817                pageFile.tx().execute(new Transaction.Closure<IOException>() {
818                    @Override
819                    public void execute(Transaction tx) throws IOException {
820                        StoredDestination sd = getStoredDestination(dest, tx);
821                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
822                                .hasNext();) {
823                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
824                            SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
825                                    .getValue().getSubscriptionInfo().newInput()));
826                            subscriptions.add(info);
827
828                        }
829                    }
830                });
831            } finally {
832                indexLock.writeLock().unlock();
833            }
834
835            SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
836            subscriptions.toArray(rc);
837            return rc;
838        }
839
840        @Override
841        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
842            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
843            indexLock.writeLock().lock();
844            try {
845                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
846                    @Override
847                    public SubscriptionInfo execute(Transaction tx) throws IOException {
848                        StoredDestination sd = getStoredDestination(dest, tx);
849                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
850                        if (command == null) {
851                            return null;
852                        }
853                        return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
854                                .getSubscriptionInfo().newInput()));
855                    }
856                });
857            } finally {
858                indexLock.writeLock().unlock();
859            }
860        }
861
862        @Override
863        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
864            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
865            indexLock.writeLock().lock();
866            try {
867                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
868                    @Override
869                    public Integer execute(Transaction tx) throws IOException {
870                        StoredDestination sd = getStoredDestination(dest, tx);
871                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
872                        if (cursorPos == null) {
873                            // The subscription might not exist.
874                            return 0;
875                        }
876
877                        return (int) getStoredMessageCount(tx, sd, subscriptionKey);
878                    }
879                });
880            } finally {
881                indexLock.writeLock().unlock();
882            }
883        }
884
885        @Override
886        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
887                throws Exception {
888            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
889            @SuppressWarnings("unused")
890            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
891            indexLock.writeLock().lock();
892            try {
893                pageFile.tx().execute(new Transaction.Closure<Exception>() {
894                    @Override
895                    public void execute(Transaction tx) throws Exception {
896                        StoredDestination sd = getStoredDestination(dest, tx);
897                        LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
898                        sd.orderIndex.setBatch(tx, cursorPos);
899                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
900                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
901                                .hasNext();) {
902                            Entry<Long, MessageKeys> entry = iterator.next();
903                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
904                                continue;
905                            }
906                            listener.recoverMessage(loadMessage(entry.getValue().location));
907                        }
908                        sd.orderIndex.resetCursorPosition();
909                    }
910                });
911            } finally {
912                indexLock.writeLock().unlock();
913            }
914        }
915
916        @Override
917        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
918                final MessageRecoveryListener listener) throws Exception {
919            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
920            @SuppressWarnings("unused")
921            final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
922            indexLock.writeLock().lock();
923            try {
924                pageFile.tx().execute(new Transaction.Closure<Exception>() {
925                    @Override
926                    public void execute(Transaction tx) throws Exception {
927                        StoredDestination sd = getStoredDestination(dest, tx);
928                        sd.orderIndex.resetCursorPosition();
929                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
930                        if (moc == null) {
931                            LastAck pos = getLastAck(tx, sd, subscriptionKey);
932                            if (pos == null) {
933                                // sub deleted
934                                return;
935                            }
936                            sd.orderIndex.setBatch(tx, pos);
937                            moc = sd.orderIndex.cursor;
938                        } else {
939                            sd.orderIndex.cursor.sync(moc);
940                        }
941
942                        Entry<Long, MessageKeys> entry = null;
943                        int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener);
944                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
945                                .hasNext();) {
946                            entry = iterator.next();
947                            if (ackedAndPrepared.contains(entry.getValue().messageId)) {
948                                continue;
949                            }
950                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
951                                counter++;
952                            }
953                            if (counter >= maxReturned || listener.hasSpace() == false) {
954                                break;
955                            }
956                        }
957                        sd.orderIndex.stoppedIterating();
958                        if (entry != null) {
959                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
960                            sd.subscriptionCursors.put(subscriptionKey, copy);
961                        }
962                    }
963                });
964            } finally {
965                indexLock.writeLock().unlock();
966            }
967        }
968
969        @Override
970        public void resetBatching(String clientId, String subscriptionName) {
971            try {
972                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
973                indexLock.writeLock().lock();
974                try {
975                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
976                        @Override
977                        public void execute(Transaction tx) throws IOException {
978                            StoredDestination sd = getStoredDestination(dest, tx);
979                            sd.subscriptionCursors.remove(subscriptionKey);
980                        }
981                    });
982                }finally {
983                    indexLock.writeLock().unlock();
984                }
985            } catch (IOException e) {
986                throw new RuntimeException(e);
987            }
988        }
989    }
990
991    String subscriptionKey(String clientId, String subscriptionName) {
992        return clientId + ":" + subscriptionName;
993    }
994
995    @Override
996    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
997        return this.transactionStore.proxy(new KahaDBMessageStore(destination));
998    }
999
1000    @Override
1001    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
1002        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
1003    }
1004
1005    /**
1006     * Cleanup method to remove any state associated with the given destination.
1007     * This method does not stop the message store (it might not be cached).
1008     *
1009     * @param destination
1010     *            Destination to forget
1011     */
1012    @Override
1013    public void removeQueueMessageStore(ActiveMQQueue destination) {
1014    }
1015
1016    /**
1017     * Cleanup method to remove any state associated with the given destination
1018     * This method does not stop the message store (it might not be cached).
1019     *
1020     * @param destination
1021     *            Destination to forget
1022     */
1023    @Override
1024    public void removeTopicMessageStore(ActiveMQTopic destination) {
1025    }
1026
1027    @Override
1028    public void deleteAllMessages() throws IOException {
1029        deleteAllMessages = true;
1030    }
1031
1032    @Override
1033    public Set<ActiveMQDestination> getDestinations() {
1034        try {
1035            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
1036            indexLock.writeLock().lock();
1037            try {
1038                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1039                    @Override
1040                    public void execute(Transaction tx) throws IOException {
1041                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
1042                                .hasNext();) {
1043                            Entry<String, StoredDestination> entry = iterator.next();
1044                            if (!isEmptyTopic(entry, tx)) {
1045                                rc.add(convert(entry.getKey()));
1046                            }
1047                        }
1048                    }
1049
1050                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
1051                            throws IOException {
1052                        boolean isEmptyTopic = false;
1053                        ActiveMQDestination dest = convert(entry.getKey());
1054                        if (dest.isTopic()) {
1055                            StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
1056                            if (loadedStore.subscriptionAcks.isEmpty(tx)) {
1057                                isEmptyTopic = true;
1058                            }
1059                        }
1060                        return isEmptyTopic;
1061                    }
1062                });
1063            }finally {
1064                indexLock.writeLock().unlock();
1065            }
1066            return rc;
1067        } catch (IOException e) {
1068            throw new RuntimeException(e);
1069        }
1070    }
1071
1072    @Override
1073    public long getLastMessageBrokerSequenceId() throws IOException {
1074        return 0;
1075    }
1076
1077    @Override
1078    public long getLastProducerSequenceId(ProducerId id) {
1079        indexLock.readLock().lock();
1080        try {
1081            return metadata.producerSequenceIdTracker.getLastSeqId(id);
1082        } finally {
1083            indexLock.readLock().unlock();
1084        }
1085    }
1086
1087    @Override
1088    public long size() {
1089        try {
1090            return journalSize.get() + getPageFile().getDiskSize();
1091        } catch (IOException e) {
1092            throw new RuntimeException(e);
1093        }
1094    }
1095
1096    @Override
1097    public void beginTransaction(ConnectionContext context) throws IOException {
1098        throw new IOException("Not yet implemented.");
1099    }
1100    @Override
1101    public void commitTransaction(ConnectionContext context) throws IOException {
1102        throw new IOException("Not yet implemented.");
1103    }
1104    @Override
1105    public void rollbackTransaction(ConnectionContext context) throws IOException {
1106        throw new IOException("Not yet implemented.");
1107    }
1108
1109    @Override
1110    public void checkpoint(boolean sync) throws IOException {
1111        super.checkpointCleanup(sync);
1112    }
1113
1114    // /////////////////////////////////////////////////////////////////
1115    // Internal helper methods.
1116    // /////////////////////////////////////////////////////////////////
1117
1118    /**
1119     * @param location
1120     * @return
1121     * @throws IOException
1122     */
1123    Message loadMessage(Location location) throws IOException {
1124        JournalCommand<?> command = load(location);
1125        KahaAddMessageCommand addMessage = null;
1126        switch (command.type()) {
1127            case KAHA_UPDATE_MESSAGE_COMMAND:
1128                addMessage = ((KahaUpdateMessageCommand)command).getMessage();
1129                break;
1130            default:
1131                addMessage = (KahaAddMessageCommand) command;
1132        }
1133        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
1134        return msg;
1135    }
1136
1137    // /////////////////////////////////////////////////////////////////
1138    // Internal conversion methods.
1139    // /////////////////////////////////////////////////////////////////
1140
1141    KahaLocation convert(Location location) {
1142        KahaLocation rc = new KahaLocation();
1143        rc.setLogId(location.getDataFileId());
1144        rc.setOffset(location.getOffset());
1145        return rc;
1146    }
1147
1148    KahaDestination convert(ActiveMQDestination dest) {
1149        KahaDestination rc = new KahaDestination();
1150        rc.setName(dest.getPhysicalName());
1151        switch (dest.getDestinationType()) {
1152        case ActiveMQDestination.QUEUE_TYPE:
1153            rc.setType(DestinationType.QUEUE);
1154            return rc;
1155        case ActiveMQDestination.TOPIC_TYPE:
1156            rc.setType(DestinationType.TOPIC);
1157            return rc;
1158        case ActiveMQDestination.TEMP_QUEUE_TYPE:
1159            rc.setType(DestinationType.TEMP_QUEUE);
1160            return rc;
1161        case ActiveMQDestination.TEMP_TOPIC_TYPE:
1162            rc.setType(DestinationType.TEMP_TOPIC);
1163            return rc;
1164        default:
1165            return null;
1166        }
1167    }
1168
1169    ActiveMQDestination convert(String dest) {
1170        int p = dest.indexOf(":");
1171        if (p < 0) {
1172            throw new IllegalArgumentException("Not in the valid destination format");
1173        }
1174        int type = Integer.parseInt(dest.substring(0, p));
1175        String name = dest.substring(p + 1);
1176        return convert(type, name);
1177    }
1178
1179    private ActiveMQDestination convert(KahaDestination commandDestination) {
1180        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
1181    }
1182
1183    private ActiveMQDestination convert(int type, String name) {
1184        switch (KahaDestination.DestinationType.valueOf(type)) {
1185        case QUEUE:
1186            return new ActiveMQQueue(name);
1187        case TOPIC:
1188            return new ActiveMQTopic(name);
1189        case TEMP_QUEUE:
1190            return new ActiveMQTempQueue(name);
1191        case TEMP_TOPIC:
1192            return new ActiveMQTempTopic(name);
1193        default:
1194            throw new IllegalArgumentException("Not in the valid destination format");
1195        }
1196    }
1197
1198    public TransactionIdTransformer getTransactionIdTransformer() {
1199        return transactionIdTransformer;
1200    }
1201
1202    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
1203        this.transactionIdTransformer = transactionIdTransformer;
1204    }
1205
1206    static class AsyncJobKey {
1207        MessageId id;
1208        ActiveMQDestination destination;
1209
1210        AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1211            this.id = id;
1212            this.destination = destination;
1213        }
1214
1215        @Override
1216        public boolean equals(Object obj) {
1217            if (obj == this) {
1218                return true;
1219            }
1220            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1221                    && destination.equals(((AsyncJobKey) obj).destination);
1222        }
1223
1224        @Override
1225        public int hashCode() {
1226            return id.hashCode() + destination.hashCode();
1227        }
1228
1229        @Override
1230        public String toString() {
1231            return destination.getPhysicalName() + "-" + id;
1232        }
1233    }
1234
1235    public interface StoreTask {
1236        public boolean cancel();
1237
1238        public void aquireLocks();
1239
1240        public void releaseLocks();
1241    }
1242
1243    class StoreQueueTask implements Runnable, StoreTask {
1244        protected final Message message;
1245        protected final ConnectionContext context;
1246        protected final KahaDBMessageStore store;
1247        protected final InnerFutureTask future;
1248        protected final AtomicBoolean done = new AtomicBoolean();
1249        protected final AtomicBoolean locked = new AtomicBoolean();
1250
1251        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1252            this.store = store;
1253            this.context = context;
1254            this.message = message;
1255            this.future = new InnerFutureTask(this);
1256        }
1257
1258        public ListenableFuture<Object> getFuture() {
1259            return this.future;
1260        }
1261
1262        @Override
1263        public boolean cancel() {
1264            if (this.done.compareAndSet(false, true)) {
1265                return this.future.cancel(false);
1266            }
1267            return false;
1268        }
1269
1270        @Override
1271        public void aquireLocks() {
1272            if (this.locked.compareAndSet(false, true)) {
1273                try {
1274                    globalQueueSemaphore.acquire();
1275                    store.acquireLocalAsyncLock();
1276                    message.incrementReferenceCount();
1277                } catch (InterruptedException e) {
1278                    LOG.warn("Failed to aquire lock", e);
1279                }
1280            }
1281
1282        }
1283
1284        @Override
1285        public void releaseLocks() {
1286            if (this.locked.compareAndSet(true, false)) {
1287                store.releaseLocalAsyncLock();
1288                globalQueueSemaphore.release();
1289                message.decrementReferenceCount();
1290            }
1291        }
1292
1293        @Override
1294        public void run() {
1295            this.store.doneTasks++;
1296            try {
1297                if (this.done.compareAndSet(false, true)) {
1298                    this.store.addMessage(context, message);
1299                    removeQueueTask(this.store, this.message.getMessageId());
1300                    this.future.complete();
1301                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1302                    System.err.println(this.store.dest.getName() + " cancelled: "
1303                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1304                    this.store.canceledTasks = this.store.doneTasks = 0;
1305                }
1306            } catch (Exception e) {
1307                this.future.setException(e);
1308            }
1309        }
1310
1311        protected Message getMessage() {
1312            return this.message;
1313        }
1314
1315        private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object>  {
1316
1317            private Runnable listener;
1318            public InnerFutureTask(Runnable runnable) {
1319                super(runnable, null);
1320
1321            }
1322
1323            public void setException(final Exception e) {
1324                super.setException(e);
1325            }
1326
1327            public void complete() {
1328                super.set(null);
1329            }
1330
1331            @Override
1332            public void done() {
1333                fireListener();
1334            }
1335
1336            @Override
1337            public void addListener(Runnable listener) {
1338                this.listener = listener;
1339                if (isDone()) {
1340                    fireListener();
1341                }
1342            }
1343
1344            private void fireListener() {
1345                if (listener != null) {
1346                    try {
1347                        listener.run();
1348                    } catch (Exception ignored) {
1349                        LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
1350                    }
1351                }
1352            }
1353        }
1354    }
1355
1356    class StoreTopicTask extends StoreQueueTask {
1357        private final int subscriptionCount;
1358        private final List<String> subscriptionKeys = new ArrayList<String>(1);
1359        private final KahaDBTopicMessageStore topicStore;
1360        public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1361                int subscriptionCount) {
1362            super(store, context, message);
1363            this.topicStore = store;
1364            this.subscriptionCount = subscriptionCount;
1365
1366        }
1367
1368        @Override
1369        public void aquireLocks() {
1370            if (this.locked.compareAndSet(false, true)) {
1371                try {
1372                    globalTopicSemaphore.acquire();
1373                    store.acquireLocalAsyncLock();
1374                    message.incrementReferenceCount();
1375                } catch (InterruptedException e) {
1376                    LOG.warn("Failed to aquire lock", e);
1377                }
1378            }
1379        }
1380
1381        @Override
1382        public void releaseLocks() {
1383            if (this.locked.compareAndSet(true, false)) {
1384                message.decrementReferenceCount();
1385                store.releaseLocalAsyncLock();
1386                globalTopicSemaphore.release();
1387            }
1388        }
1389
1390        /**
1391         * add a key
1392         *
1393         * @param key
1394         * @return true if all acknowledgements received
1395         */
1396        public boolean addSubscriptionKey(String key) {
1397            synchronized (this.subscriptionKeys) {
1398                this.subscriptionKeys.add(key);
1399            }
1400            return this.subscriptionKeys.size() >= this.subscriptionCount;
1401        }
1402
1403        @Override
1404        public void run() {
1405            this.store.doneTasks++;
1406            try {
1407                if (this.done.compareAndSet(false, true)) {
1408                    this.topicStore.addMessage(context, message);
1409                    // apply any acks we have
1410                    synchronized (this.subscriptionKeys) {
1411                        for (String key : this.subscriptionKeys) {
1412                            this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1413
1414                        }
1415                    }
1416                    removeTopicTask(this.topicStore, this.message.getMessageId());
1417                    this.future.complete();
1418                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1419                    System.err.println(this.store.dest.getName() + " cancelled: "
1420                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
1421                    this.store.canceledTasks = this.store.doneTasks = 0;
1422                }
1423            } catch (Exception e) {
1424                this.future.setException(e);
1425            }
1426        }
1427    }
1428
1429    public class StoreTaskExecutor extends ThreadPoolExecutor {
1430
1431        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
1432            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
1433        }
1434
1435        @Override
1436        protected void afterExecute(Runnable runnable, Throwable throwable) {
1437            super.afterExecute(runnable, throwable);
1438
1439            if (runnable instanceof StoreTask) {
1440               ((StoreTask)runnable).releaseLocks();
1441            }
1442        }
1443    }
1444
1445    @Override
1446    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
1447        return new JobSchedulerStoreImpl();
1448    }
1449}