001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.UnknownHostException;
027import java.security.Provider;
028import java.security.Security;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050
051import javax.annotation.PostConstruct;
052import javax.annotation.PreDestroy;
053import javax.management.MalformedObjectNameException;
054import javax.management.ObjectName;
055
056import org.apache.activemq.ActiveMQConnectionMetaData;
057import org.apache.activemq.ConfigurationException;
058import org.apache.activemq.Service;
059import org.apache.activemq.advisory.AdvisoryBroker;
060import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
061import org.apache.activemq.broker.jmx.AnnotatedMBean;
062import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
063import org.apache.activemq.broker.jmx.BrokerView;
064import org.apache.activemq.broker.jmx.ConnectorView;
065import org.apache.activemq.broker.jmx.ConnectorViewMBean;
066import org.apache.activemq.broker.jmx.HealthView;
067import org.apache.activemq.broker.jmx.HealthViewMBean;
068import org.apache.activemq.broker.jmx.JmsConnectorView;
069import org.apache.activemq.broker.jmx.JobSchedulerView;
070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
071import org.apache.activemq.broker.jmx.Log4JConfigView;
072import org.apache.activemq.broker.jmx.ManagedRegionBroker;
073import org.apache.activemq.broker.jmx.ManagementContext;
074import org.apache.activemq.broker.jmx.NetworkConnectorView;
075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
076import org.apache.activemq.broker.jmx.ProxyConnectorView;
077import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
078import org.apache.activemq.broker.region.Destination;
079import org.apache.activemq.broker.region.DestinationFactory;
080import org.apache.activemq.broker.region.DestinationFactoryImpl;
081import org.apache.activemq.broker.region.DestinationInterceptor;
082import org.apache.activemq.broker.region.RegionBroker;
083import org.apache.activemq.broker.region.policy.PolicyMap;
084import org.apache.activemq.broker.region.virtual.MirroredQueue;
085import org.apache.activemq.broker.region.virtual.VirtualDestination;
086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
087import org.apache.activemq.broker.region.virtual.VirtualTopic;
088import org.apache.activemq.broker.scheduler.JobSchedulerStore;
089import org.apache.activemq.broker.scheduler.SchedulerBroker;
090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
091import org.apache.activemq.command.ActiveMQDestination;
092import org.apache.activemq.command.ActiveMQQueue;
093import org.apache.activemq.command.BrokerId;
094import org.apache.activemq.command.ProducerInfo;
095import org.apache.activemq.filter.DestinationFilter;
096import org.apache.activemq.network.ConnectionFilter;
097import org.apache.activemq.network.DiscoveryNetworkConnector;
098import org.apache.activemq.network.NetworkConnector;
099import org.apache.activemq.network.jms.JmsConnector;
100import org.apache.activemq.openwire.OpenWireFormat;
101import org.apache.activemq.proxy.ProxyConnector;
102import org.apache.activemq.security.MessageAuthorizationPolicy;
103import org.apache.activemq.selector.SelectorParser;
104import org.apache.activemq.store.JournaledStore;
105import org.apache.activemq.store.PListStore;
106import org.apache.activemq.store.PersistenceAdapter;
107import org.apache.activemq.store.PersistenceAdapterFactory;
108import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
109import org.apache.activemq.thread.Scheduler;
110import org.apache.activemq.thread.TaskRunnerFactory;
111import org.apache.activemq.transport.TransportFactorySupport;
112import org.apache.activemq.transport.TransportServer;
113import org.apache.activemq.transport.vm.VMTransportFactory;
114import org.apache.activemq.usage.StoreUsage;
115import org.apache.activemq.usage.SystemUsage;
116import org.apache.activemq.usage.Usage;
117import org.apache.activemq.util.BrokerSupport;
118import org.apache.activemq.util.DefaultIOExceptionHandler;
119import org.apache.activemq.util.IOExceptionHandler;
120import org.apache.activemq.util.IOExceptionSupport;
121import org.apache.activemq.util.IOHelper;
122import org.apache.activemq.util.InetAddressUtil;
123import org.apache.activemq.util.ServiceStopper;
124import org.apache.activemq.util.ThreadPoolUtils;
125import org.apache.activemq.util.TimeUtils;
126import org.apache.activemq.util.URISupport;
127import org.slf4j.Logger;
128import org.slf4j.LoggerFactory;
129import org.slf4j.MDC;
130
131/**
132 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
133 * number of transport connectors, network connectors and a bunch of properties
134 * which can be used to configure the broker as its lazily created.
135 *
136 * @org.apache.xbean.XBean
137 */
138public class BrokerService implements Service {
139    public static final String DEFAULT_PORT = "61616";
140    public static final String LOCAL_HOST_NAME;
141    public static final String BROKER_VERSION;
142    public static final String DEFAULT_BROKER_NAME = "localhost";
143    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
144    public static final long DEFAULT_START_TIMEOUT = 600000L;
145
146    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
147
148    @SuppressWarnings("unused")
149    private static final long serialVersionUID = 7353129142305630237L;
150
151    private boolean useJmx = true;
152    private boolean enableStatistics = true;
153    private boolean persistent = true;
154    private boolean populateJMSXUserID;
155    private boolean useAuthenticatedPrincipalForJMSXUserID;
156    private boolean populateUserNameInMBeans;
157    private long mbeanInvocationTimeout = 0;
158
159    private boolean useShutdownHook = true;
160    private boolean useLoggingForShutdownErrors;
161    private boolean shutdownOnMasterFailure;
162    private boolean shutdownOnSlaveFailure;
163    private boolean waitForSlave;
164    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
165    private boolean passiveSlave;
166    private String brokerName = DEFAULT_BROKER_NAME;
167    private File dataDirectoryFile;
168    private File tmpDataDirectory;
169    private Broker broker;
170    private BrokerView adminView;
171    private ManagementContext managementContext;
172    private ObjectName brokerObjectName;
173    private TaskRunnerFactory taskRunnerFactory;
174    private TaskRunnerFactory persistenceTaskRunnerFactory;
175    private SystemUsage systemUsage;
176    private SystemUsage producerSystemUsage;
177    private SystemUsage consumerSystemUsaage;
178    private PersistenceAdapter persistenceAdapter;
179    private PersistenceAdapterFactory persistenceFactory;
180    protected DestinationFactory destinationFactory;
181    private MessageAuthorizationPolicy messageAuthorizationPolicy;
182    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
183    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
184    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
185    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
186    private final List<Service> services = new ArrayList<Service>();
187    private transient Thread shutdownHook;
188    private String[] transportConnectorURIs;
189    private String[] networkConnectorURIs;
190    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
191    // to other jms messaging systems
192    private boolean deleteAllMessagesOnStartup;
193    private boolean advisorySupport = true;
194    private URI vmConnectorURI;
195    private String defaultSocketURIString;
196    private PolicyMap destinationPolicy;
197    private final AtomicBoolean started = new AtomicBoolean(false);
198    private final AtomicBoolean stopped = new AtomicBoolean(false);
199    private final AtomicBoolean stopping = new AtomicBoolean(false);
200    private BrokerPlugin[] plugins;
201    private boolean keepDurableSubsActive = true;
202    private boolean useVirtualTopics = true;
203    private boolean useMirroredQueues = false;
204    private boolean useTempMirroredQueues = true;
205    private BrokerId brokerId;
206    private volatile DestinationInterceptor[] destinationInterceptors;
207    private ActiveMQDestination[] destinations;
208    private PListStore tempDataStore;
209    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
210    private boolean useLocalHostBrokerName;
211    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
212    private final CountDownLatch startedLatch = new CountDownLatch(1);
213    private Broker regionBroker;
214    private int producerSystemUsagePortion = 60;
215    private int consumerSystemUsagePortion = 40;
216    private boolean splitSystemUsageForProducersConsumers;
217    private boolean monitorConnectionSplits = false;
218    private int taskRunnerPriority = Thread.NORM_PRIORITY;
219    private boolean dedicatedTaskRunner;
220    private boolean cacheTempDestinations = false;// useful for failover
221    private int timeBeforePurgeTempDestinations = 5000;
222    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
223    private boolean systemExitOnShutdown;
224    private int systemExitOnShutdownExitCode;
225    private SslContext sslContext;
226    private boolean forceStart = false;
227    private IOExceptionHandler ioExceptionHandler;
228    private boolean schedulerSupport = false;
229    private File schedulerDirectoryFile;
230    private Scheduler scheduler;
231    private ThreadPoolExecutor executor;
232    private int schedulePeriodForDestinationPurge= 0;
233    private int maxPurgedDestinationsPerSweep = 0;
234    private boolean adjustUsageLimits = true;
235    private BrokerContext brokerContext;
236    private boolean networkConnectorStartAsync = false;
237    private boolean allowTempAutoCreationOnSend;
238    private JobSchedulerStore jobSchedulerStore;
239    private final AtomicLong totalConnections = new AtomicLong();
240    private final AtomicInteger currentConnections = new AtomicInteger();
241
242    private long offlineDurableSubscriberTimeout = -1;
243    private long offlineDurableSubscriberTaskSchedule = 300000;
244    private DestinationFilter virtualConsumerDestinationFilter;
245
246    private final Object persistenceAdapterLock = new Object();
247    private Throwable startException = null;
248    private boolean startAsync = false;
249    private Date startDate;
250    private boolean slave = true;
251
252    private boolean restartAllowed = true;
253    private boolean restartRequested = false;
254    private boolean rejectDurableConsumers = false;
255    private boolean rollbackOnlyOnAsyncException = true;
256
257    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
258
259    static {
260
261        try {
262            ClassLoader loader = BrokerService.class.getClassLoader();
263            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
264            Provider bouncycastle = (Provider) clazz.newInstance();
265            Security.insertProviderAt(bouncycastle, 2);
266            LOG.info("Loaded the Bouncy Castle security provider.");
267        } catch(Throwable e) {
268            // No BouncyCastle found so we use the default Java Security Provider
269        }
270
271        String localHostName = "localhost";
272        try {
273            localHostName =  InetAddressUtil.getLocalHostName();
274        } catch (UnknownHostException e) {
275            LOG.error("Failed to resolve localhost");
276        }
277        LOCAL_HOST_NAME = localHostName;
278
279        InputStream in = null;
280        String version = null;
281        if ((in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
282            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
283            try {
284                version = reader.readLine();
285            } catch(Exception e) {
286            }
287        }
288        BROKER_VERSION = version;
289    }
290
291    @Override
292    public String toString() {
293        return "BrokerService[" + getBrokerName() + "]";
294    }
295
296    private String getBrokerVersion() {
297        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
298        if (version == null) {
299            version = BROKER_VERSION;
300        }
301
302        return version;
303    }
304
305    /**
306     * Adds a new transport connector for the given bind address
307     *
308     * @return the newly created and added transport connector
309     * @throws Exception
310     */
311    public TransportConnector addConnector(String bindAddress) throws Exception {
312        return addConnector(new URI(bindAddress));
313    }
314
315    /**
316     * Adds a new transport connector for the given bind address
317     *
318     * @return the newly created and added transport connector
319     * @throws Exception
320     */
321    public TransportConnector addConnector(URI bindAddress) throws Exception {
322        return addConnector(createTransportConnector(bindAddress));
323    }
324
325    /**
326     * Adds a new transport connector for the given TransportServer transport
327     *
328     * @return the newly created and added transport connector
329     * @throws Exception
330     */
331    public TransportConnector addConnector(TransportServer transport) throws Exception {
332        return addConnector(new TransportConnector(transport));
333    }
334
335    /**
336     * Adds a new transport connector
337     *
338     * @return the transport connector
339     * @throws Exception
340     */
341    public TransportConnector addConnector(TransportConnector connector) throws Exception {
342        transportConnectors.add(connector);
343        return connector;
344    }
345
346    /**
347     * Stops and removes a transport connector from the broker.
348     *
349     * @param connector
350     * @return true if the connector has been previously added to the broker
351     * @throws Exception
352     */
353    public boolean removeConnector(TransportConnector connector) throws Exception {
354        boolean rc = transportConnectors.remove(connector);
355        if (rc) {
356            unregisterConnectorMBean(connector);
357        }
358        return rc;
359    }
360
361    /**
362     * Adds a new network connector using the given discovery address
363     *
364     * @return the newly created and added network connector
365     * @throws Exception
366     */
367    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
368        return addNetworkConnector(new URI(discoveryAddress));
369    }
370
371    /**
372     * Adds a new proxy connector using the given bind address
373     *
374     * @return the newly created and added network connector
375     * @throws Exception
376     */
377    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
378        return addProxyConnector(new URI(bindAddress));
379    }
380
381    /**
382     * Adds a new network connector using the given discovery address
383     *
384     * @return the newly created and added network connector
385     * @throws Exception
386     */
387    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
388        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
389        return addNetworkConnector(connector);
390    }
391
392    /**
393     * Adds a new proxy connector using the given bind address
394     *
395     * @return the newly created and added network connector
396     * @throws Exception
397     */
398    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
399        ProxyConnector connector = new ProxyConnector();
400        connector.setBind(bindAddress);
401        connector.setRemote(new URI("fanout:multicast://default"));
402        return addProxyConnector(connector);
403    }
404
405    /**
406     * Adds a new network connector to connect this broker to a federated
407     * network
408     */
409    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
410        connector.setBrokerService(this);
411        connector.setLocalUri(getVmConnectorURI());
412        // Set a connection filter so that the connector does not establish loop
413        // back connections.
414        connector.setConnectionFilter(new ConnectionFilter() {
415            @Override
416            public boolean connectTo(URI location) {
417                List<TransportConnector> transportConnectors = getTransportConnectors();
418                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
419                    try {
420                        TransportConnector tc = iter.next();
421                        if (location.equals(tc.getConnectUri())) {
422                            return false;
423                        }
424                    } catch (Throwable e) {
425                    }
426                }
427                return true;
428            }
429        });
430        networkConnectors.add(connector);
431        return connector;
432    }
433
434    /**
435     * Removes the given network connector without stopping it. The caller
436     * should call {@link NetworkConnector#stop()} to close the connector
437     */
438    public boolean removeNetworkConnector(NetworkConnector connector) {
439        boolean answer = networkConnectors.remove(connector);
440        if (answer) {
441            unregisterNetworkConnectorMBean(connector);
442        }
443        return answer;
444    }
445
446    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
447        URI uri = getVmConnectorURI();
448        connector.setLocalUri(uri);
449        proxyConnectors.add(connector);
450        if (isUseJmx()) {
451            registerProxyConnectorMBean(connector);
452        }
453        return connector;
454    }
455
456    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
457        connector.setBrokerService(this);
458        jmsConnectors.add(connector);
459        if (isUseJmx()) {
460            registerJmsConnectorMBean(connector);
461        }
462        return connector;
463    }
464
465    public JmsConnector removeJmsConnector(JmsConnector connector) {
466        if (jmsConnectors.remove(connector)) {
467            return connector;
468        }
469        return null;
470    }
471
472    public void masterFailed() {
473        if (shutdownOnMasterFailure) {
474            LOG.error("The Master has failed ... shutting down");
475            try {
476                stop();
477            } catch (Exception e) {
478                LOG.error("Failed to stop for master failure", e);
479            }
480        } else {
481            LOG.warn("Master Failed - starting all connectors");
482            try {
483                startAllConnectors();
484                broker.nowMasterBroker();
485            } catch (Exception e) {
486                LOG.error("Failed to startAllConnectors", e);
487            }
488        }
489    }
490
491    public String getUptime() {
492        long delta = getUptimeMillis();
493
494        if (delta == 0) {
495            return "not started";
496        }
497
498        return TimeUtils.printDuration(delta);
499    }
500
501    public long getUptimeMillis() {
502        if (startDate == null) {
503            return 0;
504        }
505
506        return new Date().getTime() - startDate.getTime();
507    }
508
509    public boolean isStarted() {
510        return started.get() && startedLatch.getCount() == 0;
511    }
512
513    /**
514     * Forces a start of the broker.
515     * By default a BrokerService instance that was
516     * previously stopped using BrokerService.stop() cannot be restarted
517     * using BrokerService.start().
518     * This method enforces a restart.
519     * It is not recommended to force a restart of the broker and will not work
520     * for most but some very trivial broker configurations.
521     * For restarting a broker instance we recommend to first call stop() on
522     * the old instance and then recreate a new BrokerService instance.
523     *
524     * @param force - if true enforces a restart.
525     * @throws Exception
526     */
527    public void start(boolean force) throws Exception {
528        forceStart = force;
529        stopped.set(false);
530        started.set(false);
531        start();
532    }
533
534    // Service interface
535    // -------------------------------------------------------------------------
536
537    protected boolean shouldAutostart() {
538        return true;
539    }
540
541    /**
542     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
543     *
544     * delegates to autoStart, done to prevent backwards incompatible signature change
545     */
546    @PostConstruct
547    private void postConstruct() {
548        try {
549            autoStart();
550        } catch (Exception ex) {
551            throw new RuntimeException(ex);
552        }
553    }
554
555    /**
556     *
557     * @throws Exception
558     * @org. apache.xbean.InitMethod
559     */
560    public void autoStart() throws Exception {
561        if(shouldAutostart()) {
562            start();
563        }
564    }
565
566    @Override
567    public void start() throws Exception {
568        if (stopped.get() || !started.compareAndSet(false, true)) {
569            // lets just ignore redundant start() calls
570            // as its way too easy to not be completely sure if start() has been
571            // called or not with the gazillion of different configuration
572            // mechanisms
573            // throw new IllegalStateException("Already started.");
574            return;
575        }
576
577        setStartException(null);
578        stopping.set(false);
579        startDate = new Date();
580        MDC.put("activemq.broker", brokerName);
581
582        try {
583            checkMemorySystemUsageLimits();
584            if (systemExitOnShutdown && useShutdownHook) {
585                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
586            }
587            processHelperProperties();
588            if (isUseJmx()) {
589                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
590                // we cannot cleanup clear that during shutdown of the broker.
591                MDC.remove("activemq.broker");
592                try {
593                    startManagementContext();
594                    for (NetworkConnector connector : getNetworkConnectors()) {
595                        registerNetworkConnectorMBean(connector);
596                    }
597                } finally {
598                    MDC.put("activemq.broker", brokerName);
599                }
600            }
601
602            // in jvm master slave, lets not publish over existing broker till we get the lock
603            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
604            if (brokerRegistry.lookup(getBrokerName()) == null) {
605                brokerRegistry.bind(getBrokerName(), BrokerService.this);
606            }
607            startPersistenceAdapter(startAsync);
608            startBroker(startAsync);
609            brokerRegistry.bind(getBrokerName(), BrokerService.this);
610        } catch (Exception e) {
611            LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
612            try {
613                if (!stopped.get()) {
614                    stop();
615                }
616            } catch (Exception ex) {
617                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
618            }
619            throw e;
620        } finally {
621            MDC.remove("activemq.broker");
622        }
623    }
624
625    private void startPersistenceAdapter(boolean async) throws Exception {
626        if (async) {
627            new Thread("Persistence Adapter Starting Thread") {
628                @Override
629                public void run() {
630                    try {
631                        doStartPersistenceAdapter();
632                    } catch (Throwable e) {
633                        setStartException(e);
634                    } finally {
635                        synchronized (persistenceAdapterLock) {
636                            persistenceAdapterLock.notifyAll();
637                        }
638                    }
639                }
640            }.start();
641        } else {
642            doStartPersistenceAdapter();
643        }
644    }
645
646    private void doStartPersistenceAdapter() throws Exception {
647        PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter();
648        if (persistenceAdapterToStart == null) {
649            checkStartException();
650            throw new ConfigurationException("Cannot start null persistence adapter");
651        }
652        persistenceAdapterToStart.setUsageManager(getProducerSystemUsage());
653        persistenceAdapterToStart.setBrokerName(getBrokerName());
654        LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart);
655        if (deleteAllMessagesOnStartup) {
656            deleteAllMessages();
657        }
658        persistenceAdapterToStart.start();
659
660        getTempDataStore();
661        if (tempDataStore != null) {
662            try {
663                // start after we have the store lock
664                tempDataStore.start();
665            } catch (Exception e) {
666                RuntimeException exception = new RuntimeException(
667                        "Failed to start temp data store: " + tempDataStore, e);
668                LOG.error(exception.getLocalizedMessage(), e);
669                throw exception;
670            }
671        }
672
673        getJobSchedulerStore();
674        if (jobSchedulerStore != null) {
675            try {
676                jobSchedulerStore.start();
677            } catch (Exception e) {
678                RuntimeException exception = new RuntimeException(
679                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
680                LOG.error(exception.getLocalizedMessage(), e);
681                throw exception;
682            }
683        }
684    }
685
686    private void startBroker(boolean async) throws Exception {
687        if (async) {
688            new Thread("Broker Starting Thread") {
689                @Override
690                public void run() {
691                    try {
692                        synchronized (persistenceAdapterLock) {
693                            persistenceAdapterLock.wait();
694                        }
695                        doStartBroker();
696                    } catch (Throwable t) {
697                        setStartException(t);
698                    }
699                }
700            }.start();
701        } else {
702            doStartBroker();
703        }
704    }
705
706    private void doStartBroker() throws Exception {
707        checkStartException();
708        startDestinations();
709        addShutdownHook();
710
711        broker = getBroker();
712        brokerId = broker.getBrokerId();
713
714        // need to log this after creating the broker so we have its id and name
715        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
716        broker.start();
717
718        if (isUseJmx()) {
719            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
720                // try to restart management context
721                // typical for slaves that use the same ports as master
722                managementContext.stop();
723                startManagementContext();
724            }
725            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
726            managedBroker.setContextBroker(broker);
727            adminView.setBroker(managedBroker);
728        }
729
730        if (ioExceptionHandler == null) {
731            setIoExceptionHandler(new DefaultIOExceptionHandler());
732        }
733
734        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
735            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
736            Log4JConfigView log4jConfigView = new Log4JConfigView();
737            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
738        }
739
740        startAllConnectors();
741
742        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
743        LOG.info("For help or more information please see: http://activemq.apache.org");
744
745        getBroker().brokerServiceStarted();
746        checkStoreSystemUsageLimits();
747        startedLatch.countDown();
748        getBroker().nowMasterBroker();
749    }
750
751    /**
752     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
753     *
754     * delegates to stop, done to prevent backwards incompatible signature change
755     */
756    @PreDestroy
757    private void preDestroy () {
758        try {
759            stop();
760        } catch (Exception ex) {
761            throw new RuntimeException();
762        }
763    }
764
765    /**
766     *
767     * @throws Exception
768     * @org.apache .xbean.DestroyMethod
769     */
770    @Override
771    public void stop() throws Exception {
772        if (!stopping.compareAndSet(false, true)) {
773            LOG.trace("Broker already stopping/stopped");
774            return;
775        }
776
777        setStartException(new BrokerStoppedException("Stop invoked"));
778        MDC.put("activemq.broker", brokerName);
779
780        if (systemExitOnShutdown) {
781            new Thread() {
782                @Override
783                public void run() {
784                    System.exit(systemExitOnShutdownExitCode);
785                }
786            }.start();
787        }
788
789        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
790
791        removeShutdownHook();
792        if (this.scheduler != null) {
793            this.scheduler.stop();
794            this.scheduler = null;
795        }
796        ServiceStopper stopper = new ServiceStopper();
797        if (services != null) {
798            for (Service service : services) {
799                stopper.stop(service);
800            }
801        }
802        stopAllConnectors(stopper);
803        this.slave = true;
804        // remove any VMTransports connected
805        // this has to be done after services are stopped,
806        // to avoid timing issue with discovery (spinning up a new instance)
807        BrokerRegistry.getInstance().unbind(getBrokerName());
808        VMTransportFactory.stopped(getBrokerName());
809        if (broker != null) {
810            stopper.stop(broker);
811            broker = null;
812        }
813
814        if (jobSchedulerStore != null) {
815            jobSchedulerStore.stop();
816            jobSchedulerStore = null;
817        }
818        if (tempDataStore != null) {
819            tempDataStore.stop();
820            tempDataStore = null;
821        }
822        try {
823            stopper.stop(getPersistenceAdapter());
824            persistenceAdapter = null;
825            if (isUseJmx()) {
826                stopper.stop(managementContext);
827                managementContext = null;
828            }
829            // Clear SelectorParser cache to free memory
830            SelectorParser.clearCache();
831        } finally {
832            started.set(false);
833            stopped.set(true);
834            stoppedLatch.countDown();
835        }
836
837        if (this.taskRunnerFactory != null) {
838            this.taskRunnerFactory.shutdown();
839            this.taskRunnerFactory = null;
840        }
841        if (this.executor != null) {
842            ThreadPoolUtils.shutdownNow(executor);
843            this.executor = null;
844        }
845
846        this.destinationInterceptors = null;
847        this.destinationFactory = null;
848
849        if (startDate != null) {
850            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
851        }
852        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
853
854        synchronized (shutdownHooks) {
855            for (Runnable hook : shutdownHooks) {
856                try {
857                    hook.run();
858                } catch (Throwable e) {
859                    stopper.onException(hook, e);
860                }
861            }
862        }
863
864        MDC.remove("activemq.broker");
865
866        // and clear start date
867        startDate = null;
868
869        stopper.throwFirstException();
870    }
871
872    public boolean checkQueueSize(String queueName) {
873        long count = 0;
874        long queueSize = 0;
875        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
876        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
877            if (entry.getKey().isQueue()) {
878                if (entry.getValue().getName().matches(queueName)) {
879                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
880                    count += queueSize;
881                    if (queueSize > 0) {
882                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
883                    }
884                }
885            }
886        }
887        return count == 0;
888    }
889
890    /**
891     * This method (both connectorName and queueName are using regex to match)
892     * 1. stop the connector (supposed the user input the connector which the
893     * clients connect to) 2. to check whether there is any pending message on
894     * the queues defined by queueName 3. supposedly, after stop the connector,
895     * client should failover to other broker and pending messages should be
896     * forwarded. if no pending messages, the method finally call stop to stop
897     * the broker.
898     *
899     * @param connectorName
900     * @param queueName
901     * @param timeout
902     * @param pollInterval
903     * @throws Exception
904     */
905    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
906        if (isUseJmx()) {
907            if (connectorName == null || queueName == null || timeout <= 0) {
908                throw new Exception(
909                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
910            }
911            if (pollInterval <= 0) {
912                pollInterval = 30;
913            }
914            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
915                    connectorName, queueName, timeout, pollInterval
916            });
917            TransportConnector connector;
918            for (int i = 0; i < transportConnectors.size(); i++) {
919                connector = transportConnectors.get(i);
920                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
921                    connector.stop();
922                }
923            }
924            long start = System.currentTimeMillis();
925            while (System.currentTimeMillis() - start < timeout * 1000) {
926                // check quesize until it gets zero
927                if (checkQueueSize(queueName)) {
928                    stop();
929                    break;
930                } else {
931                    Thread.sleep(pollInterval * 1000);
932                }
933            }
934            if (stopped.get()) {
935                LOG.info("Successfully stop the broker.");
936            } else {
937                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
938            }
939        }
940    }
941
942    /**
943     * A helper method to block the caller thread until the broker has been
944     * stopped
945     */
946    public void waitUntilStopped() {
947        while (isStarted() && !stopped.get()) {
948            try {
949                stoppedLatch.await();
950            } catch (InterruptedException e) {
951                // ignore
952            }
953        }
954    }
955
956    public boolean isStopped() {
957        return stopped.get();
958    }
959
960    /**
961     * A helper method to block the caller thread until the broker has fully started
962     * @return boolean true if wait succeeded false if broker was not started or was stopped
963     */
964    public boolean waitUntilStarted() {
965        return waitUntilStarted(DEFAULT_START_TIMEOUT);
966    }
967
968    /**
969     * A helper method to block the caller thread until the broker has fully started
970     *
971     * @param timeout
972     *        the amount of time to wait before giving up and returning false.
973     *
974     * @return boolean true if wait succeeded false if broker was not started or was stopped
975     */
976    public boolean waitUntilStarted(long timeout) {
977        boolean waitSucceeded = isStarted();
978        long expiration = Math.max(0, timeout + System.currentTimeMillis());
979        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
980            try {
981                if (getStartException() != null) {
982                    return waitSucceeded;
983                }
984                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
985            } catch (InterruptedException ignore) {
986            }
987        }
988        return waitSucceeded;
989    }
990
991    // Properties
992    // -------------------------------------------------------------------------
993    /**
994     * Returns the message broker
995     */
996    public Broker getBroker() throws Exception {
997        if (broker == null) {
998            checkStartException();
999            broker = createBroker();
1000        }
1001        return broker;
1002    }
1003
1004    /**
1005     * Returns the administration view of the broker; used to create and destroy
1006     * resources such as queues and topics. Note this method returns null if JMX
1007     * is disabled.
1008     */
1009    public BrokerView getAdminView() throws Exception {
1010        if (adminView == null) {
1011            // force lazy creation
1012            getBroker();
1013        }
1014        return adminView;
1015    }
1016
1017    public void setAdminView(BrokerView adminView) {
1018        this.adminView = adminView;
1019    }
1020
1021    public String getBrokerName() {
1022        return brokerName;
1023    }
1024
1025    /**
1026     * Sets the name of this broker; which must be unique in the network
1027     *
1028     * @param brokerName
1029     */
1030    private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]";
1031    public void setBrokerName(String brokerName) {
1032        if (brokerName == null) {
1033            throw new NullPointerException("The broker name cannot be null");
1034        }
1035        String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_");
1036        if (!str.equals(brokerName)) {
1037            LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str);
1038        }
1039        this.brokerName = str.trim();
1040    }
1041
1042    public PersistenceAdapterFactory getPersistenceFactory() {
1043        return persistenceFactory;
1044    }
1045
1046    public File getDataDirectoryFile() {
1047        if (dataDirectoryFile == null) {
1048            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
1049        }
1050        return dataDirectoryFile;
1051    }
1052
1053    public File getBrokerDataDirectory() {
1054        String brokerDir = getBrokerName();
1055        return new File(getDataDirectoryFile(), brokerDir);
1056    }
1057
1058    /**
1059     * Sets the directory in which the data files will be stored by default for
1060     * the JDBC and Journal persistence adaptors.
1061     *
1062     * @param dataDirectory
1063     *            the directory to store data files
1064     */
1065    public void setDataDirectory(String dataDirectory) {
1066        setDataDirectoryFile(new File(dataDirectory));
1067    }
1068
1069    /**
1070     * Sets the directory in which the data files will be stored by default for
1071     * the JDBC and Journal persistence adaptors.
1072     *
1073     * @param dataDirectoryFile
1074     *            the directory to store data files
1075     */
1076    public void setDataDirectoryFile(File dataDirectoryFile) {
1077        this.dataDirectoryFile = dataDirectoryFile;
1078    }
1079
1080    /**
1081     * @return the tmpDataDirectory
1082     */
1083    public File getTmpDataDirectory() {
1084        if (tmpDataDirectory == null) {
1085            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1086        }
1087        return tmpDataDirectory;
1088    }
1089
1090    /**
1091     * @param tmpDataDirectory
1092     *            the tmpDataDirectory to set
1093     */
1094    public void setTmpDataDirectory(File tmpDataDirectory) {
1095        this.tmpDataDirectory = tmpDataDirectory;
1096    }
1097
1098    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1099        this.persistenceFactory = persistenceFactory;
1100    }
1101
1102    public void setDestinationFactory(DestinationFactory destinationFactory) {
1103        this.destinationFactory = destinationFactory;
1104    }
1105
1106    public boolean isPersistent() {
1107        return persistent;
1108    }
1109
1110    /**
1111     * Sets whether or not persistence is enabled or disabled.
1112     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1113     */
1114    public void setPersistent(boolean persistent) {
1115        this.persistent = persistent;
1116    }
1117
1118    public boolean isPopulateJMSXUserID() {
1119        return populateJMSXUserID;
1120    }
1121
1122    /**
1123     * Sets whether or not the broker should populate the JMSXUserID header.
1124     */
1125    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1126        this.populateJMSXUserID = populateJMSXUserID;
1127    }
1128
1129    public SystemUsage getSystemUsage() {
1130        try {
1131            if (systemUsage == null) {
1132
1133                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1134                systemUsage.setExecutor(getExecutor());
1135                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
1136                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1137                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1138                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1139                addService(this.systemUsage);
1140            }
1141            return systemUsage;
1142        } catch (IOException e) {
1143            LOG.error("Cannot create SystemUsage", e);
1144            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
1145        }
1146    }
1147
1148    public void setSystemUsage(SystemUsage memoryManager) {
1149        if (this.systemUsage != null) {
1150            removeService(this.systemUsage);
1151        }
1152        this.systemUsage = memoryManager;
1153        if (this.systemUsage.getExecutor()==null) {
1154            this.systemUsage.setExecutor(getExecutor());
1155        }
1156        addService(this.systemUsage);
1157    }
1158
1159    /**
1160     * @return the consumerUsageManager
1161     * @throws IOException
1162     */
1163    public SystemUsage getConsumerSystemUsage() throws IOException {
1164        if (this.consumerSystemUsaage == null) {
1165            if (splitSystemUsageForProducersConsumers) {
1166                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1167                float portion = consumerSystemUsagePortion / 100f;
1168                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1169                addService(this.consumerSystemUsaage);
1170            } else {
1171                consumerSystemUsaage = getSystemUsage();
1172            }
1173        }
1174        return this.consumerSystemUsaage;
1175    }
1176
1177    /**
1178     * @param consumerSystemUsaage
1179     *            the storeSystemUsage to set
1180     */
1181    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1182        if (this.consumerSystemUsaage != null) {
1183            removeService(this.consumerSystemUsaage);
1184        }
1185        this.consumerSystemUsaage = consumerSystemUsaage;
1186        addService(this.consumerSystemUsaage);
1187    }
1188
1189    /**
1190     * @return the producerUsageManager
1191     * @throws IOException
1192     */
1193    public SystemUsage getProducerSystemUsage() throws IOException {
1194        if (producerSystemUsage == null) {
1195            if (splitSystemUsageForProducersConsumers) {
1196                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1197                float portion = producerSystemUsagePortion / 100f;
1198                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1199                addService(producerSystemUsage);
1200            } else {
1201                producerSystemUsage = getSystemUsage();
1202            }
1203        }
1204        return producerSystemUsage;
1205    }
1206
1207    /**
1208     * @param producerUsageManager
1209     *            the producerUsageManager to set
1210     */
1211    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1212        if (this.producerSystemUsage != null) {
1213            removeService(this.producerSystemUsage);
1214        }
1215        this.producerSystemUsage = producerUsageManager;
1216        addService(this.producerSystemUsage);
1217    }
1218
1219    public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
1220        if (persistenceAdapter == null && !hasStartException()) {
1221            persistenceAdapter = createPersistenceAdapter();
1222            configureService(persistenceAdapter);
1223            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1224        }
1225        return persistenceAdapter;
1226    }
1227
1228    /**
1229     * Sets the persistence adaptor implementation to use for this broker
1230     *
1231     * @throws IOException
1232     */
1233    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1234        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1235            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
1236            return;
1237        }
1238        this.persistenceAdapter = persistenceAdapter;
1239        configureService(this.persistenceAdapter);
1240        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1241    }
1242
1243    public TaskRunnerFactory getTaskRunnerFactory() {
1244        if (this.taskRunnerFactory == null) {
1245            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1246                    isDedicatedTaskRunner());
1247            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
1248        }
1249        return this.taskRunnerFactory;
1250    }
1251
1252    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1253        this.taskRunnerFactory = taskRunnerFactory;
1254    }
1255
1256    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1257        if (taskRunnerFactory == null) {
1258            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1259                    true, 1000, isDedicatedTaskRunner());
1260        }
1261        return persistenceTaskRunnerFactory;
1262    }
1263
1264    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1265        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1266    }
1267
1268    public boolean isUseJmx() {
1269        return useJmx;
1270    }
1271
1272    public boolean isEnableStatistics() {
1273        return enableStatistics;
1274    }
1275
1276    /**
1277     * Sets whether or not the Broker's services enable statistics or not.
1278     */
1279    public void setEnableStatistics(boolean enableStatistics) {
1280        this.enableStatistics = enableStatistics;
1281    }
1282
1283    /**
1284     * Sets whether or not the Broker's services should be exposed into JMX or
1285     * not.
1286     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1287     */
1288    public void setUseJmx(boolean useJmx) {
1289        this.useJmx = useJmx;
1290    }
1291
1292    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1293        if (brokerObjectName == null) {
1294            brokerObjectName = createBrokerObjectName();
1295        }
1296        return brokerObjectName;
1297    }
1298
1299    /**
1300     * Sets the JMX ObjectName for this broker
1301     */
1302    public void setBrokerObjectName(ObjectName brokerObjectName) {
1303        this.brokerObjectName = brokerObjectName;
1304    }
1305
1306    public ManagementContext getManagementContext() {
1307        if (managementContext == null) {
1308            checkStartException();
1309            managementContext = new ManagementContext();
1310        }
1311        return managementContext;
1312    }
1313
1314    synchronized private void checkStartException() {
1315        if (startException != null) {
1316            throw new BrokerStoppedException(startException);
1317        }
1318    }
1319
1320    synchronized private boolean hasStartException() {
1321        return startException != null;
1322    }
1323
1324    synchronized private void setStartException(Throwable t) {
1325        startException = t;
1326    }
1327
1328    public void setManagementContext(ManagementContext managementContext) {
1329        this.managementContext = managementContext;
1330    }
1331
1332    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1333        for (NetworkConnector connector : networkConnectors) {
1334            if (connector.getName().equals(connectorName)) {
1335                return connector;
1336            }
1337        }
1338        return null;
1339    }
1340
1341    public String[] getNetworkConnectorURIs() {
1342        return networkConnectorURIs;
1343    }
1344
1345    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1346        this.networkConnectorURIs = networkConnectorURIs;
1347    }
1348
1349    public TransportConnector getConnectorByName(String connectorName) {
1350        for (TransportConnector connector : transportConnectors) {
1351            if (connector.getName().equals(connectorName)) {
1352                return connector;
1353            }
1354        }
1355        return null;
1356    }
1357
1358    public Map<String, String> getTransportConnectorURIsAsMap() {
1359        Map<String, String> answer = new HashMap<String, String>();
1360        for (TransportConnector connector : transportConnectors) {
1361            try {
1362                URI uri = connector.getConnectUri();
1363                if (uri != null) {
1364                    String scheme = uri.getScheme();
1365                    if (scheme != null) {
1366                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1367                    }
1368                }
1369            } catch (Exception e) {
1370                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1371            }
1372        }
1373        return answer;
1374    }
1375
1376    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
1377        ProducerBrokerExchange result = null;
1378
1379        for (TransportConnector connector : transportConnectors) {
1380            for (TransportConnection tc: connector.getConnections()){
1381                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
1382                if (result !=null){
1383                    return result;
1384                }
1385            }
1386        }
1387        return result;
1388    }
1389
1390    public String[] getTransportConnectorURIs() {
1391        return transportConnectorURIs;
1392    }
1393
1394    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1395        this.transportConnectorURIs = transportConnectorURIs;
1396    }
1397
1398    /**
1399     * @return Returns the jmsBridgeConnectors.
1400     */
1401    public JmsConnector[] getJmsBridgeConnectors() {
1402        return jmsBridgeConnectors;
1403    }
1404
1405    /**
1406     * @param jmsConnectors
1407     *            The jmsBridgeConnectors to set.
1408     */
1409    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1410        this.jmsBridgeConnectors = jmsConnectors;
1411    }
1412
1413    public Service[] getServices() {
1414        return services.toArray(new Service[0]);
1415    }
1416
1417    /**
1418     * Sets the services associated with this broker.
1419     */
1420    public void setServices(Service[] services) {
1421        this.services.clear();
1422        if (services != null) {
1423            for (int i = 0; i < services.length; i++) {
1424                this.services.add(services[i]);
1425            }
1426        }
1427    }
1428
1429    /**
1430     * Adds a new service so that it will be started as part of the broker
1431     * lifecycle
1432     */
1433    public void addService(Service service) {
1434        services.add(service);
1435    }
1436
1437    public void removeService(Service service) {
1438        services.remove(service);
1439    }
1440
1441    public boolean isUseLoggingForShutdownErrors() {
1442        return useLoggingForShutdownErrors;
1443    }
1444
1445    /**
1446     * Sets whether or not we should use commons-logging when reporting errors
1447     * when shutting down the broker
1448     */
1449    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1450        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1451    }
1452
1453    public boolean isUseShutdownHook() {
1454        return useShutdownHook;
1455    }
1456
1457    /**
1458     * Sets whether or not we should use a shutdown handler to close down the
1459     * broker cleanly if the JVM is terminated. It is recommended you leave this
1460     * enabled.
1461     */
1462    public void setUseShutdownHook(boolean useShutdownHook) {
1463        this.useShutdownHook = useShutdownHook;
1464    }
1465
1466    public boolean isAdvisorySupport() {
1467        return advisorySupport;
1468    }
1469
1470    /**
1471     * Allows the support of advisory messages to be disabled for performance
1472     * reasons.
1473     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1474     */
1475    public void setAdvisorySupport(boolean advisorySupport) {
1476        this.advisorySupport = advisorySupport;
1477    }
1478
1479    public List<TransportConnector> getTransportConnectors() {
1480        return new ArrayList<TransportConnector>(transportConnectors);
1481    }
1482
1483    /**
1484     * Sets the transport connectors which this broker will listen on for new
1485     * clients
1486     *
1487     * @org.apache.xbean.Property
1488     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1489     */
1490    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1491        for (TransportConnector connector : transportConnectors) {
1492            addConnector(connector);
1493        }
1494    }
1495
1496    public TransportConnector getTransportConnectorByName(String name){
1497        for (TransportConnector transportConnector : transportConnectors){
1498           if (name.equals(transportConnector.getName())){
1499               return transportConnector;
1500           }
1501        }
1502        return null;
1503    }
1504
1505    public TransportConnector getTransportConnectorByScheme(String scheme){
1506        for (TransportConnector transportConnector : transportConnectors){
1507            if (scheme.equals(transportConnector.getUri().getScheme())){
1508                return transportConnector;
1509            }
1510        }
1511        return null;
1512    }
1513
1514    public List<NetworkConnector> getNetworkConnectors() {
1515        return new ArrayList<NetworkConnector>(networkConnectors);
1516    }
1517
1518    public List<ProxyConnector> getProxyConnectors() {
1519        return new ArrayList<ProxyConnector>(proxyConnectors);
1520    }
1521
1522    /**
1523     * Sets the network connectors which this broker will use to connect to
1524     * other brokers in a federated network
1525     *
1526     * @org.apache.xbean.Property
1527     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1528     */
1529    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1530        for (Object connector : networkConnectors) {
1531            addNetworkConnector((NetworkConnector) connector);
1532        }
1533    }
1534
1535    /**
1536     * Sets the network connectors which this broker will use to connect to
1537     * other brokers in a federated network
1538     */
1539    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1540        for (Object connector : proxyConnectors) {
1541            addProxyConnector((ProxyConnector) connector);
1542        }
1543    }
1544
1545    public PolicyMap getDestinationPolicy() {
1546        return destinationPolicy;
1547    }
1548
1549    /**
1550     * Sets the destination specific policies available either for exact
1551     * destinations or for wildcard areas of destinations.
1552     */
1553    public void setDestinationPolicy(PolicyMap policyMap) {
1554        this.destinationPolicy = policyMap;
1555    }
1556
1557    public BrokerPlugin[] getPlugins() {
1558        return plugins;
1559    }
1560
1561    /**
1562     * Sets a number of broker plugins to install such as for security
1563     * authentication or authorization
1564     */
1565    public void setPlugins(BrokerPlugin[] plugins) {
1566        this.plugins = plugins;
1567    }
1568
1569    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1570        return messageAuthorizationPolicy;
1571    }
1572
1573    /**
1574     * Sets the policy used to decide if the current connection is authorized to
1575     * consume a given message
1576     */
1577    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1578        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1579    }
1580
1581    /**
1582     * Delete all messages from the persistent store
1583     *
1584     * @throws IOException
1585     */
1586    public void deleteAllMessages() throws IOException {
1587        getPersistenceAdapter().deleteAllMessages();
1588    }
1589
1590    public boolean isDeleteAllMessagesOnStartup() {
1591        return deleteAllMessagesOnStartup;
1592    }
1593
1594    /**
1595     * Sets whether or not all messages are deleted on startup - mostly only
1596     * useful for testing.
1597     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1598     */
1599    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1600        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1601    }
1602
1603    public URI getVmConnectorURI() {
1604        if (vmConnectorURI == null) {
1605            try {
1606                vmConnectorURI = new URI("vm://" + getBrokerName());
1607            } catch (URISyntaxException e) {
1608                LOG.error("Badly formed URI from {}", getBrokerName(), e);
1609            }
1610        }
1611        return vmConnectorURI;
1612    }
1613
1614    public void setVmConnectorURI(URI vmConnectorURI) {
1615        this.vmConnectorURI = vmConnectorURI;
1616    }
1617
1618    public String getDefaultSocketURIString() {
1619        if (started.get()) {
1620            if (this.defaultSocketURIString == null) {
1621                for (TransportConnector tc:this.transportConnectors) {
1622                    String result = null;
1623                    try {
1624                        result = tc.getPublishableConnectString();
1625                    } catch (Exception e) {
1626                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
1627                    }
1628                    if (result != null) {
1629                        // find first publishable uri
1630                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1631                            this.defaultSocketURIString = result;
1632                            break;
1633                        } else {
1634                        // or use the first defined
1635                            if (this.defaultSocketURIString == null) {
1636                                this.defaultSocketURIString = result;
1637                            }
1638                        }
1639                    }
1640                }
1641
1642            }
1643            return this.defaultSocketURIString;
1644        }
1645       return null;
1646    }
1647
1648    /**
1649     * @return Returns the shutdownOnMasterFailure.
1650     */
1651    public boolean isShutdownOnMasterFailure() {
1652        return shutdownOnMasterFailure;
1653    }
1654
1655    /**
1656     * @param shutdownOnMasterFailure
1657     *            The shutdownOnMasterFailure to set.
1658     */
1659    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1660        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1661    }
1662
1663    public boolean isKeepDurableSubsActive() {
1664        return keepDurableSubsActive;
1665    }
1666
1667    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1668        this.keepDurableSubsActive = keepDurableSubsActive;
1669    }
1670
1671    public boolean isUseVirtualTopics() {
1672        return useVirtualTopics;
1673    }
1674
1675    /**
1676     * Sets whether or not <a
1677     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1678     * Topics</a> should be supported by default if they have not been
1679     * explicitly configured.
1680     */
1681    public void setUseVirtualTopics(boolean useVirtualTopics) {
1682        this.useVirtualTopics = useVirtualTopics;
1683    }
1684
1685    public DestinationInterceptor[] getDestinationInterceptors() {
1686        return destinationInterceptors;
1687    }
1688
1689    public boolean isUseMirroredQueues() {
1690        return useMirroredQueues;
1691    }
1692
1693    /**
1694     * Sets whether or not <a
1695     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1696     * Queues</a> should be supported by default if they have not been
1697     * explicitly configured.
1698     */
1699    public void setUseMirroredQueues(boolean useMirroredQueues) {
1700        this.useMirroredQueues = useMirroredQueues;
1701    }
1702
1703    /**
1704     * Sets the destination interceptors to use
1705     */
1706    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1707        this.destinationInterceptors = destinationInterceptors;
1708    }
1709
1710    public ActiveMQDestination[] getDestinations() {
1711        return destinations;
1712    }
1713
1714    /**
1715     * Sets the destinations which should be loaded/created on startup
1716     */
1717    public void setDestinations(ActiveMQDestination[] destinations) {
1718        this.destinations = destinations;
1719    }
1720
1721    /**
1722     * @return the tempDataStore
1723     */
1724    public synchronized PListStore getTempDataStore() {
1725        if (tempDataStore == null) {
1726            if (!isPersistent()) {
1727                return null;
1728            }
1729
1730            try {
1731                PersistenceAdapter pa = getPersistenceAdapter();
1732                if( pa!=null && pa instanceof PListStore) {
1733                    return (PListStore) pa;
1734                }
1735            } catch (IOException e) {
1736                throw new RuntimeException(e);
1737            }
1738
1739            try {
1740                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1741                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1742                this.tempDataStore.setDirectory(getTmpDataDirectory());
1743                configureService(tempDataStore);
1744            } catch (Exception e) {
1745                throw new RuntimeException(e);
1746            }
1747        }
1748        return tempDataStore;
1749    }
1750
1751    /**
1752     * @param tempDataStore
1753     *            the tempDataStore to set
1754     */
1755    public void setTempDataStore(PListStore tempDataStore) {
1756        this.tempDataStore = tempDataStore;
1757        configureService(tempDataStore);
1758    }
1759
1760    public int getPersistenceThreadPriority() {
1761        return persistenceThreadPriority;
1762    }
1763
1764    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1765        this.persistenceThreadPriority = persistenceThreadPriority;
1766    }
1767
1768    /**
1769     * @return the useLocalHostBrokerName
1770     */
1771    public boolean isUseLocalHostBrokerName() {
1772        return this.useLocalHostBrokerName;
1773    }
1774
1775    /**
1776     * @param useLocalHostBrokerName
1777     *            the useLocalHostBrokerName to set
1778     */
1779    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1780        this.useLocalHostBrokerName = useLocalHostBrokerName;
1781        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1782            brokerName = LOCAL_HOST_NAME;
1783        }
1784    }
1785
1786    /**
1787     * Looks up and lazily creates if necessary the destination for the given
1788     * JMS name
1789     */
1790    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1791        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1792    }
1793
1794    public void removeDestination(ActiveMQDestination destination) throws Exception {
1795        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1796    }
1797
1798    public int getProducerSystemUsagePortion() {
1799        return producerSystemUsagePortion;
1800    }
1801
1802    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1803        this.producerSystemUsagePortion = producerSystemUsagePortion;
1804    }
1805
1806    public int getConsumerSystemUsagePortion() {
1807        return consumerSystemUsagePortion;
1808    }
1809
1810    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1811        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1812    }
1813
1814    public boolean isSplitSystemUsageForProducersConsumers() {
1815        return splitSystemUsageForProducersConsumers;
1816    }
1817
1818    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1819        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1820    }
1821
1822    public boolean isMonitorConnectionSplits() {
1823        return monitorConnectionSplits;
1824    }
1825
1826    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1827        this.monitorConnectionSplits = monitorConnectionSplits;
1828    }
1829
1830    public int getTaskRunnerPriority() {
1831        return taskRunnerPriority;
1832    }
1833
1834    public void setTaskRunnerPriority(int taskRunnerPriority) {
1835        this.taskRunnerPriority = taskRunnerPriority;
1836    }
1837
1838    public boolean isDedicatedTaskRunner() {
1839        return dedicatedTaskRunner;
1840    }
1841
1842    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1843        this.dedicatedTaskRunner = dedicatedTaskRunner;
1844    }
1845
1846    public boolean isCacheTempDestinations() {
1847        return cacheTempDestinations;
1848    }
1849
1850    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1851        this.cacheTempDestinations = cacheTempDestinations;
1852    }
1853
1854    public int getTimeBeforePurgeTempDestinations() {
1855        return timeBeforePurgeTempDestinations;
1856    }
1857
1858    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1859        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1860    }
1861
1862    public boolean isUseTempMirroredQueues() {
1863        return useTempMirroredQueues;
1864    }
1865
1866    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1867        this.useTempMirroredQueues = useTempMirroredQueues;
1868    }
1869
1870    public synchronized JobSchedulerStore getJobSchedulerStore() {
1871
1872        // If support is off don't allow any scheduler even is user configured their own.
1873        if (!isSchedulerSupport()) {
1874            return null;
1875        }
1876
1877        // If the user configured their own we use it even if persistence is disabled since
1878        // we don't know anything about their implementation.
1879        if (jobSchedulerStore == null) {
1880
1881            if (!isPersistent()) {
1882                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
1883                configureService(jobSchedulerStore);
1884                return this.jobSchedulerStore;
1885            }
1886
1887            try {
1888                PersistenceAdapter pa = getPersistenceAdapter();
1889                if (pa != null) {
1890                    this.jobSchedulerStore = pa.createJobSchedulerStore();
1891                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1892                    configureService(jobSchedulerStore);
1893                    return this.jobSchedulerStore;
1894                }
1895            } catch (IOException e) {
1896                throw new RuntimeException(e);
1897            } catch (UnsupportedOperationException ex) {
1898                // It's ok if the store doesn't implement a scheduler.
1899            } catch (Exception e) {
1900                throw new RuntimeException(e);
1901            }
1902
1903            try {
1904                PersistenceAdapter pa = getPersistenceAdapter();
1905                if (pa != null && pa instanceof JobSchedulerStore) {
1906                    this.jobSchedulerStore = (JobSchedulerStore) pa;
1907                    configureService(jobSchedulerStore);
1908                    return this.jobSchedulerStore;
1909                }
1910            } catch (IOException e) {
1911                throw new RuntimeException(e);
1912            }
1913
1914            // Load the KahaDB store as a last resort, this only works if KahaDB is
1915            // included at runtime, otherwise this will fail.  User should disable
1916            // scheduler support if this fails.
1917            try {
1918                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
1919                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
1920                jobSchedulerStore = adaptor.createJobSchedulerStore();
1921                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1922                configureService(jobSchedulerStore);
1923                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
1924            } catch (Exception e) {
1925                throw new RuntimeException(e);
1926            }
1927        }
1928        return jobSchedulerStore;
1929    }
1930
1931    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1932        this.jobSchedulerStore = jobSchedulerStore;
1933        configureService(jobSchedulerStore);
1934    }
1935
1936    //
1937    // Implementation methods
1938    // -------------------------------------------------------------------------
1939    /**
1940     * Handles any lazy-creation helper properties which are added to make
1941     * things easier to configure inside environments such as Spring
1942     *
1943     * @throws Exception
1944     */
1945    protected void processHelperProperties() throws Exception {
1946        if (transportConnectorURIs != null) {
1947            for (int i = 0; i < transportConnectorURIs.length; i++) {
1948                String uri = transportConnectorURIs[i];
1949                addConnector(uri);
1950            }
1951        }
1952        if (networkConnectorURIs != null) {
1953            for (int i = 0; i < networkConnectorURIs.length; i++) {
1954                String uri = networkConnectorURIs[i];
1955                addNetworkConnector(uri);
1956            }
1957        }
1958        if (jmsBridgeConnectors != null) {
1959            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1960                addJmsConnector(jmsBridgeConnectors[i]);
1961            }
1962        }
1963    }
1964
1965    /**
1966     * Check that the store usage limit is not greater than max usable
1967     * space and adjust if it is
1968     */
1969    protected void checkStoreUsageLimits() throws Exception {
1970        final SystemUsage usage = getSystemUsage();
1971
1972        if (getPersistenceAdapter() != null) {
1973            PersistenceAdapter adapter = getPersistenceAdapter();
1974            File dir = adapter.getDirectory();
1975
1976            if (dir != null) {
1977                String dirPath = dir.getAbsolutePath();
1978                if (!dir.isAbsolute()) {
1979                    dir = new File(dirPath);
1980                }
1981
1982                while (dir != null && !dir.isDirectory()) {
1983                    dir = dir.getParentFile();
1984                }
1985                long storeLimit = usage.getStoreUsage().getLimit();
1986                long storeCurrent = usage.getStoreUsage().getUsage();
1987                long dirFreeSpace = dir.getUsableSpace();
1988                if (storeLimit > (dirFreeSpace + storeCurrent)) {
1989                    final String message = "Store limit is " + storeLimit / (1024 * 1024) +
1990                             " mb (current store usage is " + storeCurrent / (1024 * 1024) +
1991                             " mb). The data directory: " + dir.getAbsolutePath() +
1992                             " only has " + dirFreeSpace / (1024 * 1024) +
1993                             " mb of usable space.";
1994
1995                    if (isAdjustUsageLimits()) {
1996                        LOG.warn(message + ", resetting to maximum available disk space: " +
1997                                (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
1998                        usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent);
1999                    } else {
2000                        LOG.error(message);
2001                        throw new ConfigurationException(message);
2002                    }
2003                }
2004            }
2005
2006            long maxJournalFileSize = 0;
2007            long storeLimit = usage.getStoreUsage().getLimit();
2008
2009            if (adapter instanceof JournaledStore) {
2010                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
2011            }
2012
2013            if (storeLimit < maxJournalFileSize) {
2014                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
2015                          " mb, whilst the max journal file size for the store is: " +
2016                          maxJournalFileSize / (1024 * 1024) + " mb, " +
2017                          "the store will not accept any data when used.");
2018
2019            }
2020        }
2021    }
2022
2023    /**
2024     * Check that temporary usage limit is not greater than max usable
2025     * space and adjust if it is
2026     */
2027    protected void checkTmpStoreUsageLimits() throws Exception {
2028        final SystemUsage usage = getSystemUsage();
2029
2030        File tmpDir = getTmpDataDirectory();
2031        if (tmpDir != null) {
2032
2033            String tmpDirPath = tmpDir.getAbsolutePath();
2034            if (!tmpDir.isAbsolute()) {
2035                tmpDir = new File(tmpDirPath);
2036            }
2037
2038            long storeLimit = usage.getTempUsage().getLimit();
2039            while (tmpDir != null && !tmpDir.isDirectory()) {
2040                tmpDir = tmpDir.getParentFile();
2041            }
2042            long dirFreeSpace = tmpDir.getUsableSpace();
2043            if (storeLimit > dirFreeSpace) {
2044                final String message = "Temporary Store limit is " + storeLimit / (1024 * 1024) +
2045                        " mb, whilst the temporary data directory: " + tmpDirPath +
2046                        " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space.";
2047
2048                if (isAdjustUsageLimits()) {
2049                    LOG.warn(message + ", resetting to maximum available " +
2050                            dirFreeSpace / (1024 * 1024) + " mb.");
2051                    usage.getTempUsage().setLimit(dirFreeSpace);
2052                } else {
2053                    LOG.error(message);
2054                    throw new ConfigurationException(message);
2055                }
2056            }
2057
2058            if (isPersistent()) {
2059                long maxJournalFileSize;
2060
2061                PListStore store = usage.getTempUsage().getStore();
2062                if (store != null && store instanceof JournaledStore) {
2063                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
2064                } else {
2065                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
2066                }
2067
2068                if (storeLimit < maxJournalFileSize) {
2069                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2070                              " mb, whilst the max journal file size for the temporary store is: " +
2071                              maxJournalFileSize / (1024 * 1024) + " mb, " +
2072                              "the temp store will not accept any data when used.");
2073                }
2074            }
2075        }
2076    }
2077
2078    protected void checkMemorySystemUsageLimits() throws Exception {
2079        final SystemUsage usage = getSystemUsage();
2080        long memLimit = usage.getMemoryUsage().getLimit();
2081        long jvmLimit = Runtime.getRuntime().maxMemory();
2082
2083        if (memLimit > jvmLimit) {
2084            final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
2085                    + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
2086
2087            if (adjustUsageLimits) {
2088                usage.getMemoryUsage().setPercentOfJvmHeap(70);
2089                LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
2090            } else {
2091                LOG.error(message);
2092                throw new ConfigurationException(message);
2093            }
2094        }
2095    }
2096
2097    protected void checkStoreSystemUsageLimits() throws Exception {
2098        final SystemUsage usage = getSystemUsage();
2099
2100        //Check the persistent store and temp store limits if they exist
2101        //and schedule a periodic check to update disk limits if
2102        //schedulePeriodForDiskLimitCheck is set
2103        checkStoreUsageLimits();
2104        checkTmpStoreUsageLimits();
2105
2106        if (getJobSchedulerStore() != null) {
2107            JobSchedulerStore scheduler = getJobSchedulerStore();
2108            File schedulerDir = scheduler.getDirectory();
2109            if (schedulerDir != null) {
2110
2111                String schedulerDirPath = schedulerDir.getAbsolutePath();
2112                if (!schedulerDir.isAbsolute()) {
2113                    schedulerDir = new File(schedulerDirPath);
2114                }
2115
2116                while (schedulerDir != null && !schedulerDir.isDirectory()) {
2117                    schedulerDir = schedulerDir.getParentFile();
2118                }
2119                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
2120                long dirFreeSpace = schedulerDir.getUsableSpace();
2121                if (schedulerLimit > dirFreeSpace) {
2122                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
2123                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
2124                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
2125                            dirFreeSpace / (1024 * 1024) + " mb.");
2126                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
2127                }
2128            }
2129        }
2130    }
2131
2132    public void stopAllConnectors(ServiceStopper stopper) {
2133        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2134            NetworkConnector connector = iter.next();
2135            unregisterNetworkConnectorMBean(connector);
2136            stopper.stop(connector);
2137        }
2138        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2139            ProxyConnector connector = iter.next();
2140            stopper.stop(connector);
2141        }
2142        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2143            JmsConnector connector = iter.next();
2144            stopper.stop(connector);
2145        }
2146        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2147            TransportConnector connector = iter.next();
2148            try {
2149                unregisterConnectorMBean(connector);
2150            } catch (IOException e) {
2151            }
2152            stopper.stop(connector);
2153        }
2154    }
2155
2156    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
2157        try {
2158            ObjectName objectName = createConnectorObjectName(connector);
2159            connector = connector.asManagedConnector(getManagementContext(), objectName);
2160            ConnectorViewMBean view = new ConnectorView(connector);
2161            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2162            return connector;
2163        } catch (Throwable e) {
2164            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
2165        }
2166    }
2167
2168    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
2169        if (isUseJmx()) {
2170            try {
2171                ObjectName objectName = createConnectorObjectName(connector);
2172                getManagementContext().unregisterMBean(objectName);
2173            } catch (Throwable e) {
2174                throw IOExceptionSupport.create(
2175                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
2176            }
2177        }
2178    }
2179
2180    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2181        return adaptor;
2182    }
2183
2184    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2185        if (isUseJmx()) {}
2186    }
2187
2188    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2189        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
2190    }
2191
2192    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2193        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2194        try {
2195            ObjectName objectName = createNetworkConnectorObjectName(connector);
2196            connector.setObjectName(objectName);
2197            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2198        } catch (Throwable e) {
2199            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2200        }
2201    }
2202
2203    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2204        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2205    }
2206
2207    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2208        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
2209    }
2210
2211    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2212        if (isUseJmx()) {
2213            try {
2214                ObjectName objectName = createNetworkConnectorObjectName(connector);
2215                getManagementContext().unregisterMBean(objectName);
2216            } catch (Exception e) {
2217                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
2218            }
2219        }
2220    }
2221
2222    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2223        ProxyConnectorView view = new ProxyConnectorView(connector);
2224        try {
2225            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2226            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2227        } catch (Throwable e) {
2228            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2229        }
2230    }
2231
2232    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2233        JmsConnectorView view = new JmsConnectorView(connector);
2234        try {
2235            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2236            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2237        } catch (Throwable e) {
2238            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2239        }
2240    }
2241
2242    /**
2243     * Factory method to create a new broker
2244     *
2245     * @throws Exception
2246     * @throws
2247     * @throws
2248     */
2249    protected Broker createBroker() throws Exception {
2250        regionBroker = createRegionBroker();
2251        Broker broker = addInterceptors(regionBroker);
2252        // Add a filter that will stop access to the broker once stopped
2253        broker = new MutableBrokerFilter(broker) {
2254            Broker old;
2255
2256            @Override
2257            public void stop() throws Exception {
2258                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2259                    // Just ignore additional stop actions.
2260                    @Override
2261                    public void stop() throws Exception {
2262                    }
2263                });
2264                old.stop();
2265            }
2266
2267            @Override
2268            public void start() throws Exception {
2269                if (forceStart && old != null) {
2270                    this.next.set(old);
2271                }
2272                getNext().start();
2273            }
2274        };
2275        return broker;
2276    }
2277
2278    /**
2279     * Factory method to create the core region broker onto which interceptors
2280     * are added
2281     *
2282     * @throws Exception
2283     */
2284    protected Broker createRegionBroker() throws Exception {
2285        if (destinationInterceptors == null) {
2286            destinationInterceptors = createDefaultDestinationInterceptor();
2287        }
2288        configureServices(destinationInterceptors);
2289        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2290        if (destinationFactory == null) {
2291            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2292        }
2293        return createRegionBroker(destinationInterceptor);
2294    }
2295
2296    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2297        RegionBroker regionBroker;
2298        if (isUseJmx()) {
2299            try {
2300                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2301                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2302            } catch(MalformedObjectNameException me){
2303                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
2304                throw new IOException(me);
2305            }
2306        } else {
2307            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2308                    destinationInterceptor,getScheduler(),getExecutor());
2309        }
2310        destinationFactory.setRegionBroker(regionBroker);
2311        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2312        regionBroker.setBrokerName(getBrokerName());
2313        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2314        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2315        if (brokerId != null) {
2316            regionBroker.setBrokerId(brokerId);
2317        }
2318        return regionBroker;
2319    }
2320
2321    /**
2322     * Create the default destination interceptor
2323     */
2324    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2325        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2326        if (isUseVirtualTopics()) {
2327            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2328            VirtualTopic virtualTopic = new VirtualTopic();
2329            virtualTopic.setName("VirtualTopic.>");
2330            VirtualDestination[] virtualDestinations = { virtualTopic };
2331            interceptor.setVirtualDestinations(virtualDestinations);
2332            answer.add(interceptor);
2333        }
2334        if (isUseMirroredQueues()) {
2335            MirroredQueue interceptor = new MirroredQueue();
2336            answer.add(interceptor);
2337        }
2338        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2339        answer.toArray(array);
2340        return array;
2341    }
2342
2343    /**
2344     * Strategy method to add interceptors to the broker
2345     *
2346     * @throws IOException
2347     */
2348    protected Broker addInterceptors(Broker broker) throws Exception {
2349        if (isSchedulerSupport()) {
2350            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2351            if (isUseJmx()) {
2352                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2353                try {
2354                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2355                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2356                    this.adminView.setJMSJobScheduler(objectName);
2357                } catch (Throwable e) {
2358                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2359                            + e.getMessage(), e);
2360                }
2361            }
2362            broker = sb;
2363        }
2364        if (isUseJmx()) {
2365            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2366            try {
2367                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2368                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2369            } catch (Throwable e) {
2370                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2371                        + e.getMessage(), e);
2372            }
2373        }
2374        if (isAdvisorySupport()) {
2375            broker = new AdvisoryBroker(broker);
2376        }
2377        broker = new CompositeDestinationBroker(broker);
2378        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2379        if (isPopulateJMSXUserID()) {
2380            UserIDBroker userIDBroker = new UserIDBroker(broker);
2381            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2382            broker = userIDBroker;
2383        }
2384        if (isMonitorConnectionSplits()) {
2385            broker = new ConnectionSplitBroker(broker);
2386        }
2387        if (plugins != null) {
2388            for (int i = 0; i < plugins.length; i++) {
2389                BrokerPlugin plugin = plugins[i];
2390                broker = plugin.installPlugin(broker);
2391            }
2392        }
2393        return broker;
2394    }
2395
2396    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2397        if (isPersistent()) {
2398            PersistenceAdapterFactory fac = getPersistenceFactory();
2399            if (fac != null) {
2400                return fac.createPersistenceAdapter();
2401            } else {
2402                try {
2403                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2404                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2405                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2406                    adaptor.setDirectory(dir);
2407                    return adaptor;
2408                } catch (Throwable e) {
2409                    throw IOExceptionSupport.create(e);
2410                }
2411            }
2412        } else {
2413            return new MemoryPersistenceAdapter();
2414        }
2415    }
2416
2417    protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2418        return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2419    }
2420
2421    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2422        TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2423        return new TransportConnector(transport);
2424    }
2425
2426    /**
2427     * Extracts the port from the options
2428     */
2429    protected Object getPort(Map<?,?> options) {
2430        Object port = options.get("port");
2431        if (port == null) {
2432            port = DEFAULT_PORT;
2433            LOG.warn("No port specified so defaulting to: {}", port);
2434        }
2435        return port;
2436    }
2437
2438    protected void addShutdownHook() {
2439        if (useShutdownHook) {
2440            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2441                @Override
2442                public void run() {
2443                    containerShutdown();
2444                }
2445            };
2446            Runtime.getRuntime().addShutdownHook(shutdownHook);
2447        }
2448    }
2449
2450    protected void removeShutdownHook() {
2451        if (shutdownHook != null) {
2452            try {
2453                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2454            } catch (Exception e) {
2455                LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e);
2456            }
2457        }
2458    }
2459
2460    /**
2461     * Sets hooks to be executed when broker shut down
2462     *
2463     * @org.apache.xbean.Property
2464     */
2465    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2466        for (Runnable hook : hooks) {
2467            addShutdownHook(hook);
2468        }
2469    }
2470
2471    /**
2472     * Causes a clean shutdown of the container when the VM is being shut down
2473     */
2474    protected void containerShutdown() {
2475        try {
2476            stop();
2477        } catch (IOException e) {
2478            Throwable linkedException = e.getCause();
2479            if (linkedException != null) {
2480                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2481            } else {
2482                logError("Failed to shut down: " + e, e);
2483            }
2484            if (!useLoggingForShutdownErrors) {
2485                e.printStackTrace(System.err);
2486            }
2487        } catch (Exception e) {
2488            logError("Failed to shut down: " + e, e);
2489        }
2490    }
2491
2492    protected void logError(String message, Throwable e) {
2493        if (useLoggingForShutdownErrors) {
2494            LOG.error("Failed to shut down: " + e);
2495        } else {
2496            System.err.println("Failed to shut down: " + e);
2497        }
2498    }
2499
2500    /**
2501     * Starts any configured destinations on startup
2502     */
2503    protected void startDestinations() throws Exception {
2504        if (destinations != null) {
2505            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2506            for (int i = 0; i < destinations.length; i++) {
2507                ActiveMQDestination destination = destinations[i];
2508                getBroker().addDestination(adminConnectionContext, destination,true);
2509            }
2510        }
2511        if (isUseVirtualTopics()) {
2512            startVirtualConsumerDestinations();
2513        }
2514    }
2515
2516    /**
2517     * Returns the broker's administration connection context used for
2518     * configuring the broker at startup
2519     */
2520    public ConnectionContext getAdminConnectionContext() throws Exception {
2521        return BrokerSupport.getConnectionContext(getBroker());
2522    }
2523
2524    protected void startManagementContext() throws Exception {
2525        getManagementContext().setBrokerName(brokerName);
2526        getManagementContext().start();
2527        adminView = new BrokerView(this, null);
2528        ObjectName objectName = getBrokerObjectName();
2529        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2530    }
2531
2532    /**
2533     * Start all transport and network connections, proxies and bridges
2534     *
2535     * @throws Exception
2536     */
2537    public void startAllConnectors() throws Exception {
2538        Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2539        List<TransportConnector> al = new ArrayList<TransportConnector>();
2540        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2541            TransportConnector connector = iter.next();
2542            al.add(startTransportConnector(connector));
2543        }
2544        if (al.size() > 0) {
2545            // let's clear the transportConnectors list and replace it with
2546            // the started transportConnector instances
2547            this.transportConnectors.clear();
2548            setTransportConnectors(al);
2549        }
2550        this.slave = false;
2551        URI uri = getVmConnectorURI();
2552        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2553        map.put("async", "false");
2554        map.put("create","false");
2555        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2556
2557        if (!stopped.get()) {
2558            ThreadPoolExecutor networkConnectorStartExecutor = null;
2559            if (isNetworkConnectorStartAsync()) {
2560                // spin up as many threads as needed
2561                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2562                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2563                    new ThreadFactory() {
2564                        int count=0;
2565                        @Override
2566                        public Thread newThread(Runnable runnable) {
2567                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2568                            thread.setDaemon(true);
2569                            return thread;
2570                        }
2571                    });
2572            }
2573
2574            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2575                final NetworkConnector connector = iter.next();
2576                connector.setLocalUri(uri);
2577                connector.setBrokerName(getBrokerName());
2578                connector.setDurableDestinations(durableDestinations);
2579                if (getDefaultSocketURIString() != null) {
2580                    connector.setBrokerURL(getDefaultSocketURIString());
2581                }
2582                if (networkConnectorStartExecutor != null) {
2583                    networkConnectorStartExecutor.execute(new Runnable() {
2584                        @Override
2585                        public void run() {
2586                            try {
2587                                LOG.info("Async start of {}", connector);
2588                                connector.start();
2589                            } catch(Exception e) {
2590                                LOG.error("Async start of network connector: {} failed", connector, e);
2591                            }
2592                        }
2593                    });
2594                } else {
2595                    connector.start();
2596                }
2597            }
2598            if (networkConnectorStartExecutor != null) {
2599                // executor done when enqueued tasks are complete
2600                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2601            }
2602
2603            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2604                ProxyConnector connector = iter.next();
2605                connector.start();
2606            }
2607            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2608                JmsConnector connector = iter.next();
2609                connector.start();
2610            }
2611            for (Service service : services) {
2612                configureService(service);
2613                service.start();
2614            }
2615        }
2616    }
2617
2618    public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2619        connector.setBrokerService(this);
2620        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2621        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2622        if (policy != null) {
2623            connector.setMessageAuthorizationPolicy(policy);
2624        }
2625        if (isUseJmx()) {
2626            connector = registerConnectorMBean(connector);
2627        }
2628        connector.getStatistics().setEnabled(enableStatistics);
2629        connector.start();
2630        return connector;
2631    }
2632
2633    /**
2634     * Perform any custom dependency injection
2635     */
2636    protected void configureServices(Object[] services) {
2637        for (Object service : services) {
2638            configureService(service);
2639        }
2640    }
2641
2642    /**
2643     * Perform any custom dependency injection
2644     */
2645    protected void configureService(Object service) {
2646        if (service instanceof BrokerServiceAware) {
2647            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2648            serviceAware.setBrokerService(this);
2649        }
2650    }
2651
2652    public void handleIOException(IOException exception) {
2653        if (ioExceptionHandler != null) {
2654            ioExceptionHandler.handle(exception);
2655         } else {
2656            LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception);
2657         }
2658    }
2659
2660    protected void startVirtualConsumerDestinations() throws Exception {
2661        checkStartException();
2662        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2663        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2664        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2665        if (!destinations.isEmpty()) {
2666            for (ActiveMQDestination destination : destinations) {
2667                if (filter.matches(destination) == true) {
2668                    broker.addDestination(adminConnectionContext, destination, false);
2669                }
2670            }
2671        }
2672    }
2673
2674    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2675        // created at startup, so no sync needed
2676        if (virtualConsumerDestinationFilter == null) {
2677            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2678            if (destinationInterceptors != null) {
2679                for (DestinationInterceptor interceptor : destinationInterceptors) {
2680                    if (interceptor instanceof VirtualDestinationInterceptor) {
2681                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2682                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2683                            if (virtualDestination instanceof VirtualTopic) {
2684                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2685                            }
2686                        }
2687                    }
2688                }
2689            }
2690            ActiveMQQueue filter = new ActiveMQQueue();
2691            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2692            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2693        }
2694        return virtualConsumerDestinationFilter;
2695    }
2696
2697    protected synchronized ThreadPoolExecutor getExecutor() {
2698        if (this.executor == null) {
2699            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2700
2701                private long i = 0;
2702
2703                @Override
2704                public Thread newThread(Runnable runnable) {
2705                    this.i++;
2706                    Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2707                    thread.setDaemon(true);
2708                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2709                        @Override
2710                        public void uncaughtException(final Thread t, final Throwable e) {
2711                            LOG.error("Error in thread '{}'", t.getName(), e);
2712                        }
2713                    });
2714                    return thread;
2715                }
2716            }, new RejectedExecutionHandler() {
2717                @Override
2718                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2719                    try {
2720                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2721                    } catch (InterruptedException e) {
2722                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2723                    }
2724
2725                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2726                }
2727            });
2728        }
2729        return this.executor;
2730    }
2731
2732    public synchronized Scheduler getScheduler() {
2733        if (this.scheduler==null) {
2734            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2735            try {
2736                this.scheduler.start();
2737            } catch (Exception e) {
2738               LOG.error("Failed to start Scheduler", e);
2739            }
2740        }
2741        return this.scheduler;
2742    }
2743
2744    public Broker getRegionBroker() {
2745        return regionBroker;
2746    }
2747
2748    public void setRegionBroker(Broker regionBroker) {
2749        this.regionBroker = regionBroker;
2750    }
2751
2752    public void addShutdownHook(Runnable hook) {
2753        synchronized (shutdownHooks) {
2754            shutdownHooks.add(hook);
2755        }
2756    }
2757
2758    public void removeShutdownHook(Runnable hook) {
2759        synchronized (shutdownHooks) {
2760            shutdownHooks.remove(hook);
2761        }
2762    }
2763
2764    public boolean isSystemExitOnShutdown() {
2765        return systemExitOnShutdown;
2766    }
2767
2768    /**
2769     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2770     */
2771    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2772        this.systemExitOnShutdown = systemExitOnShutdown;
2773    }
2774
2775    public int getSystemExitOnShutdownExitCode() {
2776        return systemExitOnShutdownExitCode;
2777    }
2778
2779    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2780        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2781    }
2782
2783    public SslContext getSslContext() {
2784        return sslContext;
2785    }
2786
2787    public void setSslContext(SslContext sslContext) {
2788        this.sslContext = sslContext;
2789    }
2790
2791    public boolean isShutdownOnSlaveFailure() {
2792        return shutdownOnSlaveFailure;
2793    }
2794
2795    /**
2796     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2797     */
2798    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2799        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2800    }
2801
2802    public boolean isWaitForSlave() {
2803        return waitForSlave;
2804    }
2805
2806    /**
2807     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2808     */
2809    public void setWaitForSlave(boolean waitForSlave) {
2810        this.waitForSlave = waitForSlave;
2811    }
2812
2813    public long getWaitForSlaveTimeout() {
2814        return this.waitForSlaveTimeout;
2815    }
2816
2817    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2818        this.waitForSlaveTimeout = waitForSlaveTimeout;
2819    }
2820
2821    /**
2822     * Get the passiveSlave
2823     * @return the passiveSlave
2824     */
2825    public boolean isPassiveSlave() {
2826        return this.passiveSlave;
2827    }
2828
2829    /**
2830     * Set the passiveSlave
2831     * @param passiveSlave the passiveSlave to set
2832     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2833     */
2834    public void setPassiveSlave(boolean passiveSlave) {
2835        this.passiveSlave = passiveSlave;
2836    }
2837
2838    /**
2839     * override the Default IOException handler, called when persistence adapter
2840     * has experiences File or JDBC I/O Exceptions
2841     *
2842     * @param ioExceptionHandler
2843     */
2844    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2845        configureService(ioExceptionHandler);
2846        this.ioExceptionHandler = ioExceptionHandler;
2847    }
2848
2849    public IOExceptionHandler getIoExceptionHandler() {
2850        return ioExceptionHandler;
2851    }
2852
2853    /**
2854     * @return the schedulerSupport
2855     */
2856    public boolean isSchedulerSupport() {
2857        return this.schedulerSupport;
2858    }
2859
2860    /**
2861     * @param schedulerSupport the schedulerSupport to set
2862     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2863     */
2864    public void setSchedulerSupport(boolean schedulerSupport) {
2865        this.schedulerSupport = schedulerSupport;
2866    }
2867
2868    /**
2869     * @return the schedulerDirectory
2870     */
2871    public File getSchedulerDirectoryFile() {
2872        if (this.schedulerDirectoryFile == null) {
2873            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2874        }
2875        return schedulerDirectoryFile;
2876    }
2877
2878    /**
2879     * @param schedulerDirectory the schedulerDirectory to set
2880     */
2881    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2882        this.schedulerDirectoryFile = schedulerDirectory;
2883    }
2884
2885    public void setSchedulerDirectory(String schedulerDirectory) {
2886        setSchedulerDirectoryFile(new File(schedulerDirectory));
2887    }
2888
2889    public int getSchedulePeriodForDestinationPurge() {
2890        return this.schedulePeriodForDestinationPurge;
2891    }
2892
2893    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2894        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2895    }
2896
2897    public int getMaxPurgedDestinationsPerSweep() {
2898        return this.maxPurgedDestinationsPerSweep;
2899    }
2900
2901    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2902        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2903    }
2904
2905    public BrokerContext getBrokerContext() {
2906        return brokerContext;
2907    }
2908
2909    public void setBrokerContext(BrokerContext brokerContext) {
2910        this.brokerContext = brokerContext;
2911    }
2912
2913    public void setBrokerId(String brokerId) {
2914        this.brokerId = new BrokerId(brokerId);
2915    }
2916
2917    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2918        return useAuthenticatedPrincipalForJMSXUserID;
2919    }
2920
2921    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2922        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2923    }
2924
2925    /**
2926     * Should MBeans that support showing the Authenticated User Name information have this
2927     * value filled in or not.
2928     *
2929     * @return true if user names should be exposed in MBeans
2930     */
2931    public boolean isPopulateUserNameInMBeans() {
2932        return this.populateUserNameInMBeans;
2933    }
2934
2935    /**
2936     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2937     * @param value if MBeans should expose user name information.
2938     */
2939    public void setPopulateUserNameInMBeans(boolean value) {
2940        this.populateUserNameInMBeans = value;
2941    }
2942
2943    /**
2944     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2945     * failing.  The default value is to wait forever (zero).
2946     *
2947     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2948     */
2949    public long getMbeanInvocationTimeout() {
2950        return mbeanInvocationTimeout;
2951    }
2952
2953    /**
2954     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2955     * failing. The default value is to wait forever (zero).
2956     *
2957     * @param mbeanInvocationTimeout
2958     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2959     */
2960    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
2961        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
2962    }
2963
2964    public boolean isNetworkConnectorStartAsync() {
2965        return networkConnectorStartAsync;
2966    }
2967
2968    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2969        this.networkConnectorStartAsync = networkConnectorStartAsync;
2970    }
2971
2972    public boolean isAllowTempAutoCreationOnSend() {
2973        return allowTempAutoCreationOnSend;
2974    }
2975
2976    /**
2977     * enable if temp destinations need to be propagated through a network when
2978     * advisorySupport==false. This is used in conjunction with the policy
2979     * gcInactiveDestinations for matching temps so they can get removed
2980     * when inactive
2981     *
2982     * @param allowTempAutoCreationOnSend
2983     */
2984    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2985        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2986    }
2987
2988    public long getOfflineDurableSubscriberTimeout() {
2989        return offlineDurableSubscriberTimeout;
2990    }
2991
2992    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
2993        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2994    }
2995
2996    public long getOfflineDurableSubscriberTaskSchedule() {
2997        return offlineDurableSubscriberTaskSchedule;
2998    }
2999
3000    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
3001        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
3002    }
3003
3004    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
3005        return isUseVirtualTopics() && destination.isQueue() &&
3006               getVirtualTopicConsumerDestinationFilter().matches(destination);
3007    }
3008
3009    synchronized public Throwable getStartException() {
3010        return startException;
3011    }
3012
3013    public boolean isStartAsync() {
3014        return startAsync;
3015    }
3016
3017    public void setStartAsync(boolean startAsync) {
3018        this.startAsync = startAsync;
3019    }
3020
3021    public boolean isSlave() {
3022        return this.slave;
3023    }
3024
3025    public boolean isStopping() {
3026        return this.stopping.get();
3027    }
3028
3029    /**
3030     * @return true if the broker allowed to restart on shutdown.
3031     */
3032    public boolean isRestartAllowed() {
3033        return restartAllowed;
3034    }
3035
3036    /**
3037     * Sets if the broker allowed to restart on shutdown.
3038     * @return
3039     */
3040    public void setRestartAllowed(boolean restartAllowed) {
3041        this.restartAllowed = restartAllowed;
3042    }
3043
3044    /**
3045     * A lifecycle manager of the BrokerService should
3046     * inspect this property after a broker shutdown has occurred
3047     * to find out if the broker needs to be re-created and started
3048     * again.
3049     *
3050     * @return true if the broker wants to be restarted after it shuts down.
3051     */
3052    public boolean isRestartRequested() {
3053        return restartRequested;
3054    }
3055
3056    public void requestRestart() {
3057        this.restartRequested = true;
3058    }
3059
3060    public int getStoreOpenWireVersion() {
3061        return storeOpenWireVersion;
3062    }
3063
3064    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
3065        this.storeOpenWireVersion = storeOpenWireVersion;
3066    }
3067
3068    /**
3069     * @return the current number of connections on this Broker.
3070     */
3071    public int getCurrentConnections() {
3072        return this.currentConnections.get();
3073    }
3074
3075    /**
3076     * @return the total number of connections this broker has handled since startup.
3077     */
3078    public long getTotalConnections() {
3079        return this.totalConnections.get();
3080    }
3081
3082    public void incrementCurrentConnections() {
3083        this.currentConnections.incrementAndGet();
3084    }
3085
3086    public void decrementCurrentConnections() {
3087        this.currentConnections.decrementAndGet();
3088    }
3089
3090    public void incrementTotalConnections() {
3091        this.totalConnections.incrementAndGet();
3092    }
3093
3094    public boolean isRejectDurableConsumers() {
3095        return rejectDurableConsumers;
3096    }
3097
3098    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
3099        this.rejectDurableConsumers = rejectDurableConsumers;
3100    }
3101
3102    public boolean isAdjustUsageLimits() {
3103        return adjustUsageLimits;
3104    }
3105
3106    public void setAdjustUsageLimits(boolean adjustUsageLimits) {
3107        this.adjustUsageLimits = adjustUsageLimits;
3108    }
3109
3110    public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
3111        this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
3112    }
3113
3114    public boolean isRollbackOnlyOnAsyncException() {
3115        return rollbackOnlyOnAsyncException;
3116    }
3117}