001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.UnknownHostException;
027import java.security.Provider;
028import java.security.Security;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050
051import javax.annotation.PostConstruct;
052import javax.annotation.PreDestroy;
053import javax.management.MalformedObjectNameException;
054import javax.management.ObjectName;
055
056import org.apache.activemq.ActiveMQConnectionMetaData;
057import org.apache.activemq.ConfigurationException;
058import org.apache.activemq.Service;
059import org.apache.activemq.advisory.AdvisoryBroker;
060import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
061import org.apache.activemq.broker.jmx.AnnotatedMBean;
062import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
063import org.apache.activemq.broker.jmx.BrokerView;
064import org.apache.activemq.broker.jmx.ConnectorView;
065import org.apache.activemq.broker.jmx.ConnectorViewMBean;
066import org.apache.activemq.broker.jmx.HealthView;
067import org.apache.activemq.broker.jmx.HealthViewMBean;
068import org.apache.activemq.broker.jmx.JmsConnectorView;
069import org.apache.activemq.broker.jmx.JobSchedulerView;
070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
071import org.apache.activemq.broker.jmx.Log4JConfigView;
072import org.apache.activemq.broker.jmx.ManagedRegionBroker;
073import org.apache.activemq.broker.jmx.ManagementContext;
074import org.apache.activemq.broker.jmx.NetworkConnectorView;
075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
076import org.apache.activemq.broker.jmx.ProxyConnectorView;
077import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
078import org.apache.activemq.broker.region.Destination;
079import org.apache.activemq.broker.region.DestinationFactory;
080import org.apache.activemq.broker.region.DestinationFactoryImpl;
081import org.apache.activemq.broker.region.DestinationInterceptor;
082import org.apache.activemq.broker.region.RegionBroker;
083import org.apache.activemq.broker.region.policy.PolicyMap;
084import org.apache.activemq.broker.region.virtual.MirroredQueue;
085import org.apache.activemq.broker.region.virtual.VirtualDestination;
086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
087import org.apache.activemq.broker.region.virtual.VirtualTopic;
088import org.apache.activemq.broker.scheduler.JobSchedulerStore;
089import org.apache.activemq.broker.scheduler.SchedulerBroker;
090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
091import org.apache.activemq.command.ActiveMQDestination;
092import org.apache.activemq.command.ActiveMQQueue;
093import org.apache.activemq.command.BrokerId;
094import org.apache.activemq.command.ProducerInfo;
095import org.apache.activemq.filter.DestinationFilter;
096import org.apache.activemq.network.ConnectionFilter;
097import org.apache.activemq.network.DiscoveryNetworkConnector;
098import org.apache.activemq.network.NetworkConnector;
099import org.apache.activemq.network.jms.JmsConnector;
100import org.apache.activemq.openwire.OpenWireFormat;
101import org.apache.activemq.proxy.ProxyConnector;
102import org.apache.activemq.security.MessageAuthorizationPolicy;
103import org.apache.activemq.selector.SelectorParser;
104import org.apache.activemq.store.JournaledStore;
105import org.apache.activemq.store.PListStore;
106import org.apache.activemq.store.PersistenceAdapter;
107import org.apache.activemq.store.PersistenceAdapterFactory;
108import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
109import org.apache.activemq.thread.Scheduler;
110import org.apache.activemq.thread.TaskRunnerFactory;
111import org.apache.activemq.transport.TransportFactorySupport;
112import org.apache.activemq.transport.TransportServer;
113import org.apache.activemq.transport.vm.VMTransportFactory;
114import org.apache.activemq.usage.SystemUsage;
115import org.apache.activemq.util.BrokerSupport;
116import org.apache.activemq.util.DefaultIOExceptionHandler;
117import org.apache.activemq.util.IOExceptionHandler;
118import org.apache.activemq.util.IOExceptionSupport;
119import org.apache.activemq.util.IOHelper;
120import org.apache.activemq.util.InetAddressUtil;
121import org.apache.activemq.util.ServiceStopper;
122import org.apache.activemq.util.ThreadPoolUtils;
123import org.apache.activemq.util.TimeUtils;
124import org.apache.activemq.util.URISupport;
125import org.slf4j.Logger;
126import org.slf4j.LoggerFactory;
127import org.slf4j.MDC;
128
129/**
130 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
131 * number of transport connectors, network connectors and a bunch of properties
132 * which can be used to configure the broker as its lazily created.
133 *
134 * @org.apache.xbean.XBean
135 */
136public class BrokerService implements Service {
137    public static final String DEFAULT_PORT = "61616";
138    public static final String LOCAL_HOST_NAME;
139    public static final String BROKER_VERSION;
140    public static final String DEFAULT_BROKER_NAME = "localhost";
141    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
142    public static final long DEFAULT_START_TIMEOUT = 600000L;
143
144    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
145
146    @SuppressWarnings("unused")
147    private static final long serialVersionUID = 7353129142305630237L;
148
149    private boolean useJmx = true;
150    private boolean enableStatistics = true;
151    private boolean persistent = true;
152    private boolean populateJMSXUserID;
153    private boolean useAuthenticatedPrincipalForJMSXUserID;
154    private boolean populateUserNameInMBeans;
155    private long mbeanInvocationTimeout = 0;
156
157    private boolean useShutdownHook = true;
158    private boolean useLoggingForShutdownErrors;
159    private boolean shutdownOnMasterFailure;
160    private boolean shutdownOnSlaveFailure;
161    private boolean waitForSlave;
162    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
163    private boolean passiveSlave;
164    private String brokerName = DEFAULT_BROKER_NAME;
165    private File dataDirectoryFile;
166    private File tmpDataDirectory;
167    private Broker broker;
168    private BrokerView adminView;
169    private ManagementContext managementContext;
170    private ObjectName brokerObjectName;
171    private TaskRunnerFactory taskRunnerFactory;
172    private TaskRunnerFactory persistenceTaskRunnerFactory;
173    private SystemUsage systemUsage;
174    private SystemUsage producerSystemUsage;
175    private SystemUsage consumerSystemUsaage;
176    private PersistenceAdapter persistenceAdapter;
177    private PersistenceAdapterFactory persistenceFactory;
178    protected DestinationFactory destinationFactory;
179    private MessageAuthorizationPolicy messageAuthorizationPolicy;
180    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
181    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
182    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
183    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
184    private final List<Service> services = new ArrayList<Service>();
185    private transient Thread shutdownHook;
186    private String[] transportConnectorURIs;
187    private String[] networkConnectorURIs;
188    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
189    // to other jms messaging systems
190    private boolean deleteAllMessagesOnStartup;
191    private boolean advisorySupport = true;
192    private URI vmConnectorURI;
193    private String defaultSocketURIString;
194    private PolicyMap destinationPolicy;
195    private final AtomicBoolean started = new AtomicBoolean(false);
196    private final AtomicBoolean stopped = new AtomicBoolean(false);
197    private final AtomicBoolean stopping = new AtomicBoolean(false);
198    private BrokerPlugin[] plugins;
199    private boolean keepDurableSubsActive = true;
200    private boolean useVirtualTopics = true;
201    private boolean useMirroredQueues = false;
202    private boolean useTempMirroredQueues = true;
203    private BrokerId brokerId;
204    private volatile DestinationInterceptor[] destinationInterceptors;
205    private ActiveMQDestination[] destinations;
206    private PListStore tempDataStore;
207    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
208    private boolean useLocalHostBrokerName;
209    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
210    private final CountDownLatch startedLatch = new CountDownLatch(1);
211    private Broker regionBroker;
212    private int producerSystemUsagePortion = 60;
213    private int consumerSystemUsagePortion = 40;
214    private boolean splitSystemUsageForProducersConsumers;
215    private boolean monitorConnectionSplits = false;
216    private int taskRunnerPriority = Thread.NORM_PRIORITY;
217    private boolean dedicatedTaskRunner;
218    private boolean cacheTempDestinations = false;// useful for failover
219    private int timeBeforePurgeTempDestinations = 5000;
220    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
221    private boolean systemExitOnShutdown;
222    private int systemExitOnShutdownExitCode;
223    private SslContext sslContext;
224    private boolean forceStart = false;
225    private IOExceptionHandler ioExceptionHandler;
226    private boolean schedulerSupport = false;
227    private File schedulerDirectoryFile;
228    private Scheduler scheduler;
229    private ThreadPoolExecutor executor;
230    private int schedulePeriodForDestinationPurge= 0;
231    private int maxPurgedDestinationsPerSweep = 0;
232    private BrokerContext brokerContext;
233    private boolean networkConnectorStartAsync = false;
234    private boolean allowTempAutoCreationOnSend;
235    private JobSchedulerStore jobSchedulerStore;
236    private final AtomicLong totalConnections = new AtomicLong();
237    private final AtomicInteger currentConnections = new AtomicInteger();
238
239    private long offlineDurableSubscriberTimeout = -1;
240    private long offlineDurableSubscriberTaskSchedule = 300000;
241    private DestinationFilter virtualConsumerDestinationFilter;
242
243    private final Object persistenceAdapterLock = new Object();
244    private Throwable startException = null;
245    private boolean startAsync = false;
246    private Date startDate;
247    private boolean slave = true;
248
249    private boolean restartAllowed = true;
250    private boolean restartRequested = false;
251    private boolean rejectDurableConsumers = false;
252
253    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
254
255    static {
256
257        try {
258            ClassLoader loader = BrokerService.class.getClassLoader();
259            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
260            Provider bouncycastle = (Provider) clazz.newInstance();
261            Security.insertProviderAt(bouncycastle, 2);
262            LOG.info("Loaded the Bouncy Castle security provider.");
263        } catch(Throwable e) {
264            // No BouncyCastle found so we use the default Java Security Provider
265        }
266
267        String localHostName = "localhost";
268        try {
269            localHostName =  InetAddressUtil.getLocalHostName();
270        } catch (UnknownHostException e) {
271            LOG.error("Failed to resolve localhost");
272        }
273        LOCAL_HOST_NAME = localHostName;
274
275        InputStream in = null;
276        String version = null;
277        if ((in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
278            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
279            try {
280                version = reader.readLine();
281            } catch(Exception e) {
282            }
283        }
284        BROKER_VERSION = version;
285    }
286
287    @Override
288    public String toString() {
289        return "BrokerService[" + getBrokerName() + "]";
290    }
291
292    private String getBrokerVersion() {
293        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
294        if (version == null) {
295            version = BROKER_VERSION;
296        }
297
298        return version;
299    }
300
301    /**
302     * Adds a new transport connector for the given bind address
303     *
304     * @return the newly created and added transport connector
305     * @throws Exception
306     */
307    public TransportConnector addConnector(String bindAddress) throws Exception {
308        return addConnector(new URI(bindAddress));
309    }
310
311    /**
312     * Adds a new transport connector for the given bind address
313     *
314     * @return the newly created and added transport connector
315     * @throws Exception
316     */
317    public TransportConnector addConnector(URI bindAddress) throws Exception {
318        return addConnector(createTransportConnector(bindAddress));
319    }
320
321    /**
322     * Adds a new transport connector for the given TransportServer transport
323     *
324     * @return the newly created and added transport connector
325     * @throws Exception
326     */
327    public TransportConnector addConnector(TransportServer transport) throws Exception {
328        return addConnector(new TransportConnector(transport));
329    }
330
331    /**
332     * Adds a new transport connector
333     *
334     * @return the transport connector
335     * @throws Exception
336     */
337    public TransportConnector addConnector(TransportConnector connector) throws Exception {
338        transportConnectors.add(connector);
339        return connector;
340    }
341
342    /**
343     * Stops and removes a transport connector from the broker.
344     *
345     * @param connector
346     * @return true if the connector has been previously added to the broker
347     * @throws Exception
348     */
349    public boolean removeConnector(TransportConnector connector) throws Exception {
350        boolean rc = transportConnectors.remove(connector);
351        if (rc) {
352            unregisterConnectorMBean(connector);
353        }
354        return rc;
355    }
356
357    /**
358     * Adds a new network connector using the given discovery address
359     *
360     * @return the newly created and added network connector
361     * @throws Exception
362     */
363    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
364        return addNetworkConnector(new URI(discoveryAddress));
365    }
366
367    /**
368     * Adds a new proxy connector using the given bind address
369     *
370     * @return the newly created and added network connector
371     * @throws Exception
372     */
373    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
374        return addProxyConnector(new URI(bindAddress));
375    }
376
377    /**
378     * Adds a new network connector using the given discovery address
379     *
380     * @return the newly created and added network connector
381     * @throws Exception
382     */
383    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
384        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
385        return addNetworkConnector(connector);
386    }
387
388    /**
389     * Adds a new proxy connector using the given bind address
390     *
391     * @return the newly created and added network connector
392     * @throws Exception
393     */
394    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
395        ProxyConnector connector = new ProxyConnector();
396        connector.setBind(bindAddress);
397        connector.setRemote(new URI("fanout:multicast://default"));
398        return addProxyConnector(connector);
399    }
400
401    /**
402     * Adds a new network connector to connect this broker to a federated
403     * network
404     */
405    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
406        connector.setBrokerService(this);
407        URI uri = getVmConnectorURI();
408        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
409        map.put("network", "true");
410        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
411        connector.setLocalUri(uri);
412        // Set a connection filter so that the connector does not establish loop
413        // back connections.
414        connector.setConnectionFilter(new ConnectionFilter() {
415            @Override
416            public boolean connectTo(URI location) {
417                List<TransportConnector> transportConnectors = getTransportConnectors();
418                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
419                    try {
420                        TransportConnector tc = iter.next();
421                        if (location.equals(tc.getConnectUri())) {
422                            return false;
423                        }
424                    } catch (Throwable e) {
425                    }
426                }
427                return true;
428            }
429        });
430        networkConnectors.add(connector);
431        return connector;
432    }
433
434    /**
435     * Removes the given network connector without stopping it. The caller
436     * should call {@link NetworkConnector#stop()} to close the connector
437     */
438    public boolean removeNetworkConnector(NetworkConnector connector) {
439        boolean answer = networkConnectors.remove(connector);
440        if (answer) {
441            unregisterNetworkConnectorMBean(connector);
442        }
443        return answer;
444    }
445
446    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
447        URI uri = getVmConnectorURI();
448        connector.setLocalUri(uri);
449        proxyConnectors.add(connector);
450        if (isUseJmx()) {
451            registerProxyConnectorMBean(connector);
452        }
453        return connector;
454    }
455
456    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
457        connector.setBrokerService(this);
458        jmsConnectors.add(connector);
459        if (isUseJmx()) {
460            registerJmsConnectorMBean(connector);
461        }
462        return connector;
463    }
464
465    public JmsConnector removeJmsConnector(JmsConnector connector) {
466        if (jmsConnectors.remove(connector)) {
467            return connector;
468        }
469        return null;
470    }
471
472    public void masterFailed() {
473        if (shutdownOnMasterFailure) {
474            LOG.error("The Master has failed ... shutting down");
475            try {
476                stop();
477            } catch (Exception e) {
478                LOG.error("Failed to stop for master failure", e);
479            }
480        } else {
481            LOG.warn("Master Failed - starting all connectors");
482            try {
483                startAllConnectors();
484                broker.nowMasterBroker();
485            } catch (Exception e) {
486                LOG.error("Failed to startAllConnectors", e);
487            }
488        }
489    }
490
491    public String getUptime() {
492        long delta = getUptimeMillis();
493
494        if (delta == 0) {
495            return "not started";
496        }
497
498        return TimeUtils.printDuration(delta);
499    }
500
501    public long getUptimeMillis() {
502        if (startDate == null) {
503            return 0;
504        }
505
506        return new Date().getTime() - startDate.getTime();
507    }
508
509    public boolean isStarted() {
510        return started.get() && startedLatch.getCount() == 0;
511    }
512
513    /**
514     * Forces a start of the broker.
515     * By default a BrokerService instance that was
516     * previously stopped using BrokerService.stop() cannot be restarted
517     * using BrokerService.start().
518     * This method enforces a restart.
519     * It is not recommended to force a restart of the broker and will not work
520     * for most but some very trivial broker configurations.
521     * For restarting a broker instance we recommend to first call stop() on
522     * the old instance and then recreate a new BrokerService instance.
523     *
524     * @param force - if true enforces a restart.
525     * @throws Exception
526     */
527    public void start(boolean force) throws Exception {
528        forceStart = force;
529        stopped.set(false);
530        started.set(false);
531        start();
532    }
533
534    // Service interface
535    // -------------------------------------------------------------------------
536
537    protected boolean shouldAutostart() {
538        return true;
539    }
540
541    /**
542     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
543     *
544     * delegates to autoStart, done to prevent backwards incompatible signature change
545     */
546    @PostConstruct
547    private void postConstruct() {
548        try {
549            autoStart();
550        } catch (Exception ex) {
551            throw new RuntimeException(ex);
552        }
553    }
554
555    /**
556     *
557     * @throws Exception
558     * @org. apache.xbean.InitMethod
559     */
560    public void autoStart() throws Exception {
561        if(shouldAutostart()) {
562            start();
563        }
564    }
565
566    @Override
567    public void start() throws Exception {
568        if (stopped.get() || !started.compareAndSet(false, true)) {
569            // lets just ignore redundant start() calls
570            // as its way too easy to not be completely sure if start() has been
571            // called or not with the gazillion of different configuration
572            // mechanisms
573            // throw new IllegalStateException("Already started.");
574            return;
575        }
576
577        stopping.set(false);
578        startDate = new Date();
579        MDC.put("activemq.broker", brokerName);
580
581        try {
582            if (systemExitOnShutdown && useShutdownHook) {
583                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
584            }
585            processHelperProperties();
586            if (isUseJmx()) {
587                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
588                // we cannot cleanup clear that during shutdown of the broker.
589                MDC.remove("activemq.broker");
590                try {
591                    startManagementContext();
592                    for (NetworkConnector connector : getNetworkConnectors()) {
593                        registerNetworkConnectorMBean(connector);
594                    }
595                } finally {
596                    MDC.put("activemq.broker", brokerName);
597                }
598            }
599
600            // in jvm master slave, lets not publish over existing broker till we get the lock
601            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
602            if (brokerRegistry.lookup(getBrokerName()) == null) {
603                brokerRegistry.bind(getBrokerName(), BrokerService.this);
604            }
605            startPersistenceAdapter(startAsync);
606            startBroker(startAsync);
607            brokerRegistry.bind(getBrokerName(), BrokerService.this);
608        } catch (Exception e) {
609            LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
610            try {
611                if (!stopped.get()) {
612                    stop();
613                }
614            } catch (Exception ex) {
615                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
616            }
617            throw e;
618        } finally {
619            MDC.remove("activemq.broker");
620        }
621    }
622
623    private void startPersistenceAdapter(boolean async) throws Exception {
624        if (async) {
625            new Thread("Persistence Adapter Starting Thread") {
626                @Override
627                public void run() {
628                    try {
629                        doStartPersistenceAdapter();
630                    } catch (Throwable e) {
631                        startException = e;
632                    } finally {
633                        synchronized (persistenceAdapterLock) {
634                            persistenceAdapterLock.notifyAll();
635                        }
636                    }
637                }
638            }.start();
639        } else {
640            doStartPersistenceAdapter();
641        }
642    }
643
644    private void doStartPersistenceAdapter() throws Exception {
645        getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
646        getPersistenceAdapter().setBrokerName(getBrokerName());
647        LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter());
648        if (deleteAllMessagesOnStartup) {
649            deleteAllMessages();
650        }
651        getPersistenceAdapter().start();
652
653        getJobSchedulerStore();
654        if (jobSchedulerStore != null) {
655            try {
656                jobSchedulerStore.start();
657            } catch (Exception e) {
658                RuntimeException exception = new RuntimeException(
659                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
660                LOG.error(exception.getLocalizedMessage(), e);
661                throw exception;
662            }
663        }
664    }
665
666    private void startBroker(boolean async) throws Exception {
667        if (async) {
668            new Thread("Broker Starting Thread") {
669                @Override
670                public void run() {
671                    try {
672                        synchronized (persistenceAdapterLock) {
673                            persistenceAdapterLock.wait();
674                        }
675                        doStartBroker();
676                    } catch (Throwable t) {
677                        startException = t;
678                    }
679                }
680            }.start();
681        } else {
682            doStartBroker();
683        }
684    }
685
686    private void doStartBroker() throws Exception {
687        if (startException != null) {
688            return;
689        }
690        startDestinations();
691        addShutdownHook();
692
693        broker = getBroker();
694        brokerId = broker.getBrokerId();
695
696        // need to log this after creating the broker so we have its id and name
697        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
698        broker.start();
699
700        if (isUseJmx()) {
701            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
702                // try to restart management context
703                // typical for slaves that use the same ports as master
704                managementContext.stop();
705                startManagementContext();
706            }
707            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
708            managedBroker.setContextBroker(broker);
709            adminView.setBroker(managedBroker);
710        }
711
712        if (ioExceptionHandler == null) {
713            setIoExceptionHandler(new DefaultIOExceptionHandler());
714        }
715
716        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
717            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
718            Log4JConfigView log4jConfigView = new Log4JConfigView();
719            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
720        }
721
722        startAllConnectors();
723
724        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
725        LOG.info("For help or more information please see: http://activemq.apache.org");
726
727        getBroker().brokerServiceStarted();
728        checkSystemUsageLimits();
729        startedLatch.countDown();
730        getBroker().nowMasterBroker();
731    }
732
733    /**
734     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
735     *
736     * delegates to stop, done to prevent backwards incompatible signature change
737     */
738    @PreDestroy
739    private void preDestroy () {
740        try {
741            stop();
742        } catch (Exception ex) {
743            throw new RuntimeException();
744        }
745    }
746
747    /**
748     *
749     * @throws Exception
750     * @org.apache .xbean.DestroyMethod
751     */
752    @Override
753    public void stop() throws Exception {
754        if (!stopping.compareAndSet(false, true)) {
755            LOG.trace("Broker already stopping/stopped");
756            return;
757        }
758
759        MDC.put("activemq.broker", brokerName);
760
761        if (systemExitOnShutdown) {
762            new Thread() {
763                @Override
764                public void run() {
765                    System.exit(systemExitOnShutdownExitCode);
766                }
767            }.start();
768        }
769
770        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
771
772        removeShutdownHook();
773        if (this.scheduler != null) {
774            this.scheduler.stop();
775            this.scheduler = null;
776        }
777        ServiceStopper stopper = new ServiceStopper();
778        if (services != null) {
779            for (Service service : services) {
780                stopper.stop(service);
781            }
782        }
783        stopAllConnectors(stopper);
784        this.slave = true;
785        // remove any VMTransports connected
786        // this has to be done after services are stopped,
787        // to avoid timing issue with discovery (spinning up a new instance)
788        BrokerRegistry.getInstance().unbind(getBrokerName());
789        VMTransportFactory.stopped(getBrokerName());
790        if (broker != null) {
791            stopper.stop(broker);
792            broker = null;
793        }
794
795        if (jobSchedulerStore != null) {
796            jobSchedulerStore.stop();
797            jobSchedulerStore = null;
798        }
799        if (tempDataStore != null) {
800            tempDataStore.stop();
801            tempDataStore = null;
802        }
803        try {
804            stopper.stop(persistenceAdapter);
805            persistenceAdapter = null;
806            if (isUseJmx()) {
807                stopper.stop(getManagementContext());
808                managementContext = null;
809            }
810            // Clear SelectorParser cache to free memory
811            SelectorParser.clearCache();
812        } finally {
813            started.set(false);
814            stopped.set(true);
815            stoppedLatch.countDown();
816        }
817
818        if (this.taskRunnerFactory != null) {
819            this.taskRunnerFactory.shutdown();
820            this.taskRunnerFactory = null;
821        }
822        if (this.executor != null) {
823            ThreadPoolUtils.shutdownNow(executor);
824            this.executor = null;
825        }
826
827        this.destinationInterceptors = null;
828        this.destinationFactory = null;
829
830        if (startDate != null) {
831            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
832        }
833        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
834
835        synchronized (shutdownHooks) {
836            for (Runnable hook : shutdownHooks) {
837                try {
838                    hook.run();
839                } catch (Throwable e) {
840                    stopper.onException(hook, e);
841                }
842            }
843        }
844
845        MDC.remove("activemq.broker");
846
847        // and clear start date
848        startDate = null;
849
850        stopper.throwFirstException();
851    }
852
853    public boolean checkQueueSize(String queueName) {
854        long count = 0;
855        long queueSize = 0;
856        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
857        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
858            if (entry.getKey().isQueue()) {
859                if (entry.getValue().getName().matches(queueName)) {
860                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
861                    count += queueSize;
862                    if (queueSize > 0) {
863                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
864                    }
865                }
866            }
867        }
868        return count == 0;
869    }
870
871    /**
872     * This method (both connectorName and queueName are using regex to match)
873     * 1. stop the connector (supposed the user input the connector which the
874     * clients connect to) 2. to check whether there is any pending message on
875     * the queues defined by queueName 3. supposedly, after stop the connector,
876     * client should failover to other broker and pending messages should be
877     * forwarded. if no pending messages, the method finally call stop to stop
878     * the broker.
879     *
880     * @param connectorName
881     * @param queueName
882     * @param timeout
883     * @param pollInterval
884     * @throws Exception
885     */
886    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
887        if (isUseJmx()) {
888            if (connectorName == null || queueName == null || timeout <= 0) {
889                throw new Exception(
890                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
891            }
892            if (pollInterval <= 0) {
893                pollInterval = 30;
894            }
895            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
896                    connectorName, queueName, timeout, pollInterval
897            });
898            TransportConnector connector;
899            for (int i = 0; i < transportConnectors.size(); i++) {
900                connector = transportConnectors.get(i);
901                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
902                    connector.stop();
903                }
904            }
905            long start = System.currentTimeMillis();
906            while (System.currentTimeMillis() - start < timeout * 1000) {
907                // check quesize until it gets zero
908                if (checkQueueSize(queueName)) {
909                    stop();
910                    break;
911                } else {
912                    Thread.sleep(pollInterval * 1000);
913                }
914            }
915            if (stopped.get()) {
916                LOG.info("Successfully stop the broker.");
917            } else {
918                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
919            }
920        }
921    }
922
923    /**
924     * A helper method to block the caller thread until the broker has been
925     * stopped
926     */
927    public void waitUntilStopped() {
928        while (isStarted() && !stopped.get()) {
929            try {
930                stoppedLatch.await();
931            } catch (InterruptedException e) {
932                // ignore
933            }
934        }
935    }
936
937    public boolean isStopped() {
938        return stopped.get();
939    }
940
941    /**
942     * A helper method to block the caller thread until the broker has fully started
943     * @return boolean true if wait succeeded false if broker was not started or was stopped
944     */
945    public boolean waitUntilStarted() {
946        return waitUntilStarted(DEFAULT_START_TIMEOUT);
947    }
948
949    /**
950     * A helper method to block the caller thread until the broker has fully started
951     *
952     * @param timeout
953     *        the amount of time to wait before giving up and returning false.
954     *
955     * @return boolean true if wait succeeded false if broker was not started or was stopped
956     */
957    public boolean waitUntilStarted(long timeout) {
958        boolean waitSucceeded = isStarted();
959        long expiration = Math.max(0, timeout + System.currentTimeMillis());
960        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
961            try {
962                if (startException != null) {
963                    return waitSucceeded;
964                }
965                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
966            } catch (InterruptedException ignore) {
967            }
968        }
969        return waitSucceeded;
970    }
971
972    // Properties
973    // -------------------------------------------------------------------------
974    /**
975     * Returns the message broker
976     */
977    public Broker getBroker() throws Exception {
978        if (broker == null) {
979            broker = createBroker();
980        }
981        return broker;
982    }
983
984    /**
985     * Returns the administration view of the broker; used to create and destroy
986     * resources such as queues and topics. Note this method returns null if JMX
987     * is disabled.
988     */
989    public BrokerView getAdminView() throws Exception {
990        if (adminView == null) {
991            // force lazy creation
992            getBroker();
993        }
994        return adminView;
995    }
996
997    public void setAdminView(BrokerView adminView) {
998        this.adminView = adminView;
999    }
1000
1001    public String getBrokerName() {
1002        return brokerName;
1003    }
1004
1005    /**
1006     * Sets the name of this broker; which must be unique in the network
1007     *
1008     * @param brokerName
1009     */
1010    public void setBrokerName(String brokerName) {
1011        if (brokerName == null) {
1012            throw new NullPointerException("The broker name cannot be null");
1013        }
1014        String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
1015        if (!str.equals(brokerName)) {
1016            LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str);
1017        }
1018        this.brokerName = str.trim();
1019    }
1020
1021    public PersistenceAdapterFactory getPersistenceFactory() {
1022        return persistenceFactory;
1023    }
1024
1025    public File getDataDirectoryFile() {
1026        if (dataDirectoryFile == null) {
1027            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
1028        }
1029        return dataDirectoryFile;
1030    }
1031
1032    public File getBrokerDataDirectory() {
1033        String brokerDir = getBrokerName();
1034        return new File(getDataDirectoryFile(), brokerDir);
1035    }
1036
1037    /**
1038     * Sets the directory in which the data files will be stored by default for
1039     * the JDBC and Journal persistence adaptors.
1040     *
1041     * @param dataDirectory
1042     *            the directory to store data files
1043     */
1044    public void setDataDirectory(String dataDirectory) {
1045        setDataDirectoryFile(new File(dataDirectory));
1046    }
1047
1048    /**
1049     * Sets the directory in which the data files will be stored by default for
1050     * the JDBC and Journal persistence adaptors.
1051     *
1052     * @param dataDirectoryFile
1053     *            the directory to store data files
1054     */
1055    public void setDataDirectoryFile(File dataDirectoryFile) {
1056        this.dataDirectoryFile = dataDirectoryFile;
1057    }
1058
1059    /**
1060     * @return the tmpDataDirectory
1061     */
1062    public File getTmpDataDirectory() {
1063        if (tmpDataDirectory == null) {
1064            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1065        }
1066        return tmpDataDirectory;
1067    }
1068
1069    /**
1070     * @param tmpDataDirectory
1071     *            the tmpDataDirectory to set
1072     */
1073    public void setTmpDataDirectory(File tmpDataDirectory) {
1074        this.tmpDataDirectory = tmpDataDirectory;
1075    }
1076
1077    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1078        this.persistenceFactory = persistenceFactory;
1079    }
1080
1081    public void setDestinationFactory(DestinationFactory destinationFactory) {
1082        this.destinationFactory = destinationFactory;
1083    }
1084
1085    public boolean isPersistent() {
1086        return persistent;
1087    }
1088
1089    /**
1090     * Sets whether or not persistence is enabled or disabled.
1091     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1092     */
1093    public void setPersistent(boolean persistent) {
1094        this.persistent = persistent;
1095    }
1096
1097    public boolean isPopulateJMSXUserID() {
1098        return populateJMSXUserID;
1099    }
1100
1101    /**
1102     * Sets whether or not the broker should populate the JMSXUserID header.
1103     */
1104    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1105        this.populateJMSXUserID = populateJMSXUserID;
1106    }
1107
1108    public SystemUsage getSystemUsage() {
1109        try {
1110            if (systemUsage == null) {
1111
1112                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1113                systemUsage.setExecutor(getExecutor());
1114                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
1115                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1116                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1117                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1118                addService(this.systemUsage);
1119            }
1120            return systemUsage;
1121        } catch (IOException e) {
1122            LOG.error("Cannot create SystemUsage", e);
1123            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
1124        }
1125    }
1126
1127    public void setSystemUsage(SystemUsage memoryManager) {
1128        if (this.systemUsage != null) {
1129            removeService(this.systemUsage);
1130        }
1131        this.systemUsage = memoryManager;
1132        if (this.systemUsage.getExecutor()==null) {
1133            this.systemUsage.setExecutor(getExecutor());
1134        }
1135        addService(this.systemUsage);
1136    }
1137
1138    /**
1139     * @return the consumerUsageManager
1140     * @throws IOException
1141     */
1142    public SystemUsage getConsumerSystemUsage() throws IOException {
1143        if (this.consumerSystemUsaage == null) {
1144            if (splitSystemUsageForProducersConsumers) {
1145                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1146                float portion = consumerSystemUsagePortion / 100f;
1147                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1148                addService(this.consumerSystemUsaage);
1149            } else {
1150                consumerSystemUsaage = getSystemUsage();
1151            }
1152        }
1153        return this.consumerSystemUsaage;
1154    }
1155
1156    /**
1157     * @param consumerSystemUsaage
1158     *            the storeSystemUsage to set
1159     */
1160    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1161        if (this.consumerSystemUsaage != null) {
1162            removeService(this.consumerSystemUsaage);
1163        }
1164        this.consumerSystemUsaage = consumerSystemUsaage;
1165        addService(this.consumerSystemUsaage);
1166    }
1167
1168    /**
1169     * @return the producerUsageManager
1170     * @throws IOException
1171     */
1172    public SystemUsage getProducerSystemUsage() throws IOException {
1173        if (producerSystemUsage == null) {
1174            if (splitSystemUsageForProducersConsumers) {
1175                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1176                float portion = producerSystemUsagePortion / 100f;
1177                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1178                addService(producerSystemUsage);
1179            } else {
1180                producerSystemUsage = getSystemUsage();
1181            }
1182        }
1183        return producerSystemUsage;
1184    }
1185
1186    /**
1187     * @param producerUsageManager
1188     *            the producerUsageManager to set
1189     */
1190    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1191        if (this.producerSystemUsage != null) {
1192            removeService(this.producerSystemUsage);
1193        }
1194        this.producerSystemUsage = producerUsageManager;
1195        addService(this.producerSystemUsage);
1196    }
1197
1198    public PersistenceAdapter getPersistenceAdapter() throws IOException {
1199        if (persistenceAdapter == null) {
1200            persistenceAdapter = createPersistenceAdapter();
1201            configureService(persistenceAdapter);
1202            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1203        }
1204        return persistenceAdapter;
1205    }
1206
1207    /**
1208     * Sets the persistence adaptor implementation to use for this broker
1209     *
1210     * @throws IOException
1211     */
1212    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1213        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1214            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
1215            return;
1216        }
1217        this.persistenceAdapter = persistenceAdapter;
1218        configureService(this.persistenceAdapter);
1219        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1220    }
1221
1222    public TaskRunnerFactory getTaskRunnerFactory() {
1223        if (this.taskRunnerFactory == null) {
1224            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1225                    isDedicatedTaskRunner());
1226            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
1227        }
1228        return this.taskRunnerFactory;
1229    }
1230
1231    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1232        this.taskRunnerFactory = taskRunnerFactory;
1233    }
1234
1235    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1236        if (taskRunnerFactory == null) {
1237            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1238                    true, 1000, isDedicatedTaskRunner());
1239        }
1240        return persistenceTaskRunnerFactory;
1241    }
1242
1243    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1244        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1245    }
1246
1247    public boolean isUseJmx() {
1248        return useJmx;
1249    }
1250
1251    public boolean isEnableStatistics() {
1252        return enableStatistics;
1253    }
1254
1255    /**
1256     * Sets whether or not the Broker's services enable statistics or not.
1257     */
1258    public void setEnableStatistics(boolean enableStatistics) {
1259        this.enableStatistics = enableStatistics;
1260    }
1261
1262    /**
1263     * Sets whether or not the Broker's services should be exposed into JMX or
1264     * not.
1265     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1266     */
1267    public void setUseJmx(boolean useJmx) {
1268        this.useJmx = useJmx;
1269    }
1270
1271    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1272        if (brokerObjectName == null) {
1273            brokerObjectName = createBrokerObjectName();
1274        }
1275        return brokerObjectName;
1276    }
1277
1278    /**
1279     * Sets the JMX ObjectName for this broker
1280     */
1281    public void setBrokerObjectName(ObjectName brokerObjectName) {
1282        this.brokerObjectName = brokerObjectName;
1283    }
1284
1285    public ManagementContext getManagementContext() {
1286        if (managementContext == null) {
1287            managementContext = new ManagementContext();
1288        }
1289        return managementContext;
1290    }
1291
1292    public void setManagementContext(ManagementContext managementContext) {
1293        this.managementContext = managementContext;
1294    }
1295
1296    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1297        for (NetworkConnector connector : networkConnectors) {
1298            if (connector.getName().equals(connectorName)) {
1299                return connector;
1300            }
1301        }
1302        return null;
1303    }
1304
1305    public String[] getNetworkConnectorURIs() {
1306        return networkConnectorURIs;
1307    }
1308
1309    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1310        this.networkConnectorURIs = networkConnectorURIs;
1311    }
1312
1313    public TransportConnector getConnectorByName(String connectorName) {
1314        for (TransportConnector connector : transportConnectors) {
1315            if (connector.getName().equals(connectorName)) {
1316                return connector;
1317            }
1318        }
1319        return null;
1320    }
1321
1322    public Map<String, String> getTransportConnectorURIsAsMap() {
1323        Map<String, String> answer = new HashMap<String, String>();
1324        for (TransportConnector connector : transportConnectors) {
1325            try {
1326                URI uri = connector.getConnectUri();
1327                if (uri != null) {
1328                    String scheme = uri.getScheme();
1329                    if (scheme != null) {
1330                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1331                    }
1332                }
1333            } catch (Exception e) {
1334                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1335            }
1336        }
1337        return answer;
1338    }
1339
1340    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
1341        ProducerBrokerExchange result = null;
1342
1343        for (TransportConnector connector : transportConnectors) {
1344            for (TransportConnection tc: connector.getConnections()){
1345                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
1346                if (result !=null){
1347                    return result;
1348                }
1349            }
1350        }
1351        return result;
1352    }
1353
1354    public String[] getTransportConnectorURIs() {
1355        return transportConnectorURIs;
1356    }
1357
1358    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1359        this.transportConnectorURIs = transportConnectorURIs;
1360    }
1361
1362    /**
1363     * @return Returns the jmsBridgeConnectors.
1364     */
1365    public JmsConnector[] getJmsBridgeConnectors() {
1366        return jmsBridgeConnectors;
1367    }
1368
1369    /**
1370     * @param jmsConnectors
1371     *            The jmsBridgeConnectors to set.
1372     */
1373    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1374        this.jmsBridgeConnectors = jmsConnectors;
1375    }
1376
1377    public Service[] getServices() {
1378        return services.toArray(new Service[0]);
1379    }
1380
1381    /**
1382     * Sets the services associated with this broker.
1383     */
1384    public void setServices(Service[] services) {
1385        this.services.clear();
1386        if (services != null) {
1387            for (int i = 0; i < services.length; i++) {
1388                this.services.add(services[i]);
1389            }
1390        }
1391    }
1392
1393    /**
1394     * Adds a new service so that it will be started as part of the broker
1395     * lifecycle
1396     */
1397    public void addService(Service service) {
1398        services.add(service);
1399    }
1400
1401    public void removeService(Service service) {
1402        services.remove(service);
1403    }
1404
1405    public boolean isUseLoggingForShutdownErrors() {
1406        return useLoggingForShutdownErrors;
1407    }
1408
1409    /**
1410     * Sets whether or not we should use commons-logging when reporting errors
1411     * when shutting down the broker
1412     */
1413    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1414        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1415    }
1416
1417    public boolean isUseShutdownHook() {
1418        return useShutdownHook;
1419    }
1420
1421    /**
1422     * Sets whether or not we should use a shutdown handler to close down the
1423     * broker cleanly if the JVM is terminated. It is recommended you leave this
1424     * enabled.
1425     */
1426    public void setUseShutdownHook(boolean useShutdownHook) {
1427        this.useShutdownHook = useShutdownHook;
1428    }
1429
1430    public boolean isAdvisorySupport() {
1431        return advisorySupport;
1432    }
1433
1434    /**
1435     * Allows the support of advisory messages to be disabled for performance
1436     * reasons.
1437     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1438     */
1439    public void setAdvisorySupport(boolean advisorySupport) {
1440        this.advisorySupport = advisorySupport;
1441    }
1442
1443    public List<TransportConnector> getTransportConnectors() {
1444        return new ArrayList<TransportConnector>(transportConnectors);
1445    }
1446
1447    /**
1448     * Sets the transport connectors which this broker will listen on for new
1449     * clients
1450     *
1451     * @org.apache.xbean.Property
1452     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1453     */
1454    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1455        for (TransportConnector connector : transportConnectors) {
1456            addConnector(connector);
1457        }
1458    }
1459
1460    public TransportConnector getTransportConnectorByName(String name){
1461        for (TransportConnector transportConnector : transportConnectors){
1462           if (name.equals(transportConnector.getName())){
1463               return transportConnector;
1464           }
1465        }
1466        return null;
1467    }
1468
1469    public TransportConnector getTransportConnectorByScheme(String scheme){
1470        for (TransportConnector transportConnector : transportConnectors){
1471            if (scheme.equals(transportConnector.getUri().getScheme())){
1472                return transportConnector;
1473            }
1474        }
1475        return null;
1476    }
1477
1478    public List<NetworkConnector> getNetworkConnectors() {
1479        return new ArrayList<NetworkConnector>(networkConnectors);
1480    }
1481
1482    public List<ProxyConnector> getProxyConnectors() {
1483        return new ArrayList<ProxyConnector>(proxyConnectors);
1484    }
1485
1486    /**
1487     * Sets the network connectors which this broker will use to connect to
1488     * other brokers in a federated network
1489     *
1490     * @org.apache.xbean.Property
1491     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1492     */
1493    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1494        for (Object connector : networkConnectors) {
1495            addNetworkConnector((NetworkConnector) connector);
1496        }
1497    }
1498
1499    /**
1500     * Sets the network connectors which this broker will use to connect to
1501     * other brokers in a federated network
1502     */
1503    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1504        for (Object connector : proxyConnectors) {
1505            addProxyConnector((ProxyConnector) connector);
1506        }
1507    }
1508
1509    public PolicyMap getDestinationPolicy() {
1510        return destinationPolicy;
1511    }
1512
1513    /**
1514     * Sets the destination specific policies available either for exact
1515     * destinations or for wildcard areas of destinations.
1516     */
1517    public void setDestinationPolicy(PolicyMap policyMap) {
1518        this.destinationPolicy = policyMap;
1519    }
1520
1521    public BrokerPlugin[] getPlugins() {
1522        return plugins;
1523    }
1524
1525    /**
1526     * Sets a number of broker plugins to install such as for security
1527     * authentication or authorization
1528     */
1529    public void setPlugins(BrokerPlugin[] plugins) {
1530        this.plugins = plugins;
1531    }
1532
1533    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1534        return messageAuthorizationPolicy;
1535    }
1536
1537    /**
1538     * Sets the policy used to decide if the current connection is authorized to
1539     * consume a given message
1540     */
1541    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1542        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1543    }
1544
1545    /**
1546     * Delete all messages from the persistent store
1547     *
1548     * @throws IOException
1549     */
1550    public void deleteAllMessages() throws IOException {
1551        getPersistenceAdapter().deleteAllMessages();
1552    }
1553
1554    public boolean isDeleteAllMessagesOnStartup() {
1555        return deleteAllMessagesOnStartup;
1556    }
1557
1558    /**
1559     * Sets whether or not all messages are deleted on startup - mostly only
1560     * useful for testing.
1561     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1562     */
1563    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1564        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1565    }
1566
1567    public URI getVmConnectorURI() {
1568        if (vmConnectorURI == null) {
1569            try {
1570                vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1571            } catch (URISyntaxException e) {
1572                LOG.error("Badly formed URI from {}", getBrokerName(), e);
1573            }
1574        }
1575        return vmConnectorURI;
1576    }
1577
1578    public void setVmConnectorURI(URI vmConnectorURI) {
1579        this.vmConnectorURI = vmConnectorURI;
1580    }
1581
1582    public String getDefaultSocketURIString() {
1583        if (started.get()) {
1584            if (this.defaultSocketURIString == null) {
1585                for (TransportConnector tc:this.transportConnectors) {
1586                    String result = null;
1587                    try {
1588                        result = tc.getPublishableConnectString();
1589                    } catch (Exception e) {
1590                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
1591                    }
1592                    if (result != null) {
1593                        // find first publishable uri
1594                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1595                            this.defaultSocketURIString = result;
1596                            break;
1597                        } else {
1598                        // or use the first defined
1599                            if (this.defaultSocketURIString == null) {
1600                                this.defaultSocketURIString = result;
1601                            }
1602                        }
1603                    }
1604                }
1605
1606            }
1607            return this.defaultSocketURIString;
1608        }
1609       return null;
1610    }
1611
1612    /**
1613     * @return Returns the shutdownOnMasterFailure.
1614     */
1615    public boolean isShutdownOnMasterFailure() {
1616        return shutdownOnMasterFailure;
1617    }
1618
1619    /**
1620     * @param shutdownOnMasterFailure
1621     *            The shutdownOnMasterFailure to set.
1622     */
1623    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1624        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1625    }
1626
1627    public boolean isKeepDurableSubsActive() {
1628        return keepDurableSubsActive;
1629    }
1630
1631    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1632        this.keepDurableSubsActive = keepDurableSubsActive;
1633    }
1634
1635    public boolean isUseVirtualTopics() {
1636        return useVirtualTopics;
1637    }
1638
1639    /**
1640     * Sets whether or not <a
1641     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1642     * Topics</a> should be supported by default if they have not been
1643     * explicitly configured.
1644     */
1645    public void setUseVirtualTopics(boolean useVirtualTopics) {
1646        this.useVirtualTopics = useVirtualTopics;
1647    }
1648
1649    public DestinationInterceptor[] getDestinationInterceptors() {
1650        return destinationInterceptors;
1651    }
1652
1653    public boolean isUseMirroredQueues() {
1654        return useMirroredQueues;
1655    }
1656
1657    /**
1658     * Sets whether or not <a
1659     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1660     * Queues</a> should be supported by default if they have not been
1661     * explicitly configured.
1662     */
1663    public void setUseMirroredQueues(boolean useMirroredQueues) {
1664        this.useMirroredQueues = useMirroredQueues;
1665    }
1666
1667    /**
1668     * Sets the destination interceptors to use
1669     */
1670    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1671        this.destinationInterceptors = destinationInterceptors;
1672    }
1673
1674    public ActiveMQDestination[] getDestinations() {
1675        return destinations;
1676    }
1677
1678    /**
1679     * Sets the destinations which should be loaded/created on startup
1680     */
1681    public void setDestinations(ActiveMQDestination[] destinations) {
1682        this.destinations = destinations;
1683    }
1684
1685    /**
1686     * @return the tempDataStore
1687     */
1688    public synchronized PListStore getTempDataStore() {
1689        if (tempDataStore == null) {
1690            if (!isPersistent()) {
1691                return null;
1692            }
1693
1694            try {
1695                PersistenceAdapter pa = getPersistenceAdapter();
1696                if( pa!=null && pa instanceof PListStore) {
1697                    return (PListStore) pa;
1698                }
1699            } catch (IOException e) {
1700                throw new RuntimeException(e);
1701            }
1702
1703            boolean result = true;
1704            boolean empty = true;
1705            try {
1706                File directory = getTmpDataDirectory();
1707                if (directory.exists() && directory.isDirectory()) {
1708                    File[] files = directory.listFiles();
1709                    if (files != null && files.length > 0) {
1710                        empty = false;
1711                        for (int i = 0; i < files.length; i++) {
1712                            File file = files[i];
1713                            if (!file.isDirectory()) {
1714                                result &= file.delete();
1715                            }
1716                        }
1717                    }
1718                }
1719                if (!empty) {
1720                    String str = result ? "Successfully deleted" : "Failed to delete";
1721                    LOG.info("{} temporary storage", str);
1722                }
1723
1724                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1725                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1726                this.tempDataStore.setDirectory(getTmpDataDirectory());
1727                configureService(tempDataStore);
1728                this.tempDataStore.start();
1729            } catch (Exception e) {
1730                throw new RuntimeException(e);
1731            }
1732        }
1733        return tempDataStore;
1734    }
1735
1736    /**
1737     * @param tempDataStore
1738     *            the tempDataStore to set
1739     */
1740    public void setTempDataStore(PListStore tempDataStore) {
1741        this.tempDataStore = tempDataStore;
1742        configureService(tempDataStore);
1743        try {
1744            tempDataStore.start();
1745        } catch (Exception e) {
1746            RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1747            LOG.error(exception.getLocalizedMessage(), e);
1748            throw exception;
1749        }
1750    }
1751
1752    public int getPersistenceThreadPriority() {
1753        return persistenceThreadPriority;
1754    }
1755
1756    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1757        this.persistenceThreadPriority = persistenceThreadPriority;
1758    }
1759
1760    /**
1761     * @return the useLocalHostBrokerName
1762     */
1763    public boolean isUseLocalHostBrokerName() {
1764        return this.useLocalHostBrokerName;
1765    }
1766
1767    /**
1768     * @param useLocalHostBrokerName
1769     *            the useLocalHostBrokerName to set
1770     */
1771    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1772        this.useLocalHostBrokerName = useLocalHostBrokerName;
1773        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1774            brokerName = LOCAL_HOST_NAME;
1775        }
1776    }
1777
1778    /**
1779     * Looks up and lazily creates if necessary the destination for the given
1780     * JMS name
1781     */
1782    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1783        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1784    }
1785
1786    public void removeDestination(ActiveMQDestination destination) throws Exception {
1787        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1788    }
1789
1790    public int getProducerSystemUsagePortion() {
1791        return producerSystemUsagePortion;
1792    }
1793
1794    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1795        this.producerSystemUsagePortion = producerSystemUsagePortion;
1796    }
1797
1798    public int getConsumerSystemUsagePortion() {
1799        return consumerSystemUsagePortion;
1800    }
1801
1802    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1803        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1804    }
1805
1806    public boolean isSplitSystemUsageForProducersConsumers() {
1807        return splitSystemUsageForProducersConsumers;
1808    }
1809
1810    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1811        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1812    }
1813
1814    public boolean isMonitorConnectionSplits() {
1815        return monitorConnectionSplits;
1816    }
1817
1818    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1819        this.monitorConnectionSplits = monitorConnectionSplits;
1820    }
1821
1822    public int getTaskRunnerPriority() {
1823        return taskRunnerPriority;
1824    }
1825
1826    public void setTaskRunnerPriority(int taskRunnerPriority) {
1827        this.taskRunnerPriority = taskRunnerPriority;
1828    }
1829
1830    public boolean isDedicatedTaskRunner() {
1831        return dedicatedTaskRunner;
1832    }
1833
1834    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1835        this.dedicatedTaskRunner = dedicatedTaskRunner;
1836    }
1837
1838    public boolean isCacheTempDestinations() {
1839        return cacheTempDestinations;
1840    }
1841
1842    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1843        this.cacheTempDestinations = cacheTempDestinations;
1844    }
1845
1846    public int getTimeBeforePurgeTempDestinations() {
1847        return timeBeforePurgeTempDestinations;
1848    }
1849
1850    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1851        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1852    }
1853
1854    public boolean isUseTempMirroredQueues() {
1855        return useTempMirroredQueues;
1856    }
1857
1858    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1859        this.useTempMirroredQueues = useTempMirroredQueues;
1860    }
1861
1862    public synchronized JobSchedulerStore getJobSchedulerStore() {
1863
1864        // If support is off don't allow any scheduler even is user configured their own.
1865        if (!isSchedulerSupport()) {
1866            return null;
1867        }
1868
1869        // If the user configured their own we use it even if persistence is disabled since
1870        // we don't know anything about their implementation.
1871        if (jobSchedulerStore == null) {
1872
1873            if (!isPersistent()) {
1874                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
1875                configureService(jobSchedulerStore);
1876                return this.jobSchedulerStore;
1877            }
1878
1879            try {
1880                PersistenceAdapter pa = getPersistenceAdapter();
1881                if (pa != null) {
1882                    this.jobSchedulerStore = pa.createJobSchedulerStore();
1883                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1884                    configureService(jobSchedulerStore);
1885                    return this.jobSchedulerStore;
1886                }
1887            } catch (IOException e) {
1888                throw new RuntimeException(e);
1889            } catch (UnsupportedOperationException ex) {
1890                // It's ok if the store doesn't implement a scheduler.
1891            } catch (Exception e) {
1892                throw new RuntimeException(e);
1893            }
1894
1895            try {
1896                PersistenceAdapter pa = getPersistenceAdapter();
1897                if (pa != null && pa instanceof JobSchedulerStore) {
1898                    this.jobSchedulerStore = (JobSchedulerStore) pa;
1899                    configureService(jobSchedulerStore);
1900                    return this.jobSchedulerStore;
1901                }
1902            } catch (IOException e) {
1903                throw new RuntimeException(e);
1904            }
1905
1906            // Load the KahaDB store as a last resort, this only works if KahaDB is
1907            // included at runtime, otherwise this will fail.  User should disable
1908            // scheduler support if this fails.
1909            try {
1910                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
1911                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
1912                jobSchedulerStore = adaptor.createJobSchedulerStore();
1913                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1914                configureService(jobSchedulerStore);
1915                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
1916            } catch (Exception e) {
1917                throw new RuntimeException(e);
1918            }
1919        }
1920        return jobSchedulerStore;
1921    }
1922
1923    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1924        this.jobSchedulerStore = jobSchedulerStore;
1925        configureService(jobSchedulerStore);
1926    }
1927
1928    //
1929    // Implementation methods
1930    // -------------------------------------------------------------------------
1931    /**
1932     * Handles any lazy-creation helper properties which are added to make
1933     * things easier to configure inside environments such as Spring
1934     *
1935     * @throws Exception
1936     */
1937    protected void processHelperProperties() throws Exception {
1938        if (transportConnectorURIs != null) {
1939            for (int i = 0; i < transportConnectorURIs.length; i++) {
1940                String uri = transportConnectorURIs[i];
1941                addConnector(uri);
1942            }
1943        }
1944        if (networkConnectorURIs != null) {
1945            for (int i = 0; i < networkConnectorURIs.length; i++) {
1946                String uri = networkConnectorURIs[i];
1947                addNetworkConnector(uri);
1948            }
1949        }
1950        if (jmsBridgeConnectors != null) {
1951            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1952                addJmsConnector(jmsBridgeConnectors[i]);
1953            }
1954        }
1955    }
1956
1957    protected void checkSystemUsageLimits() throws IOException {
1958        SystemUsage usage = getSystemUsage();
1959        long memLimit = usage.getMemoryUsage().getLimit();
1960        long jvmLimit = Runtime.getRuntime().maxMemory();
1961
1962        if (memLimit > jvmLimit) {
1963            usage.getMemoryUsage().setPercentOfJvmHeap(70);
1964            LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1965                    " mb) is more than the maximum available for the JVM: " +
1966                    jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
1967        }
1968
1969        if (getPersistenceAdapter() != null) {
1970            PersistenceAdapter adapter = getPersistenceAdapter();
1971            File dir = adapter.getDirectory();
1972
1973            if (dir != null) {
1974                String dirPath = dir.getAbsolutePath();
1975                if (!dir.isAbsolute()) {
1976                    dir = new File(dirPath);
1977                }
1978
1979                while (dir != null && !dir.isDirectory()) {
1980                    dir = dir.getParentFile();
1981                }
1982                long storeLimit = usage.getStoreUsage().getLimit();
1983                long storeCurrent = usage.getStoreUsage().getUsage();
1984                long dirFreeSpace = dir.getUsableSpace();
1985                if (storeLimit > (dirFreeSpace + storeCurrent)) {
1986                    LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1987                             " mb (current store usage is " + storeCurrent / (1024 * 1024) +
1988                             " mb). The data directory: " + dir.getAbsolutePath() +
1989                             " only has " + dirFreeSpace / (1024 * 1024) +
1990                             " mb of usable space - resetting to maximum available disk space: " +
1991                            (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
1992                    usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent);
1993                }
1994            }
1995
1996            long maxJournalFileSize = 0;
1997            long storeLimit = usage.getStoreUsage().getLimit();
1998
1999            if (adapter instanceof JournaledStore) {
2000                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
2001            }
2002
2003            if (storeLimit < maxJournalFileSize) {
2004                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
2005                          " mb, whilst the max journal file size for the store is: " +
2006                          maxJournalFileSize / (1024 * 1024) + " mb, " +
2007                          "the store will not accept any data when used.");
2008
2009            }
2010        }
2011
2012        File tmpDir = getTmpDataDirectory();
2013        if (tmpDir != null) {
2014
2015            String tmpDirPath = tmpDir.getAbsolutePath();
2016            if (!tmpDir.isAbsolute()) {
2017                tmpDir = new File(tmpDirPath);
2018            }
2019
2020            long storeLimit = usage.getTempUsage().getLimit();
2021            while (tmpDir != null && !tmpDir.isDirectory()) {
2022                tmpDir = tmpDir.getParentFile();
2023            }
2024            long dirFreeSpace = tmpDir.getUsableSpace();
2025            if (storeLimit > dirFreeSpace) {
2026                LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2027                        " mb, whilst the temporary data directory: " + tmpDirPath +
2028                        " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " +
2029                        dirFreeSpace / (1024 * 1024) + " mb.");
2030                usage.getTempUsage().setLimit(dirFreeSpace);
2031            }
2032
2033            if (isPersistent()) {
2034                long maxJournalFileSize;
2035
2036                PListStore store = usage.getTempUsage().getStore();
2037                if (store != null && store instanceof JournaledStore) {
2038                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
2039                } else {
2040                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
2041                }
2042
2043                if (storeLimit < maxJournalFileSize) {
2044                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2045                              " mb, whilst the max journal file size for the temporary store is: " +
2046                              maxJournalFileSize / (1024 * 1024) + " mb, " +
2047                              "the temp store will not accept any data when used.");
2048                }
2049            }
2050        }
2051
2052        if (getJobSchedulerStore() != null) {
2053            JobSchedulerStore scheduler = getJobSchedulerStore();
2054            File schedulerDir = scheduler.getDirectory();
2055            if (schedulerDir != null) {
2056
2057                String schedulerDirPath = schedulerDir.getAbsolutePath();
2058                if (!schedulerDir.isAbsolute()) {
2059                    schedulerDir = new File(schedulerDirPath);
2060                }
2061
2062                while (schedulerDir != null && !schedulerDir.isDirectory()) {
2063                    schedulerDir = schedulerDir.getParentFile();
2064                }
2065                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
2066                long dirFreeSpace = schedulerDir.getUsableSpace();
2067                if (schedulerLimit > dirFreeSpace) {
2068                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
2069                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
2070                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
2071                            dirFreeSpace / (1024 * 1024) + " mb.");
2072                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
2073                }
2074            }
2075        }
2076    }
2077
2078    public void stopAllConnectors(ServiceStopper stopper) {
2079        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2080            NetworkConnector connector = iter.next();
2081            unregisterNetworkConnectorMBean(connector);
2082            stopper.stop(connector);
2083        }
2084        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2085            ProxyConnector connector = iter.next();
2086            stopper.stop(connector);
2087        }
2088        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2089            JmsConnector connector = iter.next();
2090            stopper.stop(connector);
2091        }
2092        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2093            TransportConnector connector = iter.next();
2094            try {
2095                unregisterConnectorMBean(connector);
2096            } catch (IOException e) {
2097            }
2098            stopper.stop(connector);
2099        }
2100    }
2101
2102    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
2103        try {
2104            ObjectName objectName = createConnectorObjectName(connector);
2105            connector = connector.asManagedConnector(getManagementContext(), objectName);
2106            ConnectorViewMBean view = new ConnectorView(connector);
2107            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2108            return connector;
2109        } catch (Throwable e) {
2110            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
2111        }
2112    }
2113
2114    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
2115        if (isUseJmx()) {
2116            try {
2117                ObjectName objectName = createConnectorObjectName(connector);
2118                getManagementContext().unregisterMBean(objectName);
2119            } catch (Throwable e) {
2120                throw IOExceptionSupport.create(
2121                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
2122            }
2123        }
2124    }
2125
2126    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2127        return adaptor;
2128    }
2129
2130    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2131        if (isUseJmx()) {}
2132    }
2133
2134    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2135        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
2136    }
2137
2138    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2139        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2140        try {
2141            ObjectName objectName = createNetworkConnectorObjectName(connector);
2142            connector.setObjectName(objectName);
2143            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2144        } catch (Throwable e) {
2145            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2146        }
2147    }
2148
2149    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2150        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2151    }
2152
2153    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2154        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
2155    }
2156
2157    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2158        if (isUseJmx()) {
2159            try {
2160                ObjectName objectName = createNetworkConnectorObjectName(connector);
2161                getManagementContext().unregisterMBean(objectName);
2162            } catch (Exception e) {
2163                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
2164            }
2165        }
2166    }
2167
2168    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2169        ProxyConnectorView view = new ProxyConnectorView(connector);
2170        try {
2171            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2172            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2173        } catch (Throwable e) {
2174            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2175        }
2176    }
2177
2178    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2179        JmsConnectorView view = new JmsConnectorView(connector);
2180        try {
2181            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2182            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2183        } catch (Throwable e) {
2184            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2185        }
2186    }
2187
2188    /**
2189     * Factory method to create a new broker
2190     *
2191     * @throws Exception
2192     * @throws
2193     * @throws
2194     */
2195    protected Broker createBroker() throws Exception {
2196        regionBroker = createRegionBroker();
2197        Broker broker = addInterceptors(regionBroker);
2198        // Add a filter that will stop access to the broker once stopped
2199        broker = new MutableBrokerFilter(broker) {
2200            Broker old;
2201
2202            @Override
2203            public void stop() throws Exception {
2204                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2205                    // Just ignore additional stop actions.
2206                    @Override
2207                    public void stop() throws Exception {
2208                    }
2209                });
2210                old.stop();
2211            }
2212
2213            @Override
2214            public void start() throws Exception {
2215                if (forceStart && old != null) {
2216                    this.next.set(old);
2217                }
2218                getNext().start();
2219            }
2220        };
2221        return broker;
2222    }
2223
2224    /**
2225     * Factory method to create the core region broker onto which interceptors
2226     * are added
2227     *
2228     * @throws Exception
2229     */
2230    protected Broker createRegionBroker() throws Exception {
2231        if (destinationInterceptors == null) {
2232            destinationInterceptors = createDefaultDestinationInterceptor();
2233        }
2234        configureServices(destinationInterceptors);
2235        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2236        if (destinationFactory == null) {
2237            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2238        }
2239        return createRegionBroker(destinationInterceptor);
2240    }
2241
2242    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2243        RegionBroker regionBroker;
2244        if (isUseJmx()) {
2245            try {
2246                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2247                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2248            } catch(MalformedObjectNameException me){
2249                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
2250                throw new IOException(me);
2251            }
2252        } else {
2253            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2254                    destinationInterceptor,getScheduler(),getExecutor());
2255        }
2256        destinationFactory.setRegionBroker(regionBroker);
2257        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2258        regionBroker.setBrokerName(getBrokerName());
2259        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2260        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2261        if (brokerId != null) {
2262            regionBroker.setBrokerId(brokerId);
2263        }
2264        return regionBroker;
2265    }
2266
2267    /**
2268     * Create the default destination interceptor
2269     */
2270    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2271        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2272        if (isUseVirtualTopics()) {
2273            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2274            VirtualTopic virtualTopic = new VirtualTopic();
2275            virtualTopic.setName("VirtualTopic.>");
2276            VirtualDestination[] virtualDestinations = { virtualTopic };
2277            interceptor.setVirtualDestinations(virtualDestinations);
2278            answer.add(interceptor);
2279        }
2280        if (isUseMirroredQueues()) {
2281            MirroredQueue interceptor = new MirroredQueue();
2282            answer.add(interceptor);
2283        }
2284        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2285        answer.toArray(array);
2286        return array;
2287    }
2288
2289    /**
2290     * Strategy method to add interceptors to the broker
2291     *
2292     * @throws IOException
2293     */
2294    protected Broker addInterceptors(Broker broker) throws Exception {
2295        if (isSchedulerSupport()) {
2296            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2297            if (isUseJmx()) {
2298                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2299                try {
2300                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2301                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2302                    this.adminView.setJMSJobScheduler(objectName);
2303                } catch (Throwable e) {
2304                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2305                            + e.getMessage(), e);
2306                }
2307            }
2308            broker = sb;
2309        }
2310        if (isUseJmx()) {
2311            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2312            try {
2313                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2314                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2315            } catch (Throwable e) {
2316                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2317                        + e.getMessage(), e);
2318            }
2319        }
2320        if (isAdvisorySupport()) {
2321            broker = new AdvisoryBroker(broker);
2322        }
2323        broker = new CompositeDestinationBroker(broker);
2324        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2325        if (isPopulateJMSXUserID()) {
2326            UserIDBroker userIDBroker = new UserIDBroker(broker);
2327            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2328            broker = userIDBroker;
2329        }
2330        if (isMonitorConnectionSplits()) {
2331            broker = new ConnectionSplitBroker(broker);
2332        }
2333        if (plugins != null) {
2334            for (int i = 0; i < plugins.length; i++) {
2335                BrokerPlugin plugin = plugins[i];
2336                broker = plugin.installPlugin(broker);
2337            }
2338        }
2339        return broker;
2340    }
2341
2342    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2343        if (isPersistent()) {
2344            PersistenceAdapterFactory fac = getPersistenceFactory();
2345            if (fac != null) {
2346                return fac.createPersistenceAdapter();
2347            } else {
2348                try {
2349                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2350                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2351                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2352                    adaptor.setDirectory(dir);
2353                    return adaptor;
2354                } catch (Throwable e) {
2355                    throw IOExceptionSupport.create(e);
2356                }
2357            }
2358        } else {
2359            return new MemoryPersistenceAdapter();
2360        }
2361    }
2362
2363    protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2364        return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2365    }
2366
2367    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2368        TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2369        return new TransportConnector(transport);
2370    }
2371
2372    /**
2373     * Extracts the port from the options
2374     */
2375    protected Object getPort(Map<?,?> options) {
2376        Object port = options.get("port");
2377        if (port == null) {
2378            port = DEFAULT_PORT;
2379            LOG.warn("No port specified so defaulting to: {}", port);
2380        }
2381        return port;
2382    }
2383
2384    protected void addShutdownHook() {
2385        if (useShutdownHook) {
2386            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2387                @Override
2388                public void run() {
2389                    containerShutdown();
2390                }
2391            };
2392            Runtime.getRuntime().addShutdownHook(shutdownHook);
2393        }
2394    }
2395
2396    protected void removeShutdownHook() {
2397        if (shutdownHook != null) {
2398            try {
2399                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2400            } catch (Exception e) {
2401                LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e);
2402            }
2403        }
2404    }
2405
2406    /**
2407     * Sets hooks to be executed when broker shut down
2408     *
2409     * @org.apache.xbean.Property
2410     */
2411    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2412        for (Runnable hook : hooks) {
2413            addShutdownHook(hook);
2414        }
2415    }
2416
2417    /**
2418     * Causes a clean shutdown of the container when the VM is being shut down
2419     */
2420    protected void containerShutdown() {
2421        try {
2422            stop();
2423        } catch (IOException e) {
2424            Throwable linkedException = e.getCause();
2425            if (linkedException != null) {
2426                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2427            } else {
2428                logError("Failed to shut down: " + e, e);
2429            }
2430            if (!useLoggingForShutdownErrors) {
2431                e.printStackTrace(System.err);
2432            }
2433        } catch (Exception e) {
2434            logError("Failed to shut down: " + e, e);
2435        }
2436    }
2437
2438    protected void logError(String message, Throwable e) {
2439        if (useLoggingForShutdownErrors) {
2440            LOG.error("Failed to shut down: " + e);
2441        } else {
2442            System.err.println("Failed to shut down: " + e);
2443        }
2444    }
2445
2446    /**
2447     * Starts any configured destinations on startup
2448     */
2449    protected void startDestinations() throws Exception {
2450        if (destinations != null) {
2451            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2452            for (int i = 0; i < destinations.length; i++) {
2453                ActiveMQDestination destination = destinations[i];
2454                getBroker().addDestination(adminConnectionContext, destination,true);
2455            }
2456        }
2457        if (isUseVirtualTopics()) {
2458            startVirtualConsumerDestinations();
2459        }
2460    }
2461
2462    /**
2463     * Returns the broker's administration connection context used for
2464     * configuring the broker at startup
2465     */
2466    public ConnectionContext getAdminConnectionContext() throws Exception {
2467        return BrokerSupport.getConnectionContext(getBroker());
2468    }
2469
2470    protected void startManagementContext() throws Exception {
2471        getManagementContext().setBrokerName(brokerName);
2472        getManagementContext().start();
2473        adminView = new BrokerView(this, null);
2474        ObjectName objectName = getBrokerObjectName();
2475        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2476    }
2477
2478    /**
2479     * Start all transport and network connections, proxies and bridges
2480     *
2481     * @throws Exception
2482     */
2483    public void startAllConnectors() throws Exception {
2484        Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2485        List<TransportConnector> al = new ArrayList<TransportConnector>();
2486        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2487            TransportConnector connector = iter.next();
2488            al.add(startTransportConnector(connector));
2489        }
2490        if (al.size() > 0) {
2491            // let's clear the transportConnectors list and replace it with
2492            // the started transportConnector instances
2493            this.transportConnectors.clear();
2494            setTransportConnectors(al);
2495        }
2496        this.slave = false;
2497        URI uri = getVmConnectorURI();
2498        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2499        map.put("network", "true");
2500        map.put("async", "false");
2501        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2502
2503        if (!stopped.get()) {
2504            ThreadPoolExecutor networkConnectorStartExecutor = null;
2505            if (isNetworkConnectorStartAsync()) {
2506                // spin up as many threads as needed
2507                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2508                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2509                    new ThreadFactory() {
2510                        int count=0;
2511                        @Override
2512                        public Thread newThread(Runnable runnable) {
2513                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2514                            thread.setDaemon(true);
2515                            return thread;
2516                        }
2517                    });
2518            }
2519
2520            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2521                final NetworkConnector connector = iter.next();
2522                connector.setLocalUri(uri);
2523                connector.setBrokerName(getBrokerName());
2524                connector.setDurableDestinations(durableDestinations);
2525                if (getDefaultSocketURIString() != null) {
2526                    connector.setBrokerURL(getDefaultSocketURIString());
2527                }
2528                if (networkConnectorStartExecutor != null) {
2529                    networkConnectorStartExecutor.execute(new Runnable() {
2530                        @Override
2531                        public void run() {
2532                            try {
2533                                LOG.info("Async start of {}", connector);
2534                                connector.start();
2535                            } catch(Exception e) {
2536                                LOG.error("Async start of network connector: {} failed", connector, e);
2537                            }
2538                        }
2539                    });
2540                } else {
2541                    connector.start();
2542                }
2543            }
2544            if (networkConnectorStartExecutor != null) {
2545                // executor done when enqueued tasks are complete
2546                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2547            }
2548
2549            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2550                ProxyConnector connector = iter.next();
2551                connector.start();
2552            }
2553            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2554                JmsConnector connector = iter.next();
2555                connector.start();
2556            }
2557            for (Service service : services) {
2558                configureService(service);
2559                service.start();
2560            }
2561        }
2562    }
2563
2564    public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2565        connector.setBrokerService(this);
2566        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2567        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2568        if (policy != null) {
2569            connector.setMessageAuthorizationPolicy(policy);
2570        }
2571        if (isUseJmx()) {
2572            connector = registerConnectorMBean(connector);
2573        }
2574        connector.getStatistics().setEnabled(enableStatistics);
2575        connector.start();
2576        return connector;
2577    }
2578
2579    /**
2580     * Perform any custom dependency injection
2581     */
2582    protected void configureServices(Object[] services) {
2583        for (Object service : services) {
2584            configureService(service);
2585        }
2586    }
2587
2588    /**
2589     * Perform any custom dependency injection
2590     */
2591    protected void configureService(Object service) {
2592        if (service instanceof BrokerServiceAware) {
2593            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2594            serviceAware.setBrokerService(this);
2595        }
2596    }
2597
2598    public void handleIOException(IOException exception) {
2599        if (ioExceptionHandler != null) {
2600            ioExceptionHandler.handle(exception);
2601         } else {
2602            LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception);
2603         }
2604    }
2605
2606    protected void startVirtualConsumerDestinations() throws Exception {
2607        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2608        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2609        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2610        if (!destinations.isEmpty()) {
2611            for (ActiveMQDestination destination : destinations) {
2612                if (filter.matches(destination) == true) {
2613                    broker.addDestination(adminConnectionContext, destination, false);
2614                }
2615            }
2616        }
2617    }
2618
2619    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2620        // created at startup, so no sync needed
2621        if (virtualConsumerDestinationFilter == null) {
2622            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2623            if (destinationInterceptors != null) {
2624                for (DestinationInterceptor interceptor : destinationInterceptors) {
2625                    if (interceptor instanceof VirtualDestinationInterceptor) {
2626                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2627                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2628                            if (virtualDestination instanceof VirtualTopic) {
2629                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2630                            }
2631                        }
2632                    }
2633                }
2634            }
2635            ActiveMQQueue filter = new ActiveMQQueue();
2636            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2637            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2638        }
2639        return virtualConsumerDestinationFilter;
2640    }
2641
2642    protected synchronized ThreadPoolExecutor getExecutor() {
2643        if (this.executor == null) {
2644            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2645
2646                private long i = 0;
2647
2648                @Override
2649                public Thread newThread(Runnable runnable) {
2650                    this.i++;
2651                    Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2652                    thread.setDaemon(true);
2653                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2654                        @Override
2655                        public void uncaughtException(final Thread t, final Throwable e) {
2656                            LOG.error("Error in thread '{}'", t.getName(), e);
2657                        }
2658                    });
2659                    return thread;
2660                }
2661            }, new RejectedExecutionHandler() {
2662                @Override
2663                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2664                    try {
2665                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2666                    } catch (InterruptedException e) {
2667                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2668                    }
2669
2670                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2671                }
2672            });
2673        }
2674        return this.executor;
2675    }
2676
2677    public synchronized Scheduler getScheduler() {
2678        if (this.scheduler==null) {
2679            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2680            try {
2681                this.scheduler.start();
2682            } catch (Exception e) {
2683               LOG.error("Failed to start Scheduler", e);
2684            }
2685        }
2686        return this.scheduler;
2687    }
2688
2689    public Broker getRegionBroker() {
2690        return regionBroker;
2691    }
2692
2693    public void setRegionBroker(Broker regionBroker) {
2694        this.regionBroker = regionBroker;
2695    }
2696
2697    public void addShutdownHook(Runnable hook) {
2698        synchronized (shutdownHooks) {
2699            shutdownHooks.add(hook);
2700        }
2701    }
2702
2703    public void removeShutdownHook(Runnable hook) {
2704        synchronized (shutdownHooks) {
2705            shutdownHooks.remove(hook);
2706        }
2707    }
2708
2709    public boolean isSystemExitOnShutdown() {
2710        return systemExitOnShutdown;
2711    }
2712
2713    /**
2714     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2715     */
2716    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2717        this.systemExitOnShutdown = systemExitOnShutdown;
2718    }
2719
2720    public int getSystemExitOnShutdownExitCode() {
2721        return systemExitOnShutdownExitCode;
2722    }
2723
2724    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2725        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2726    }
2727
2728    public SslContext getSslContext() {
2729        return sslContext;
2730    }
2731
2732    public void setSslContext(SslContext sslContext) {
2733        this.sslContext = sslContext;
2734    }
2735
2736    public boolean isShutdownOnSlaveFailure() {
2737        return shutdownOnSlaveFailure;
2738    }
2739
2740    /**
2741     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2742     */
2743    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2744        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2745    }
2746
2747    public boolean isWaitForSlave() {
2748        return waitForSlave;
2749    }
2750
2751    /**
2752     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2753     */
2754    public void setWaitForSlave(boolean waitForSlave) {
2755        this.waitForSlave = waitForSlave;
2756    }
2757
2758    public long getWaitForSlaveTimeout() {
2759        return this.waitForSlaveTimeout;
2760    }
2761
2762    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2763        this.waitForSlaveTimeout = waitForSlaveTimeout;
2764    }
2765
2766    /**
2767     * Get the passiveSlave
2768     * @return the passiveSlave
2769     */
2770    public boolean isPassiveSlave() {
2771        return this.passiveSlave;
2772    }
2773
2774    /**
2775     * Set the passiveSlave
2776     * @param passiveSlave the passiveSlave to set
2777     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2778     */
2779    public void setPassiveSlave(boolean passiveSlave) {
2780        this.passiveSlave = passiveSlave;
2781    }
2782
2783    /**
2784     * override the Default IOException handler, called when persistence adapter
2785     * has experiences File or JDBC I/O Exceptions
2786     *
2787     * @param ioExceptionHandler
2788     */
2789    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2790        configureService(ioExceptionHandler);
2791        this.ioExceptionHandler = ioExceptionHandler;
2792    }
2793
2794    public IOExceptionHandler getIoExceptionHandler() {
2795        return ioExceptionHandler;
2796    }
2797
2798    /**
2799     * @return the schedulerSupport
2800     */
2801    public boolean isSchedulerSupport() {
2802        return this.schedulerSupport;
2803    }
2804
2805    /**
2806     * @param schedulerSupport the schedulerSupport to set
2807     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2808     */
2809    public void setSchedulerSupport(boolean schedulerSupport) {
2810        this.schedulerSupport = schedulerSupport;
2811    }
2812
2813    /**
2814     * @return the schedulerDirectory
2815     */
2816    public File getSchedulerDirectoryFile() {
2817        if (this.schedulerDirectoryFile == null) {
2818            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2819        }
2820        return schedulerDirectoryFile;
2821    }
2822
2823    /**
2824     * @param schedulerDirectory the schedulerDirectory to set
2825     */
2826    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2827        this.schedulerDirectoryFile = schedulerDirectory;
2828    }
2829
2830    public void setSchedulerDirectory(String schedulerDirectory) {
2831        setSchedulerDirectoryFile(new File(schedulerDirectory));
2832    }
2833
2834    public int getSchedulePeriodForDestinationPurge() {
2835        return this.schedulePeriodForDestinationPurge;
2836    }
2837
2838    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2839        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2840    }
2841
2842    public int getMaxPurgedDestinationsPerSweep() {
2843        return this.maxPurgedDestinationsPerSweep;
2844    }
2845
2846    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2847        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2848    }
2849
2850    public BrokerContext getBrokerContext() {
2851        return brokerContext;
2852    }
2853
2854    public void setBrokerContext(BrokerContext brokerContext) {
2855        this.brokerContext = brokerContext;
2856    }
2857
2858    public void setBrokerId(String brokerId) {
2859        this.brokerId = new BrokerId(brokerId);
2860    }
2861
2862    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2863        return useAuthenticatedPrincipalForJMSXUserID;
2864    }
2865
2866    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2867        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2868    }
2869
2870    /**
2871     * Should MBeans that support showing the Authenticated User Name information have this
2872     * value filled in or not.
2873     *
2874     * @return true if user names should be exposed in MBeans
2875     */
2876    public boolean isPopulateUserNameInMBeans() {
2877        return this.populateUserNameInMBeans;
2878    }
2879
2880    /**
2881     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2882     * @param value if MBeans should expose user name information.
2883     */
2884    public void setPopulateUserNameInMBeans(boolean value) {
2885        this.populateUserNameInMBeans = value;
2886    }
2887
2888    /**
2889     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2890     * failing.  The default value is to wait forever (zero).
2891     *
2892     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2893     */
2894    public long getMbeanInvocationTimeout() {
2895        return mbeanInvocationTimeout;
2896    }
2897
2898    /**
2899     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2900     * failing. The default value is to wait forever (zero).
2901     *
2902     * @param mbeanInvocationTimeout
2903     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2904     */
2905    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
2906        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
2907    }
2908
2909    public boolean isNetworkConnectorStartAsync() {
2910        return networkConnectorStartAsync;
2911    }
2912
2913    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2914        this.networkConnectorStartAsync = networkConnectorStartAsync;
2915    }
2916
2917    public boolean isAllowTempAutoCreationOnSend() {
2918        return allowTempAutoCreationOnSend;
2919    }
2920
2921    /**
2922     * enable if temp destinations need to be propagated through a network when
2923     * advisorySupport==false. This is used in conjunction with the policy
2924     * gcInactiveDestinations for matching temps so they can get removed
2925     * when inactive
2926     *
2927     * @param allowTempAutoCreationOnSend
2928     */
2929    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2930        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2931    }
2932
2933    public long getOfflineDurableSubscriberTimeout() {
2934        return offlineDurableSubscriberTimeout;
2935    }
2936
2937    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
2938        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2939    }
2940
2941    public long getOfflineDurableSubscriberTaskSchedule() {
2942        return offlineDurableSubscriberTaskSchedule;
2943    }
2944
2945    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
2946        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2947    }
2948
2949    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2950        return isUseVirtualTopics() && destination.isQueue() &&
2951               getVirtualTopicConsumerDestinationFilter().matches(destination);
2952    }
2953
2954    public Throwable getStartException() {
2955        return startException;
2956    }
2957
2958    public boolean isStartAsync() {
2959        return startAsync;
2960    }
2961
2962    public void setStartAsync(boolean startAsync) {
2963        this.startAsync = startAsync;
2964    }
2965
2966    public boolean isSlave() {
2967        return this.slave;
2968    }
2969
2970    public boolean isStopping() {
2971        return this.stopping.get();
2972    }
2973
2974    /**
2975     * @return true if the broker allowed to restart on shutdown.
2976     */
2977    public boolean isRestartAllowed() {
2978        return restartAllowed;
2979    }
2980
2981    /**
2982     * Sets if the broker allowed to restart on shutdown.
2983     * @return
2984     */
2985    public void setRestartAllowed(boolean restartAllowed) {
2986        this.restartAllowed = restartAllowed;
2987    }
2988
2989    /**
2990     * A lifecycle manager of the BrokerService should
2991     * inspect this property after a broker shutdown has occurred
2992     * to find out if the broker needs to be re-created and started
2993     * again.
2994     *
2995     * @return true if the broker wants to be restarted after it shuts down.
2996     */
2997    public boolean isRestartRequested() {
2998        return restartRequested;
2999    }
3000
3001    public void requestRestart() {
3002        this.restartRequested = true;
3003    }
3004
3005    public int getStoreOpenWireVersion() {
3006        return storeOpenWireVersion;
3007    }
3008
3009    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
3010        this.storeOpenWireVersion = storeOpenWireVersion;
3011    }
3012
3013    /**
3014     * @return the current number of connections on this Broker.
3015     */
3016    public int getCurrentConnections() {
3017        return this.currentConnections.get();
3018    }
3019
3020    /**
3021     * @return the total number of connections this broker has handled since startup.
3022     */
3023    public long getTotalConnections() {
3024        return this.totalConnections.get();
3025    }
3026
3027    public void incrementCurrentConnections() {
3028        this.currentConnections.incrementAndGet();
3029    }
3030
3031    public void decrementCurrentConnections() {
3032        this.currentConnections.decrementAndGet();
3033    }
3034
3035    public void incrementTotalConnections() {
3036        this.totalConnections.incrementAndGet();
3037    }
3038
3039    public boolean isRejectDurableConsumers() {
3040        return rejectDurableConsumers;
3041    }
3042
3043    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
3044        this.rejectDurableConsumers = rejectDurableConsumers;
3045    }
3046}