001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.InputStreamReader; 024import java.net.URI; 025import java.net.URISyntaxException; 026import java.net.UnknownHostException; 027import java.security.Provider; 028import java.security.Security; 029import java.util.ArrayList; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CopyOnWriteArrayList; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.SynchronousQueue; 044import java.util.concurrent.ThreadFactory; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050 051import javax.annotation.PostConstruct; 052import javax.annotation.PreDestroy; 053import javax.management.MalformedObjectNameException; 054import javax.management.ObjectName; 055 056import org.apache.activemq.ActiveMQConnectionMetaData; 057import org.apache.activemq.ConfigurationException; 058import org.apache.activemq.Service; 059import org.apache.activemq.advisory.AdvisoryBroker; 060import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 061import org.apache.activemq.broker.jmx.AnnotatedMBean; 062import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 063import org.apache.activemq.broker.jmx.BrokerView; 064import org.apache.activemq.broker.jmx.ConnectorView; 065import org.apache.activemq.broker.jmx.ConnectorViewMBean; 066import org.apache.activemq.broker.jmx.HealthView; 067import org.apache.activemq.broker.jmx.HealthViewMBean; 068import org.apache.activemq.broker.jmx.JmsConnectorView; 069import org.apache.activemq.broker.jmx.JobSchedulerView; 070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 071import org.apache.activemq.broker.jmx.Log4JConfigView; 072import org.apache.activemq.broker.jmx.ManagedRegionBroker; 073import org.apache.activemq.broker.jmx.ManagementContext; 074import org.apache.activemq.broker.jmx.NetworkConnectorView; 075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 076import org.apache.activemq.broker.jmx.ProxyConnectorView; 077import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 078import org.apache.activemq.broker.region.Destination; 079import org.apache.activemq.broker.region.DestinationFactory; 080import org.apache.activemq.broker.region.DestinationFactoryImpl; 081import org.apache.activemq.broker.region.DestinationInterceptor; 082import org.apache.activemq.broker.region.RegionBroker; 083import org.apache.activemq.broker.region.policy.PolicyMap; 084import org.apache.activemq.broker.region.virtual.MirroredQueue; 085import org.apache.activemq.broker.region.virtual.VirtualDestination; 086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 087import org.apache.activemq.broker.region.virtual.VirtualTopic; 088import org.apache.activemq.broker.scheduler.JobSchedulerStore; 089import org.apache.activemq.broker.scheduler.SchedulerBroker; 090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore; 091import org.apache.activemq.command.ActiveMQDestination; 092import org.apache.activemq.command.ActiveMQQueue; 093import org.apache.activemq.command.BrokerId; 094import org.apache.activemq.command.ProducerInfo; 095import org.apache.activemq.filter.DestinationFilter; 096import org.apache.activemq.network.ConnectionFilter; 097import org.apache.activemq.network.DiscoveryNetworkConnector; 098import org.apache.activemq.network.NetworkConnector; 099import org.apache.activemq.network.jms.JmsConnector; 100import org.apache.activemq.openwire.OpenWireFormat; 101import org.apache.activemq.proxy.ProxyConnector; 102import org.apache.activemq.security.MessageAuthorizationPolicy; 103import org.apache.activemq.selector.SelectorParser; 104import org.apache.activemq.store.JournaledStore; 105import org.apache.activemq.store.PListStore; 106import org.apache.activemq.store.PersistenceAdapter; 107import org.apache.activemq.store.PersistenceAdapterFactory; 108import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 109import org.apache.activemq.thread.Scheduler; 110import org.apache.activemq.thread.TaskRunnerFactory; 111import org.apache.activemq.transport.TransportFactorySupport; 112import org.apache.activemq.transport.TransportServer; 113import org.apache.activemq.transport.vm.VMTransportFactory; 114import org.apache.activemq.usage.StoreUsage; 115import org.apache.activemq.usage.SystemUsage; 116import org.apache.activemq.usage.Usage; 117import org.apache.activemq.util.BrokerSupport; 118import org.apache.activemq.util.DefaultIOExceptionHandler; 119import org.apache.activemq.util.IOExceptionHandler; 120import org.apache.activemq.util.IOExceptionSupport; 121import org.apache.activemq.util.IOHelper; 122import org.apache.activemq.util.InetAddressUtil; 123import org.apache.activemq.util.ServiceStopper; 124import org.apache.activemq.util.ThreadPoolUtils; 125import org.apache.activemq.util.TimeUtils; 126import org.apache.activemq.util.URISupport; 127import org.slf4j.Logger; 128import org.slf4j.LoggerFactory; 129import org.slf4j.MDC; 130 131/** 132 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a 133 * number of transport connectors, network connectors and a bunch of properties 134 * which can be used to configure the broker as its lazily created. 135 * 136 * @org.apache.xbean.XBean 137 */ 138public class BrokerService implements Service { 139 public static final String DEFAULT_PORT = "61616"; 140 public static final String LOCAL_HOST_NAME; 141 public static final String BROKER_VERSION; 142 public static final String DEFAULT_BROKER_NAME = "localhost"; 143 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 144 public static final long DEFAULT_START_TIMEOUT = 600000L; 145 146 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 147 148 @SuppressWarnings("unused") 149 private static final long serialVersionUID = 7353129142305630237L; 150 151 private boolean useJmx = true; 152 private boolean enableStatistics = true; 153 private boolean persistent = true; 154 private boolean populateJMSXUserID; 155 private boolean useAuthenticatedPrincipalForJMSXUserID; 156 private boolean populateUserNameInMBeans; 157 private long mbeanInvocationTimeout = 0; 158 159 private boolean useShutdownHook = true; 160 private boolean useLoggingForShutdownErrors; 161 private boolean shutdownOnMasterFailure; 162 private boolean shutdownOnSlaveFailure; 163 private boolean waitForSlave; 164 private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT; 165 private boolean passiveSlave; 166 private String brokerName = DEFAULT_BROKER_NAME; 167 private File dataDirectoryFile; 168 private File tmpDataDirectory; 169 private Broker broker; 170 private BrokerView adminView; 171 private ManagementContext managementContext; 172 private ObjectName brokerObjectName; 173 private TaskRunnerFactory taskRunnerFactory; 174 private TaskRunnerFactory persistenceTaskRunnerFactory; 175 private SystemUsage systemUsage; 176 private SystemUsage producerSystemUsage; 177 private SystemUsage consumerSystemUsaage; 178 private PersistenceAdapter persistenceAdapter; 179 private PersistenceAdapterFactory persistenceFactory; 180 protected DestinationFactory destinationFactory; 181 private MessageAuthorizationPolicy messageAuthorizationPolicy; 182 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 183 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 184 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 185 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 186 private final List<Service> services = new ArrayList<Service>(); 187 private transient Thread shutdownHook; 188 private String[] transportConnectorURIs; 189 private String[] networkConnectorURIs; 190 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 191 // to other jms messaging systems 192 private boolean deleteAllMessagesOnStartup; 193 private boolean advisorySupport = true; 194 private URI vmConnectorURI; 195 private String defaultSocketURIString; 196 private PolicyMap destinationPolicy; 197 private final AtomicBoolean started = new AtomicBoolean(false); 198 private final AtomicBoolean stopped = new AtomicBoolean(false); 199 private final AtomicBoolean stopping = new AtomicBoolean(false); 200 private BrokerPlugin[] plugins; 201 private boolean keepDurableSubsActive = true; 202 private boolean useVirtualTopics = true; 203 private boolean useMirroredQueues = false; 204 private boolean useTempMirroredQueues = true; 205 private BrokerId brokerId; 206 private volatile DestinationInterceptor[] destinationInterceptors; 207 private ActiveMQDestination[] destinations; 208 private PListStore tempDataStore; 209 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 210 private boolean useLocalHostBrokerName; 211 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 212 private final CountDownLatch startedLatch = new CountDownLatch(1); 213 private Broker regionBroker; 214 private int producerSystemUsagePortion = 60; 215 private int consumerSystemUsagePortion = 40; 216 private boolean splitSystemUsageForProducersConsumers; 217 private boolean monitorConnectionSplits = false; 218 private int taskRunnerPriority = Thread.NORM_PRIORITY; 219 private boolean dedicatedTaskRunner; 220 private boolean cacheTempDestinations = false;// useful for failover 221 private int timeBeforePurgeTempDestinations = 5000; 222 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 223 private boolean systemExitOnShutdown; 224 private int systemExitOnShutdownExitCode; 225 private SslContext sslContext; 226 private boolean forceStart = false; 227 private IOExceptionHandler ioExceptionHandler; 228 private boolean schedulerSupport = false; 229 private File schedulerDirectoryFile; 230 private Scheduler scheduler; 231 private ThreadPoolExecutor executor; 232 private int schedulePeriodForDestinationPurge= 0; 233 private int maxPurgedDestinationsPerSweep = 0; 234 private boolean adjustUsageLimits = true; 235 private BrokerContext brokerContext; 236 private boolean networkConnectorStartAsync = false; 237 private boolean allowTempAutoCreationOnSend; 238 private JobSchedulerStore jobSchedulerStore; 239 private final AtomicLong totalConnections = new AtomicLong(); 240 private final AtomicInteger currentConnections = new AtomicInteger(); 241 242 private long offlineDurableSubscriberTimeout = -1; 243 private long offlineDurableSubscriberTaskSchedule = 300000; 244 private DestinationFilter virtualConsumerDestinationFilter; 245 246 private final Object persistenceAdapterLock = new Object(); 247 private Throwable startException = null; 248 private boolean startAsync = false; 249 private Date startDate; 250 private boolean slave = true; 251 252 private boolean restartAllowed = true; 253 private boolean restartRequested = false; 254 private boolean rejectDurableConsumers = false; 255 private boolean rollbackOnlyOnAsyncException = true; 256 257 private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; 258 259 static { 260 261 try { 262 ClassLoader loader = BrokerService.class.getClassLoader(); 263 Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider"); 264 Provider bouncycastle = (Provider) clazz.newInstance(); 265 Security.insertProviderAt(bouncycastle, 2); 266 LOG.info("Loaded the Bouncy Castle security provider."); 267 } catch(Throwable e) { 268 // No BouncyCastle found so we use the default Java Security Provider 269 } 270 271 String localHostName = "localhost"; 272 try { 273 localHostName = InetAddressUtil.getLocalHostName(); 274 } catch (UnknownHostException e) { 275 LOG.error("Failed to resolve localhost"); 276 } 277 LOCAL_HOST_NAME = localHostName; 278 279 InputStream in = null; 280 String version = null; 281 if ((in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) { 282 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 283 try { 284 version = reader.readLine(); 285 } catch(Exception e) { 286 } 287 } 288 BROKER_VERSION = version; 289 } 290 291 @Override 292 public String toString() { 293 return "BrokerService[" + getBrokerName() + "]"; 294 } 295 296 private String getBrokerVersion() { 297 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION; 298 if (version == null) { 299 version = BROKER_VERSION; 300 } 301 302 return version; 303 } 304 305 /** 306 * Adds a new transport connector for the given bind address 307 * 308 * @return the newly created and added transport connector 309 * @throws Exception 310 */ 311 public TransportConnector addConnector(String bindAddress) throws Exception { 312 return addConnector(new URI(bindAddress)); 313 } 314 315 /** 316 * Adds a new transport connector for the given bind address 317 * 318 * @return the newly created and added transport connector 319 * @throws Exception 320 */ 321 public TransportConnector addConnector(URI bindAddress) throws Exception { 322 return addConnector(createTransportConnector(bindAddress)); 323 } 324 325 /** 326 * Adds a new transport connector for the given TransportServer transport 327 * 328 * @return the newly created and added transport connector 329 * @throws Exception 330 */ 331 public TransportConnector addConnector(TransportServer transport) throws Exception { 332 return addConnector(new TransportConnector(transport)); 333 } 334 335 /** 336 * Adds a new transport connector 337 * 338 * @return the transport connector 339 * @throws Exception 340 */ 341 public TransportConnector addConnector(TransportConnector connector) throws Exception { 342 transportConnectors.add(connector); 343 return connector; 344 } 345 346 /** 347 * Stops and removes a transport connector from the broker. 348 * 349 * @param connector 350 * @return true if the connector has been previously added to the broker 351 * @throws Exception 352 */ 353 public boolean removeConnector(TransportConnector connector) throws Exception { 354 boolean rc = transportConnectors.remove(connector); 355 if (rc) { 356 unregisterConnectorMBean(connector); 357 } 358 return rc; 359 } 360 361 /** 362 * Adds a new network connector using the given discovery address 363 * 364 * @return the newly created and added network connector 365 * @throws Exception 366 */ 367 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 368 return addNetworkConnector(new URI(discoveryAddress)); 369 } 370 371 /** 372 * Adds a new proxy connector using the given bind address 373 * 374 * @return the newly created and added network connector 375 * @throws Exception 376 */ 377 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 378 return addProxyConnector(new URI(bindAddress)); 379 } 380 381 /** 382 * Adds a new network connector using the given discovery address 383 * 384 * @return the newly created and added network connector 385 * @throws Exception 386 */ 387 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 388 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 389 return addNetworkConnector(connector); 390 } 391 392 /** 393 * Adds a new proxy connector using the given bind address 394 * 395 * @return the newly created and added network connector 396 * @throws Exception 397 */ 398 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 399 ProxyConnector connector = new ProxyConnector(); 400 connector.setBind(bindAddress); 401 connector.setRemote(new URI("fanout:multicast://default")); 402 return addProxyConnector(connector); 403 } 404 405 /** 406 * Adds a new network connector to connect this broker to a federated 407 * network 408 */ 409 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 410 connector.setBrokerService(this); 411 connector.setLocalUri(getVmConnectorURI()); 412 // Set a connection filter so that the connector does not establish loop 413 // back connections. 414 connector.setConnectionFilter(new ConnectionFilter() { 415 @Override 416 public boolean connectTo(URI location) { 417 List<TransportConnector> transportConnectors = getTransportConnectors(); 418 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 419 try { 420 TransportConnector tc = iter.next(); 421 if (location.equals(tc.getConnectUri())) { 422 return false; 423 } 424 } catch (Throwable e) { 425 } 426 } 427 return true; 428 } 429 }); 430 networkConnectors.add(connector); 431 return connector; 432 } 433 434 /** 435 * Removes the given network connector without stopping it. The caller 436 * should call {@link NetworkConnector#stop()} to close the connector 437 */ 438 public boolean removeNetworkConnector(NetworkConnector connector) { 439 boolean answer = networkConnectors.remove(connector); 440 if (answer) { 441 unregisterNetworkConnectorMBean(connector); 442 } 443 return answer; 444 } 445 446 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 447 URI uri = getVmConnectorURI(); 448 connector.setLocalUri(uri); 449 proxyConnectors.add(connector); 450 if (isUseJmx()) { 451 registerProxyConnectorMBean(connector); 452 } 453 return connector; 454 } 455 456 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 457 connector.setBrokerService(this); 458 jmsConnectors.add(connector); 459 if (isUseJmx()) { 460 registerJmsConnectorMBean(connector); 461 } 462 return connector; 463 } 464 465 public JmsConnector removeJmsConnector(JmsConnector connector) { 466 if (jmsConnectors.remove(connector)) { 467 return connector; 468 } 469 return null; 470 } 471 472 public void masterFailed() { 473 if (shutdownOnMasterFailure) { 474 LOG.error("The Master has failed ... shutting down"); 475 try { 476 stop(); 477 } catch (Exception e) { 478 LOG.error("Failed to stop for master failure", e); 479 } 480 } else { 481 LOG.warn("Master Failed - starting all connectors"); 482 try { 483 startAllConnectors(); 484 broker.nowMasterBroker(); 485 } catch (Exception e) { 486 LOG.error("Failed to startAllConnectors", e); 487 } 488 } 489 } 490 491 public String getUptime() { 492 long delta = getUptimeMillis(); 493 494 if (delta == 0) { 495 return "not started"; 496 } 497 498 return TimeUtils.printDuration(delta); 499 } 500 501 public long getUptimeMillis() { 502 if (startDate == null) { 503 return 0; 504 } 505 506 return new Date().getTime() - startDate.getTime(); 507 } 508 509 public boolean isStarted() { 510 return started.get() && startedLatch.getCount() == 0; 511 } 512 513 /** 514 * Forces a start of the broker. 515 * By default a BrokerService instance that was 516 * previously stopped using BrokerService.stop() cannot be restarted 517 * using BrokerService.start(). 518 * This method enforces a restart. 519 * It is not recommended to force a restart of the broker and will not work 520 * for most but some very trivial broker configurations. 521 * For restarting a broker instance we recommend to first call stop() on 522 * the old instance and then recreate a new BrokerService instance. 523 * 524 * @param force - if true enforces a restart. 525 * @throws Exception 526 */ 527 public void start(boolean force) throws Exception { 528 forceStart = force; 529 stopped.set(false); 530 started.set(false); 531 start(); 532 } 533 534 // Service interface 535 // ------------------------------------------------------------------------- 536 537 protected boolean shouldAutostart() { 538 return true; 539 } 540 541 /** 542 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 543 * 544 * delegates to autoStart, done to prevent backwards incompatible signature change 545 */ 546 @PostConstruct 547 private void postConstruct() { 548 try { 549 autoStart(); 550 } catch (Exception ex) { 551 throw new RuntimeException(ex); 552 } 553 } 554 555 /** 556 * 557 * @throws Exception 558 * @org. apache.xbean.InitMethod 559 */ 560 public void autoStart() throws Exception { 561 if(shouldAutostart()) { 562 start(); 563 } 564 } 565 566 @Override 567 public void start() throws Exception { 568 if (stopped.get() || !started.compareAndSet(false, true)) { 569 // lets just ignore redundant start() calls 570 // as its way too easy to not be completely sure if start() has been 571 // called or not with the gazillion of different configuration 572 // mechanisms 573 // throw new IllegalStateException("Already started."); 574 return; 575 } 576 577 setStartException(null); 578 stopping.set(false); 579 startDate = new Date(); 580 MDC.put("activemq.broker", brokerName); 581 582 try { 583 checkMemorySystemUsageLimits(); 584 if (systemExitOnShutdown && useShutdownHook) { 585 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 586 } 587 processHelperProperties(); 588 if (isUseJmx()) { 589 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and 590 // we cannot cleanup clear that during shutdown of the broker. 591 MDC.remove("activemq.broker"); 592 try { 593 startManagementContext(); 594 for (NetworkConnector connector : getNetworkConnectors()) { 595 registerNetworkConnectorMBean(connector); 596 } 597 } finally { 598 MDC.put("activemq.broker", brokerName); 599 } 600 } 601 602 // in jvm master slave, lets not publish over existing broker till we get the lock 603 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance(); 604 if (brokerRegistry.lookup(getBrokerName()) == null) { 605 brokerRegistry.bind(getBrokerName(), BrokerService.this); 606 } 607 startPersistenceAdapter(startAsync); 608 startBroker(startAsync); 609 brokerRegistry.bind(getBrokerName(), BrokerService.this); 610 } catch (Exception e) { 611 LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e); 612 try { 613 if (!stopped.get()) { 614 stop(); 615 } 616 } catch (Exception ex) { 617 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex); 618 } 619 throw e; 620 } finally { 621 MDC.remove("activemq.broker"); 622 } 623 } 624 625 private void startPersistenceAdapter(boolean async) throws Exception { 626 if (async) { 627 new Thread("Persistence Adapter Starting Thread") { 628 @Override 629 public void run() { 630 try { 631 doStartPersistenceAdapter(); 632 } catch (Throwable e) { 633 setStartException(e); 634 } finally { 635 synchronized (persistenceAdapterLock) { 636 persistenceAdapterLock.notifyAll(); 637 } 638 } 639 } 640 }.start(); 641 } else { 642 doStartPersistenceAdapter(); 643 } 644 } 645 646 private void doStartPersistenceAdapter() throws Exception { 647 PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter(); 648 if (persistenceAdapterToStart == null) { 649 checkStartException(); 650 throw new ConfigurationException("Cannot start null persistence adapter"); 651 } 652 persistenceAdapterToStart.setUsageManager(getProducerSystemUsage()); 653 persistenceAdapterToStart.setBrokerName(getBrokerName()); 654 LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart); 655 if (deleteAllMessagesOnStartup) { 656 deleteAllMessages(); 657 } 658 persistenceAdapterToStart.start(); 659 660 getTempDataStore(); 661 if (tempDataStore != null) { 662 try { 663 // start after we have the store lock 664 tempDataStore.start(); 665 } catch (Exception e) { 666 RuntimeException exception = new RuntimeException( 667 "Failed to start temp data store: " + tempDataStore, e); 668 LOG.error(exception.getLocalizedMessage(), e); 669 throw exception; 670 } 671 } 672 673 getJobSchedulerStore(); 674 if (jobSchedulerStore != null) { 675 try { 676 jobSchedulerStore.start(); 677 } catch (Exception e) { 678 RuntimeException exception = new RuntimeException( 679 "Failed to start job scheduler store: " + jobSchedulerStore, e); 680 LOG.error(exception.getLocalizedMessage(), e); 681 throw exception; 682 } 683 } 684 } 685 686 private void startBroker(boolean async) throws Exception { 687 if (async) { 688 new Thread("Broker Starting Thread") { 689 @Override 690 public void run() { 691 try { 692 synchronized (persistenceAdapterLock) { 693 persistenceAdapterLock.wait(); 694 } 695 doStartBroker(); 696 } catch (Throwable t) { 697 setStartException(t); 698 } 699 } 700 }.start(); 701 } else { 702 doStartBroker(); 703 } 704 } 705 706 private void doStartBroker() throws Exception { 707 checkStartException(); 708 startDestinations(); 709 addShutdownHook(); 710 711 broker = getBroker(); 712 brokerId = broker.getBrokerId(); 713 714 // need to log this after creating the broker so we have its id and name 715 LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId }); 716 broker.start(); 717 718 if (isUseJmx()) { 719 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 720 // try to restart management context 721 // typical for slaves that use the same ports as master 722 managementContext.stop(); 723 startManagementContext(); 724 } 725 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 726 managedBroker.setContextBroker(broker); 727 adminView.setBroker(managedBroker); 728 } 729 730 if (ioExceptionHandler == null) { 731 setIoExceptionHandler(new DefaultIOExceptionHandler()); 732 } 733 734 if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) { 735 ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString()); 736 Log4JConfigView log4jConfigView = new Log4JConfigView(); 737 AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName); 738 } 739 740 startAllConnectors(); 741 742 LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 743 LOG.info("For help or more information please see: http://activemq.apache.org"); 744 745 getBroker().brokerServiceStarted(); 746 checkStoreSystemUsageLimits(); 747 startedLatch.countDown(); 748 getBroker().nowMasterBroker(); 749 } 750 751 /** 752 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 753 * 754 * delegates to stop, done to prevent backwards incompatible signature change 755 */ 756 @PreDestroy 757 private void preDestroy () { 758 try { 759 stop(); 760 } catch (Exception ex) { 761 throw new RuntimeException(); 762 } 763 } 764 765 /** 766 * 767 * @throws Exception 768 * @org.apache .xbean.DestroyMethod 769 */ 770 @Override 771 public void stop() throws Exception { 772 if (!stopping.compareAndSet(false, true)) { 773 LOG.trace("Broker already stopping/stopped"); 774 return; 775 } 776 777 setStartException(new BrokerStoppedException("Stop invoked")); 778 MDC.put("activemq.broker", brokerName); 779 780 if (systemExitOnShutdown) { 781 new Thread() { 782 @Override 783 public void run() { 784 System.exit(systemExitOnShutdownExitCode); 785 } 786 }.start(); 787 } 788 789 LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} ); 790 791 removeShutdownHook(); 792 if (this.scheduler != null) { 793 this.scheduler.stop(); 794 this.scheduler = null; 795 } 796 ServiceStopper stopper = new ServiceStopper(); 797 if (services != null) { 798 for (Service service : services) { 799 stopper.stop(service); 800 } 801 } 802 stopAllConnectors(stopper); 803 this.slave = true; 804 // remove any VMTransports connected 805 // this has to be done after services are stopped, 806 // to avoid timing issue with discovery (spinning up a new instance) 807 BrokerRegistry.getInstance().unbind(getBrokerName()); 808 VMTransportFactory.stopped(getBrokerName()); 809 if (broker != null) { 810 stopper.stop(broker); 811 broker = null; 812 } 813 814 if (jobSchedulerStore != null) { 815 jobSchedulerStore.stop(); 816 jobSchedulerStore = null; 817 } 818 if (tempDataStore != null) { 819 tempDataStore.stop(); 820 tempDataStore = null; 821 } 822 try { 823 stopper.stop(getPersistenceAdapter()); 824 persistenceAdapter = null; 825 if (isUseJmx()) { 826 stopper.stop(managementContext); 827 managementContext = null; 828 } 829 // Clear SelectorParser cache to free memory 830 SelectorParser.clearCache(); 831 } finally { 832 started.set(false); 833 stopped.set(true); 834 stoppedLatch.countDown(); 835 } 836 837 if (this.taskRunnerFactory != null) { 838 this.taskRunnerFactory.shutdown(); 839 this.taskRunnerFactory = null; 840 } 841 if (this.executor != null) { 842 ThreadPoolUtils.shutdownNow(executor); 843 this.executor = null; 844 } 845 846 this.destinationInterceptors = null; 847 this.destinationFactory = null; 848 849 if (startDate != null) { 850 LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()}); 851 } 852 LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId}); 853 854 synchronized (shutdownHooks) { 855 for (Runnable hook : shutdownHooks) { 856 try { 857 hook.run(); 858 } catch (Throwable e) { 859 stopper.onException(hook, e); 860 } 861 } 862 } 863 864 MDC.remove("activemq.broker"); 865 866 // and clear start date 867 startDate = null; 868 869 stopper.throwFirstException(); 870 } 871 872 public boolean checkQueueSize(String queueName) { 873 long count = 0; 874 long queueSize = 0; 875 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 876 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 877 if (entry.getKey().isQueue()) { 878 if (entry.getValue().getName().matches(queueName)) { 879 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 880 count += queueSize; 881 if (queueSize > 0) { 882 LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize); 883 } 884 } 885 } 886 } 887 return count == 0; 888 } 889 890 /** 891 * This method (both connectorName and queueName are using regex to match) 892 * 1. stop the connector (supposed the user input the connector which the 893 * clients connect to) 2. to check whether there is any pending message on 894 * the queues defined by queueName 3. supposedly, after stop the connector, 895 * client should failover to other broker and pending messages should be 896 * forwarded. if no pending messages, the method finally call stop to stop 897 * the broker. 898 * 899 * @param connectorName 900 * @param queueName 901 * @param timeout 902 * @param pollInterval 903 * @throws Exception 904 */ 905 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 906 if (isUseJmx()) { 907 if (connectorName == null || queueName == null || timeout <= 0) { 908 throw new Exception( 909 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 910 } 911 if (pollInterval <= 0) { 912 pollInterval = 30; 913 } 914 LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{ 915 connectorName, queueName, timeout, pollInterval 916 }); 917 TransportConnector connector; 918 for (int i = 0; i < transportConnectors.size(); i++) { 919 connector = transportConnectors.get(i); 920 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 921 connector.stop(); 922 } 923 } 924 long start = System.currentTimeMillis(); 925 while (System.currentTimeMillis() - start < timeout * 1000) { 926 // check quesize until it gets zero 927 if (checkQueueSize(queueName)) { 928 stop(); 929 break; 930 } else { 931 Thread.sleep(pollInterval * 1000); 932 } 933 } 934 if (stopped.get()) { 935 LOG.info("Successfully stop the broker."); 936 } else { 937 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 938 } 939 } 940 } 941 942 /** 943 * A helper method to block the caller thread until the broker has been 944 * stopped 945 */ 946 public void waitUntilStopped() { 947 while (isStarted() && !stopped.get()) { 948 try { 949 stoppedLatch.await(); 950 } catch (InterruptedException e) { 951 // ignore 952 } 953 } 954 } 955 956 public boolean isStopped() { 957 return stopped.get(); 958 } 959 960 /** 961 * A helper method to block the caller thread until the broker has fully started 962 * @return boolean true if wait succeeded false if broker was not started or was stopped 963 */ 964 public boolean waitUntilStarted() { 965 return waitUntilStarted(DEFAULT_START_TIMEOUT); 966 } 967 968 /** 969 * A helper method to block the caller thread until the broker has fully started 970 * 971 * @param timeout 972 * the amount of time to wait before giving up and returning false. 973 * 974 * @return boolean true if wait succeeded false if broker was not started or was stopped 975 */ 976 public boolean waitUntilStarted(long timeout) { 977 boolean waitSucceeded = isStarted(); 978 long expiration = Math.max(0, timeout + System.currentTimeMillis()); 979 while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { 980 try { 981 if (getStartException() != null) { 982 return waitSucceeded; 983 } 984 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 985 } catch (InterruptedException ignore) { 986 } 987 } 988 return waitSucceeded; 989 } 990 991 // Properties 992 // ------------------------------------------------------------------------- 993 /** 994 * Returns the message broker 995 */ 996 public Broker getBroker() throws Exception { 997 if (broker == null) { 998 checkStartException(); 999 broker = createBroker(); 1000 } 1001 return broker; 1002 } 1003 1004 /** 1005 * Returns the administration view of the broker; used to create and destroy 1006 * resources such as queues and topics. Note this method returns null if JMX 1007 * is disabled. 1008 */ 1009 public BrokerView getAdminView() throws Exception { 1010 if (adminView == null) { 1011 // force lazy creation 1012 getBroker(); 1013 } 1014 return adminView; 1015 } 1016 1017 public void setAdminView(BrokerView adminView) { 1018 this.adminView = adminView; 1019 } 1020 1021 public String getBrokerName() { 1022 return brokerName; 1023 } 1024 1025 /** 1026 * Sets the name of this broker; which must be unique in the network 1027 * 1028 * @param brokerName 1029 */ 1030 private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]"; 1031 public void setBrokerName(String brokerName) { 1032 if (brokerName == null) { 1033 throw new NullPointerException("The broker name cannot be null"); 1034 } 1035 String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_"); 1036 if (!str.equals(brokerName)) { 1037 LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str); 1038 } 1039 this.brokerName = str.trim(); 1040 } 1041 1042 public PersistenceAdapterFactory getPersistenceFactory() { 1043 return persistenceFactory; 1044 } 1045 1046 public File getDataDirectoryFile() { 1047 if (dataDirectoryFile == null) { 1048 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 1049 } 1050 return dataDirectoryFile; 1051 } 1052 1053 public File getBrokerDataDirectory() { 1054 String brokerDir = getBrokerName(); 1055 return new File(getDataDirectoryFile(), brokerDir); 1056 } 1057 1058 /** 1059 * Sets the directory in which the data files will be stored by default for 1060 * the JDBC and Journal persistence adaptors. 1061 * 1062 * @param dataDirectory 1063 * the directory to store data files 1064 */ 1065 public void setDataDirectory(String dataDirectory) { 1066 setDataDirectoryFile(new File(dataDirectory)); 1067 } 1068 1069 /** 1070 * Sets the directory in which the data files will be stored by default for 1071 * the JDBC and Journal persistence adaptors. 1072 * 1073 * @param dataDirectoryFile 1074 * the directory to store data files 1075 */ 1076 public void setDataDirectoryFile(File dataDirectoryFile) { 1077 this.dataDirectoryFile = dataDirectoryFile; 1078 } 1079 1080 /** 1081 * @return the tmpDataDirectory 1082 */ 1083 public File getTmpDataDirectory() { 1084 if (tmpDataDirectory == null) { 1085 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 1086 } 1087 return tmpDataDirectory; 1088 } 1089 1090 /** 1091 * @param tmpDataDirectory 1092 * the tmpDataDirectory to set 1093 */ 1094 public void setTmpDataDirectory(File tmpDataDirectory) { 1095 this.tmpDataDirectory = tmpDataDirectory; 1096 } 1097 1098 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 1099 this.persistenceFactory = persistenceFactory; 1100 } 1101 1102 public void setDestinationFactory(DestinationFactory destinationFactory) { 1103 this.destinationFactory = destinationFactory; 1104 } 1105 1106 public boolean isPersistent() { 1107 return persistent; 1108 } 1109 1110 /** 1111 * Sets whether or not persistence is enabled or disabled. 1112 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1113 */ 1114 public void setPersistent(boolean persistent) { 1115 this.persistent = persistent; 1116 } 1117 1118 public boolean isPopulateJMSXUserID() { 1119 return populateJMSXUserID; 1120 } 1121 1122 /** 1123 * Sets whether or not the broker should populate the JMSXUserID header. 1124 */ 1125 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 1126 this.populateJMSXUserID = populateJMSXUserID; 1127 } 1128 1129 public SystemUsage getSystemUsage() { 1130 try { 1131 if (systemUsage == null) { 1132 1133 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore()); 1134 systemUsage.setExecutor(getExecutor()); 1135 systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB 1136 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1137 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB 1138 systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB 1139 addService(this.systemUsage); 1140 } 1141 return systemUsage; 1142 } catch (IOException e) { 1143 LOG.error("Cannot create SystemUsage", e); 1144 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e); 1145 } 1146 } 1147 1148 public void setSystemUsage(SystemUsage memoryManager) { 1149 if (this.systemUsage != null) { 1150 removeService(this.systemUsage); 1151 } 1152 this.systemUsage = memoryManager; 1153 if (this.systemUsage.getExecutor()==null) { 1154 this.systemUsage.setExecutor(getExecutor()); 1155 } 1156 addService(this.systemUsage); 1157 } 1158 1159 /** 1160 * @return the consumerUsageManager 1161 * @throws IOException 1162 */ 1163 public SystemUsage getConsumerSystemUsage() throws IOException { 1164 if (this.consumerSystemUsaage == null) { 1165 if (splitSystemUsageForProducersConsumers) { 1166 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 1167 float portion = consumerSystemUsagePortion / 100f; 1168 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 1169 addService(this.consumerSystemUsaage); 1170 } else { 1171 consumerSystemUsaage = getSystemUsage(); 1172 } 1173 } 1174 return this.consumerSystemUsaage; 1175 } 1176 1177 /** 1178 * @param consumerSystemUsaage 1179 * the storeSystemUsage to set 1180 */ 1181 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 1182 if (this.consumerSystemUsaage != null) { 1183 removeService(this.consumerSystemUsaage); 1184 } 1185 this.consumerSystemUsaage = consumerSystemUsaage; 1186 addService(this.consumerSystemUsaage); 1187 } 1188 1189 /** 1190 * @return the producerUsageManager 1191 * @throws IOException 1192 */ 1193 public SystemUsage getProducerSystemUsage() throws IOException { 1194 if (producerSystemUsage == null) { 1195 if (splitSystemUsageForProducersConsumers) { 1196 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 1197 float portion = producerSystemUsagePortion / 100f; 1198 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 1199 addService(producerSystemUsage); 1200 } else { 1201 producerSystemUsage = getSystemUsage(); 1202 } 1203 } 1204 return producerSystemUsage; 1205 } 1206 1207 /** 1208 * @param producerUsageManager 1209 * the producerUsageManager to set 1210 */ 1211 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 1212 if (this.producerSystemUsage != null) { 1213 removeService(this.producerSystemUsage); 1214 } 1215 this.producerSystemUsage = producerUsageManager; 1216 addService(this.producerSystemUsage); 1217 } 1218 1219 public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException { 1220 if (persistenceAdapter == null && !hasStartException()) { 1221 persistenceAdapter = createPersistenceAdapter(); 1222 configureService(persistenceAdapter); 1223 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1224 } 1225 return persistenceAdapter; 1226 } 1227 1228 /** 1229 * Sets the persistence adaptor implementation to use for this broker 1230 * 1231 * @throws IOException 1232 */ 1233 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1234 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) { 1235 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter); 1236 return; 1237 } 1238 this.persistenceAdapter = persistenceAdapter; 1239 configureService(this.persistenceAdapter); 1240 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1241 } 1242 1243 public TaskRunnerFactory getTaskRunnerFactory() { 1244 if (this.taskRunnerFactory == null) { 1245 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1246 isDedicatedTaskRunner()); 1247 this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader()); 1248 } 1249 return this.taskRunnerFactory; 1250 } 1251 1252 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1253 this.taskRunnerFactory = taskRunnerFactory; 1254 } 1255 1256 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1257 if (taskRunnerFactory == null) { 1258 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1259 true, 1000, isDedicatedTaskRunner()); 1260 } 1261 return persistenceTaskRunnerFactory; 1262 } 1263 1264 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1265 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1266 } 1267 1268 public boolean isUseJmx() { 1269 return useJmx; 1270 } 1271 1272 public boolean isEnableStatistics() { 1273 return enableStatistics; 1274 } 1275 1276 /** 1277 * Sets whether or not the Broker's services enable statistics or not. 1278 */ 1279 public void setEnableStatistics(boolean enableStatistics) { 1280 this.enableStatistics = enableStatistics; 1281 } 1282 1283 /** 1284 * Sets whether or not the Broker's services should be exposed into JMX or 1285 * not. 1286 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1287 */ 1288 public void setUseJmx(boolean useJmx) { 1289 this.useJmx = useJmx; 1290 } 1291 1292 public ObjectName getBrokerObjectName() throws MalformedObjectNameException { 1293 if (brokerObjectName == null) { 1294 brokerObjectName = createBrokerObjectName(); 1295 } 1296 return brokerObjectName; 1297 } 1298 1299 /** 1300 * Sets the JMX ObjectName for this broker 1301 */ 1302 public void setBrokerObjectName(ObjectName brokerObjectName) { 1303 this.brokerObjectName = brokerObjectName; 1304 } 1305 1306 public ManagementContext getManagementContext() { 1307 if (managementContext == null) { 1308 checkStartException(); 1309 managementContext = new ManagementContext(); 1310 } 1311 return managementContext; 1312 } 1313 1314 synchronized private void checkStartException() { 1315 if (startException != null) { 1316 throw new BrokerStoppedException(startException); 1317 } 1318 } 1319 1320 synchronized private boolean hasStartException() { 1321 return startException != null; 1322 } 1323 1324 synchronized private void setStartException(Throwable t) { 1325 startException = t; 1326 } 1327 1328 public void setManagementContext(ManagementContext managementContext) { 1329 this.managementContext = managementContext; 1330 } 1331 1332 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1333 for (NetworkConnector connector : networkConnectors) { 1334 if (connector.getName().equals(connectorName)) { 1335 return connector; 1336 } 1337 } 1338 return null; 1339 } 1340 1341 public String[] getNetworkConnectorURIs() { 1342 return networkConnectorURIs; 1343 } 1344 1345 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1346 this.networkConnectorURIs = networkConnectorURIs; 1347 } 1348 1349 public TransportConnector getConnectorByName(String connectorName) { 1350 for (TransportConnector connector : transportConnectors) { 1351 if (connector.getName().equals(connectorName)) { 1352 return connector; 1353 } 1354 } 1355 return null; 1356 } 1357 1358 public Map<String, String> getTransportConnectorURIsAsMap() { 1359 Map<String, String> answer = new HashMap<String, String>(); 1360 for (TransportConnector connector : transportConnectors) { 1361 try { 1362 URI uri = connector.getConnectUri(); 1363 if (uri != null) { 1364 String scheme = uri.getScheme(); 1365 if (scheme != null) { 1366 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString()); 1367 } 1368 } 1369 } catch (Exception e) { 1370 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1371 } 1372 } 1373 return answer; 1374 } 1375 1376 public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){ 1377 ProducerBrokerExchange result = null; 1378 1379 for (TransportConnector connector : transportConnectors) { 1380 for (TransportConnection tc: connector.getConnections()){ 1381 result = tc.getProducerBrokerExchangeIfExists(producerInfo); 1382 if (result !=null){ 1383 return result; 1384 } 1385 } 1386 } 1387 return result; 1388 } 1389 1390 public String[] getTransportConnectorURIs() { 1391 return transportConnectorURIs; 1392 } 1393 1394 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1395 this.transportConnectorURIs = transportConnectorURIs; 1396 } 1397 1398 /** 1399 * @return Returns the jmsBridgeConnectors. 1400 */ 1401 public JmsConnector[] getJmsBridgeConnectors() { 1402 return jmsBridgeConnectors; 1403 } 1404 1405 /** 1406 * @param jmsConnectors 1407 * The jmsBridgeConnectors to set. 1408 */ 1409 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1410 this.jmsBridgeConnectors = jmsConnectors; 1411 } 1412 1413 public Service[] getServices() { 1414 return services.toArray(new Service[0]); 1415 } 1416 1417 /** 1418 * Sets the services associated with this broker. 1419 */ 1420 public void setServices(Service[] services) { 1421 this.services.clear(); 1422 if (services != null) { 1423 for (int i = 0; i < services.length; i++) { 1424 this.services.add(services[i]); 1425 } 1426 } 1427 } 1428 1429 /** 1430 * Adds a new service so that it will be started as part of the broker 1431 * lifecycle 1432 */ 1433 public void addService(Service service) { 1434 services.add(service); 1435 } 1436 1437 public void removeService(Service service) { 1438 services.remove(service); 1439 } 1440 1441 public boolean isUseLoggingForShutdownErrors() { 1442 return useLoggingForShutdownErrors; 1443 } 1444 1445 /** 1446 * Sets whether or not we should use commons-logging when reporting errors 1447 * when shutting down the broker 1448 */ 1449 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1450 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1451 } 1452 1453 public boolean isUseShutdownHook() { 1454 return useShutdownHook; 1455 } 1456 1457 /** 1458 * Sets whether or not we should use a shutdown handler to close down the 1459 * broker cleanly if the JVM is terminated. It is recommended you leave this 1460 * enabled. 1461 */ 1462 public void setUseShutdownHook(boolean useShutdownHook) { 1463 this.useShutdownHook = useShutdownHook; 1464 } 1465 1466 public boolean isAdvisorySupport() { 1467 return advisorySupport; 1468 } 1469 1470 /** 1471 * Allows the support of advisory messages to be disabled for performance 1472 * reasons. 1473 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1474 */ 1475 public void setAdvisorySupport(boolean advisorySupport) { 1476 this.advisorySupport = advisorySupport; 1477 } 1478 1479 public List<TransportConnector> getTransportConnectors() { 1480 return new ArrayList<TransportConnector>(transportConnectors); 1481 } 1482 1483 /** 1484 * Sets the transport connectors which this broker will listen on for new 1485 * clients 1486 * 1487 * @org.apache.xbean.Property 1488 * nestedType="org.apache.activemq.broker.TransportConnector" 1489 */ 1490 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1491 for (TransportConnector connector : transportConnectors) { 1492 addConnector(connector); 1493 } 1494 } 1495 1496 public TransportConnector getTransportConnectorByName(String name){ 1497 for (TransportConnector transportConnector : transportConnectors){ 1498 if (name.equals(transportConnector.getName())){ 1499 return transportConnector; 1500 } 1501 } 1502 return null; 1503 } 1504 1505 public TransportConnector getTransportConnectorByScheme(String scheme){ 1506 for (TransportConnector transportConnector : transportConnectors){ 1507 if (scheme.equals(transportConnector.getUri().getScheme())){ 1508 return transportConnector; 1509 } 1510 } 1511 return null; 1512 } 1513 1514 public List<NetworkConnector> getNetworkConnectors() { 1515 return new ArrayList<NetworkConnector>(networkConnectors); 1516 } 1517 1518 public List<ProxyConnector> getProxyConnectors() { 1519 return new ArrayList<ProxyConnector>(proxyConnectors); 1520 } 1521 1522 /** 1523 * Sets the network connectors which this broker will use to connect to 1524 * other brokers in a federated network 1525 * 1526 * @org.apache.xbean.Property 1527 * nestedType="org.apache.activemq.network.NetworkConnector" 1528 */ 1529 public void setNetworkConnectors(List<?> networkConnectors) throws Exception { 1530 for (Object connector : networkConnectors) { 1531 addNetworkConnector((NetworkConnector) connector); 1532 } 1533 } 1534 1535 /** 1536 * Sets the network connectors which this broker will use to connect to 1537 * other brokers in a federated network 1538 */ 1539 public void setProxyConnectors(List<?> proxyConnectors) throws Exception { 1540 for (Object connector : proxyConnectors) { 1541 addProxyConnector((ProxyConnector) connector); 1542 } 1543 } 1544 1545 public PolicyMap getDestinationPolicy() { 1546 return destinationPolicy; 1547 } 1548 1549 /** 1550 * Sets the destination specific policies available either for exact 1551 * destinations or for wildcard areas of destinations. 1552 */ 1553 public void setDestinationPolicy(PolicyMap policyMap) { 1554 this.destinationPolicy = policyMap; 1555 } 1556 1557 public BrokerPlugin[] getPlugins() { 1558 return plugins; 1559 } 1560 1561 /** 1562 * Sets a number of broker plugins to install such as for security 1563 * authentication or authorization 1564 */ 1565 public void setPlugins(BrokerPlugin[] plugins) { 1566 this.plugins = plugins; 1567 } 1568 1569 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1570 return messageAuthorizationPolicy; 1571 } 1572 1573 /** 1574 * Sets the policy used to decide if the current connection is authorized to 1575 * consume a given message 1576 */ 1577 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1578 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1579 } 1580 1581 /** 1582 * Delete all messages from the persistent store 1583 * 1584 * @throws IOException 1585 */ 1586 public void deleteAllMessages() throws IOException { 1587 getPersistenceAdapter().deleteAllMessages(); 1588 } 1589 1590 public boolean isDeleteAllMessagesOnStartup() { 1591 return deleteAllMessagesOnStartup; 1592 } 1593 1594 /** 1595 * Sets whether or not all messages are deleted on startup - mostly only 1596 * useful for testing. 1597 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 1598 */ 1599 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1600 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1601 } 1602 1603 public URI getVmConnectorURI() { 1604 if (vmConnectorURI == null) { 1605 try { 1606 vmConnectorURI = new URI("vm://" + getBrokerName()); 1607 } catch (URISyntaxException e) { 1608 LOG.error("Badly formed URI from {}", getBrokerName(), e); 1609 } 1610 } 1611 return vmConnectorURI; 1612 } 1613 1614 public void setVmConnectorURI(URI vmConnectorURI) { 1615 this.vmConnectorURI = vmConnectorURI; 1616 } 1617 1618 public String getDefaultSocketURIString() { 1619 if (started.get()) { 1620 if (this.defaultSocketURIString == null) { 1621 for (TransportConnector tc:this.transportConnectors) { 1622 String result = null; 1623 try { 1624 result = tc.getPublishableConnectString(); 1625 } catch (Exception e) { 1626 LOG.warn("Failed to get the ConnectURI for {}", tc, e); 1627 } 1628 if (result != null) { 1629 // find first publishable uri 1630 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { 1631 this.defaultSocketURIString = result; 1632 break; 1633 } else { 1634 // or use the first defined 1635 if (this.defaultSocketURIString == null) { 1636 this.defaultSocketURIString = result; 1637 } 1638 } 1639 } 1640 } 1641 1642 } 1643 return this.defaultSocketURIString; 1644 } 1645 return null; 1646 } 1647 1648 /** 1649 * @return Returns the shutdownOnMasterFailure. 1650 */ 1651 public boolean isShutdownOnMasterFailure() { 1652 return shutdownOnMasterFailure; 1653 } 1654 1655 /** 1656 * @param shutdownOnMasterFailure 1657 * The shutdownOnMasterFailure to set. 1658 */ 1659 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1660 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1661 } 1662 1663 public boolean isKeepDurableSubsActive() { 1664 return keepDurableSubsActive; 1665 } 1666 1667 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1668 this.keepDurableSubsActive = keepDurableSubsActive; 1669 } 1670 1671 public boolean isUseVirtualTopics() { 1672 return useVirtualTopics; 1673 } 1674 1675 /** 1676 * Sets whether or not <a 1677 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1678 * Topics</a> should be supported by default if they have not been 1679 * explicitly configured. 1680 */ 1681 public void setUseVirtualTopics(boolean useVirtualTopics) { 1682 this.useVirtualTopics = useVirtualTopics; 1683 } 1684 1685 public DestinationInterceptor[] getDestinationInterceptors() { 1686 return destinationInterceptors; 1687 } 1688 1689 public boolean isUseMirroredQueues() { 1690 return useMirroredQueues; 1691 } 1692 1693 /** 1694 * Sets whether or not <a 1695 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1696 * Queues</a> should be supported by default if they have not been 1697 * explicitly configured. 1698 */ 1699 public void setUseMirroredQueues(boolean useMirroredQueues) { 1700 this.useMirroredQueues = useMirroredQueues; 1701 } 1702 1703 /** 1704 * Sets the destination interceptors to use 1705 */ 1706 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1707 this.destinationInterceptors = destinationInterceptors; 1708 } 1709 1710 public ActiveMQDestination[] getDestinations() { 1711 return destinations; 1712 } 1713 1714 /** 1715 * Sets the destinations which should be loaded/created on startup 1716 */ 1717 public void setDestinations(ActiveMQDestination[] destinations) { 1718 this.destinations = destinations; 1719 } 1720 1721 /** 1722 * @return the tempDataStore 1723 */ 1724 public synchronized PListStore getTempDataStore() { 1725 if (tempDataStore == null) { 1726 if (!isPersistent()) { 1727 return null; 1728 } 1729 1730 try { 1731 PersistenceAdapter pa = getPersistenceAdapter(); 1732 if( pa!=null && pa instanceof PListStore) { 1733 return (PListStore) pa; 1734 } 1735 } catch (IOException e) { 1736 throw new RuntimeException(e); 1737 } 1738 1739 try { 1740 String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl"; 1741 this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance(); 1742 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1743 configureService(tempDataStore); 1744 } catch (Exception e) { 1745 throw new RuntimeException(e); 1746 } 1747 } 1748 return tempDataStore; 1749 } 1750 1751 /** 1752 * @param tempDataStore 1753 * the tempDataStore to set 1754 */ 1755 public void setTempDataStore(PListStore tempDataStore) { 1756 this.tempDataStore = tempDataStore; 1757 configureService(tempDataStore); 1758 } 1759 1760 public int getPersistenceThreadPriority() { 1761 return persistenceThreadPriority; 1762 } 1763 1764 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1765 this.persistenceThreadPriority = persistenceThreadPriority; 1766 } 1767 1768 /** 1769 * @return the useLocalHostBrokerName 1770 */ 1771 public boolean isUseLocalHostBrokerName() { 1772 return this.useLocalHostBrokerName; 1773 } 1774 1775 /** 1776 * @param useLocalHostBrokerName 1777 * the useLocalHostBrokerName to set 1778 */ 1779 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1780 this.useLocalHostBrokerName = useLocalHostBrokerName; 1781 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1782 brokerName = LOCAL_HOST_NAME; 1783 } 1784 } 1785 1786 /** 1787 * Looks up and lazily creates if necessary the destination for the given 1788 * JMS name 1789 */ 1790 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1791 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1792 } 1793 1794 public void removeDestination(ActiveMQDestination destination) throws Exception { 1795 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1796 } 1797 1798 public int getProducerSystemUsagePortion() { 1799 return producerSystemUsagePortion; 1800 } 1801 1802 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1803 this.producerSystemUsagePortion = producerSystemUsagePortion; 1804 } 1805 1806 public int getConsumerSystemUsagePortion() { 1807 return consumerSystemUsagePortion; 1808 } 1809 1810 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1811 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1812 } 1813 1814 public boolean isSplitSystemUsageForProducersConsumers() { 1815 return splitSystemUsageForProducersConsumers; 1816 } 1817 1818 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1819 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1820 } 1821 1822 public boolean isMonitorConnectionSplits() { 1823 return monitorConnectionSplits; 1824 } 1825 1826 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1827 this.monitorConnectionSplits = monitorConnectionSplits; 1828 } 1829 1830 public int getTaskRunnerPriority() { 1831 return taskRunnerPriority; 1832 } 1833 1834 public void setTaskRunnerPriority(int taskRunnerPriority) { 1835 this.taskRunnerPriority = taskRunnerPriority; 1836 } 1837 1838 public boolean isDedicatedTaskRunner() { 1839 return dedicatedTaskRunner; 1840 } 1841 1842 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1843 this.dedicatedTaskRunner = dedicatedTaskRunner; 1844 } 1845 1846 public boolean isCacheTempDestinations() { 1847 return cacheTempDestinations; 1848 } 1849 1850 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1851 this.cacheTempDestinations = cacheTempDestinations; 1852 } 1853 1854 public int getTimeBeforePurgeTempDestinations() { 1855 return timeBeforePurgeTempDestinations; 1856 } 1857 1858 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1859 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1860 } 1861 1862 public boolean isUseTempMirroredQueues() { 1863 return useTempMirroredQueues; 1864 } 1865 1866 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1867 this.useTempMirroredQueues = useTempMirroredQueues; 1868 } 1869 1870 public synchronized JobSchedulerStore getJobSchedulerStore() { 1871 1872 // If support is off don't allow any scheduler even is user configured their own. 1873 if (!isSchedulerSupport()) { 1874 return null; 1875 } 1876 1877 // If the user configured their own we use it even if persistence is disabled since 1878 // we don't know anything about their implementation. 1879 if (jobSchedulerStore == null) { 1880 1881 if (!isPersistent()) { 1882 this.jobSchedulerStore = new InMemoryJobSchedulerStore(); 1883 configureService(jobSchedulerStore); 1884 return this.jobSchedulerStore; 1885 } 1886 1887 try { 1888 PersistenceAdapter pa = getPersistenceAdapter(); 1889 if (pa != null) { 1890 this.jobSchedulerStore = pa.createJobSchedulerStore(); 1891 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1892 configureService(jobSchedulerStore); 1893 return this.jobSchedulerStore; 1894 } 1895 } catch (IOException e) { 1896 throw new RuntimeException(e); 1897 } catch (UnsupportedOperationException ex) { 1898 // It's ok if the store doesn't implement a scheduler. 1899 } catch (Exception e) { 1900 throw new RuntimeException(e); 1901 } 1902 1903 try { 1904 PersistenceAdapter pa = getPersistenceAdapter(); 1905 if (pa != null && pa instanceof JobSchedulerStore) { 1906 this.jobSchedulerStore = (JobSchedulerStore) pa; 1907 configureService(jobSchedulerStore); 1908 return this.jobSchedulerStore; 1909 } 1910 } catch (IOException e) { 1911 throw new RuntimeException(e); 1912 } 1913 1914 // Load the KahaDB store as a last resort, this only works if KahaDB is 1915 // included at runtime, otherwise this will fail. User should disable 1916 // scheduler support if this fails. 1917 try { 1918 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 1919 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 1920 jobSchedulerStore = adaptor.createJobSchedulerStore(); 1921 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); 1922 configureService(jobSchedulerStore); 1923 LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile()); 1924 } catch (Exception e) { 1925 throw new RuntimeException(e); 1926 } 1927 } 1928 return jobSchedulerStore; 1929 } 1930 1931 public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) { 1932 this.jobSchedulerStore = jobSchedulerStore; 1933 configureService(jobSchedulerStore); 1934 } 1935 1936 // 1937 // Implementation methods 1938 // ------------------------------------------------------------------------- 1939 /** 1940 * Handles any lazy-creation helper properties which are added to make 1941 * things easier to configure inside environments such as Spring 1942 * 1943 * @throws Exception 1944 */ 1945 protected void processHelperProperties() throws Exception { 1946 if (transportConnectorURIs != null) { 1947 for (int i = 0; i < transportConnectorURIs.length; i++) { 1948 String uri = transportConnectorURIs[i]; 1949 addConnector(uri); 1950 } 1951 } 1952 if (networkConnectorURIs != null) { 1953 for (int i = 0; i < networkConnectorURIs.length; i++) { 1954 String uri = networkConnectorURIs[i]; 1955 addNetworkConnector(uri); 1956 } 1957 } 1958 if (jmsBridgeConnectors != null) { 1959 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1960 addJmsConnector(jmsBridgeConnectors[i]); 1961 } 1962 } 1963 } 1964 1965 /** 1966 * Check that the store usage limit is not greater than max usable 1967 * space and adjust if it is 1968 */ 1969 protected void checkStoreUsageLimits() throws Exception { 1970 final SystemUsage usage = getSystemUsage(); 1971 1972 if (getPersistenceAdapter() != null) { 1973 PersistenceAdapter adapter = getPersistenceAdapter(); 1974 File dir = adapter.getDirectory(); 1975 1976 if (dir != null) { 1977 String dirPath = dir.getAbsolutePath(); 1978 if (!dir.isAbsolute()) { 1979 dir = new File(dirPath); 1980 } 1981 1982 while (dir != null && !dir.isDirectory()) { 1983 dir = dir.getParentFile(); 1984 } 1985 long storeLimit = usage.getStoreUsage().getLimit(); 1986 long storeCurrent = usage.getStoreUsage().getUsage(); 1987 long dirFreeSpace = dir.getUsableSpace(); 1988 if (storeLimit > (dirFreeSpace + storeCurrent)) { 1989 final String message = "Store limit is " + storeLimit / (1024 * 1024) + 1990 " mb (current store usage is " + storeCurrent / (1024 * 1024) + 1991 " mb). The data directory: " + dir.getAbsolutePath() + 1992 " only has " + dirFreeSpace / (1024 * 1024) + 1993 " mb of usable space."; 1994 1995 if (isAdjustUsageLimits()) { 1996 LOG.warn(message + ", resetting to maximum available disk space: " + 1997 (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb"); 1998 usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent); 1999 } else { 2000 LOG.error(message); 2001 throw new ConfigurationException(message); 2002 } 2003 } 2004 } 2005 2006 long maxJournalFileSize = 0; 2007 long storeLimit = usage.getStoreUsage().getLimit(); 2008 2009 if (adapter instanceof JournaledStore) { 2010 maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength(); 2011 } 2012 2013 if (storeLimit < maxJournalFileSize) { 2014 LOG.error("Store limit is " + storeLimit / (1024 * 1024) + 2015 " mb, whilst the max journal file size for the store is: " + 2016 maxJournalFileSize / (1024 * 1024) + " mb, " + 2017 "the store will not accept any data when used."); 2018 2019 } 2020 } 2021 } 2022 2023 /** 2024 * Check that temporary usage limit is not greater than max usable 2025 * space and adjust if it is 2026 */ 2027 protected void checkTmpStoreUsageLimits() throws Exception { 2028 final SystemUsage usage = getSystemUsage(); 2029 2030 File tmpDir = getTmpDataDirectory(); 2031 if (tmpDir != null) { 2032 2033 String tmpDirPath = tmpDir.getAbsolutePath(); 2034 if (!tmpDir.isAbsolute()) { 2035 tmpDir = new File(tmpDirPath); 2036 } 2037 2038 long storeLimit = usage.getTempUsage().getLimit(); 2039 while (tmpDir != null && !tmpDir.isDirectory()) { 2040 tmpDir = tmpDir.getParentFile(); 2041 } 2042 long dirFreeSpace = tmpDir.getUsableSpace(); 2043 if (storeLimit > dirFreeSpace) { 2044 final String message = "Temporary Store limit is " + storeLimit / (1024 * 1024) + 2045 " mb, whilst the temporary data directory: " + tmpDirPath + 2046 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space."; 2047 2048 if (isAdjustUsageLimits()) { 2049 LOG.warn(message + ", resetting to maximum available " + 2050 dirFreeSpace / (1024 * 1024) + " mb."); 2051 usage.getTempUsage().setLimit(dirFreeSpace); 2052 } else { 2053 LOG.error(message); 2054 throw new ConfigurationException(message); 2055 } 2056 } 2057 2058 if (isPersistent()) { 2059 long maxJournalFileSize; 2060 2061 PListStore store = usage.getTempUsage().getStore(); 2062 if (store != null && store instanceof JournaledStore) { 2063 maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength(); 2064 } else { 2065 maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH; 2066 } 2067 2068 if (storeLimit < maxJournalFileSize) { 2069 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) + 2070 " mb, whilst the max journal file size for the temporary store is: " + 2071 maxJournalFileSize / (1024 * 1024) + " mb, " + 2072 "the temp store will not accept any data when used."); 2073 } 2074 } 2075 } 2076 } 2077 2078 protected void checkMemorySystemUsageLimits() throws Exception { 2079 final SystemUsage usage = getSystemUsage(); 2080 long memLimit = usage.getMemoryUsage().getLimit(); 2081 long jvmLimit = Runtime.getRuntime().maxMemory(); 2082 2083 if (memLimit > jvmLimit) { 2084 final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) 2085 + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); 2086 2087 if (adjustUsageLimits) { 2088 usage.getMemoryUsage().setPercentOfJvmHeap(70); 2089 LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); 2090 } else { 2091 LOG.error(message); 2092 throw new ConfigurationException(message); 2093 } 2094 } 2095 } 2096 2097 protected void checkStoreSystemUsageLimits() throws Exception { 2098 final SystemUsage usage = getSystemUsage(); 2099 2100 //Check the persistent store and temp store limits if they exist 2101 //and schedule a periodic check to update disk limits if 2102 //schedulePeriodForDiskLimitCheck is set 2103 checkStoreUsageLimits(); 2104 checkTmpStoreUsageLimits(); 2105 2106 if (getJobSchedulerStore() != null) { 2107 JobSchedulerStore scheduler = getJobSchedulerStore(); 2108 File schedulerDir = scheduler.getDirectory(); 2109 if (schedulerDir != null) { 2110 2111 String schedulerDirPath = schedulerDir.getAbsolutePath(); 2112 if (!schedulerDir.isAbsolute()) { 2113 schedulerDir = new File(schedulerDirPath); 2114 } 2115 2116 while (schedulerDir != null && !schedulerDir.isDirectory()) { 2117 schedulerDir = schedulerDir.getParentFile(); 2118 } 2119 long schedulerLimit = usage.getJobSchedulerUsage().getLimit(); 2120 long dirFreeSpace = schedulerDir.getUsableSpace(); 2121 if (schedulerLimit > dirFreeSpace) { 2122 LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) + 2123 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() + 2124 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " + 2125 dirFreeSpace / (1024 * 1024) + " mb."); 2126 usage.getJobSchedulerUsage().setLimit(dirFreeSpace); 2127 } 2128 } 2129 } 2130 } 2131 2132 public void stopAllConnectors(ServiceStopper stopper) { 2133 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2134 NetworkConnector connector = iter.next(); 2135 unregisterNetworkConnectorMBean(connector); 2136 stopper.stop(connector); 2137 } 2138 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2139 ProxyConnector connector = iter.next(); 2140 stopper.stop(connector); 2141 } 2142 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2143 JmsConnector connector = iter.next(); 2144 stopper.stop(connector); 2145 } 2146 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2147 TransportConnector connector = iter.next(); 2148 try { 2149 unregisterConnectorMBean(connector); 2150 } catch (IOException e) { 2151 } 2152 stopper.stop(connector); 2153 } 2154 } 2155 2156 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 2157 try { 2158 ObjectName objectName = createConnectorObjectName(connector); 2159 connector = connector.asManagedConnector(getManagementContext(), objectName); 2160 ConnectorViewMBean view = new ConnectorView(connector); 2161 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2162 return connector; 2163 } catch (Throwable e) { 2164 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e); 2165 } 2166 } 2167 2168 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 2169 if (isUseJmx()) { 2170 try { 2171 ObjectName objectName = createConnectorObjectName(connector); 2172 getManagementContext().unregisterMBean(objectName); 2173 } catch (Throwable e) { 2174 throw IOExceptionSupport.create( 2175 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 2176 } 2177 } 2178 } 2179 2180 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2181 return adaptor; 2182 } 2183 2184 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 2185 if (isUseJmx()) {} 2186 } 2187 2188 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 2189 return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName()); 2190 } 2191 2192 public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 2193 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 2194 try { 2195 ObjectName objectName = createNetworkConnectorObjectName(connector); 2196 connector.setObjectName(objectName); 2197 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2198 } catch (Throwable e) { 2199 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 2200 } 2201 } 2202 2203 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { 2204 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName()); 2205 } 2206 2207 public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { 2208 return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport); 2209 } 2210 2211 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 2212 if (isUseJmx()) { 2213 try { 2214 ObjectName objectName = createNetworkConnectorObjectName(connector); 2215 getManagementContext().unregisterMBean(objectName); 2216 } catch (Exception e) { 2217 LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e); 2218 } 2219 } 2220 } 2221 2222 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 2223 ProxyConnectorView view = new ProxyConnectorView(connector); 2224 try { 2225 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName()); 2226 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2227 } catch (Throwable e) { 2228 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2229 } 2230 } 2231 2232 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 2233 JmsConnectorView view = new JmsConnectorView(connector); 2234 try { 2235 ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName()); 2236 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2237 } catch (Throwable e) { 2238 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 2239 } 2240 } 2241 2242 /** 2243 * Factory method to create a new broker 2244 * 2245 * @throws Exception 2246 * @throws 2247 * @throws 2248 */ 2249 protected Broker createBroker() throws Exception { 2250 regionBroker = createRegionBroker(); 2251 Broker broker = addInterceptors(regionBroker); 2252 // Add a filter that will stop access to the broker once stopped 2253 broker = new MutableBrokerFilter(broker) { 2254 Broker old; 2255 2256 @Override 2257 public void stop() throws Exception { 2258 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 2259 // Just ignore additional stop actions. 2260 @Override 2261 public void stop() throws Exception { 2262 } 2263 }); 2264 old.stop(); 2265 } 2266 2267 @Override 2268 public void start() throws Exception { 2269 if (forceStart && old != null) { 2270 this.next.set(old); 2271 } 2272 getNext().start(); 2273 } 2274 }; 2275 return broker; 2276 } 2277 2278 /** 2279 * Factory method to create the core region broker onto which interceptors 2280 * are added 2281 * 2282 * @throws Exception 2283 */ 2284 protected Broker createRegionBroker() throws Exception { 2285 if (destinationInterceptors == null) { 2286 destinationInterceptors = createDefaultDestinationInterceptor(); 2287 } 2288 configureServices(destinationInterceptors); 2289 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 2290 if (destinationFactory == null) { 2291 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 2292 } 2293 return createRegionBroker(destinationInterceptor); 2294 } 2295 2296 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 2297 RegionBroker regionBroker; 2298 if (isUseJmx()) { 2299 try { 2300 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 2301 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 2302 } catch(MalformedObjectNameException me){ 2303 LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); 2304 throw new IOException(me); 2305 } 2306 } else { 2307 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 2308 destinationInterceptor,getScheduler(),getExecutor()); 2309 } 2310 destinationFactory.setRegionBroker(regionBroker); 2311 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 2312 regionBroker.setBrokerName(getBrokerName()); 2313 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 2314 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); 2315 if (brokerId != null) { 2316 regionBroker.setBrokerId(brokerId); 2317 } 2318 return regionBroker; 2319 } 2320 2321 /** 2322 * Create the default destination interceptor 2323 */ 2324 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 2325 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 2326 if (isUseVirtualTopics()) { 2327 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 2328 VirtualTopic virtualTopic = new VirtualTopic(); 2329 virtualTopic.setName("VirtualTopic.>"); 2330 VirtualDestination[] virtualDestinations = { virtualTopic }; 2331 interceptor.setVirtualDestinations(virtualDestinations); 2332 answer.add(interceptor); 2333 } 2334 if (isUseMirroredQueues()) { 2335 MirroredQueue interceptor = new MirroredQueue(); 2336 answer.add(interceptor); 2337 } 2338 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 2339 answer.toArray(array); 2340 return array; 2341 } 2342 2343 /** 2344 * Strategy method to add interceptors to the broker 2345 * 2346 * @throws IOException 2347 */ 2348 protected Broker addInterceptors(Broker broker) throws Exception { 2349 if (isSchedulerSupport()) { 2350 SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); 2351 if (isUseJmx()) { 2352 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 2353 try { 2354 ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); 2355 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 2356 this.adminView.setJMSJobScheduler(objectName); 2357 } catch (Throwable e) { 2358 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 2359 + e.getMessage(), e); 2360 } 2361 } 2362 broker = sb; 2363 } 2364 if (isUseJmx()) { 2365 HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); 2366 try { 2367 ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); 2368 AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); 2369 } catch (Throwable e) { 2370 throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " 2371 + e.getMessage(), e); 2372 } 2373 } 2374 if (isAdvisorySupport()) { 2375 broker = new AdvisoryBroker(broker); 2376 } 2377 broker = new CompositeDestinationBroker(broker); 2378 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 2379 if (isPopulateJMSXUserID()) { 2380 UserIDBroker userIDBroker = new UserIDBroker(broker); 2381 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 2382 broker = userIDBroker; 2383 } 2384 if (isMonitorConnectionSplits()) { 2385 broker = new ConnectionSplitBroker(broker); 2386 } 2387 if (plugins != null) { 2388 for (int i = 0; i < plugins.length; i++) { 2389 BrokerPlugin plugin = plugins[i]; 2390 broker = plugin.installPlugin(broker); 2391 } 2392 } 2393 return broker; 2394 } 2395 2396 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 2397 if (isPersistent()) { 2398 PersistenceAdapterFactory fac = getPersistenceFactory(); 2399 if (fac != null) { 2400 return fac.createPersistenceAdapter(); 2401 } else { 2402 try { 2403 String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; 2404 PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); 2405 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 2406 adaptor.setDirectory(dir); 2407 return adaptor; 2408 } catch (Throwable e) { 2409 throw IOExceptionSupport.create(e); 2410 } 2411 } 2412 } else { 2413 return new MemoryPersistenceAdapter(); 2414 } 2415 } 2416 2417 protected ObjectName createBrokerObjectName() throws MalformedObjectNameException { 2418 return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName()); 2419 } 2420 2421 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 2422 TransportServer transport = TransportFactorySupport.bind(this, brokerURI); 2423 return new TransportConnector(transport); 2424 } 2425 2426 /** 2427 * Extracts the port from the options 2428 */ 2429 protected Object getPort(Map<?,?> options) { 2430 Object port = options.get("port"); 2431 if (port == null) { 2432 port = DEFAULT_PORT; 2433 LOG.warn("No port specified so defaulting to: {}", port); 2434 } 2435 return port; 2436 } 2437 2438 protected void addShutdownHook() { 2439 if (useShutdownHook) { 2440 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 2441 @Override 2442 public void run() { 2443 containerShutdown(); 2444 } 2445 }; 2446 Runtime.getRuntime().addShutdownHook(shutdownHook); 2447 } 2448 } 2449 2450 protected void removeShutdownHook() { 2451 if (shutdownHook != null) { 2452 try { 2453 Runtime.getRuntime().removeShutdownHook(shutdownHook); 2454 } catch (Exception e) { 2455 LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e); 2456 } 2457 } 2458 } 2459 2460 /** 2461 * Sets hooks to be executed when broker shut down 2462 * 2463 * @org.apache.xbean.Property 2464 */ 2465 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 2466 for (Runnable hook : hooks) { 2467 addShutdownHook(hook); 2468 } 2469 } 2470 2471 /** 2472 * Causes a clean shutdown of the container when the VM is being shut down 2473 */ 2474 protected void containerShutdown() { 2475 try { 2476 stop(); 2477 } catch (IOException e) { 2478 Throwable linkedException = e.getCause(); 2479 if (linkedException != null) { 2480 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2481 } else { 2482 logError("Failed to shut down: " + e, e); 2483 } 2484 if (!useLoggingForShutdownErrors) { 2485 e.printStackTrace(System.err); 2486 } 2487 } catch (Exception e) { 2488 logError("Failed to shut down: " + e, e); 2489 } 2490 } 2491 2492 protected void logError(String message, Throwable e) { 2493 if (useLoggingForShutdownErrors) { 2494 LOG.error("Failed to shut down: " + e); 2495 } else { 2496 System.err.println("Failed to shut down: " + e); 2497 } 2498 } 2499 2500 /** 2501 * Starts any configured destinations on startup 2502 */ 2503 protected void startDestinations() throws Exception { 2504 if (destinations != null) { 2505 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2506 for (int i = 0; i < destinations.length; i++) { 2507 ActiveMQDestination destination = destinations[i]; 2508 getBroker().addDestination(adminConnectionContext, destination,true); 2509 } 2510 } 2511 if (isUseVirtualTopics()) { 2512 startVirtualConsumerDestinations(); 2513 } 2514 } 2515 2516 /** 2517 * Returns the broker's administration connection context used for 2518 * configuring the broker at startup 2519 */ 2520 public ConnectionContext getAdminConnectionContext() throws Exception { 2521 return BrokerSupport.getConnectionContext(getBroker()); 2522 } 2523 2524 protected void startManagementContext() throws Exception { 2525 getManagementContext().setBrokerName(brokerName); 2526 getManagementContext().start(); 2527 adminView = new BrokerView(this, null); 2528 ObjectName objectName = getBrokerObjectName(); 2529 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2530 } 2531 2532 /** 2533 * Start all transport and network connections, proxies and bridges 2534 * 2535 * @throws Exception 2536 */ 2537 public void startAllConnectors() throws Exception { 2538 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2539 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2540 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2541 TransportConnector connector = iter.next(); 2542 al.add(startTransportConnector(connector)); 2543 } 2544 if (al.size() > 0) { 2545 // let's clear the transportConnectors list and replace it with 2546 // the started transportConnector instances 2547 this.transportConnectors.clear(); 2548 setTransportConnectors(al); 2549 } 2550 this.slave = false; 2551 URI uri = getVmConnectorURI(); 2552 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 2553 map.put("async", "false"); 2554 map.put("create","false"); 2555 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 2556 2557 if (!stopped.get()) { 2558 ThreadPoolExecutor networkConnectorStartExecutor = null; 2559 if (isNetworkConnectorStartAsync()) { 2560 // spin up as many threads as needed 2561 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2562 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2563 new ThreadFactory() { 2564 int count=0; 2565 @Override 2566 public Thread newThread(Runnable runnable) { 2567 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2568 thread.setDaemon(true); 2569 return thread; 2570 } 2571 }); 2572 } 2573 2574 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2575 final NetworkConnector connector = iter.next(); 2576 connector.setLocalUri(uri); 2577 connector.setBrokerName(getBrokerName()); 2578 connector.setDurableDestinations(durableDestinations); 2579 if (getDefaultSocketURIString() != null) { 2580 connector.setBrokerURL(getDefaultSocketURIString()); 2581 } 2582 if (networkConnectorStartExecutor != null) { 2583 networkConnectorStartExecutor.execute(new Runnable() { 2584 @Override 2585 public void run() { 2586 try { 2587 LOG.info("Async start of {}", connector); 2588 connector.start(); 2589 } catch(Exception e) { 2590 LOG.error("Async start of network connector: {} failed", connector, e); 2591 } 2592 } 2593 }); 2594 } else { 2595 connector.start(); 2596 } 2597 } 2598 if (networkConnectorStartExecutor != null) { 2599 // executor done when enqueued tasks are complete 2600 ThreadPoolUtils.shutdown(networkConnectorStartExecutor); 2601 } 2602 2603 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2604 ProxyConnector connector = iter.next(); 2605 connector.start(); 2606 } 2607 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2608 JmsConnector connector = iter.next(); 2609 connector.start(); 2610 } 2611 for (Service service : services) { 2612 configureService(service); 2613 service.start(); 2614 } 2615 } 2616 } 2617 2618 public TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2619 connector.setBrokerService(this); 2620 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2621 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2622 if (policy != null) { 2623 connector.setMessageAuthorizationPolicy(policy); 2624 } 2625 if (isUseJmx()) { 2626 connector = registerConnectorMBean(connector); 2627 } 2628 connector.getStatistics().setEnabled(enableStatistics); 2629 connector.start(); 2630 return connector; 2631 } 2632 2633 /** 2634 * Perform any custom dependency injection 2635 */ 2636 protected void configureServices(Object[] services) { 2637 for (Object service : services) { 2638 configureService(service); 2639 } 2640 } 2641 2642 /** 2643 * Perform any custom dependency injection 2644 */ 2645 protected void configureService(Object service) { 2646 if (service instanceof BrokerServiceAware) { 2647 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2648 serviceAware.setBrokerService(this); 2649 } 2650 } 2651 2652 public void handleIOException(IOException exception) { 2653 if (ioExceptionHandler != null) { 2654 ioExceptionHandler.handle(exception); 2655 } else { 2656 LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception); 2657 } 2658 } 2659 2660 protected void startVirtualConsumerDestinations() throws Exception { 2661 checkStartException(); 2662 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2663 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 2664 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); 2665 if (!destinations.isEmpty()) { 2666 for (ActiveMQDestination destination : destinations) { 2667 if (filter.matches(destination) == true) { 2668 broker.addDestination(adminConnectionContext, destination, false); 2669 } 2670 } 2671 } 2672 } 2673 2674 private DestinationFilter getVirtualTopicConsumerDestinationFilter() { 2675 // created at startup, so no sync needed 2676 if (virtualConsumerDestinationFilter == null) { 2677 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>(); 2678 if (destinationInterceptors != null) { 2679 for (DestinationInterceptor interceptor : destinationInterceptors) { 2680 if (interceptor instanceof VirtualDestinationInterceptor) { 2681 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; 2682 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { 2683 if (virtualDestination instanceof VirtualTopic) { 2684 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); 2685 } 2686 } 2687 } 2688 } 2689 } 2690 ActiveMQQueue filter = new ActiveMQQueue(); 2691 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); 2692 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); 2693 } 2694 return virtualConsumerDestinationFilter; 2695 } 2696 2697 protected synchronized ThreadPoolExecutor getExecutor() { 2698 if (this.executor == null) { 2699 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2700 2701 private long i = 0; 2702 2703 @Override 2704 public Thread newThread(Runnable runnable) { 2705 this.i++; 2706 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i); 2707 thread.setDaemon(true); 2708 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 2709 @Override 2710 public void uncaughtException(final Thread t, final Throwable e) { 2711 LOG.error("Error in thread '{}'", t.getName(), e); 2712 } 2713 }); 2714 return thread; 2715 } 2716 }, new RejectedExecutionHandler() { 2717 @Override 2718 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 2719 try { 2720 executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 2721 } catch (InterruptedException e) { 2722 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 2723 } 2724 2725 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 2726 } 2727 }); 2728 } 2729 return this.executor; 2730 } 2731 2732 public synchronized Scheduler getScheduler() { 2733 if (this.scheduler==null) { 2734 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2735 try { 2736 this.scheduler.start(); 2737 } catch (Exception e) { 2738 LOG.error("Failed to start Scheduler", e); 2739 } 2740 } 2741 return this.scheduler; 2742 } 2743 2744 public Broker getRegionBroker() { 2745 return regionBroker; 2746 } 2747 2748 public void setRegionBroker(Broker regionBroker) { 2749 this.regionBroker = regionBroker; 2750 } 2751 2752 public void addShutdownHook(Runnable hook) { 2753 synchronized (shutdownHooks) { 2754 shutdownHooks.add(hook); 2755 } 2756 } 2757 2758 public void removeShutdownHook(Runnable hook) { 2759 synchronized (shutdownHooks) { 2760 shutdownHooks.remove(hook); 2761 } 2762 } 2763 2764 public boolean isSystemExitOnShutdown() { 2765 return systemExitOnShutdown; 2766 } 2767 2768 /** 2769 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2770 */ 2771 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2772 this.systemExitOnShutdown = systemExitOnShutdown; 2773 } 2774 2775 public int getSystemExitOnShutdownExitCode() { 2776 return systemExitOnShutdownExitCode; 2777 } 2778 2779 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2780 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2781 } 2782 2783 public SslContext getSslContext() { 2784 return sslContext; 2785 } 2786 2787 public void setSslContext(SslContext sslContext) { 2788 this.sslContext = sslContext; 2789 } 2790 2791 public boolean isShutdownOnSlaveFailure() { 2792 return shutdownOnSlaveFailure; 2793 } 2794 2795 /** 2796 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2797 */ 2798 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2799 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2800 } 2801 2802 public boolean isWaitForSlave() { 2803 return waitForSlave; 2804 } 2805 2806 /** 2807 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2808 */ 2809 public void setWaitForSlave(boolean waitForSlave) { 2810 this.waitForSlave = waitForSlave; 2811 } 2812 2813 public long getWaitForSlaveTimeout() { 2814 return this.waitForSlaveTimeout; 2815 } 2816 2817 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2818 this.waitForSlaveTimeout = waitForSlaveTimeout; 2819 } 2820 2821 /** 2822 * Get the passiveSlave 2823 * @return the passiveSlave 2824 */ 2825 public boolean isPassiveSlave() { 2826 return this.passiveSlave; 2827 } 2828 2829 /** 2830 * Set the passiveSlave 2831 * @param passiveSlave the passiveSlave to set 2832 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2833 */ 2834 public void setPassiveSlave(boolean passiveSlave) { 2835 this.passiveSlave = passiveSlave; 2836 } 2837 2838 /** 2839 * override the Default IOException handler, called when persistence adapter 2840 * has experiences File or JDBC I/O Exceptions 2841 * 2842 * @param ioExceptionHandler 2843 */ 2844 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2845 configureService(ioExceptionHandler); 2846 this.ioExceptionHandler = ioExceptionHandler; 2847 } 2848 2849 public IOExceptionHandler getIoExceptionHandler() { 2850 return ioExceptionHandler; 2851 } 2852 2853 /** 2854 * @return the schedulerSupport 2855 */ 2856 public boolean isSchedulerSupport() { 2857 return this.schedulerSupport; 2858 } 2859 2860 /** 2861 * @param schedulerSupport the schedulerSupport to set 2862 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" 2863 */ 2864 public void setSchedulerSupport(boolean schedulerSupport) { 2865 this.schedulerSupport = schedulerSupport; 2866 } 2867 2868 /** 2869 * @return the schedulerDirectory 2870 */ 2871 public File getSchedulerDirectoryFile() { 2872 if (this.schedulerDirectoryFile == null) { 2873 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2874 } 2875 return schedulerDirectoryFile; 2876 } 2877 2878 /** 2879 * @param schedulerDirectory the schedulerDirectory to set 2880 */ 2881 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2882 this.schedulerDirectoryFile = schedulerDirectory; 2883 } 2884 2885 public void setSchedulerDirectory(String schedulerDirectory) { 2886 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2887 } 2888 2889 public int getSchedulePeriodForDestinationPurge() { 2890 return this.schedulePeriodForDestinationPurge; 2891 } 2892 2893 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2894 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2895 } 2896 2897 public int getMaxPurgedDestinationsPerSweep() { 2898 return this.maxPurgedDestinationsPerSweep; 2899 } 2900 2901 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) { 2902 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep; 2903 } 2904 2905 public BrokerContext getBrokerContext() { 2906 return brokerContext; 2907 } 2908 2909 public void setBrokerContext(BrokerContext brokerContext) { 2910 this.brokerContext = brokerContext; 2911 } 2912 2913 public void setBrokerId(String brokerId) { 2914 this.brokerId = new BrokerId(brokerId); 2915 } 2916 2917 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 2918 return useAuthenticatedPrincipalForJMSXUserID; 2919 } 2920 2921 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 2922 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 2923 } 2924 2925 /** 2926 * Should MBeans that support showing the Authenticated User Name information have this 2927 * value filled in or not. 2928 * 2929 * @return true if user names should be exposed in MBeans 2930 */ 2931 public boolean isPopulateUserNameInMBeans() { 2932 return this.populateUserNameInMBeans; 2933 } 2934 2935 /** 2936 * Sets whether Authenticated User Name information is shown in MBeans that support this field. 2937 * @param value if MBeans should expose user name information. 2938 */ 2939 public void setPopulateUserNameInMBeans(boolean value) { 2940 this.populateUserNameInMBeans = value; 2941 } 2942 2943 /** 2944 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 2945 * failing. The default value is to wait forever (zero). 2946 * 2947 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 2948 */ 2949 public long getMbeanInvocationTimeout() { 2950 return mbeanInvocationTimeout; 2951 } 2952 2953 /** 2954 * Gets the time in Milliseconds that an invocation of an MBean method will wait before 2955 * failing. The default value is to wait forever (zero). 2956 * 2957 * @param mbeanInvocationTimeout 2958 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout). 2959 */ 2960 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) { 2961 this.mbeanInvocationTimeout = mbeanInvocationTimeout; 2962 } 2963 2964 public boolean isNetworkConnectorStartAsync() { 2965 return networkConnectorStartAsync; 2966 } 2967 2968 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 2969 this.networkConnectorStartAsync = networkConnectorStartAsync; 2970 } 2971 2972 public boolean isAllowTempAutoCreationOnSend() { 2973 return allowTempAutoCreationOnSend; 2974 } 2975 2976 /** 2977 * enable if temp destinations need to be propagated through a network when 2978 * advisorySupport==false. This is used in conjunction with the policy 2979 * gcInactiveDestinations for matching temps so they can get removed 2980 * when inactive 2981 * 2982 * @param allowTempAutoCreationOnSend 2983 */ 2984 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { 2985 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; 2986 } 2987 2988 public long getOfflineDurableSubscriberTimeout() { 2989 return offlineDurableSubscriberTimeout; 2990 } 2991 2992 public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { 2993 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; 2994 } 2995 2996 public long getOfflineDurableSubscriberTaskSchedule() { 2997 return offlineDurableSubscriberTaskSchedule; 2998 } 2999 3000 public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { 3001 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; 3002 } 3003 3004 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { 3005 return isUseVirtualTopics() && destination.isQueue() && 3006 getVirtualTopicConsumerDestinationFilter().matches(destination); 3007 } 3008 3009 synchronized public Throwable getStartException() { 3010 return startException; 3011 } 3012 3013 public boolean isStartAsync() { 3014 return startAsync; 3015 } 3016 3017 public void setStartAsync(boolean startAsync) { 3018 this.startAsync = startAsync; 3019 } 3020 3021 public boolean isSlave() { 3022 return this.slave; 3023 } 3024 3025 public boolean isStopping() { 3026 return this.stopping.get(); 3027 } 3028 3029 /** 3030 * @return true if the broker allowed to restart on shutdown. 3031 */ 3032 public boolean isRestartAllowed() { 3033 return restartAllowed; 3034 } 3035 3036 /** 3037 * Sets if the broker allowed to restart on shutdown. 3038 * @return 3039 */ 3040 public void setRestartAllowed(boolean restartAllowed) { 3041 this.restartAllowed = restartAllowed; 3042 } 3043 3044 /** 3045 * A lifecycle manager of the BrokerService should 3046 * inspect this property after a broker shutdown has occurred 3047 * to find out if the broker needs to be re-created and started 3048 * again. 3049 * 3050 * @return true if the broker wants to be restarted after it shuts down. 3051 */ 3052 public boolean isRestartRequested() { 3053 return restartRequested; 3054 } 3055 3056 public void requestRestart() { 3057 this.restartRequested = true; 3058 } 3059 3060 public int getStoreOpenWireVersion() { 3061 return storeOpenWireVersion; 3062 } 3063 3064 public void setStoreOpenWireVersion(int storeOpenWireVersion) { 3065 this.storeOpenWireVersion = storeOpenWireVersion; 3066 } 3067 3068 /** 3069 * @return the current number of connections on this Broker. 3070 */ 3071 public int getCurrentConnections() { 3072 return this.currentConnections.get(); 3073 } 3074 3075 /** 3076 * @return the total number of connections this broker has handled since startup. 3077 */ 3078 public long getTotalConnections() { 3079 return this.totalConnections.get(); 3080 } 3081 3082 public void incrementCurrentConnections() { 3083 this.currentConnections.incrementAndGet(); 3084 } 3085 3086 public void decrementCurrentConnections() { 3087 this.currentConnections.decrementAndGet(); 3088 } 3089 3090 public void incrementTotalConnections() { 3091 this.totalConnections.incrementAndGet(); 3092 } 3093 3094 public boolean isRejectDurableConsumers() { 3095 return rejectDurableConsumers; 3096 } 3097 3098 public void setRejectDurableConsumers(boolean rejectDurableConsumers) { 3099 this.rejectDurableConsumers = rejectDurableConsumers; 3100 } 3101 3102 public boolean isAdjustUsageLimits() { 3103 return adjustUsageLimits; 3104 } 3105 3106 public void setAdjustUsageLimits(boolean adjustUsageLimits) { 3107 this.adjustUsageLimits = adjustUsageLimits; 3108 } 3109 3110 public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) { 3111 this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException; 3112 } 3113 3114 public boolean isRollbackOnlyOnAsyncException() { 3115 return rollbackOnlyOnAsyncException; 3116 } 3117}