001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.File;
020import java.io.FileFilter;
021import java.io.IOException;
022import java.nio.charset.Charset;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.CopyOnWriteArrayList;
030
031import javax.transaction.xa.Xid;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Lockable;
037import org.apache.activemq.broker.LockableServiceSupport;
038import org.apache.activemq.broker.Locker;
039import org.apache.activemq.broker.scheduler.JobSchedulerStore;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.apache.activemq.command.LocalTransactionId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.command.TransactionId;
046import org.apache.activemq.command.XATransactionId;
047import org.apache.activemq.filter.AnyDestination;
048import org.apache.activemq.filter.DestinationMap;
049import org.apache.activemq.filter.DestinationMapEntry;
050import org.apache.activemq.store.MessageStore;
051import org.apache.activemq.store.PersistenceAdapter;
052import org.apache.activemq.store.SharedFileLocker;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.store.TransactionIdTransformer;
055import org.apache.activemq.store.TransactionIdTransformerAware;
056import org.apache.activemq.store.TransactionStore;
057import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
058import org.apache.activemq.usage.SystemUsage;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IOHelper;
061import org.apache.activemq.util.IntrospectionSupport;
062import org.apache.activemq.util.ServiceStopper;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
068 * distribution of destinations across multiple kahaDB persistence adapters
069 *
070 * @org.apache.xbean.XBean element="mKahaDB"
071 */
072public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, BrokerServiceAware {
073    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
074
075    final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
076    final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
077
078    final class DelegateDestinationMap extends DestinationMap {
079        @Override
080        public void setEntries(List<DestinationMapEntry>  entries) {
081            super.setEntries(entries);
082        }
083    };
084    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
085
086    BrokerService brokerService;
087    List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>();
088    private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
089
090    MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
091
092    // all local store transactions are XA, 2pc if more than one adapter involved
093    TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
094        @Override
095        public TransactionId transform(TransactionId txid) {
096            if (txid == null) {
097                return null;
098            }
099            if (txid.isLocalTransaction()) {
100                final LocalTransactionId t = (LocalTransactionId) txid;
101                return new XATransactionId(new Xid() {
102                    @Override
103                    public int getFormatId() {
104                        return LOCAL_FORMAT_ID_MAGIC;
105                    }
106
107                    @Override
108                    public byte[] getGlobalTransactionId() {
109                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
110                    }
111
112                    @Override
113                    public byte[] getBranchQualifier() {
114                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
115                    }
116                });
117            } else {
118                return txid;
119            }
120        }
121    };
122
123    /**
124     * Sets the  FilteredKahaDBPersistenceAdapter entries
125     *
126     * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
127     */
128    @SuppressWarnings({ "rawtypes", "unchecked" })
129    public void setFilteredPersistenceAdapters(List entries) {
130        for (Object entry : entries) {
131            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
132            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
133            if (filteredAdapter.getDestination() == null) {
134                filteredAdapter.setDestination(matchAll);
135            }
136
137            if (filteredAdapter.isPerDestination()) {
138                configureDirectory(adapter, null);
139                // per destination adapters will be created on demand or during recovery
140                continue;
141            } else {
142                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
143            }
144
145            configureAdapter(adapter);
146            adapters.add(adapter);
147        }
148        destinationMap.setEntries(entries);
149    }
150
151    private String nameFromDestinationFilter(ActiveMQDestination destination) {
152        if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
153            LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
154                     "potential problem with recovery can result from name truncation.");
155        }
156
157        return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
158    }
159
160    public boolean isLocalXid(TransactionId xid) {
161        return xid instanceof XATransactionId &&
162                ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
163    }
164
165    @Override
166    public void beginTransaction(ConnectionContext context) throws IOException {
167        throw new IllegalStateException();
168    }
169
170    @Override
171    public void checkpoint(final boolean sync) throws IOException {
172        for (PersistenceAdapter persistenceAdapter : adapters) {
173            persistenceAdapter.checkpoint(sync);
174        }
175    }
176
177    @Override
178    public void commitTransaction(ConnectionContext context) throws IOException {
179        throw new IllegalStateException();
180    }
181
182    @Override
183    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
184        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
185        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
186    }
187
188    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
189        Object result = destinationMap.chooseValue(destination);
190        if (result == null) {
191            throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
192        }
193        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
194        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
195            filteredAdapter = addAdapter(filteredAdapter, destination);
196            if (LOG.isTraceEnabled()) {
197                LOG.info("created per destination adapter for: " + destination  + ", " + result);
198            }
199        }
200        startAdapter(filteredAdapter.getPersistenceAdapter(), destination.getQualifiedName());
201        LOG.debug("destination {} matched persistence adapter {}", new Object[]{destination.getQualifiedName(), filteredAdapter.getPersistenceAdapter()});
202        return filteredAdapter.getPersistenceAdapter();
203    }
204
205    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
206        try {
207            kahaDBPersistenceAdapter.start();
208        } catch (Exception e) {
209            RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
210            LOG.error(detail.toString(), e);
211            throw detail;
212        }
213    }
214
215    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
216        try {
217            kahaDBPersistenceAdapter.stop();
218        } catch (Exception e) {
219            RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
220            LOG.error(detail.toString(), e);
221            throw detail;
222        }
223    }
224
225    @Override
226    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
227        PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
228        return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
229    }
230
231    @Override
232    public TransactionStore createTransactionStore() throws IOException {
233        return transactionStore;
234    }
235
236    @Override
237    public void deleteAllMessages() throws IOException {
238        for (PersistenceAdapter persistenceAdapter : adapters) {
239            persistenceAdapter.deleteAllMessages();
240        }
241        transactionStore.deleteAllMessages();
242        IOHelper.deleteChildren(getDirectory());
243    }
244
245    @Override
246    public Set<ActiveMQDestination> getDestinations() {
247        Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
248        for (PersistenceAdapter persistenceAdapter : adapters) {
249            results.addAll(persistenceAdapter.getDestinations());
250        }
251        return results;
252    }
253
254    @Override
255    public long getLastMessageBrokerSequenceId() throws IOException {
256        long maxId = -1;
257        for (PersistenceAdapter persistenceAdapter : adapters) {
258            maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
259        }
260        return maxId;
261    }
262
263    @Override
264    public long getLastProducerSequenceId(ProducerId id) throws IOException {
265        long maxId = -1;
266        for (PersistenceAdapter persistenceAdapter : adapters) {
267            maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
268        }
269        return maxId;
270    }
271
272    @Override
273    public void removeQueueMessageStore(ActiveMQQueue destination) {
274        PersistenceAdapter adapter = null;
275        try {
276            adapter = getMatchingPersistenceAdapter(destination);
277        } catch (IOException e) {
278            throw new RuntimeException(e);
279        }
280        if (adapter instanceof PersistenceAdapter) {
281            adapter.removeQueueMessageStore(destination);
282            removeMessageStore(adapter, destination);
283            destinationMap.removeAll(destination);
284        }
285    }
286
287    @Override
288    public void removeTopicMessageStore(ActiveMQTopic destination) {
289        PersistenceAdapter adapter = null;
290        try {
291            adapter = getMatchingPersistenceAdapter(destination);
292        } catch (IOException e) {
293            throw new RuntimeException(e);
294        }
295        if (adapter instanceof PersistenceAdapter) {
296            adapter.removeTopicMessageStore(destination);
297            removeMessageStore(adapter, destination);
298            destinationMap.removeAll(destination);
299        }
300    }
301
302    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
303        if (adapter.getDestinations().isEmpty()) {
304            stopAdapter(adapter, destination.toString());
305            File adapterDir = adapter.getDirectory();
306            if (adapterDir != null) {
307                if (IOHelper.deleteFile(adapterDir)) {
308                    if (LOG.isTraceEnabled()) {
309                        LOG.info("deleted per destination adapter directory for: " + destination);
310                    }
311                } else {
312                    if (LOG.isTraceEnabled()) {
313                        LOG.info("failed to deleted per destination adapter directory for: " + destination);
314                    }
315                }
316            }
317        }
318    }
319
320    @Override
321    public void rollbackTransaction(ConnectionContext context) throws IOException {
322        throw new IllegalStateException();
323    }
324
325    @Override
326    public void setBrokerName(String brokerName) {
327        for (PersistenceAdapter persistenceAdapter : adapters) {
328            persistenceAdapter.setBrokerName(brokerName);
329        }
330    }
331
332    @Override
333    public void setUsageManager(SystemUsage usageManager) {
334        for (PersistenceAdapter persistenceAdapter : adapters) {
335            persistenceAdapter.setUsageManager(usageManager);
336        }
337    }
338
339    @Override
340    public long size() {
341        long size = 0;
342        for (PersistenceAdapter persistenceAdapter : adapters) {
343            size += persistenceAdapter.size();
344        }
345        return size;
346    }
347
348    @Override
349    public void doStart() throws Exception {
350        Object result = destinationMap.chooseValue(matchAll);
351        if (result != null) {
352            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
353            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
354                findAndRegisterExistingAdapters(filteredAdapter);
355            }
356        }
357        for (PersistenceAdapter persistenceAdapter : adapters) {
358            persistenceAdapter.start();
359        }
360    }
361
362    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
363        FileFilter destinationNames = new FileFilter() {
364            @Override
365            public boolean accept(File file) {
366                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
367            }
368        };
369        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
370        if (candidates != null) {
371            for (File candidate : candidates) {
372                registerExistingAdapter(template, candidate);
373            }
374        }
375    }
376
377    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
378        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
379        startAdapter(adapter, candidate.getName());
380        Set<ActiveMQDestination> destinations = adapter.getDestinations();
381        if (destinations.size() != 0) {
382            registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
383        } else {
384            stopAdapter(adapter, candidate.getName());
385        }
386    }
387
388    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
389        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
390        return registerAdapter(adapter, destination);
391    }
392
393    private PersistenceAdapter adapterFromTemplate(PersistenceAdapter template, String destinationName) throws IOException {
394        PersistenceAdapter adapter = kahaDBFromTemplate(template);
395        configureAdapter(adapter);
396        configureDirectory(adapter, destinationName);
397        return adapter;
398    }
399
400    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
401        File directory = null;
402        File defaultDir = MessageDatabase.DEFAULT_DIRECTORY;
403        try {
404            defaultDir = adapter.getClass().newInstance().getDirectory();
405        } catch (Exception e) {
406        }
407        if (defaultDir.equals(adapter.getDirectory())) {
408            // not set so inherit from mkahadb
409            directory = getDirectory();
410        } else {
411            directory = adapter.getDirectory();
412        }
413
414        if (fileName != null) {
415            directory = new File(directory, fileName);
416        }
417        adapter.setDirectory(directory);
418    }
419
420    private FilteredKahaDBPersistenceAdapter registerAdapter(PersistenceAdapter adapter, ActiveMQDestination destination) {
421        adapters.add(adapter);
422        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
423        destinationMap.put(destination, result);
424        return result;
425    }
426
427    private void configureAdapter(PersistenceAdapter adapter) {
428        // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
429        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
430        if (isUseLock()) {
431            if( adapter instanceof Lockable ) {
432                ((Lockable)adapter).setUseLock(false);
433            }
434        }
435        if( adapter instanceof BrokerServiceAware ) {
436            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
437        }
438    }
439
440    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
441        try {
442            Map<String, Object> configuration = new HashMap<String, Object>();
443            IntrospectionSupport.getProperties(template, configuration, null);
444            PersistenceAdapter adapter = template.getClass().newInstance();
445            IntrospectionSupport.setProperties(adapter, configuration);
446            return adapter;
447        } catch (Exception e) {
448            throw IOExceptionSupport.create(e);
449        }
450    }
451
452    @Override
453    protected void doStop(ServiceStopper stopper) throws Exception {
454        for (PersistenceAdapter persistenceAdapter : adapters) {
455            stopper.stop(persistenceAdapter);
456        }
457    }
458
459    @Override
460    public File getDirectory() {
461        return this.directory;
462    }
463
464    @Override
465    public void setDirectory(File directory) {
466        this.directory = directory;
467    }
468
469    @Override
470    public void init() throws Exception {
471    }
472
473    @Override
474    public void setBrokerService(BrokerService brokerService) {
475        this.brokerService = brokerService;
476        for (PersistenceAdapter persistenceAdapter : adapters) {
477            if( persistenceAdapter instanceof BrokerServiceAware ) {
478                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
479            }
480        }
481    }
482
483    @Override
484    public BrokerService getBrokerService() {
485        return brokerService;
486    }
487
488    public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
489        this.transactionStore = transactionStore;
490    }
491
492    /**
493     * Set the max file length of the transaction journal
494     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
495     * be used
496     *
497     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
498     */
499    public void setJournalMaxFileLength(int maxFileLength) {
500        transactionStore.setJournalMaxFileLength(maxFileLength);
501    }
502
503    public int getJournalMaxFileLength() {
504        return transactionStore.getJournalMaxFileLength();
505    }
506
507    /**
508     * Set the max write batch size of  the transaction journal
509     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
510     * be used
511     *
512     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
513     */
514    public void setJournalWriteBatchSize(int journalWriteBatchSize) {
515        transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
516    }
517
518    public int getJournalWriteBatchSize() {
519        return transactionStore.getJournalMaxWriteBatchSize();
520    }
521
522    public List<PersistenceAdapter> getAdapters() {
523        return Collections.unmodifiableList(adapters);
524    }
525
526    @Override
527    public String toString() {
528        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
529        return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
530    }
531
532    @Override
533    public Locker createDefaultLocker() throws IOException {
534        SharedFileLocker locker = new SharedFileLocker();
535        locker.configure(this);
536        return locker;
537    }
538
539    @Override
540    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
541        return new JobSchedulerStoreImpl();
542    }
543}