001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.CopyOnWriteArraySet; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.ThreadPoolExecutor; 031 032import javax.management.InstanceNotFoundException; 033import javax.management.MalformedObjectNameException; 034import javax.management.ObjectName; 035import javax.management.openmbean.CompositeData; 036import javax.management.openmbean.CompositeDataSupport; 037import javax.management.openmbean.CompositeType; 038import javax.management.openmbean.OpenDataException; 039import javax.management.openmbean.TabularData; 040import javax.management.openmbean.TabularDataSupport; 041import javax.management.openmbean.TabularType; 042 043import org.apache.activemq.broker.Broker; 044import org.apache.activemq.broker.BrokerService; 045import org.apache.activemq.broker.ConnectionContext; 046import org.apache.activemq.broker.ProducerBrokerExchange; 047import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 048import org.apache.activemq.broker.region.Destination; 049import org.apache.activemq.broker.region.DestinationFactory; 050import org.apache.activemq.broker.region.DestinationFactoryImpl; 051import org.apache.activemq.broker.region.DestinationInterceptor; 052import org.apache.activemq.broker.region.Queue; 053import org.apache.activemq.broker.region.Region; 054import org.apache.activemq.broker.region.RegionBroker; 055import org.apache.activemq.broker.region.Subscription; 056import org.apache.activemq.broker.region.Topic; 057import org.apache.activemq.broker.region.TopicRegion; 058import org.apache.activemq.broker.region.TopicSubscription; 059import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; 060import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 061import org.apache.activemq.command.ActiveMQDestination; 062import org.apache.activemq.command.ActiveMQMessage; 063import org.apache.activemq.command.ActiveMQTopic; 064import org.apache.activemq.command.ConnectionInfo; 065import org.apache.activemq.command.ConsumerInfo; 066import org.apache.activemq.command.Message; 067import org.apache.activemq.command.MessageId; 068import org.apache.activemq.command.ProducerInfo; 069import org.apache.activemq.command.SubscriptionInfo; 070import org.apache.activemq.store.MessageRecoveryListener; 071import org.apache.activemq.store.PersistenceAdapter; 072import org.apache.activemq.store.TopicMessageStore; 073import org.apache.activemq.thread.Scheduler; 074import org.apache.activemq.thread.TaskRunnerFactory; 075import org.apache.activemq.transaction.XATransaction; 076import org.apache.activemq.usage.SystemUsage; 077import org.apache.activemq.util.ServiceStopper; 078import org.apache.activemq.util.SubscriptionKey; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082public class ManagedRegionBroker extends RegionBroker { 083 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); 084 private final ManagementContext managementContext; 085 private final ObjectName brokerObjectName; 086 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); 087 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); 088 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); 089 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); 090 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 091 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 092 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 093 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 094 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 095 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 096 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 097 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 098 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 099 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 100 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 101 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); 102 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); 103 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); 104 /* This is the first broker in the broker interceptor chain. */ 105 private Broker contextBroker; 106 107 private final ExecutorService asyncInvokeService; 108 private final long mbeanTimeout; 109 110 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, 111 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 112 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); 113 this.managementContext = context; 114 this.brokerObjectName = brokerObjectName; 115 this.mbeanTimeout = brokerService.getMbeanInvocationTimeout(); 116 this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;; 117 } 118 119 @Override 120 public void start() throws Exception { 121 super.start(); 122 // build all existing durable subscriptions 123 buildExistingSubscriptions(); 124 } 125 126 @Override 127 protected void doStop(ServiceStopper stopper) { 128 super.doStop(stopper); 129 // lets remove any mbeans not yet removed 130 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) { 131 ObjectName name = iter.next(); 132 try { 133 managementContext.unregisterMBean(name); 134 } catch (InstanceNotFoundException e) { 135 LOG.warn("The MBean {} is no longer registered with JMX", name); 136 } catch (Exception e) { 137 stopper.onException(this, e); 138 } 139 } 140 registeredMBeans.clear(); 141 } 142 143 @Override 144 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 145 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 146 } 147 148 @Override 149 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 150 return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 151 } 152 153 @Override 154 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 155 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 156 } 157 158 @Override 159 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 160 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 161 } 162 163 public void register(ActiveMQDestination destName, Destination destination) { 164 // TODO refactor to allow views for custom destinations 165 try { 166 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName); 167 DestinationView view; 168 if (destination instanceof Queue) { 169 view = new QueueView(this, (Queue)destination); 170 } else if (destination instanceof Topic) { 171 view = new TopicView(this, (Topic)destination); 172 } else { 173 view = null; 174 LOG.warn("JMX View is not supported for custom destination {}", destination); 175 } 176 if (view != null) { 177 registerDestination(objectName, destName, view); 178 } 179 } catch (Exception e) { 180 LOG.error("Failed to register destination {}", destName, e); 181 } 182 } 183 184 public void unregister(ActiveMQDestination destName) { 185 try { 186 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName); 187 unregisterDestination(objectName); 188 } catch (Exception e) { 189 LOG.error("Failed to unregister {}", destName, e); 190 } 191 } 192 193 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { 194 String connectionClientId = context.getClientId(); 195 196 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); 197 try { 198 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo()); 199 SubscriptionView view; 200 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) { 201 // add offline subscribers to inactive list 202 SubscriptionInfo info = new SubscriptionInfo(); 203 info.setClientId(context.getClientId()); 204 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName()); 205 info.setDestination(sub.getConsumerInfo().getDestination()); 206 info.setSelector(sub.getSelector()); 207 addInactiveSubscription(key, info, sub); 208 } else { 209 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 210 if (sub.getConsumerInfo().isDurable()) { 211 view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub); 212 } else { 213 if (sub instanceof TopicSubscription) { 214 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub); 215 } else { 216 view = new SubscriptionView(context.getClientId(), userName, sub); 217 } 218 } 219 registerSubscription(objectName, sub.getConsumerInfo(), key, view); 220 } 221 subscriptionMap.put(sub, objectName); 222 return objectName; 223 } catch (Exception e) { 224 LOG.error("Failed to register subscription {}", sub, e); 225 return null; 226 } 227 } 228 229 @Override 230 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 231 super.addConnection(context, info); 232 this.contextBroker.getBrokerService().incrementCurrentConnections(); 233 this.contextBroker.getBrokerService().incrementTotalConnections(); 234 } 235 236 @Override 237 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 238 super.removeConnection(context, info, error); 239 this.contextBroker.getBrokerService().decrementCurrentConnections(); 240 } 241 242 @Override 243 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 244 Subscription sub = super.addConsumer(context, info); 245 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 246 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 247 if (inactiveName != null) { 248 // if it was inactive, register it 249 registerSubscription(context, sub); 250 } 251 return sub; 252 } 253 254 @Override 255 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 256 for (Subscription sub : subscriptionMap.keySet()) { 257 if (sub.getConsumerInfo().equals(info)) { 258 // unregister all consumer subs 259 unregisterSubscription(subscriptionMap.get(sub), true); 260 } 261 } 262 super.removeConsumer(context, info); 263 } 264 265 @Override 266 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 267 super.addProducer(context, info); 268 String connectionClientId = context.getClientId(); 269 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info); 270 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 271 ProducerView view = new ProducerView(info, connectionClientId, userName, this); 272 registerProducer(objectName, info.getDestination(), view); 273 } 274 275 @Override 276 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 277 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info); 278 unregisterProducer(objectName); 279 super.removeProducer(context, info); 280 } 281 282 @Override 283 public void send(ProducerBrokerExchange exchange, Message message) throws Exception { 284 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) { 285 ProducerInfo info = exchange.getProducerState().getInfo(); 286 if (info.getDestination() == null && info.getProducerId() != null) { 287 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info); 288 ProducerView view = this.dynamicDestinationProducers.get(objectName); 289 if (view != null) { 290 ActiveMQDestination dest = message.getDestination(); 291 if (dest != null) { 292 view.setLastUsedDestinationName(dest); 293 } 294 } 295 } 296 } 297 super.send(exchange, message); 298 } 299 300 public void unregisterSubscription(Subscription sub) { 301 ObjectName name = subscriptionMap.remove(sub); 302 if (name != null) { 303 try { 304 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 305 ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey); 306 if (inactiveName != null) { 307 inactiveDurableTopicSubscribers.remove(inactiveName); 308 managementContext.unregisterMBean(inactiveName); 309 } 310 } catch (Exception e) { 311 LOG.error("Failed to unregister subscription {}", sub, e); 312 } 313 } 314 } 315 316 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception { 317 if (dest.isQueue()) { 318 if (dest.isTemporary()) { 319 temporaryQueues.put(key, view); 320 } else { 321 queues.put(key, view); 322 } 323 } else { 324 if (dest.isTemporary()) { 325 temporaryTopics.put(key, view); 326 } else { 327 topics.put(key, view); 328 } 329 } 330 try { 331 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); 332 registeredMBeans.add(key); 333 } catch (Throwable e) { 334 LOG.warn("Failed to register MBean {}", key); 335 LOG.debug("Failure reason: ", e); 336 } 337 } 338 339 protected void unregisterDestination(ObjectName key) throws Exception { 340 341 DestinationView view = removeAndRemember(topics, key, null); 342 view = removeAndRemember(queues, key, view); 343 view = removeAndRemember(temporaryQueues, key, view); 344 view = removeAndRemember(temporaryTopics, key, view); 345 if (registeredMBeans.remove(key)) { 346 try { 347 managementContext.unregisterMBean(key); 348 } catch (Throwable e) { 349 LOG.warn("Failed to unregister MBean {}", key); 350 LOG.debug("Failure reason: ", e); 351 } 352 } 353 if (view != null) { 354 key = view.getSlowConsumerStrategy(); 355 if (key!= null && registeredMBeans.remove(key)) { 356 try { 357 managementContext.unregisterMBean(key); 358 } catch (Throwable e) { 359 LOG.warn("Failed to unregister slow consumer strategy MBean {}", key); 360 LOG.debug("Failure reason: ", e); 361 } 362 } 363 } 364 } 365 366 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception { 367 368 if (dest != null) { 369 if (dest.isQueue()) { 370 if (dest.isTemporary()) { 371 temporaryQueueProducers.put(key, view); 372 } else { 373 queueProducers.put(key, view); 374 } 375 } else { 376 if (dest.isTemporary()) { 377 temporaryTopicProducers.put(key, view); 378 } else { 379 topicProducers.put(key, view); 380 } 381 } 382 } else { 383 dynamicDestinationProducers.put(key, view); 384 } 385 386 try { 387 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); 388 registeredMBeans.add(key); 389 } catch (Throwable e) { 390 LOG.warn("Failed to register MBean {}", key); 391 LOG.debug("Failure reason: ", e); 392 } 393 } 394 395 protected void unregisterProducer(ObjectName key) throws Exception { 396 queueProducers.remove(key); 397 topicProducers.remove(key); 398 temporaryQueueProducers.remove(key); 399 temporaryTopicProducers.remove(key); 400 dynamicDestinationProducers.remove(key); 401 if (registeredMBeans.remove(key)) { 402 try { 403 managementContext.unregisterMBean(key); 404 } catch (Throwable e) { 405 LOG.warn("Failed to unregister MBean {}", key); 406 LOG.debug("Failure reason: ", e); 407 } 408 } 409 } 410 411 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) { 412 DestinationView candidate = map.remove(key); 413 if (candidate != null && view == null) { 414 view = candidate; 415 } 416 return candidate != null ? candidate : view; 417 } 418 419 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { 420 ActiveMQDestination dest = info.getDestination(); 421 if (dest.isQueue()) { 422 if (dest.isTemporary()) { 423 temporaryQueueSubscribers.put(key, view); 424 } else { 425 queueSubscribers.put(key, view); 426 } 427 } else { 428 if (dest.isTemporary()) { 429 temporaryTopicSubscribers.put(key, view); 430 } else { 431 if (info.isDurable()) { 432 durableTopicSubscribers.put(key, view); 433 // unregister any inactive durable subs 434 try { 435 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 436 if (inactiveName != null) { 437 inactiveDurableTopicSubscribers.remove(inactiveName); 438 registeredMBeans.remove(inactiveName); 439 managementContext.unregisterMBean(inactiveName); 440 } 441 } catch (Throwable e) { 442 LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e); 443 } 444 } else { 445 topicSubscribers.put(key, view); 446 } 447 } 448 } 449 450 try { 451 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); 452 registeredMBeans.add(key); 453 } catch (Throwable e) { 454 LOG.warn("Failed to register MBean {}", key); 455 LOG.debug("Failure reason: ", e); 456 } 457 } 458 459 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception { 460 queueSubscribers.remove(key); 461 topicSubscribers.remove(key); 462 temporaryQueueSubscribers.remove(key); 463 temporaryTopicSubscribers.remove(key); 464 if (registeredMBeans.remove(key)) { 465 try { 466 managementContext.unregisterMBean(key); 467 } catch (Throwable e) { 468 LOG.warn("Failed to unregister MBean {}", key); 469 LOG.debug("Failure reason: ", e); 470 } 471 } 472 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key); 473 if (view != null) { 474 // need to put this back in the inactive list 475 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName()); 476 if (addToInactive) { 477 SubscriptionInfo info = new SubscriptionInfo(); 478 info.setClientId(subscriptionKey.getClientId()); 479 info.setSubscriptionName(subscriptionKey.getSubscriptionName()); 480 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 481 info.setSelector(view.getSelector()); 482 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null)); 483 } 484 } 485 } 486 487 protected void buildExistingSubscriptions() throws Exception { 488 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>(); 489 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 490 if (destinations != null) { 491 for (ActiveMQDestination dest : destinations) { 492 if (dest.isTopic()) { 493 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest); 494 if (infos != null) { 495 for (int i = 0; i < infos.length; i++) { 496 SubscriptionInfo info = infos[i]; 497 SubscriptionKey key = new SubscriptionKey(info); 498 if (!alreadyKnown(key)) { 499 LOG.debug("Restoring durable subscription MBean {}", info); 500 subscriptions.put(key, info); 501 } 502 } 503 } 504 } 505 } 506 } 507 508 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) { 509 addInactiveSubscription(entry.getKey(), entry.getValue(), null); 510 } 511 } 512 513 private boolean alreadyKnown(SubscriptionKey key) { 514 boolean known = false; 515 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key); 516 LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not")); 517 return known; 518 } 519 520 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) { 521 try { 522 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info); 523 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo); 524 SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription); 525 526 try { 527 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); 528 registeredMBeans.add(objectName); 529 } catch (Throwable e) { 530 LOG.warn("Failed to register MBean {}", key); 531 LOG.debug("Failure reason: ", e); 532 } 533 534 inactiveDurableTopicSubscribers.put(objectName, view); 535 subscriptionKeys.put(key, objectName); 536 } catch (Exception e) { 537 LOG.error("Failed to register subscription {}", info, e); 538 } 539 } 540 541 public CompositeData[] browse(SubscriptionView view) throws OpenDataException { 542 List<Message> messages = getSubscriberMessages(view); 543 CompositeData c[] = new CompositeData[messages.size()]; 544 for (int i = 0; i < c.length; i++) { 545 try { 546 c[i] = OpenTypeSupport.convert(messages.get(i)); 547 } catch (Throwable e) { 548 LOG.error("Failed to browse: {}", view, e); 549 } 550 } 551 return c; 552 } 553 554 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 555 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 556 List<Message> messages = getSubscriberMessages(view); 557 CompositeType ct = factory.getCompositeType(); 558 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); 559 TabularDataSupport rc = new TabularDataSupport(tt); 560 for (int i = 0; i < messages.size(); i++) { 561 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i)))); 562 } 563 return rc; 564 } 565 566 protected List<Message> getSubscriberMessages(SubscriptionView view) { 567 // TODO It is very dangerous operation for big backlogs 568 if (!(destinationFactory instanceof DestinationFactoryImpl)) { 569 throw new RuntimeException("unsupported by " + destinationFactory); 570 } 571 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); 572 final List<Message> result = new ArrayList<Message>(); 573 try { 574 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); 575 TopicMessageStore store = adapter.createTopicMessageStore(topic); 576 store.recover(new MessageRecoveryListener() { 577 @Override 578 public boolean recoverMessage(Message message) throws Exception { 579 result.add(message); 580 return true; 581 } 582 583 @Override 584 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 585 throw new RuntimeException("Should not be called."); 586 } 587 588 @Override 589 public boolean hasSpace() { 590 return true; 591 } 592 593 @Override 594 public boolean isDuplicate(MessageId id) { 595 return false; 596 } 597 }); 598 } catch (Throwable e) { 599 LOG.error("Failed to browse messages for Subscription {}", view, e); 600 } 601 return result; 602 603 } 604 605 protected ObjectName[] getTopics() { 606 Set<ObjectName> set = topics.keySet(); 607 return set.toArray(new ObjectName[set.size()]); 608 } 609 610 protected ObjectName[] getQueues() { 611 Set<ObjectName> set = queues.keySet(); 612 return set.toArray(new ObjectName[set.size()]); 613 } 614 615 protected ObjectName[] getTemporaryTopics() { 616 Set<ObjectName> set = temporaryTopics.keySet(); 617 return set.toArray(new ObjectName[set.size()]); 618 } 619 620 protected ObjectName[] getTemporaryQueues() { 621 Set<ObjectName> set = temporaryQueues.keySet(); 622 return set.toArray(new ObjectName[set.size()]); 623 } 624 625 protected ObjectName[] getTopicSubscribers() { 626 Set<ObjectName> set = topicSubscribers.keySet(); 627 return set.toArray(new ObjectName[set.size()]); 628 } 629 630 protected ObjectName[] getDurableTopicSubscribers() { 631 Set<ObjectName> set = durableTopicSubscribers.keySet(); 632 return set.toArray(new ObjectName[set.size()]); 633 } 634 635 protected ObjectName[] getQueueSubscribers() { 636 Set<ObjectName> set = queueSubscribers.keySet(); 637 return set.toArray(new ObjectName[set.size()]); 638 } 639 640 protected ObjectName[] getTemporaryTopicSubscribers() { 641 Set<ObjectName> set = temporaryTopicSubscribers.keySet(); 642 return set.toArray(new ObjectName[set.size()]); 643 } 644 645 protected ObjectName[] getTemporaryQueueSubscribers() { 646 Set<ObjectName> set = temporaryQueueSubscribers.keySet(); 647 return set.toArray(new ObjectName[set.size()]); 648 } 649 650 protected ObjectName[] getInactiveDurableTopicSubscribers() { 651 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet(); 652 return set.toArray(new ObjectName[set.size()]); 653 } 654 655 protected ObjectName[] getTopicProducers() { 656 Set<ObjectName> set = topicProducers.keySet(); 657 return set.toArray(new ObjectName[set.size()]); 658 } 659 660 protected ObjectName[] getQueueProducers() { 661 Set<ObjectName> set = queueProducers.keySet(); 662 return set.toArray(new ObjectName[set.size()]); 663 } 664 665 protected ObjectName[] getTemporaryTopicProducers() { 666 Set<ObjectName> set = temporaryTopicProducers.keySet(); 667 return set.toArray(new ObjectName[set.size()]); 668 } 669 670 protected ObjectName[] getTemporaryQueueProducers() { 671 Set<ObjectName> set = temporaryQueueProducers.keySet(); 672 return set.toArray(new ObjectName[set.size()]); 673 } 674 675 protected ObjectName[] getDynamicDestinationProducers() { 676 Set<ObjectName> set = dynamicDestinationProducers.keySet(); 677 return set.toArray(new ObjectName[set.size()]); 678 } 679 680 public Broker getContextBroker() { 681 return contextBroker; 682 } 683 684 public void setContextBroker(Broker contextBroker) { 685 this.contextBroker = contextBroker; 686 } 687 688 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { 689 ObjectName objectName = null; 690 try { 691 objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy); 692 if (!registeredMBeans.contains(objectName)) { 693 694 AbortSlowConsumerStrategyView view = null; 695 if (strategy instanceof AbortSlowAckConsumerStrategy) { 696 view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy); 697 } else { 698 view = new AbortSlowConsumerStrategyView(this, strategy); 699 } 700 701 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); 702 registeredMBeans.add(objectName); 703 } 704 } catch (Exception e) { 705 LOG.warn("Failed to register MBean {}", strategy); 706 LOG.debug("Failure reason: ", e); 707 } 708 return objectName; 709 } 710 711 public void registerRecoveredTransactionMBean(XATransaction transaction) { 712 try { 713 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction); 714 if (!registeredMBeans.contains(objectName)) { 715 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction); 716 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); 717 registeredMBeans.add(objectName); 718 } 719 } catch (Exception e) { 720 LOG.warn("Failed to register prepared transaction MBean {}", transaction); 721 LOG.debug("Failure reason: ", e); 722 } 723 } 724 725 public void unregister(XATransaction transaction) { 726 try { 727 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction); 728 if (registeredMBeans.remove(objectName)) { 729 try { 730 managementContext.unregisterMBean(objectName); 731 } catch (Throwable e) { 732 LOG.warn("Failed to unregister MBean {}", objectName); 733 LOG.debug("Failure reason: ", e); 734 } 735 } 736 } catch (Exception e) { 737 LOG.warn("Failed to create object name to unregister {}", transaction, e); 738 } 739 } 740 741 public ObjectName getSubscriberObjectName(Subscription key) { 742 return subscriptionMap.get(key); 743 } 744 745 public Subscription getSubscriber(ObjectName key) { 746 Subscription sub = null; 747 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) { 748 if (entry.getValue().equals(key)) { 749 sub = entry.getKey(); 750 break; 751 } 752 } 753 return sub; 754 } 755 756 public Map<ObjectName, DestinationView> getQueueViews() { 757 return queues; 758 } 759}