001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.UnknownHostException; 027import java.security.Provider; 028import java.security.Security; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050 051import javax.annotation.PostConstruct; 052import javax.annotation.PreDestroy; 053import javax.management.MalformedObjectNameException; 054import javax.management.ObjectName; 055 056import org.apache.activemq.ActiveMQConnectionMetaData; 057import org.apache.activemq.ConfigurationException; 058import org.apache.activemq.Service; 059import org.apache.activemq.advisory.AdvisoryBroker; 060import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 061import org.apache.activemq.broker.jmx.AnnotatedMBean; 062import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 063import org.apache.activemq.broker.jmx.BrokerView; 064import org.apache.activemq.broker.jmx.ConnectorView; 065import org.apache.activemq.broker.jmx.ConnectorViewMBean; 066import org.apache.activemq.broker.jmx.HealthView; 067import org.apache.activemq.broker.jmx.HealthViewMBean; 068import org.apache.activemq.broker.jmx.JmsConnectorView; 069import org.apache.activemq.broker.jmx.JobSchedulerView; 070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 071import org.apache.activemq.broker.jmx.Log4JConfigView; 072import org.apache.activemq.broker.jmx.ManagedRegionBroker; 073import org.apache.activemq.broker.jmx.ManagementContext; 074import org.apache.activemq.broker.jmx.NetworkConnectorView; 075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 076import org.apache.activemq.broker.jmx.ProxyConnectorView; 077import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 078import org.apache.activemq.broker.region.Destination; 079import org.apache.activemq.broker.region.DestinationFactory; 080import org.apache.activemq.broker.region.DestinationFactoryImpl; 081import org.apache.activemq.broker.region.DestinationInterceptor; 082import org.apache.activemq.broker.region.RegionBroker; 083import org.apache.activemq.broker.region.policy.PolicyMap; 084import org.apache.activemq.broker.region.virtual.MirroredQueue; 085import org.apache.activemq.broker.region.virtual.VirtualDestination; 086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 087import org.apache.activemq.broker.region.virtual.VirtualTopic; 088import org.apache.activemq.broker.scheduler.JobSchedulerStore; 089import org.apache.activemq.broker.scheduler.SchedulerBroker; 090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; 091import org.apache.activemq.command.ActiveMQDestination; 092import org.apache.activemq.command.ActiveMQQueue; 093import org.apache.activemq.command.BrokerId; 094import org.apache.activemq.command.ProducerInfo; 095import org.apache.activemq.filter.DestinationFilter; 096import org.apache.activemq.network.ConnectionFilter; 097import org.apache.activemq.network.DiscoveryNetworkConnector; 098import org.apache.activemq.network.NetworkConnector; 099import org.apache.activemq.network.jms.JmsConnector; 100import org.apache.activemq.openwire.OpenWireFormat; 101import org.apache.activemq.proxy.ProxyConnector; 102import org.apache.activemq.security.MessageAuthorizationPolicy; 103import org.apache.activemq.selector.SelectorParser; 104import org.apache.activemq.store.JournaledStore; 105import org.apache.activemq.store.PListStore; 106import org.apache.activemq.store.PersistenceAdapter; 107import org.apache.activemq.store.PersistenceAdapterFactory; 108import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 109import org.apache.activemq.thread.Scheduler; 110import org.apache.activemq.thread.TaskRunnerFactory; 111import org.apache.activemq.transport.TransportFactorySupport; 112import org.apache.activemq.transport.TransportServer; 113import org.apache.activemq.transport.vm.VMTransportFactory; 114import org.apache.activemq.usage.SystemUsage; 115import org.apache.activemq.util.BrokerSupport; 116import org.apache.activemq.util.DefaultIOExceptionHandler; 117import org.apache.activemq.util.IOExceptionHandler; 118import org.apache.activemq.util.IOExceptionSupport; 119import org.apache.activemq.util.IOHelper; 120import org.apache.activemq.util.InetAddressUtil; 121import org.apache.activemq.util.ServiceStopper; 122import org.apache.activemq.util.ThreadPoolUtils; 123import org.apache.activemq.util.TimeUtils; 124import org.apache.activemq.util.URISupport; 125import org.slf4j.Logger; 126import org.slf4j.LoggerFactory; 127import org.slf4j.MDC; 128 129/** 130 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a 131 * number of transport connectors, network connectors and a bunch of properties 132 * which can be used to configure the broker as its lazily created. 133 * 134 * @org.apache.xbean.XBean 135 */ 136public class BrokerService implements Service { 137 public static final String DEFAULT_PORT = "61616"; 138 public static final String LOCAL_HOST_NAME; 139 public static final String BROKER_VERSION; 140 public static final String DEFAULT_BROKER_NAME = "localhost"; 141 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 142 public static final long DEFAULT_START_TIMEOUT = 600000L; 143 144 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 145 146 @SuppressWarnings("unused") 147 private static final long serialVersionUID = 7353129142305630237L; 148 149 private boolean useJmx = true; 150 private boolean enableStatistics = true; 151 private boolean persistent = true; 152 private boolean populateJMSXUserID; 153 private boolean useAuthenticatedPrincipalForJMSXUserID; 154 private boolean populateUserNameInMBeans; 155 private long mbeanInvocationTimeout = 0; 156 157 private boolean useShutdownHook = true; 158 private boolean useLoggingForShutdownErrors; 159 private boolean shutdownOnMasterFailure; 160 private boolean shutdownOnSlaveFailure; 161 private boolean waitForSlave; 162 private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; 163 private boolean passiveSlave; 164 private String brokerName = DEFAULT_BROKER_NAME; 165 private File dataDirectoryFile; 166 private File tmpDataDirectory; 167 private Broker broker; 168 private BrokerView adminView; 169 private ManagementContext managementContext; 170 private ObjectName brokerObjectName; 171 private TaskRunnerFactory taskRunnerFactory; 172 private TaskRunnerFactory persistenceTaskRunnerFactory; 173 private SystemUsage systemUsage; 174 private SystemUsage producerSystemUsage; 175 private SystemUsage consumerSystemUsaage; 176 private PersistenceAdapter persistenceAdapter; 177 private PersistenceAdapterFactory persistenceFactory; 178 protected DestinationFactory destinationFactory; 179 private MessageAuthorizationPolicy messageAuthorizationPolicy; 180 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 181 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 182 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 183 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 184 private final List<Service> services = new ArrayList<Service>(); 185 private transient Thread shutdownHook; 186 private String[] transportConnectorURIs; 187 private String[] networkConnectorURIs; 188 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 189 // to other jms messaging systems 190 private boolean deleteAllMessagesOnStartup; 191 private boolean advisorySupport = true; 192 private URI vmConnectorURI; 193 private String defaultSocketURIString; 194 private PolicyMap destinationPolicy; 195 private final AtomicBoolean started = new AtomicBoolean(false); 196 private final AtomicBoolean stopped = new AtomicBoolean(false); 197 private final AtomicBoolean stopping = new AtomicBoolean(false); 198 private BrokerPlugin[] plugins; 199 private boolean keepDurableSubsActive = true; 200 private boolean useVirtualTopics = true; 201 private boolean useMirroredQueues = false; 202 private boolean useTempMirroredQueues = true; 203 private BrokerId brokerId; 204 private volatile DestinationInterceptor[] destinationInterceptors; 205 private ActiveMQDestination[] destinations; 206 private PListStore tempDataStore; 207 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 208 private boolean useLocalHostBrokerName; 209 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 210 private final CountDownLatch startedLatch = new CountDownLatch(1); 211 private Broker regionBroker; 212 private int producerSystemUsagePortion = 60; 213 private int consumerSystemUsagePortion = 40; 214 private boolean splitSystemUsageForProducersConsumers; 215 private boolean monitorConnectionSplits = false; 216 private int taskRunnerPriority = Thread.NORM_PRIORITY; 217 private boolean dedicatedTaskRunner; 218 private boolean cacheTempDestinations = false;// useful for failover 219 private int timeBeforePurgeTempDestinations = 5000; 220 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 221 private boolean systemExitOnShutdown; 222 private int systemExitOnShutdownExitCode; 223 private SslContext sslContext; 224 private boolean forceStart = false; 225 private IOExceptionHandler ioExceptionHandler; 226 private boolean schedulerSupport = false; 227 private File schedulerDirectoryFile; 228 private Scheduler scheduler; 229 private ThreadPoolExecutor executor; 230 private int schedulePeriodForDestinationPurge= 0; 231 private int maxPurgedDestinationsPerSweep = 0; 232 private BrokerContext brokerContext; 233 private boolean networkConnectorStartAsync = false; 234 private boolean allowTempAutoCreationOnSend; 235 private JobSchedulerStore jobSchedulerStore; 236 private final AtomicLong totalConnections = new AtomicLong(); 237 private final AtomicInteger currentConnections = new AtomicInteger(); 238 239 private long offlineDurableSubscriberTimeout = -1; 240 private long offlineDurableSubscriberTaskSchedule = 300000; 241 private DestinationFilter virtualConsumerDestinationFilter; 242 243 private final Object persistenceAdapterLock = new Object(); 244 private Throwable startException = null; 245 private boolean startAsync = false; 246 private Date startDate; 247 private boolean slave = true; 248 249 private boolean restartAllowed = true; 250 private boolean restartRequested = false; 251 private boolean rejectDurableConsumers = false; 252 253 private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; 254 255 static { 256 257 try { 258 ClassLoader loader = BrokerService.class.getClassLoader(); 259 Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); 260 Provider bouncycastle = (Provider) clazz.newInstance(); 261 Security.insertProviderAt(bouncycastle, 2); 262 LOG.info("Loaded the Bouncy Castle security provider."); 263 } catch(Throwable e) { 264 // No BouncyCastle found so we use the default Java Security Provider 265 } 266 267 String localHostName = "localhost"; 268 try { 269 localHostName = InetAddressUtil.getLocalHostName(); 270 } catch (UnknownHostException e) { 271 LOG.error("Failed to resolve localhost"); 272 } 273 LOCAL_HOST_NAME = localHostName; 274 275 InputStream in = null; 276 String version = null; 277 if ((in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 278 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 279 try { 280 version = reader.readLine(); 281 } catch(Exception e) { 282 } 283 } 284 BROKER_VERSION = version; 285 } 286 287 @Override 288 public String toString() { 289 return "BrokerService[" + getBrokerName() + "]"; 290 } 291 292 private String getBrokerVersion() { 293 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; 294 if (version == null) { 295 version = BROKER_VERSION; 296 } 297 298 return version; 299 } 300 301 /** 302 * Adds a new transport connector for the given bind address 303 * 304 * @return the newly created and added transport connector 305 * @throws Exception 306 */ 307 public TransportConnector addConnector(String bindAddress) throws Exception { 308 return addConnector(new URI(bindAddress)); 309 } 310 311 /** 312 * Adds a new transport connector for the given bind address 313 * 314 * @return the newly created and added transport connector 315 * @throws Exception 316 */ 317 public TransportConnector addConnector(URI bindAddress) throws Exception { 318 return addConnector(createTransportConnector(bindAddress)); 319 } 320 321 /** 322 * Adds a new transport connector for the given TransportServer transport 323 * 324 * @return the newly created and added transport connector 325 * @throws Exception 326 */ 327 public TransportConnector addConnector(TransportServer transport) throws Exception { 328 return addConnector(new TransportConnector(transport)); 329 } 330 331 /** 332 * Adds a new transport connector 333 * 334 * @return the transport connector 335 * @throws Exception 336 */ 337 public TransportConnector addConnector(TransportConnector connector) throws Exception { 338 transportConnectors.add(connector); 339 return connector; 340 } 341 342 /** 343 * Stops and removes a transport connector from the broker. 344 * 345 * @param connector 346 * @return true if the connector has been previously added to the broker 347 * @throws Exception 348 */ 349 public boolean removeConnector(TransportConnector connector) throws Exception { 350 boolean rc = transportConnectors.remove(connector); 351 if (rc) { 352 unregisterConnectorMBean(connector); 353 } 354 return rc; 355 } 356 357 /** 358 * Adds a new network connector using the given discovery address 359 * 360 * @return the newly created and added network connector 361 * @throws Exception 362 */ 363 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 364 return addNetworkConnector(new URI(discoveryAddress)); 365 } 366 367 /** 368 * Adds a new proxy connector using the given bind address 369 * 370 * @return the newly created and added network connector 371 * @throws Exception 372 */ 373 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 374 return addProxyConnector(new URI(bindAddress)); 375 } 376 377 /** 378 * Adds a new network connector using the given discovery address 379 * 380 * @return the newly created and added network connector 381 * @throws Exception 382 */ 383 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 384 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 385 return addNetworkConnector(connector); 386 } 387 388 /** 389 * Adds a new proxy connector using the given bind address 390 * 391 * @return the newly created and added network connector 392 * @throws Exception 393 */ 394 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 395 ProxyConnector connector = new ProxyConnector(); 396 connector.setBind(bindAddress); 397 connector.setRemote(new URI("fanout:multicast://default")); 398 return addProxyConnector(connector); 399 } 400 401 /** 402 * Adds a new network connector to connect this broker to a federated 403 * network 404 */ 405 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 406 connector.setBrokerService(this); 407 URI uri = getVmConnectorURI(); 408 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 409 map.put("network", "true"); 410 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 411 connector.setLocalUri(uri); 412 // Set a connection filter so that the connector does not establish loop 413 // back connections. 414 connector.setConnectionFilter(new ConnectionFilter() { 415 @Override 416 public boolean connectTo(URI location) { 417 List<TransportConnector> transportConnectors = getTransportConnectors(); 418 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 419 try { 420 TransportConnector tc = iter.next(); 421 if (location.equals(tc.getConnectUri())) { 422 return false; 423 } 424 } catch (Throwable e) { 425 } 426 } 427 return true; 428 } 429 }); 430 networkConnectors.add(connector); 431 return connector; 432 } 433 434 /** 435 * Removes the given network connector without stopping it. The caller 436 * should call {@link NetworkConnector#stop()} to close the connector 437 */ 438 public boolean removeNetworkConnector(NetworkConnector connector) { 439 boolean answer = networkConnectors.remove(connector); 440 if (answer) { 441 unregisterNetworkConnectorMBean(connector); 442 } 443 return answer; 444 } 445 446 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 447 URI uri = getVmConnectorURI(); 448 connector.setLocalUri(uri); 449 proxyConnectors.add(connector); 450 if (isUseJmx()) { 451 registerProxyConnectorMBean(connector); 452 } 453 return connector; 454 } 455 456 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 457 connector.setBrokerService(this); 458 jmsConnectors.add(connector); 459 if (isUseJmx()) { 460 registerJmsConnectorMBean(connector); 461 } 462 return connector; 463 } 464 465 public JmsConnector removeJmsConnector(JmsConnector connector) { 466 if (jmsConnectors.remove(connector)) { 467 return connector; 468 } 469 return null; 470 } 471 472 public void masterFailed() { 473 if (shutdownOnMasterFailure) { 474 LOG.error("The Master has failed ... shutting down"); 475 try { 476 stop(); 477 } catch (Exception e) { 478 LOG.error("Failed to stop for master failure", e); 479 } 480 } else { 481 LOG.warn("Master Failed - starting all connectors"); 482 try { 483 startAllConnectors(); 484 broker.nowMasterBroker(); 485 } catch (Exception e) { 486 LOG.error("Failed to startAllConnectors", e); 487 } 488 } 489 } 490 491 public String getUptime() { 492 long delta = getUptimeMillis(); 493 494 if (delta == 0) { 495 return "not started"; 496 } 497 498 return TimeUtils.printDuration(delta); 499 } 500 501 public long getUptimeMillis() { 502 if (startDate == null) { 503 return 0; 504 } 505 506 return new Date().getTime() - startDate.getTime(); 507 } 508 509 public boolean isStarted() { 510 return started.get() && startedLatch.getCount() == 0; 511 } 512 513 /** 514 * Forces a start of the broker. 515 * By default a BrokerService instance that was 516 * previously stopped using BrokerService.stop() cannot be restarted 517 * using BrokerService.start(). 518 * This method enforces a restart. 519 * It is not recommended to force a restart of the broker and will not work 520 * for most but some very trivial broker configurations. 521 * For restarting a broker instance we recommend to first call stop() on 522 * the old instance and then recreate a new BrokerService instance. 523 * 524 * @param force - if true enforces a restart. 525 * @throws Exception 526 */ 527 public void start(boolean force) throws Exception { 528 forceStart = force; 529 stopped.set(false); 530 started.set(false); 531 start(); 532 } 533 534 // Service interface 535 // ------------------------------------------------------------------------- 536 537 protected boolean shouldAutostart() { 538 return true; 539 } 540 541 /** 542 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 543 * 544 * delegates to autoStart, done to prevent backwards incompatible signature change 545 */ 546 @PostConstruct 547 private void postConstruct() { 548 try { 549 autoStart(); 550 } catch (Exception ex) { 551 throw new RuntimeException(ex); 552 } 553 } 554 555 /** 556 * 557 * @throws Exception 558 * @org. apache.xbean.InitMethod 559 */ 560 public void autoStart() throws Exception { 561 if(shouldAutostart()) { 562 start(); 563 } 564 } 565 566 @Override 567 public void start() throws Exception { 568 if (stopped.get() || !started.compareAndSet(false, true)) { 569 // lets just ignore redundant start() calls 570 // as its way too easy to not be completely sure if start() has been 571 // called or not with the gazillion of different configuration 572 // mechanisms 573 // throw new IllegalStateException("Already started."); 574 return; 575 } 576 577 stopping.set(false); 578 startDate = new Date(); 579 MDC.put("activemq.broker", brokerName); 580 581 try { 582 if (systemExitOnShutdown && useShutdownHook) { 583 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 584 } 585 processHelperProperties(); 586 if (isUseJmx()) { 587 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and 588 // we cannot cleanup clear that during shutdown of the broker. 589 MDC.remove("activemq.broker"); 590 try { 591 startManagementContext(); 592 for (NetworkConnector connector : getNetworkConnectors()) { 593 registerNetworkConnectorMBean(connector); 594 } 595 } finally { 596 MDC.put("activemq.broker", brokerName); 597 } 598 } 599 600 // in jvm master slave, lets not publish over existing broker till we get the lock 601 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); 602 if (brokerRegistry.lookup(getBrokerName()) == null) { 603 brokerRegistry.bind(getBrokerName(), BrokerService.this); 604 } 605 startPersistenceAdapter(startAsync); 606 startBroker(startAsync); 607 brokerRegistry.bind(getBrokerName(), BrokerService.this); 608 } catch (Exception e) { 609 LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e); 610 try { 611 if (!stopped.get()) { 612 stop(); 613 } 614 } catch (Exception ex) { 615 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); 616 } 617 throw e; 618 } finally { 619 MDC.remove("activemq.broker"); 620 } 621 } 622 623 private void startPersistenceAdapter(boolean async) throws Exception { 624 if (async) { 625 new Thread("Persistence Adapter Starting Thread") { 626 @Override 627 public void run() { 628 try { 629 doStartPersistenceAdapter(); 630 } catch (Throwable e) { 631 startException = e; 632 } finally { 633 synchronized (persistenceAdapterLock) { 634 persistenceAdapterLock.notifyAll(); 635 } 636 } 637 } 638 }.start(); 639 } else { 640 doStartPersistenceAdapter(); 641 } 642 } 643 644 private void doStartPersistenceAdapter() throws Exception { 645 getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); 646 getPersistenceAdapter().setBrokerName(getBrokerName()); 647 LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter()); 648 if (deleteAllMessagesOnStartup) { 649 deleteAllMessages(); 650 } 651 getPersistenceAdapter().start(); 652 653 getJobSchedulerStore(); 654 if (jobSchedulerStore != null) { 655 try { 656 jobSchedulerStore.start(); 657 } catch (Exception e) { 658 RuntimeException exception = new RuntimeException( 659 "Failed to start job scheduler store: " + jobSchedulerStore, e); 660 LOG.error(exception.getLocalizedMessage(), e); 661 throw exception; 662 } 663 } 664 } 665 666 private void startBroker(boolean async) throws Exception { 667 if (async) { 668 new Thread("Broker Starting Thread") { 669 @Override 670 public void run() { 671 try { 672 synchronized (persistenceAdapterLock) { 673 persistenceAdapterLock.wait(); 674 } 675 doStartBroker(); 676 } catch (Throwable t) { 677 startException = t; 678 } 679 } 680 }.start(); 681 } else { 682 doStartBroker(); 683 } 684 } 685 686 private void doStartBroker() throws Exception { 687 if (startException != null) { 688 return; 689 } 690 startDestinations(); 691 addShutdownHook(); 692 693 broker = getBroker(); 694 brokerId = broker.getBrokerId(); 695 696 // need to log this after creating the broker so we have its id and name 697 LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); 698 broker.start(); 699 700 if (isUseJmx()) { 701 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 702 // try to restart management context 703 // typical for slaves that use the same ports as master 704 managementContext.stop(); 705 startManagementContext(); 706 } 707 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 708 managedBroker.setContextBroker(broker); 709 adminView.setBroker(managedBroker); 710 } 711 712 if (ioExceptionHandler == null) { 713 setIoExceptionHandler(new DefaultIOExceptionHandler()); 714 } 715 716 if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { 717 ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); 718 Log4JConfigView log4jConfigView = new Log4JConfigView(); 719 AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); 720 } 721 722 startAllConnectors(); 723 724 LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 725 LOG.info("For help or more information please see: http://activemq.apache.org"); 726 727 getBroker().brokerServiceStarted(); 728 checkSystemUsageLimits(); 729 startedLatch.countDown(); 730 getBroker().nowMasterBroker(); 731 } 732 733 /** 734 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 735 * 736 * delegates to stop, done to prevent backwards incompatible signature change 737 */ 738 @PreDestroy 739 private void preDestroy () { 740 try { 741 stop(); 742 } catch (Exception ex) { 743 throw new RuntimeException(); 744 } 745 } 746 747 /** 748 * 749 * @throws Exception 750 * @org.apache .xbean.DestroyMethod 751 */ 752 @Override 753 public void stop() throws Exception { 754 if (!stopping.compareAndSet(false, true)) { 755 LOG.trace("Broker already stopping/stopped"); 756 return; 757 } 758 759 MDC.put("activemq.broker", brokerName); 760 761 if (systemExitOnShutdown) { 762 new Thread() { 763 @Override 764 public void run() { 765 System.exit(systemExitOnShutdownExitCode); 766 } 767 }.start(); 768 } 769 770 LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); 771 772 removeShutdownHook(); 773 if (this.scheduler != null) { 774 this.scheduler.stop(); 775 this.scheduler = null; 776 } 777 ServiceStopper stopper = new ServiceStopper(); 778 if (services != null) { 779 for (Service service : services) { 780 stopper.stop(service); 781 } 782 } 783 stopAllConnectors(stopper); 784 this.slave = true; 785 // remove any VMTransports connected 786 // this has to be done after services are stopped, 787 // to avoid timing issue with discovery (spinning up a new instance) 788 BrokerRegistry.getInstance().unbind(getBrokerName()); 789 VMTransportFactory.stopped(getBrokerName()); 790 if (broker != null) { 791 stopper.stop(broker); 792 broker = null; 793 } 794 795 if (jobSchedulerStore != null) { 796 jobSchedulerStore.stop(); 797 jobSchedulerStore = null; 798 } 799 if (tempDataStore != null) { 800 tempDataStore.stop(); 801 tempDataStore = null; 802 } 803 try { 804 stopper.stop(persistenceAdapter); 805 persistenceAdapter = null; 806 if (isUseJmx()) { 807 stopper.stop(getManagementContext()); 808 managementContext = null; 809 } 810 // Clear SelectorParser cache to free memory 811 SelectorParser.clearCache(); 812 } finally { 813 started.set(false); 814 stopped.set(true); 815 stoppedLatch.countDown(); 816 } 817 818 if (this.taskRunnerFactory != null) { 819 this.taskRunnerFactory.shutdown(); 820 this.taskRunnerFactory = null; 821 } 822 if (this.executor != null) { 823 ThreadPoolUtils.shutdownNow(executor); 824 this.executor = null; 825 } 826 827 this.destinationInterceptors = null; 828 this.destinationFactory = null; 829 830 if (startDate != null) { 831 LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); 832 } 833 LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 834 835 synchronized (shutdownHooks) { 836 for (Runnable hook : shutdownHooks) { 837 try { 838 hook.run(); 839 } catch (Throwable e) { 840 stopper.onException(hook, e); 841 } 842 } 843 } 844 845 MDC.remove("activemq.broker"); 846 847 // and clear start date 848 startDate = null; 849 850 stopper.throwFirstException(); 851 } 852 853 public boolean checkQueueSize(String queueName) { 854 long count = 0; 855 long queueSize = 0; 856 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 857 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 858 if (entry.getKey().isQueue()) { 859 if (entry.getValue().getName().matches(queueName)) { 860 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 861 count += queueSize; 862 if (queueSize > 0) { 863 LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); 864 } 865 } 866 } 867 } 868 return count == 0; 869 } 870 871 /** 872 * This method (both connectorName and queueName are using regex to match) 873 * 1. stop the connector (supposed the user input the connector which the 874 * clients connect to) 2. to check whether there is any pending message on 875 * the queues defined by queueName 3. supposedly, after stop the connector, 876 * client should failover to other broker and pending messages should be 877 * forwarded. if no pending messages, the method finally call stop to stop 878 * the broker. 879 * 880 * @param connectorName 881 * @param queueName 882 * @param timeout 883 * @param pollInterval 884 * @throws Exception 885 */ 886 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 887 if (isUseJmx()) { 888 if (connectorName == null || queueName == null || timeout <= 0) { 889 throw new Exception( 890 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 891 } 892 if (pollInterval <= 0) { 893 pollInterval = 30; 894 } 895 LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ 896 connectorName, queueName, timeout, pollInterval 897 }); 898 TransportConnector connector; 899 for (int i = 0; i < transportConnectors.size(); i++) { 900 connector = transportConnectors.get(i); 901 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 902 connector.stop(); 903 } 904 } 905 long start = System.currentTimeMillis(); 906 while (System.currentTimeMillis() - start < timeout * 1000) { 907 // check quesize until it gets zero 908 if (checkQueueSize(queueName)) { 909 stop(); 910 break; 911 } else { 912 Thread.sleep(pollInterval * 1000); 913 } 914 } 915 if (stopped.get()) { 916 LOG.info("Successfully stop the broker."); 917 } else { 918 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 919 } 920 } 921 } 922 923 /** 924 * A helper method to block the caller thread until the broker has been 925 * stopped 926 */ 927 public void waitUntilStopped() { 928 while (isStarted() && !stopped.get()) { 929 try { 930 stoppedLatch.await(); 931 } catch (InterruptedException e) { 932 // ignore 933 } 934 } 935 } 936 937 public boolean isStopped() { 938 return stopped.get(); 939 } 940 941 /** 942 * A helper method to block the caller thread until the broker has fully started 943 * @return boolean true if wait succeeded false if broker was not started or was stopped 944 */ 945 public boolean waitUntilStarted() { 946 return waitUntilStarted(DEFAULT_START_TIMEOUT); 947 } 948 949 /** 950 * A helper method to block the caller thread until the broker has fully started 951 * 952 * @param timeout 953 * the amount of time to wait before giving up and returning false. 954 * 955 * @return boolean true if wait succeeded false if broker was not started or was stopped 956 */ 957 public boolean waitUntilStarted(long timeout) { 958 boolean waitSucceeded = isStarted(); 959 long expiration = Math.max(0, timeout + System.currentTimeMillis()); 960 while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { 961 try { 962 if (startException != null) { 963 return waitSucceeded; 964 } 965 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 966 } catch (InterruptedException ignore) { 967 } 968 } 969 return waitSucceeded; 970 } 971 972 // Properties 973 // ------------------------------------------------------------------------- 974 /** 975 * Returns the message broker 976 */ 977 public Broker getBroker() throws Exception { 978 if (broker == null) { 979 broker = createBroker(); 980 } 981 return broker; 982 } 983 984 /** 985 * Returns the administration view of the broker; used to create and destroy 986 * resources such as queues and topics. Note this method returns null if JMX 987 * is disabled. 988 */ 989 public BrokerView getAdminView() throws Exception { 990 if (adminView == null) { 991 // force lazy creation 992 getBroker(); 993 } 994 return adminView; 995 } 996 997 public void setAdminView(BrokerView adminView) { 998 this.adminView = adminView; 999 } 1000 1001 public String getBrokerName() { 1002 return brokerName; 1003 } 1004 1005 /** 1006 * Sets the name of this broker; which must be unique in the network 1007 * 1008 * @param brokerName 1009 */ 1010 public void setBrokerName(String brokerName) { 1011 if (brokerName == null) { 1012 throw new NullPointerException("The broker name cannot be null"); 1013 } 1014 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_"); 1015 if (!str.equals(brokerName)) { 1016 LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str); 1017 } 1018 this.brokerName = str.trim(); 1019 } 1020 1021 public PersistenceAdapterFactory getPersistenceFactory() { 1022 return persistenceFactory; 1023 } 1024 1025 public File getDataDirectoryFile() { 1026 if (dataDirectoryFile == null) { 1027 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 1028 } 1029 return dataDirectoryFile; 1030 } 1031 1032 public File getBrokerDataDirectory() { 1033 String brokerDir = getBrokerName(); 1034 return new File(getDataDirectoryFile(), brokerDir); 1035 } 1036 1037 /** 1038 * Sets the directory in which the data files will be stored by default for 1039 * the JDBC and Journal persistence adaptors. 1040 * 1041 * @param dataDirectory 1042 * the directory to store data files 1043 */ 1044 public void setDataDirectory(String dataDirectory) { 1045 setDataDirectoryFile(new File(dataDirectory)); 1046 } 1047 1048 /** 1049 * Sets the directory in which the data files will be stored by default for 1050 * the JDBC and Journal persistence adaptors. 1051 * 1052 * @param dataDirectoryFile 1053 * the directory to store data files 1054 */ 1055 public void setDataDirectoryFile(File dataDirectoryFile) { 1056 this.dataDirectoryFile = dataDirectoryFile; 1057 } 1058 1059 /** 1060 * @return the tmpDataDirectory 1061 */ 1062 public File getTmpDataDirectory() { 1063 if (tmpDataDirectory == null) { 1064 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 1065 } 1066 return tmpDataDirectory; 1067 } 1068 1069 /** 1070 * @param tmpDataDirectory 1071 * the tmpDataDirectory to set 1072 */ 1073 public void setTmpDataDirectory(File tmpDataDirectory) { 1074 this.tmpDataDirectory = tmpDataDirectory; 1075 } 1076 1077 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 1078 this.persistenceFactory = persistenceFactory; 1079 } 1080 1081 public void setDestinationFactory(DestinationFactory destinationFactory) { 1082 this.destinationFactory = destinationFactory; 1083 } 1084 1085 public boolean isPersistent() { 1086 return persistent; 1087 } 1088 1089 /** 1090 * Sets whether or not persistence is enabled or disabled. 1091 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1092 */ 1093 public void setPersistent(boolean persistent) { 1094 this.persistent = persistent; 1095 } 1096 1097 public boolean isPopulateJMSXUserID() { 1098 return populateJMSXUserID; 1099 } 1100 1101 /** 1102 * Sets whether or not the broker should populate the JMSXUserID header. 1103 */ 1104 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 1105 this.populateJMSXUserID = populateJMSXUserID; 1106 } 1107 1108 public SystemUsage getSystemUsage() { 1109 try { 1110 if (systemUsage == null) { 1111 1112 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); 1113 systemUsage.setExecutor(getExecutor()); 1114 systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB 1115 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1116 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB 1117 systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1118 addService(this.systemUsage); 1119 } 1120 return systemUsage; 1121 } catch (IOException e) { 1122 LOG.error("Cannot create SystemUsage", e); 1123 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); 1124 } 1125 } 1126 1127 public void setSystemUsage(SystemUsage memoryManager) { 1128 if (this.systemUsage != null) { 1129 removeService(this.systemUsage); 1130 } 1131 this.systemUsage = memoryManager; 1132 if (this.systemUsage.getExecutor()==null) { 1133 this.systemUsage.setExecutor(getExecutor()); 1134 } 1135 addService(this.systemUsage); 1136 } 1137 1138 /** 1139 * @return the consumerUsageManager 1140 * @throws IOException 1141 */ 1142 public SystemUsage getConsumerSystemUsage() throws IOException { 1143 if (this.consumerSystemUsaage == null) { 1144 if (splitSystemUsageForProducersConsumers) { 1145 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 1146 float portion = consumerSystemUsagePortion / 100f; 1147 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 1148 addService(this.consumerSystemUsaage); 1149 } else { 1150 consumerSystemUsaage = getSystemUsage(); 1151 } 1152 } 1153 return this.consumerSystemUsaage; 1154 } 1155 1156 /** 1157 * @param consumerSystemUsaage 1158 * the storeSystemUsage to set 1159 */ 1160 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 1161 if (this.consumerSystemUsaage != null) { 1162 removeService(this.consumerSystemUsaage); 1163 } 1164 this.consumerSystemUsaage = consumerSystemUsaage; 1165 addService(this.consumerSystemUsaage); 1166 } 1167 1168 /** 1169 * @return the producerUsageManager 1170 * @throws IOException 1171 */ 1172 public SystemUsage getProducerSystemUsage() throws IOException { 1173 if (producerSystemUsage == null) { 1174 if (splitSystemUsageForProducersConsumers) { 1175 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1176 float portion = producerSystemUsagePortion / 100f; 1177 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1178 addService(producerSystemUsage); 1179 } else { 1180 producerSystemUsage = getSystemUsage(); 1181 } 1182 } 1183 return producerSystemUsage; 1184 } 1185 1186 /** 1187 * @param producerUsageManager 1188 * the producerUsageManager to set 1189 */ 1190 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1191 if (this.producerSystemUsage != null) { 1192 removeService(this.producerSystemUsage); 1193 } 1194 this.producerSystemUsage = producerUsageManager; 1195 addService(this.producerSystemUsage); 1196 } 1197 1198 public PersistenceAdapter getPersistenceAdapter() throws IOException { 1199 if (persistenceAdapter == null) { 1200 persistenceAdapter = createPersistenceAdapter(); 1201 configureService(persistenceAdapter); 1202 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1203 } 1204 return persistenceAdapter; 1205 } 1206 1207 /** 1208 * Sets the persistence adaptor implementation to use for this broker 1209 * 1210 * @throws IOException 1211 */ 1212 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1213 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { 1214 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); 1215 return; 1216 } 1217 this.persistenceAdapter = persistenceAdapter; 1218 configureService(this.persistenceAdapter); 1219 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1220 } 1221 1222 public TaskRunnerFactory getTaskRunnerFactory() { 1223 if (this.taskRunnerFactory == null) { 1224 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1225 isDedicatedTaskRunner()); 1226 this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); 1227 } 1228 return this.taskRunnerFactory; 1229 } 1230 1231 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1232 this.taskRunnerFactory = taskRunnerFactory; 1233 } 1234 1235 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1236 if (taskRunnerFactory == null) { 1237 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1238 true, 1000, isDedicatedTaskRunner()); 1239 } 1240 return persistenceTaskRunnerFactory; 1241 } 1242 1243 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1244 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1245 } 1246 1247 public boolean isUseJmx() { 1248 return useJmx; 1249 } 1250 1251 public boolean isEnableStatistics() { 1252 return enableStatistics; 1253 } 1254 1255 /** 1256 * Sets whether or not the Broker's services enable statistics or not. 1257 */ 1258 public void setEnableStatistics(boolean enableStatistics) { 1259 this.enableStatistics = enableStatistics; 1260 } 1261 1262 /** 1263 * Sets whether or not the Broker's services should be exposed into JMX or 1264 * not. 1265 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1266 */ 1267 public void setUseJmx(boolean useJmx) { 1268 this.useJmx = useJmx; 1269 } 1270 1271 public ObjectName getBrokerObjectName() throws MalformedObjectNameException { 1272 if (brokerObjectName == null) { 1273 brokerObjectName = createBrokerObjectName(); 1274 } 1275 return brokerObjectName; 1276 } 1277 1278 /** 1279 * Sets the JMX ObjectName for this broker 1280 */ 1281 public void setBrokerObjectName(ObjectName brokerObjectName) { 1282 this.brokerObjectName = brokerObjectName; 1283 } 1284 1285 public ManagementContext getManagementContext() { 1286 if (managementContext == null) { 1287 managementContext = new ManagementContext(); 1288 } 1289 return managementContext; 1290 } 1291 1292 public void setManagementContext(ManagementContext managementContext) { 1293 this.managementContext = managementContext; 1294 } 1295 1296 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1297 for (NetworkConnector connector : networkConnectors) { 1298 if (connector.getName().equals(connectorName)) { 1299 return connector; 1300 } 1301 } 1302 return null; 1303 } 1304 1305 public String[] getNetworkConnectorURIs() { 1306 return networkConnectorURIs; 1307 } 1308 1309 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1310 this.networkConnectorURIs = networkConnectorURIs; 1311 } 1312 1313 public TransportConnector getConnectorByName(String connectorName) { 1314 for (TransportConnector connector : transportConnectors) { 1315 if (connector.getName().equals(connectorName)) { 1316 return connector; 1317 } 1318 } 1319 return null; 1320 } 1321 1322 public Map<String, String> getTransportConnectorURIsAsMap() { 1323 Map<String, String> answer = new HashMap<String, String>(); 1324 for (TransportConnector connector : transportConnectors) { 1325 try { 1326 URI uri = connector.getConnectUri(); 1327 if (uri != null) { 1328 String scheme = uri.getScheme(); 1329 if (scheme != null) { 1330 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); 1331 } 1332 } 1333 } catch (Exception e) { 1334 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1335 } 1336 } 1337 return answer; 1338 } 1339 1340 public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ 1341 ProducerBrokerExchange result = null; 1342 1343 for (TransportConnector connector : transportConnectors) { 1344 for (TransportConnection tc: connector.getConnections()){ 1345 result = tc.getProducerBrokerExchangeIfExists(producerInfo); 1346 if (result !=null){ 1347 return result; 1348 } 1349 } 1350 } 1351 return result; 1352 } 1353 1354 public String[] getTransportConnectorURIs() { 1355 return transportConnectorURIs; 1356 } 1357 1358 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1359 this.transportConnectorURIs = transportConnectorURIs; 1360 } 1361 1362 /** 1363 * @return Returns the jmsBridgeConnectors. 1364 */ 1365 public JmsConnector[] getJmsBridgeConnectors() { 1366 return jmsBridgeConnectors; 1367 } 1368 1369 /** 1370 * @param jmsConnectors 1371 * The jmsBridgeConnectors to set. 1372 */ 1373 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1374 this.jmsBridgeConnectors = jmsConnectors; 1375 } 1376 1377 public Service[] getServices() { 1378 return services.toArray(new Service[0]); 1379 } 1380 1381 /** 1382 * Sets the services associated with this broker. 1383 */ 1384 public void setServices(Service[] services) { 1385 this.services.clear(); 1386 if (services != null) { 1387 for (int i = 0; i < services.length; i++) { 1388 this.services.add(services[i]); 1389 } 1390 } 1391 } 1392 1393 /** 1394 * Adds a new service so that it will be started as part of the broker 1395 * lifecycle 1396 */ 1397 public void addService(Service service) { 1398 services.add(service); 1399 } 1400 1401 public void removeService(Service service) { 1402 services.remove(service); 1403 } 1404 1405 public boolean isUseLoggingForShutdownErrors() { 1406 return useLoggingForShutdownErrors; 1407 } 1408 1409 /** 1410 * Sets whether or not we should use commons-logging when reporting errors 1411 * when shutting down the broker 1412 */ 1413 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1414 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1415 } 1416 1417 public boolean isUseShutdownHook() { 1418 return useShutdownHook; 1419 } 1420 1421 /** 1422 * Sets whether or not we should use a shutdown handler to close down the 1423 * broker cleanly if the JVM is terminated. It is recommended you leave this 1424 * enabled. 1425 */ 1426 public void setUseShutdownHook(boolean useShutdownHook) { 1427 this.useShutdownHook = useShutdownHook; 1428 } 1429 1430 public boolean isAdvisorySupport() { 1431 return advisorySupport; 1432 } 1433 1434 /** 1435 * Allows the support of advisory messages to be disabled for performance 1436 * reasons. 1437 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1438 */ 1439 public void setAdvisorySupport(boolean advisorySupport) { 1440 this.advisorySupport = advisorySupport; 1441 } 1442 1443 public List<TransportConnector> getTransportConnectors() { 1444 return new ArrayList<TransportConnector>(transportConnectors); 1445 } 1446 1447 /** 1448 * Sets the transport connectors which this broker will listen on for new 1449 * clients 1450 * 1451 * @org.apache.xbean.Property 1452 * nestedType="org.apache.activemq.broker.TransportConnector" 1453 */ 1454 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1455 for (TransportConnector connector : transportConnectors) { 1456 addConnector(connector); 1457 } 1458 } 1459 1460 public TransportConnector getTransportConnectorByName(String name){ 1461 for (TransportConnector transportConnector : transportConnectors){ 1462 if (name.equals(transportConnector.getName())){ 1463 return transportConnector; 1464 } 1465 } 1466 return null; 1467 } 1468 1469 public TransportConnector getTransportConnectorByScheme(String scheme){ 1470 for (TransportConnector transportConnector : transportConnectors){ 1471 if (scheme.equals(transportConnector.getUri().getScheme())){ 1472 return transportConnector; 1473 } 1474 } 1475 return null; 1476 } 1477 1478 public List<NetworkConnector> getNetworkConnectors() { 1479 return new ArrayList<NetworkConnector>(networkConnectors); 1480 } 1481 1482 public List<ProxyConnector> getProxyConnectors() { 1483 return new ArrayList<ProxyConnector>(proxyConnectors); 1484 } 1485 1486 /** 1487 * Sets the network connectors which this broker will use to connect to 1488 * other brokers in a federated network 1489 * 1490 * @org.apache.xbean.Property 1491 * nestedType="org.apache.activemq.network.NetworkConnector" 1492 */ 1493 public void setNetworkConnectors(List<?> networkConnectors) throws Exception { 1494 for (Object connector : networkConnectors) { 1495 addNetworkConnector((NetworkConnector) connector); 1496 } 1497 } 1498 1499 /** 1500 * Sets the network connectors which this broker will use to connect to 1501 * other brokers in a federated network 1502 */ 1503 public void setProxyConnectors(List<?> proxyConnectors) throws Exception { 1504 for (Object connector : proxyConnectors) { 1505 addProxyConnector((ProxyConnector) connector); 1506 } 1507 } 1508 1509 public PolicyMap getDestinationPolicy() { 1510 return destinationPolicy; 1511 } 1512 1513 /** 1514 * Sets the destination specific policies available either for exact 1515 * destinations or for wildcard areas of destinations. 1516 */ 1517 public void setDestinationPolicy(PolicyMap policyMap) { 1518 this.destinationPolicy = policyMap; 1519 } 1520 1521 public BrokerPlugin[] getPlugins() { 1522 return plugins; 1523 } 1524 1525 /** 1526 * Sets a number of broker plugins to install such as for security 1527 * authentication or authorization 1528 */ 1529 public void setPlugins(BrokerPlugin[] plugins) { 1530 this.plugins = plugins; 1531 } 1532 1533 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1534 return messageAuthorizationPolicy; 1535 } 1536 1537 /** 1538 * Sets the policy used to decide if the current connection is authorized to 1539 * consume a given message 1540 */ 1541 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1542 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1543 } 1544 1545 /** 1546 * Delete all messages from the persistent store 1547 * 1548 * @throws IOException 1549 */ 1550 public void deleteAllMessages() throws IOException { 1551 getPersistenceAdapter().deleteAllMessages(); 1552 } 1553 1554 public boolean isDeleteAllMessagesOnStartup() { 1555 return deleteAllMessagesOnStartup; 1556 } 1557 1558 /** 1559 * Sets whether or not all messages are deleted on startup - mostly only 1560 * useful for testing. 1561 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1562 */ 1563 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1564 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1565 } 1566 1567 public URI getVmConnectorURI() { 1568 if (vmConnectorURI == null) { 1569 try { 1570 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); 1571 } catch (URISyntaxException e) { 1572 LOG.error("Badly formed URI from {}", getBrokerName(), e); 1573 } 1574 } 1575 return vmConnectorURI; 1576 } 1577 1578 public void setVmConnectorURI(URI vmConnectorURI) { 1579 this.vmConnectorURI = vmConnectorURI; 1580 } 1581 1582 public String getDefaultSocketURIString() { 1583 if (started.get()) { 1584 if (this.defaultSocketURIString == null) { 1585 for (TransportConnector tc:this.transportConnectors) { 1586 String result = null; 1587 try { 1588 result = tc.getPublishableConnectString(); 1589 } catch (Exception e) { 1590 LOG.warn("Failed to get the ConnectURI for {}", tc, e); 1591 } 1592 if (result != null) { 1593 // find first publishable uri 1594 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1595 this.defaultSocketURIString = result; 1596 break; 1597 } else { 1598 // or use the first defined 1599 if (this.defaultSocketURIString == null) { 1600 this.defaultSocketURIString = result; 1601 } 1602 } 1603 } 1604 } 1605 1606 } 1607 return this.defaultSocketURIString; 1608 } 1609 return null; 1610 } 1611 1612 /** 1613 * @return Returns the shutdownOnMasterFailure. 1614 */ 1615 public boolean isShutdownOnMasterFailure() { 1616 return shutdownOnMasterFailure; 1617 } 1618 1619 /** 1620 * @param shutdownOnMasterFailure 1621 * The shutdownOnMasterFailure to set. 1622 */ 1623 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1624 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1625 } 1626 1627 public boolean isKeepDurableSubsActive() { 1628 return keepDurableSubsActive; 1629 } 1630 1631 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1632 this.keepDurableSubsActive = keepDurableSubsActive; 1633 } 1634 1635 public boolean isUseVirtualTopics() { 1636 return useVirtualTopics; 1637 } 1638 1639 /** 1640 * Sets whether or not <a 1641 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1642 * Topics</a> should be supported by default if they have not been 1643 * explicitly configured. 1644 */ 1645 public void setUseVirtualTopics(boolean useVirtualTopics) { 1646 this.useVirtualTopics = useVirtualTopics; 1647 } 1648 1649 public DestinationInterceptor[] getDestinationInterceptors() { 1650 return destinationInterceptors; 1651 } 1652 1653 public boolean isUseMirroredQueues() { 1654 return useMirroredQueues; 1655 } 1656 1657 /** 1658 * Sets whether or not <a 1659 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1660 * Queues</a> should be supported by default if they have not been 1661 * explicitly configured. 1662 */ 1663 public void setUseMirroredQueues(boolean useMirroredQueues) { 1664 this.useMirroredQueues = useMirroredQueues; 1665 } 1666 1667 /** 1668 * Sets the destination interceptors to use 1669 */ 1670 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1671 this.destinationInterceptors = destinationInterceptors; 1672 } 1673 1674 public ActiveMQDestination[] getDestinations() { 1675 return destinations; 1676 } 1677 1678 /** 1679 * Sets the destinations which should be loaded/created on startup 1680 */ 1681 public void setDestinations(ActiveMQDestination[] destinations) { 1682 this.destinations = destinations; 1683 } 1684 1685 /** 1686 * @return the tempDataStore 1687 */ 1688 public synchronized PListStore getTempDataStore() { 1689 if (tempDataStore == null) { 1690 if (!isPersistent()) { 1691 return null; 1692 } 1693 1694 try { 1695 PersistenceAdapter pa = getPersistenceAdapter(); 1696 if( pa!=null && pa instanceof PListStore) { 1697 return (PListStore) pa; 1698 } 1699 } catch (IOException e) { 1700 throw new RuntimeException(e); 1701 } 1702 1703 boolean result = true; 1704 boolean empty = true; 1705 try { 1706 File directory = getTmpDataDirectory(); 1707 if (directory.exists() && directory.isDirectory()) { 1708 File[] files = directory.listFiles(); 1709 if (files != null && files.length > 0) { 1710 empty = false; 1711 for (int i = 0; i < files.length; i++) { 1712 File file = files[i]; 1713 if (!file.isDirectory()) { 1714 result &= file.delete(); 1715 } 1716 } 1717 } 1718 } 1719 if (!empty) { 1720 String str = result ? "Successfully deleted" : "Failed to delete"; 1721 LOG.info("{} temporary storage", str); 1722 } 1723 1724 String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; 1725 this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); 1726 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1727 configureService(tempDataStore); 1728 this.tempDataStore.start(); 1729 } catch (Exception e) { 1730 throw new RuntimeException(e); 1731 } 1732 } 1733 return tempDataStore; 1734 } 1735 1736 /** 1737 * @param tempDataStore 1738 * the tempDataStore to set 1739 */ 1740 public void setTempDataStore(PListStore tempDataStore) { 1741 this.tempDataStore = tempDataStore; 1742 configureService(tempDataStore); 1743 try { 1744 tempDataStore.start(); 1745 } catch (Exception e) { 1746 RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e); 1747 LOG.error(exception.getLocalizedMessage(), e); 1748 throw exception; 1749 } 1750 } 1751 1752 public int getPersistenceThreadPriority() { 1753 return persistenceThreadPriority; 1754 } 1755 1756 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1757 this.persistenceThreadPriority = persistenceThreadPriority; 1758 } 1759 1760 /** 1761 * @return the useLocalHostBrokerName 1762 */ 1763 public boolean isUseLocalHostBrokerName() { 1764 return this.useLocalHostBrokerName; 1765 } 1766 1767 /** 1768 * @param useLocalHostBrokerName 1769 * the useLocalHostBrokerName to set 1770 */ 1771 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1772 this.useLocalHostBrokerName = useLocalHostBrokerName; 1773 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1774 brokerName = LOCAL_HOST_NAME; 1775 } 1776 } 1777 1778 /** 1779 * Looks up and lazily creates if necessary the destination for the given 1780 * JMS name 1781 */ 1782 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1783 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1784 } 1785 1786 public void removeDestination(ActiveMQDestination destination) throws Exception { 1787 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1788 } 1789 1790 public int getProducerSystemUsagePortion() { 1791 return producerSystemUsagePortion; 1792 } 1793 1794 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1795 this.producerSystemUsagePortion = producerSystemUsagePortion; 1796 } 1797 1798 public int getConsumerSystemUsagePortion() { 1799 return consumerSystemUsagePortion; 1800 } 1801 1802 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1803 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1804 } 1805 1806 public boolean isSplitSystemUsageForProducersConsumers() { 1807 return splitSystemUsageForProducersConsumers; 1808 } 1809 1810 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1811 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1812 } 1813 1814 public boolean isMonitorConnectionSplits() { 1815 return monitorConnectionSplits; 1816 } 1817 1818 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1819 this.monitorConnectionSplits = monitorConnectionSplits; 1820 } 1821 1822 public int getTaskRunnerPriority() { 1823 return taskRunnerPriority; 1824 } 1825 1826 public void setTaskRunnerPriority(int taskRunnerPriority) { 1827 this.taskRunnerPriority = taskRunnerPriority; 1828 } 1829 1830 public boolean isDedicatedTaskRunner() { 1831 return dedicatedTaskRunner; 1832 } 1833 1834 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1835 this.dedicatedTaskRunner = dedicatedTaskRunner; 1836 } 1837 1838 public boolean isCacheTempDestinations() { 1839 return cacheTempDestinations; 1840 } 1841 1842 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1843 this.cacheTempDestinations = cacheTempDestinations; 1844 } 1845 1846 public int getTimeBeforePurgeTempDestinations() { 1847 return timeBeforePurgeTempDestinations; 1848 } 1849 1850 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1851 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1852 } 1853 1854 public boolean isUseTempMirroredQueues() { 1855 return useTempMirroredQueues; 1856 } 1857 1858 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1859 this.useTempMirroredQueues = useTempMirroredQueues; 1860 } 1861 1862 public synchronized JobSchedulerStore getJobSchedulerStore() { 1863 1864 // If support is off don't allow any scheduler even is user configured their own. 1865 if (!isSchedulerSupport()) { 1866 return null; 1867 } 1868 1869 // If the user configured their own we use it even if persistence is disabled since 1870 // we don't know anything about their implementation. 1871 if (jobSchedulerStore == null) { 1872 1873 if (!isPersistent()) { 1874 this.jobSchedulerStore = new InMemoryJobSchedulerStore(); 1875 configureService(jobSchedulerStore); 1876 return this.jobSchedulerStore; 1877 } 1878 1879 try { 1880 PersistenceAdapter pa = getPersistenceAdapter(); 1881 if (pa != null) { 1882 this.jobSchedulerStore = pa.createJobSchedulerStore(); 1883 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1884 configureService(jobSchedulerStore); 1885 return this.jobSchedulerStore; 1886 } 1887 } catch (IOException e) { 1888 throw new RuntimeException(e); 1889 } catch (UnsupportedOperationException ex) { 1890 // It's ok if the store doesn't implement a scheduler. 1891 } catch (Exception e) { 1892 throw new RuntimeException(e); 1893 } 1894 1895 try { 1896 PersistenceAdapter pa = getPersistenceAdapter(); 1897 if (pa != null && pa instanceof JobSchedulerStore) { 1898 this.jobSchedulerStore = (JobSchedulerStore) pa; 1899 configureService(jobSchedulerStore); 1900 return this.jobSchedulerStore; 1901 } 1902 } catch (IOException e) { 1903 throw new RuntimeException(e); 1904 } 1905 1906 // Load the KahaDB store as a last resort, this only works if KahaDB is 1907 // included at runtime, otherwise this will fail. User should disable 1908 // scheduler support if this fails. 1909 try { 1910 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 1911 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 1912 jobSchedulerStore = adaptor.createJobSchedulerStore(); 1913 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1914 configureService(jobSchedulerStore); 1915 LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); 1916 } catch (Exception e) { 1917 throw new RuntimeException(e); 1918 } 1919 } 1920 return jobSchedulerStore; 1921 } 1922 1923 public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { 1924 this.jobSchedulerStore = jobSchedulerStore; 1925 configureService(jobSchedulerStore); 1926 } 1927 1928 // 1929 // Implementation methods 1930 // ------------------------------------------------------------------------- 1931 /** 1932 * Handles any lazy-creation helper properties which are added to make 1933 * things easier to configure inside environments such as Spring 1934 * 1935 * @throws Exception 1936 */ 1937 protected void processHelperProperties() throws Exception { 1938 if (transportConnectorURIs != null) { 1939 for (int i = 0; i < transportConnectorURIs.length; i++) { 1940 String uri = transportConnectorURIs[i]; 1941 addConnector(uri); 1942 } 1943 } 1944 if (networkConnectorURIs != null) { 1945 for (int i = 0; i < networkConnectorURIs.length; i++) { 1946 String uri = networkConnectorURIs[i]; 1947 addNetworkConnector(uri); 1948 } 1949 } 1950 if (jmsBridgeConnectors != null) { 1951 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1952 addJmsConnector(jmsBridgeConnectors[i]); 1953 } 1954 } 1955 } 1956 1957 protected void checkSystemUsageLimits() throws IOException { 1958 SystemUsage usage = getSystemUsage(); 1959 long memLimit = usage.getMemoryUsage().getLimit(); 1960 long jvmLimit = Runtime.getRuntime().maxMemory(); 1961 1962 if (memLimit > jvmLimit) { 1963 usage.getMemoryUsage().setPercentOfJvmHeap(70); 1964 LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) + 1965 " mb) is more than the maximum available for the JVM: " + 1966 jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); 1967 } 1968 1969 if (getPersistenceAdapter() != null) { 1970 PersistenceAdapter adapter = getPersistenceAdapter(); 1971 File dir = adapter.getDirectory(); 1972 1973 if (dir != null) { 1974 String dirPath = dir.getAbsolutePath(); 1975 if (!dir.isAbsolute()) { 1976 dir = new File(dirPath); 1977 } 1978 1979 while (dir != null && !dir.isDirectory()) { 1980 dir = dir.getParentFile(); 1981 } 1982 long storeLimit = usage.getStoreUsage().getLimit(); 1983 long storeCurrent = usage.getStoreUsage().getUsage(); 1984 long dirFreeSpace = dir.getUsableSpace(); 1985 if (storeLimit > (dirFreeSpace + storeCurrent)) { 1986 LOG.warn("Store limit is " + storeLimit / (1024 * 1024) + 1987 " mb (current store usage is " + storeCurrent / (1024 * 1024) + 1988 " mb). The data directory: " + dir.getAbsolutePath() + 1989 " only has " + dirFreeSpace / (1024 * 1024) + 1990 " mb of usable space - resetting to maximum available disk space: " + 1991 (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); 1992 usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent); 1993 } 1994 } 1995 1996 long maxJournalFileSize = 0; 1997 long storeLimit = usage.getStoreUsage().getLimit(); 1998 1999 if (adapter instanceof JournaledStore) { 2000 maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); 2001 } 2002 2003 if (storeLimit < maxJournalFileSize) { 2004 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 2005 " mb, whilst the max journal file size for the store is: " + 2006 maxJournalFileSize / (1024 * 1024) + " mb, " + 2007 "the store will not accept any data when used."); 2008 2009 } 2010 } 2011 2012 File tmpDir = getTmpDataDirectory(); 2013 if (tmpDir != null) { 2014 2015 String tmpDirPath = tmpDir.getAbsolutePath(); 2016 if (!tmpDir.isAbsolute()) { 2017 tmpDir = new File(tmpDirPath); 2018 } 2019 2020 long storeLimit = usage.getTempUsage().getLimit(); 2021 while (tmpDir != null && !tmpDir.isDirectory()) { 2022 tmpDir = tmpDir.getParentFile(); 2023 } 2024 long dirFreeSpace = tmpDir.getUsableSpace(); 2025 if (storeLimit > dirFreeSpace) { 2026 LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2027 " mb, whilst the temporary data directory: " + tmpDirPath + 2028 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " + 2029 dirFreeSpace / (1024 * 1024) + " mb."); 2030 usage.getTempUsage().setLimit(dirFreeSpace); 2031 } 2032 2033 if (isPersistent()) { 2034 long maxJournalFileSize; 2035 2036 PListStore store = usage.getTempUsage().getStore(); 2037 if (store != null && store instanceof JournaledStore) { 2038 maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); 2039 } else { 2040 maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; 2041 } 2042 2043 if (storeLimit < maxJournalFileSize) { 2044 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2045 " mb, whilst the max journal file size for the temporary store is: " + 2046 maxJournalFileSize / (1024 * 1024) + " mb, " + 2047 "the temp store will not accept any data when used."); 2048 } 2049 } 2050 } 2051 2052 if (getJobSchedulerStore() != null) { 2053 JobSchedulerStore scheduler = getJobSchedulerStore(); 2054 File schedulerDir = scheduler.getDirectory(); 2055 if (schedulerDir != null) { 2056 2057 String schedulerDirPath = schedulerDir.getAbsolutePath(); 2058 if (!schedulerDir.isAbsolute()) { 2059 schedulerDir = new File(schedulerDirPath); 2060 } 2061 2062 while (schedulerDir != null && !schedulerDir.isDirectory()) { 2063 schedulerDir = schedulerDir.getParentFile(); 2064 } 2065 long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); 2066 long dirFreeSpace = schedulerDir.getUsableSpace(); 2067 if (schedulerLimit > dirFreeSpace) { 2068 LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + 2069 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + 2070 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + 2071 dirFreeSpace / (1024 * 1024) + " mb."); 2072 usage.getJobSchedulerUsage().setLimit(dirFreeSpace); 2073 } 2074 } 2075 } 2076 } 2077 2078 public void stopAllConnectors(ServiceStopper stopper) { 2079 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2080 NetworkConnector connector = iter.next(); 2081 unregisterNetworkConnectorMBean(connector); 2082 stopper.stop(connector); 2083 } 2084 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2085 ProxyConnector connector = iter.next(); 2086 stopper.stop(connector); 2087 } 2088 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2089 JmsConnector connector = iter.next(); 2090 stopper.stop(connector); 2091 } 2092 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2093 TransportConnector connector = iter.next(); 2094 try { 2095 unregisterConnectorMBean(connector); 2096 } catch (IOException e) { 2097 } 2098 stopper.stop(connector); 2099 } 2100 } 2101 2102 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 2103 try { 2104 ObjectName objectName = createConnectorObjectName(connector); 2105 connector = connector.asManagedConnector(getManagementContext(), objectName); 2106 ConnectorViewMBean view = new ConnectorView(connector); 2107 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2108 return connector; 2109 } catch (Throwable e) { 2110 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); 2111 } 2112 } 2113 2114 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 2115 if (isUseJmx()) { 2116 try { 2117 ObjectName objectName = createConnectorObjectName(connector); 2118 getManagementContext().unregisterMBean(objectName); 2119 } catch (Throwable e) { 2120 throw IOExceptionSupport.create( 2121 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 2122 } 2123 } 2124 } 2125 2126 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2127 return adaptor; 2128 } 2129 2130 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2131 if (isUseJmx()) {} 2132 } 2133 2134 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 2135 return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); 2136 } 2137 2138 public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 2139 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 2140 try { 2141 ObjectName objectName = createNetworkConnectorObjectName(connector); 2142 connector.setObjectName(objectName); 2143 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2144 } catch (Throwable e) { 2145 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 2146 } 2147 } 2148 2149 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 2150 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); 2151 } 2152 2153 public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { 2154 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); 2155 } 2156 2157 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 2158 if (isUseJmx()) { 2159 try { 2160 ObjectName objectName = createNetworkConnectorObjectName(connector); 2161 getManagementContext().unregisterMBean(objectName); 2162 } catch (Exception e) { 2163 LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); 2164 } 2165 } 2166 } 2167 2168 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 2169 ProxyConnectorView view = new ProxyConnectorView(connector); 2170 try { 2171 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); 2172 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2173 } catch (Throwable e) { 2174 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2175 } 2176 } 2177 2178 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 2179 JmsConnectorView view = new JmsConnectorView(connector); 2180 try { 2181 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); 2182 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2183 } catch (Throwable e) { 2184 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2185 } 2186 } 2187 2188 /** 2189 * Factory method to create a new broker 2190 * 2191 * @throws Exception 2192 * @throws 2193 * @throws 2194 */ 2195 protected Broker createBroker() throws Exception { 2196 regionBroker = createRegionBroker(); 2197 Broker broker = addInterceptors(regionBroker); 2198 // Add a filter that will stop access to the broker once stopped 2199 broker = new MutableBrokerFilter(broker) { 2200 Broker old; 2201 2202 @Override 2203 public void stop() throws Exception { 2204 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 2205 // Just ignore additional stop actions. 2206 @Override 2207 public void stop() throws Exception { 2208 } 2209 }); 2210 old.stop(); 2211 } 2212 2213 @Override 2214 public void start() throws Exception { 2215 if (forceStart && old != null) { 2216 this.next.set(old); 2217 } 2218 getNext().start(); 2219 } 2220 }; 2221 return broker; 2222 } 2223 2224 /** 2225 * Factory method to create the core region broker onto which interceptors 2226 * are added 2227 * 2228 * @throws Exception 2229 */ 2230 protected Broker createRegionBroker() throws Exception { 2231 if (destinationInterceptors == null) { 2232 destinationInterceptors = createDefaultDestinationInterceptor(); 2233 } 2234 configureServices(destinationInterceptors); 2235 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 2236 if (destinationFactory == null) { 2237 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 2238 } 2239 return createRegionBroker(destinationInterceptor); 2240 } 2241 2242 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 2243 RegionBroker regionBroker; 2244 if (isUseJmx()) { 2245 try { 2246 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 2247 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 2248 } catch(MalformedObjectNameException me){ 2249 LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); 2250 throw new IOException(me); 2251 } 2252 } else { 2253 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2254 destinationInterceptor,getScheduler(),getExecutor()); 2255 } 2256 destinationFactory.setRegionBroker(regionBroker); 2257 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2258 regionBroker.setBrokerName(getBrokerName()); 2259 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2260 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2261 if (brokerId != null) { 2262 regionBroker.setBrokerId(brokerId); 2263 } 2264 return regionBroker; 2265 } 2266 2267 /** 2268 * Create the default destination interceptor 2269 */ 2270 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2271 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2272 if (isUseVirtualTopics()) { 2273 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2274 VirtualTopic virtualTopic = new VirtualTopic(); 2275 virtualTopic.setName("VirtualTopic.>"); 2276 VirtualDestination[] virtualDestinations = { virtualTopic }; 2277 interceptor.setVirtualDestinations(virtualDestinations); 2278 answer.add(interceptor); 2279 } 2280 if (isUseMirroredQueues()) { 2281 MirroredQueue interceptor = new MirroredQueue(); 2282 answer.add(interceptor); 2283 } 2284 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2285 answer.toArray(array); 2286 return array; 2287 } 2288 2289 /** 2290 * Strategy method to add interceptors to the broker 2291 * 2292 * @throws IOException 2293 */ 2294 protected Broker addInterceptors(Broker broker) throws Exception { 2295 if (isSchedulerSupport()) { 2296 SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); 2297 if (isUseJmx()) { 2298 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2299 try { 2300 ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); 2301 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2302 this.adminView.setJMSJobScheduler(objectName); 2303 } catch (Throwable e) { 2304 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2305 + e.getMessage(), e); 2306 } 2307 } 2308 broker = sb; 2309 } 2310 if (isUseJmx()) { 2311 HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); 2312 try { 2313 ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); 2314 AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); 2315 } catch (Throwable e) { 2316 throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " 2317 + e.getMessage(), e); 2318 } 2319 } 2320 if (isAdvisorySupport()) { 2321 broker = new AdvisoryBroker(broker); 2322 } 2323 broker = new CompositeDestinationBroker(broker); 2324 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2325 if (isPopulateJMSXUserID()) { 2326 UserIDBroker userIDBroker = new UserIDBroker(broker); 2327 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2328 broker = userIDBroker; 2329 } 2330 if (isMonitorConnectionSplits()) { 2331 broker = new ConnectionSplitBroker(broker); 2332 } 2333 if (plugins != null) { 2334 for (int i = 0; i < plugins.length; i++) { 2335 BrokerPlugin plugin = plugins[i]; 2336 broker = plugin.installPlugin(broker); 2337 } 2338 } 2339 return broker; 2340 } 2341 2342 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2343 if (isPersistent()) { 2344 PersistenceAdapterFactory fac = getPersistenceFactory(); 2345 if (fac != null) { 2346 return fac.createPersistenceAdapter(); 2347 } else { 2348 try { 2349 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 2350 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 2351 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2352 adaptor.setDirectory(dir); 2353 return adaptor; 2354 } catch (Throwable e) { 2355 throw IOExceptionSupport.create(e); 2356 } 2357 } 2358 } else { 2359 return new MemoryPersistenceAdapter(); 2360 } 2361 } 2362 2363 protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { 2364 return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); 2365 } 2366 2367 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2368 TransportServer transport = TransportFactorySupport.bind(this, brokerURI); 2369 return new TransportConnector(transport); 2370 } 2371 2372 /** 2373 * Extracts the port from the options 2374 */ 2375 protected Object getPort(Map<?,?> options) { 2376 Object port = options.get("port"); 2377 if (port == null) { 2378 port = DEFAULT_PORT; 2379 LOG.warn("No port specified so defaulting to: {}", port); 2380 } 2381 return port; 2382 } 2383 2384 protected void addShutdownHook() { 2385 if (useShutdownHook) { 2386 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2387 @Override 2388 public void run() { 2389 containerShutdown(); 2390 } 2391 }; 2392 Runtime.getRuntime().addShutdownHook(shutdownHook); 2393 } 2394 } 2395 2396 protected void removeShutdownHook() { 2397 if (shutdownHook != null) { 2398 try { 2399 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2400 } catch (Exception e) { 2401 LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); 2402 } 2403 } 2404 } 2405 2406 /** 2407 * Sets hooks to be executed when broker shut down 2408 * 2409 * @org.apache.xbean.Property 2410 */ 2411 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2412 for (Runnable hook : hooks) { 2413 addShutdownHook(hook); 2414 } 2415 } 2416 2417 /** 2418 * Causes a clean shutdown of the container when the VM is being shut down 2419 */ 2420 protected void containerShutdown() { 2421 try { 2422 stop(); 2423 } catch (IOException e) { 2424 Throwable linkedException = e.getCause(); 2425 if (linkedException != null) { 2426 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2427 } else { 2428 logError("Failed to shut down: " + e, e); 2429 } 2430 if (!useLoggingForShutdownErrors) { 2431 e.printStackTrace(System.err); 2432 } 2433 } catch (Exception e) { 2434 logError("Failed to shut down: " + e, e); 2435 } 2436 } 2437 2438 protected void logError(String message, Throwable e) { 2439 if (useLoggingForShutdownErrors) { 2440 LOG.error("Failed to shut down: " + e); 2441 } else { 2442 System.err.println("Failed to shut down: " + e); 2443 } 2444 } 2445 2446 /** 2447 * Starts any configured destinations on startup 2448 */ 2449 protected void startDestinations() throws Exception { 2450 if (destinations != null) { 2451 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2452 for (int i = 0; i < destinations.length; i++) { 2453 ActiveMQDestination destination = destinations[i]; 2454 getBroker().addDestination(adminConnectionContext, destination,true); 2455 } 2456 } 2457 if (isUseVirtualTopics()) { 2458 startVirtualConsumerDestinations(); 2459 } 2460 } 2461 2462 /** 2463 * Returns the broker's administration connection context used for 2464 * configuring the broker at startup 2465 */ 2466 public ConnectionContext getAdminConnectionContext() throws Exception { 2467 return BrokerSupport.getConnectionContext(getBroker()); 2468 } 2469 2470 protected void startManagementContext() throws Exception { 2471 getManagementContext().setBrokerName(brokerName); 2472 getManagementContext().start(); 2473 adminView = new BrokerView(this, null); 2474 ObjectName objectName = getBrokerObjectName(); 2475 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2476 } 2477 2478 /** 2479 * Start all transport and network connections, proxies and bridges 2480 * 2481 * @throws Exception 2482 */ 2483 public void startAllConnectors() throws Exception { 2484 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2485 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2486 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2487 TransportConnector connector = iter.next(); 2488 al.add(startTransportConnector(connector)); 2489 } 2490 if (al.size() > 0) { 2491 // let's clear the transportConnectors list and replace it with 2492 // the started transportConnector instances 2493 this.transportConnectors.clear(); 2494 setTransportConnectors(al); 2495 } 2496 this.slave = false; 2497 URI uri = getVmConnectorURI(); 2498 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 2499 map.put("network", "true"); 2500 map.put("async", "false"); 2501 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 2502 2503 if (!stopped.get()) { 2504 ThreadPoolExecutor networkConnectorStartExecutor = null; 2505 if (isNetworkConnectorStartAsync()) { 2506 // spin up as many threads as needed 2507 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2508 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2509 new ThreadFactory() { 2510 int count=0; 2511 @Override 2512 public Thread newThread(Runnable runnable) { 2513 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2514 thread.setDaemon(true); 2515 return thread; 2516 } 2517 }); 2518 } 2519 2520 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2521 final NetworkConnector connector = iter.next(); 2522 connector.setLocalUri(uri); 2523 connector.setBrokerName(getBrokerName()); 2524 connector.setDurableDestinations(durableDestinations); 2525 if (getDefaultSocketURIString() != null) { 2526 connector.setBrokerURL(getDefaultSocketURIString()); 2527 } 2528 if (networkConnectorStartExecutor != null) { 2529 networkConnectorStartExecutor.execute(new Runnable() { 2530 @Override 2531 public void run() { 2532 try { 2533 LOG.info("Async start of {}", connector); 2534 connector.start(); 2535 } catch(Exception e) { 2536 LOG.error("Async start of network connector: {} failed", connector, e); 2537 } 2538 } 2539 }); 2540 } else { 2541 connector.start(); 2542 } 2543 } 2544 if (networkConnectorStartExecutor != null) { 2545 // executor done when enqueued tasks are complete 2546 ThreadPoolUtils.shutdown(networkConnectorStartExecutor); 2547 } 2548 2549 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2550 ProxyConnector connector = iter.next(); 2551 connector.start(); 2552 } 2553 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2554 JmsConnector connector = iter.next(); 2555 connector.start(); 2556 } 2557 for (Service service : services) { 2558 configureService(service); 2559 service.start(); 2560 } 2561 } 2562 } 2563 2564 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2565 connector.setBrokerService(this); 2566 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2567 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2568 if (policy != null) { 2569 connector.setMessageAuthorizationPolicy(policy); 2570 } 2571 if (isUseJmx()) { 2572 connector = registerConnectorMBean(connector); 2573 } 2574 connector.getStatistics().setEnabled(enableStatistics); 2575 connector.start(); 2576 return connector; 2577 } 2578 2579 /** 2580 * Perform any custom dependency injection 2581 */ 2582 protected void configureServices(Object[] services) { 2583 for (Object service : services) { 2584 configureService(service); 2585 } 2586 } 2587 2588 /** 2589 * Perform any custom dependency injection 2590 */ 2591 protected void configureService(Object service) { 2592 if (service instanceof BrokerServiceAware) { 2593 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2594 serviceAware.setBrokerService(this); 2595 } 2596 } 2597 2598 public void handleIOException(IOException exception) { 2599 if (ioExceptionHandler != null) { 2600 ioExceptionHandler.handle(exception); 2601 } else { 2602 LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); 2603 } 2604 } 2605 2606 protected void startVirtualConsumerDestinations() throws Exception { 2607 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2608 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2609 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2610 if (!destinations.isEmpty()) { 2611 for (ActiveMQDestination destination : destinations) { 2612 if (filter.matches(destination) == true) { 2613 broker.addDestination(adminConnectionContext, destination, false); 2614 } 2615 } 2616 } 2617 } 2618 2619 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2620 // created at startup, so no sync needed 2621 if (virtualConsumerDestinationFilter == null) { 2622 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2623 if (destinationInterceptors != null) { 2624 for (DestinationInterceptor interceptor : destinationInterceptors) { 2625 if (interceptor instanceof VirtualDestinationInterceptor) { 2626 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2627 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2628 if (virtualDestination instanceof VirtualTopic) { 2629 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2630 } 2631 } 2632 } 2633 } 2634 } 2635 ActiveMQQueue filter = new ActiveMQQueue(); 2636 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2637 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2638 } 2639 return virtualConsumerDestinationFilter; 2640 } 2641 2642 protected synchronized ThreadPoolExecutor getExecutor() { 2643 if (this.executor == null) { 2644 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2645 2646 private long i = 0; 2647 2648 @Override 2649 public Thread newThread(Runnable runnable) { 2650 this.i++; 2651 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); 2652 thread.setDaemon(true); 2653 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2654 @Override 2655 public void uncaughtException(final Thread t, final Throwable e) { 2656 LOG.error("Error in thread '{}'", t.getName(), e); 2657 } 2658 }); 2659 return thread; 2660 } 2661 }, new RejectedExecutionHandler() { 2662 @Override 2663 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2664 try { 2665 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2666 } catch (InterruptedException e) { 2667 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2668 } 2669 2670 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2671 } 2672 }); 2673 } 2674 return this.executor; 2675 } 2676 2677 public synchronized Scheduler getScheduler() { 2678 if (this.scheduler==null) { 2679 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2680 try { 2681 this.scheduler.start(); 2682 } catch (Exception e) { 2683 LOG.error("Failed to start Scheduler", e); 2684 } 2685 } 2686 return this.scheduler; 2687 } 2688 2689 public Broker getRegionBroker() { 2690 return regionBroker; 2691 } 2692 2693 public void setRegionBroker(Broker regionBroker) { 2694 this.regionBroker = regionBroker; 2695 } 2696 2697 public void addShutdownHook(Runnable hook) { 2698 synchronized (shutdownHooks) { 2699 shutdownHooks.add(hook); 2700 } 2701 } 2702 2703 public void removeShutdownHook(Runnable hook) { 2704 synchronized (shutdownHooks) { 2705 shutdownHooks.remove(hook); 2706 } 2707 } 2708 2709 public boolean isSystemExitOnShutdown() { 2710 return systemExitOnShutdown; 2711 } 2712 2713 /** 2714 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2715 */ 2716 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2717 this.systemExitOnShutdown = systemExitOnShutdown; 2718 } 2719 2720 public int getSystemExitOnShutdownExitCode() { 2721 return systemExitOnShutdownExitCode; 2722 } 2723 2724 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2725 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2726 } 2727 2728 public SslContext getSslContext() { 2729 return sslContext; 2730 } 2731 2732 public void setSslContext(SslContext sslContext) { 2733 this.sslContext = sslContext; 2734 } 2735 2736 public boolean isShutdownOnSlaveFailure() { 2737 return shutdownOnSlaveFailure; 2738 } 2739 2740 /** 2741 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2742 */ 2743 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2744 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2745 } 2746 2747 public boolean isWaitForSlave() { 2748 return waitForSlave; 2749 } 2750 2751 /** 2752 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2753 */ 2754 public void setWaitForSlave(boolean waitForSlave) { 2755 this.waitForSlave = waitForSlave; 2756 } 2757 2758 public long getWaitForSlaveTimeout() { 2759 return this.waitForSlaveTimeout; 2760 } 2761 2762 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2763 this.waitForSlaveTimeout = waitForSlaveTimeout; 2764 } 2765 2766 /** 2767 * Get the passiveSlave 2768 * @return the passiveSlave 2769 */ 2770 public boolean isPassiveSlave() { 2771 return this.passiveSlave; 2772 } 2773 2774 /** 2775 * Set the passiveSlave 2776 * @param passiveSlave the passiveSlave to set 2777 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2778 */ 2779 public void setPassiveSlave(boolean passiveSlave) { 2780 this.passiveSlave = passiveSlave; 2781 } 2782 2783 /** 2784 * override the Default IOException handler, called when persistence adapter 2785 * has experiences File or JDBC I/O Exceptions 2786 * 2787 * @param ioExceptionHandler 2788 */ 2789 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2790 configureService(ioExceptionHandler); 2791 this.ioExceptionHandler = ioExceptionHandler; 2792 } 2793 2794 public IOExceptionHandler getIoExceptionHandler() { 2795 return ioExceptionHandler; 2796 } 2797 2798 /** 2799 * @return the schedulerSupport 2800 */ 2801 public boolean isSchedulerSupport() { 2802 return this.schedulerSupport; 2803 } 2804 2805 /** 2806 * @param schedulerSupport the schedulerSupport to set 2807 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2808 */ 2809 public void setSchedulerSupport(boolean schedulerSupport) { 2810 this.schedulerSupport = schedulerSupport; 2811 } 2812 2813 /** 2814 * @return the schedulerDirectory 2815 */ 2816 public File getSchedulerDirectoryFile() { 2817 if (this.schedulerDirectoryFile == null) { 2818 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2819 } 2820 return schedulerDirectoryFile; 2821 } 2822 2823 /** 2824 * @param schedulerDirectory the schedulerDirectory to set 2825 */ 2826 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2827 this.schedulerDirectoryFile = schedulerDirectory; 2828 } 2829 2830 public void setSchedulerDirectory(String schedulerDirectory) { 2831 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2832 } 2833 2834 public int getSchedulePeriodForDestinationPurge() { 2835 return this.schedulePeriodForDestinationPurge; 2836 } 2837 2838 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2839 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2840 } 2841 2842 public int getMaxPurgedDestinationsPerSweep() { 2843 return this.maxPurgedDestinationsPerSweep; 2844 } 2845 2846 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 2847 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 2848 } 2849 2850 public BrokerContext getBrokerContext() { 2851 return brokerContext; 2852 } 2853 2854 public void setBrokerContext(BrokerContext brokerContext) { 2855 this.brokerContext = brokerContext; 2856 } 2857 2858 public void setBrokerId(String brokerId) { 2859 this.brokerId = new BrokerId(brokerId); 2860 } 2861 2862 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 2863 return useAuthenticatedPrincipalForJMSXUserID; 2864 } 2865 2866 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 2867 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 2868 } 2869 2870 /** 2871 * Should MBeans that support showing the Authenticated User Name information have this 2872 * value filled in or not. 2873 * 2874 * @return true if user names should be exposed in MBeans 2875 */ 2876 public boolean isPopulateUserNameInMBeans() { 2877 return this.populateUserNameInMBeans; 2878 } 2879 2880 /** 2881 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 2882 * @param value if MBeans should expose user name information. 2883 */ 2884 public void setPopulateUserNameInMBeans(boolean value) { 2885 this.populateUserNameInMBeans = value; 2886 } 2887 2888 /** 2889 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 2890 * failing. The default value is to wait forever (zero). 2891 * 2892 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 2893 */ 2894 public long getMbeanInvocationTimeout() { 2895 return mbeanInvocationTimeout; 2896 } 2897 2898 /** 2899 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 2900 * failing. The default value is to wait forever (zero). 2901 * 2902 * @param mbeanInvocationTimeout 2903 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 2904 */ 2905 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { 2906 this.mbeanInvocationTimeout = mbeanInvocationTimeout; 2907 } 2908 2909 public boolean isNetworkConnectorStartAsync() { 2910 return networkConnectorStartAsync; 2911 } 2912 2913 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 2914 this.networkConnectorStartAsync = networkConnectorStartAsync; 2915 } 2916 2917 public boolean isAllowTempAutoCreationOnSend() { 2918 return allowTempAutoCreationOnSend; 2919 } 2920 2921 /** 2922 * enable if temp destinations need to be propagated through a network when 2923 * advisorySupport==false. This is used in conjunction with the policy 2924 * gcInactiveDestinations for matching temps so they can get removed 2925 * when inactive 2926 * 2927 * @param allowTempAutoCreationOnSend 2928 */ 2929 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 2930 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 2931 } 2932 2933 public long getOfflineDurableSubscriberTimeout() { 2934 return offlineDurableSubscriberTimeout; 2935 } 2936 2937 public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { 2938 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 2939 } 2940 2941 public long getOfflineDurableSubscriberTaskSchedule() { 2942 return offlineDurableSubscriberTaskSchedule; 2943 } 2944 2945 public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { 2946 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 2947 } 2948 2949 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 2950 return isUseVirtualTopics() && destination.isQueue() && 2951 getVirtualTopicConsumerDestinationFilter().matches(destination); 2952 } 2953 2954 public Throwable getStartException() { 2955 return startException; 2956 } 2957 2958 public boolean isStartAsync() { 2959 return startAsync; 2960 } 2961 2962 public void setStartAsync(boolean startAsync) { 2963 this.startAsync = startAsync; 2964 } 2965 2966 public boolean isSlave() { 2967 return this.slave; 2968 } 2969 2970 public boolean isStopping() { 2971 return this.stopping.get(); 2972 } 2973 2974 /** 2975 * @return true if the broker allowed to restart on shutdown. 2976 */ 2977 public boolean isRestartAllowed() { 2978 return restartAllowed; 2979 } 2980 2981 /** 2982 * Sets if the broker allowed to restart on shutdown. 2983 * @return 2984 */ 2985 public void setRestartAllowed(boolean restartAllowed) { 2986 this.restartAllowed = restartAllowed; 2987 } 2988 2989 /** 2990 * A lifecycle manager of the BrokerService should 2991 * inspect this property after a broker shutdown has occurred 2992 * to find out if the broker needs to be re-created and started 2993 * again. 2994 * 2995 * @return true if the broker wants to be restarted after it shuts down. 2996 */ 2997 public boolean isRestartRequested() { 2998 return restartRequested; 2999 } 3000 3001 public void requestRestart() { 3002 this.restartRequested = true; 3003 } 3004 3005 public int getStoreOpenWireVersion() { 3006 return storeOpenWireVersion; 3007 } 3008 3009 public void setStoreOpenWireVersion(int storeOpenWireVersion) { 3010 this.storeOpenWireVersion = storeOpenWireVersion; 3011 } 3012 3013 /** 3014 * @return the current number of connections on this Broker. 3015 */ 3016 public int getCurrentConnections() { 3017 return this.currentConnections.get(); 3018 } 3019 3020 /** 3021 * @return the total number of connections this broker has handled since startup. 3022 */ 3023 public long getTotalConnections() { 3024 return this.totalConnections.get(); 3025 } 3026 3027 public void incrementCurrentConnections() { 3028 this.currentConnections.incrementAndGet(); 3029 } 3030 3031 public void decrementCurrentConnections() { 3032 this.currentConnections.decrementAndGet(); 3033 } 3034 3035 public void incrementTotalConnections() { 3036 this.totalConnections.incrementAndGet(); 3037 } 3038 3039 public boolean isRejectDurableConsumers() { 3040 return rejectDurableConsumers; 3041 } 3042 3043 public void setRejectDurableConsumers(boolean rejectDurableConsumers) { 3044 this.rejectDurableConsumers = rejectDurableConsumers; 3045 } 3046}