001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.CopyOnWriteArraySet;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.ThreadPoolExecutor;
031
032import javax.management.InstanceNotFoundException;
033import javax.management.MalformedObjectNameException;
034import javax.management.ObjectName;
035import javax.management.openmbean.CompositeData;
036import javax.management.openmbean.CompositeDataSupport;
037import javax.management.openmbean.CompositeType;
038import javax.management.openmbean.OpenDataException;
039import javax.management.openmbean.TabularData;
040import javax.management.openmbean.TabularDataSupport;
041import javax.management.openmbean.TabularType;
042
043import org.apache.activemq.broker.Broker;
044import org.apache.activemq.broker.BrokerService;
045import org.apache.activemq.broker.ConnectionContext;
046import org.apache.activemq.broker.ProducerBrokerExchange;
047import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048import org.apache.activemq.broker.region.Destination;
049import org.apache.activemq.broker.region.DestinationFactory;
050import org.apache.activemq.broker.region.DestinationFactoryImpl;
051import org.apache.activemq.broker.region.DestinationInterceptor;
052import org.apache.activemq.broker.region.Queue;
053import org.apache.activemq.broker.region.Region;
054import org.apache.activemq.broker.region.RegionBroker;
055import org.apache.activemq.broker.region.Subscription;
056import org.apache.activemq.broker.region.Topic;
057import org.apache.activemq.broker.region.TopicRegion;
058import org.apache.activemq.broker.region.TopicSubscription;
059import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
060import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
061import org.apache.activemq.command.ActiveMQDestination;
062import org.apache.activemq.command.ActiveMQMessage;
063import org.apache.activemq.command.ActiveMQTopic;
064import org.apache.activemq.command.ConnectionInfo;
065import org.apache.activemq.command.ConsumerInfo;
066import org.apache.activemq.command.Message;
067import org.apache.activemq.command.MessageId;
068import org.apache.activemq.command.ProducerInfo;
069import org.apache.activemq.command.SubscriptionInfo;
070import org.apache.activemq.store.MessageRecoveryListener;
071import org.apache.activemq.store.PersistenceAdapter;
072import org.apache.activemq.store.TopicMessageStore;
073import org.apache.activemq.thread.Scheduler;
074import org.apache.activemq.thread.TaskRunnerFactory;
075import org.apache.activemq.transaction.XATransaction;
076import org.apache.activemq.usage.SystemUsage;
077import org.apache.activemq.util.ServiceStopper;
078import org.apache.activemq.util.SubscriptionKey;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082public class ManagedRegionBroker extends RegionBroker {
083    private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
084    private final ManagementContext managementContext;
085    private final ObjectName brokerObjectName;
086    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
087    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
088    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
089    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
090    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
096    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
101    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
102    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
103    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
104    /* This is the first broker in the broker interceptor chain. */
105    private Broker contextBroker;
106
107    private final ExecutorService asyncInvokeService;
108    private final long mbeanTimeout;
109
110    public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
111                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
112        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
113        this.managementContext = context;
114        this.brokerObjectName = brokerObjectName;
115        this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
116        this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
117    }
118
119    @Override
120    public void start() throws Exception {
121        super.start();
122        // build all existing durable subscriptions
123        buildExistingSubscriptions();
124    }
125
126    @Override
127    protected void doStop(ServiceStopper stopper) {
128        super.doStop(stopper);
129        // lets remove any mbeans not yet removed
130        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
131            ObjectName name = iter.next();
132            try {
133                managementContext.unregisterMBean(name);
134            } catch (InstanceNotFoundException e) {
135                LOG.warn("The MBean {} is no longer registered with JMX", name);
136            } catch (Exception e) {
137                stopper.onException(this, e);
138            }
139        }
140        registeredMBeans.clear();
141    }
142
143    @Override
144    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
145        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
146    }
147
148    @Override
149    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
150        return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
151    }
152
153    @Override
154    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
155        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
156    }
157
158    @Override
159    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
160        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
161    }
162
163    public void register(ActiveMQDestination destName, Destination destination) {
164        // TODO refactor to allow views for custom destinations
165        try {
166            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
167            DestinationView view;
168            if (destination instanceof Queue) {
169                view = new QueueView(this, (Queue)destination);
170            } else if (destination instanceof Topic) {
171                view = new TopicView(this, (Topic)destination);
172            } else {
173                view = null;
174                LOG.warn("JMX View is not supported for custom destination {}", destination);
175            }
176            if (view != null) {
177                registerDestination(objectName, destName, view);
178            }
179        } catch (Exception e) {
180            LOG.error("Failed to register destination {}", destName, e);
181        }
182    }
183
184    public void unregister(ActiveMQDestination destName) {
185        try {
186            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
187            unregisterDestination(objectName);
188        } catch (Exception e) {
189            LOG.error("Failed to unregister {}", destName, e);
190        }
191    }
192
193    public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
194        String connectionClientId = context.getClientId();
195
196        SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
197        try {
198            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
199            SubscriptionView view;
200            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
201                // add offline subscribers to inactive list
202                SubscriptionInfo info = new SubscriptionInfo();
203                info.setClientId(context.getClientId());
204                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
205                info.setDestination(sub.getConsumerInfo().getDestination());
206                info.setSelector(sub.getSelector());
207                addInactiveSubscription(key, info, sub);
208            } else {
209                String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
210                if (sub.getConsumerInfo().isDurable()) {
211                    view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
212                } else {
213                    if (sub instanceof TopicSubscription) {
214                        view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
215                    } else {
216                        view = new SubscriptionView(context.getClientId(), userName, sub);
217                    }
218                }
219                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
220            }
221            subscriptionMap.put(sub, objectName);
222            return objectName;
223        } catch (Exception e) {
224            LOG.error("Failed to register subscription {}", sub, e);
225            return null;
226        }
227    }
228
229    @Override
230    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
231        super.addConnection(context, info);
232        this.contextBroker.getBrokerService().incrementCurrentConnections();
233        this.contextBroker.getBrokerService().incrementTotalConnections();
234    }
235
236    @Override
237    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
238        super.removeConnection(context, info, error);
239        this.contextBroker.getBrokerService().decrementCurrentConnections();
240    }
241
242    @Override
243    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
244        Subscription sub = super.addConsumer(context, info);
245        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
246        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
247        if (inactiveName != null) {
248            // if it was inactive, register it
249            registerSubscription(context, sub);
250        }
251        return sub;
252    }
253
254    @Override
255    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
256        for (Subscription sub : subscriptionMap.keySet()) {
257            if (sub.getConsumerInfo().equals(info)) {
258               // unregister all consumer subs
259               unregisterSubscription(subscriptionMap.get(sub), true);
260            }
261        }
262        super.removeConsumer(context, info);
263    }
264
265    @Override
266    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
267        super.addProducer(context, info);
268        String connectionClientId = context.getClientId();
269        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
270        String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
271        ProducerView view = new ProducerView(info, connectionClientId, userName, this);
272        registerProducer(objectName, info.getDestination(), view);
273    }
274
275    @Override
276    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
277        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
278        unregisterProducer(objectName);
279        super.removeProducer(context, info);
280    }
281
282    @Override
283    public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
284        if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
285            ProducerInfo info = exchange.getProducerState().getInfo();
286            if (info.getDestination() == null && info.getProducerId() != null) {
287                ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
288                ProducerView view = this.dynamicDestinationProducers.get(objectName);
289                if (view != null) {
290                    ActiveMQDestination dest = message.getDestination();
291                    if (dest != null) {
292                        view.setLastUsedDestinationName(dest);
293                    }
294                }
295            }
296         }
297        super.send(exchange, message);
298    }
299
300    public void unregisterSubscription(Subscription sub) {
301        ObjectName name = subscriptionMap.remove(sub);
302        if (name != null) {
303            try {
304                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
305                ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey);
306                if (inactiveName != null) {
307                    inactiveDurableTopicSubscribers.remove(inactiveName);
308                    managementContext.unregisterMBean(inactiveName);
309                }
310            } catch (Exception e) {
311                LOG.error("Failed to unregister subscription {}", sub, e);
312            }
313        }
314    }
315
316    protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
317        if (dest.isQueue()) {
318            if (dest.isTemporary()) {
319                temporaryQueues.put(key, view);
320            } else {
321                queues.put(key, view);
322            }
323        } else {
324            if (dest.isTemporary()) {
325                temporaryTopics.put(key, view);
326            } else {
327                topics.put(key, view);
328            }
329        }
330        try {
331            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
332            registeredMBeans.add(key);
333        } catch (Throwable e) {
334            LOG.warn("Failed to register MBean {}", key);
335            LOG.debug("Failure reason: ", e);
336        }
337    }
338
339    protected void unregisterDestination(ObjectName key) throws Exception {
340
341        DestinationView view = removeAndRemember(topics, key, null);
342        view = removeAndRemember(queues, key, view);
343        view = removeAndRemember(temporaryQueues, key, view);
344        view = removeAndRemember(temporaryTopics, key, view);
345        if (registeredMBeans.remove(key)) {
346            try {
347                managementContext.unregisterMBean(key);
348            } catch (Throwable e) {
349                LOG.warn("Failed to unregister MBean {}", key);
350                LOG.debug("Failure reason: ", e);
351            }
352        }
353        if (view != null) {
354            key = view.getSlowConsumerStrategy();
355            if (key!= null && registeredMBeans.remove(key)) {
356                try {
357                    managementContext.unregisterMBean(key);
358                } catch (Throwable e) {
359                    LOG.warn("Failed to unregister slow consumer strategy MBean {}", key);
360                    LOG.debug("Failure reason: ", e);
361                }
362            }
363        }
364    }
365
366    protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
367
368        if (dest != null) {
369            if (dest.isQueue()) {
370                if (dest.isTemporary()) {
371                    temporaryQueueProducers.put(key, view);
372                } else {
373                    queueProducers.put(key, view);
374                }
375            } else {
376                if (dest.isTemporary()) {
377                    temporaryTopicProducers.put(key, view);
378                } else {
379                    topicProducers.put(key, view);
380                }
381            }
382        } else {
383            dynamicDestinationProducers.put(key, view);
384        }
385
386        try {
387            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
388            registeredMBeans.add(key);
389        } catch (Throwable e) {
390            LOG.warn("Failed to register MBean {}", key);
391            LOG.debug("Failure reason: ", e);
392        }
393    }
394
395    protected void unregisterProducer(ObjectName key) throws Exception {
396        queueProducers.remove(key);
397        topicProducers.remove(key);
398        temporaryQueueProducers.remove(key);
399        temporaryTopicProducers.remove(key);
400        dynamicDestinationProducers.remove(key);
401        if (registeredMBeans.remove(key)) {
402            try {
403                managementContext.unregisterMBean(key);
404            } catch (Throwable e) {
405                LOG.warn("Failed to unregister MBean {}", key);
406                LOG.debug("Failure reason: ", e);
407            }
408        }
409    }
410
411    private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
412        DestinationView candidate = map.remove(key);
413        if (candidate != null && view == null) {
414            view = candidate;
415        }
416        return candidate != null ? candidate : view;
417    }
418
419    protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
420        ActiveMQDestination dest = info.getDestination();
421        if (dest.isQueue()) {
422            if (dest.isTemporary()) {
423                temporaryQueueSubscribers.put(key, view);
424            } else {
425                queueSubscribers.put(key, view);
426            }
427        } else {
428            if (dest.isTemporary()) {
429                temporaryTopicSubscribers.put(key, view);
430            } else {
431                if (info.isDurable()) {
432                    durableTopicSubscribers.put(key, view);
433                    // unregister any inactive durable subs
434                    try {
435                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
436                        if (inactiveName != null) {
437                            inactiveDurableTopicSubscribers.remove(inactiveName);
438                            registeredMBeans.remove(inactiveName);
439                            managementContext.unregisterMBean(inactiveName);
440                        }
441                    } catch (Throwable e) {
442                        LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e);
443                    }
444                } else {
445                    topicSubscribers.put(key, view);
446                }
447            }
448        }
449
450        try {
451            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
452            registeredMBeans.add(key);
453        } catch (Throwable e) {
454            LOG.warn("Failed to register MBean {}", key);
455            LOG.debug("Failure reason: ", e);
456        }
457    }
458
459    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
460        queueSubscribers.remove(key);
461        topicSubscribers.remove(key);
462        temporaryQueueSubscribers.remove(key);
463        temporaryTopicSubscribers.remove(key);
464        if (registeredMBeans.remove(key)) {
465            try {
466                managementContext.unregisterMBean(key);
467            } catch (Throwable e) {
468                LOG.warn("Failed to unregister MBean {}", key);
469                LOG.debug("Failure reason: ", e);
470            }
471        }
472        DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
473        if (view != null) {
474            // need to put this back in the inactive list
475            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
476            if (addToInactive) {
477                SubscriptionInfo info = new SubscriptionInfo();
478                info.setClientId(subscriptionKey.getClientId());
479                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
480                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
481                info.setSelector(view.getSelector());
482                addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
483            }
484        }
485    }
486
487    protected void buildExistingSubscriptions() throws Exception {
488        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
489        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
490        if (destinations != null) {
491            for (ActiveMQDestination dest : destinations) {
492                if (dest.isTopic()) {
493                    SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
494                    if (infos != null) {
495                        for (int i = 0; i < infos.length; i++) {
496                            SubscriptionInfo info = infos[i];
497                            SubscriptionKey key = new SubscriptionKey(info);
498                            if (!alreadyKnown(key)) {
499                                LOG.debug("Restoring durable subscription MBean {}", info);
500                                subscriptions.put(key, info);
501                            }
502                        }
503                    }
504                }
505            }
506        }
507
508        for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
509            addInactiveSubscription(entry.getKey(), entry.getValue(), null);
510        }
511    }
512
513    private boolean alreadyKnown(SubscriptionKey key) {
514        boolean known = false;
515        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
516        LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not"));
517        return known;
518    }
519
520    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
521        try {
522            ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
523            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
524            SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
525
526            try {
527                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
528                registeredMBeans.add(objectName);
529            } catch (Throwable e) {
530                LOG.warn("Failed to register MBean {}", key);
531                LOG.debug("Failure reason: ", e);
532            }
533
534            inactiveDurableTopicSubscribers.put(objectName, view);
535            subscriptionKeys.put(key, objectName);
536        } catch (Exception e) {
537            LOG.error("Failed to register subscription {}", info, e);
538        }
539    }
540
541    public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
542        List<Message> messages = getSubscriberMessages(view);
543        CompositeData c[] = new CompositeData[messages.size()];
544        for (int i = 0; i < c.length; i++) {
545            try {
546                c[i] = OpenTypeSupport.convert(messages.get(i));
547            } catch (Throwable e) {
548                LOG.error("Failed to browse: {}", view, e);
549            }
550        }
551        return c;
552    }
553
554    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
555        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
556        List<Message> messages = getSubscriberMessages(view);
557        CompositeType ct = factory.getCompositeType();
558        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
559        TabularDataSupport rc = new TabularDataSupport(tt);
560        for (int i = 0; i < messages.size(); i++) {
561            rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
562        }
563        return rc;
564    }
565
566    protected List<Message> getSubscriberMessages(SubscriptionView view) {
567        // TODO It is very dangerous operation for big backlogs
568        if (!(destinationFactory instanceof DestinationFactoryImpl)) {
569            throw new RuntimeException("unsupported by " + destinationFactory);
570        }
571        PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
572        final List<Message> result = new ArrayList<Message>();
573        try {
574            ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
575            TopicMessageStore store = adapter.createTopicMessageStore(topic);
576            store.recover(new MessageRecoveryListener() {
577                @Override
578                public boolean recoverMessage(Message message) throws Exception {
579                    result.add(message);
580                    return true;
581                }
582
583                @Override
584                public boolean recoverMessageReference(MessageId messageReference) throws Exception {
585                    throw new RuntimeException("Should not be called.");
586                }
587
588                @Override
589                public boolean hasSpace() {
590                    return true;
591                }
592
593                @Override
594                public boolean isDuplicate(MessageId id) {
595                    return false;
596                }
597            });
598        } catch (Throwable e) {
599            LOG.error("Failed to browse messages for Subscription {}", view, e);
600        }
601        return result;
602
603    }
604
605    protected ObjectName[] getTopics() {
606        Set<ObjectName> set = topics.keySet();
607        return set.toArray(new ObjectName[set.size()]);
608    }
609
610    protected ObjectName[] getQueues() {
611        Set<ObjectName> set = queues.keySet();
612        return set.toArray(new ObjectName[set.size()]);
613    }
614
615    protected ObjectName[] getTemporaryTopics() {
616        Set<ObjectName> set = temporaryTopics.keySet();
617        return set.toArray(new ObjectName[set.size()]);
618    }
619
620    protected ObjectName[] getTemporaryQueues() {
621        Set<ObjectName> set = temporaryQueues.keySet();
622        return set.toArray(new ObjectName[set.size()]);
623    }
624
625    protected ObjectName[] getTopicSubscribers() {
626        Set<ObjectName> set = topicSubscribers.keySet();
627        return set.toArray(new ObjectName[set.size()]);
628    }
629
630    protected ObjectName[] getDurableTopicSubscribers() {
631        Set<ObjectName> set = durableTopicSubscribers.keySet();
632        return set.toArray(new ObjectName[set.size()]);
633    }
634
635    protected ObjectName[] getQueueSubscribers() {
636        Set<ObjectName> set = queueSubscribers.keySet();
637        return set.toArray(new ObjectName[set.size()]);
638    }
639
640    protected ObjectName[] getTemporaryTopicSubscribers() {
641        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
642        return set.toArray(new ObjectName[set.size()]);
643    }
644
645    protected ObjectName[] getTemporaryQueueSubscribers() {
646        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
647        return set.toArray(new ObjectName[set.size()]);
648    }
649
650    protected ObjectName[] getInactiveDurableTopicSubscribers() {
651        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
652        return set.toArray(new ObjectName[set.size()]);
653    }
654
655    protected ObjectName[] getTopicProducers() {
656        Set<ObjectName> set = topicProducers.keySet();
657        return set.toArray(new ObjectName[set.size()]);
658    }
659
660    protected ObjectName[] getQueueProducers() {
661        Set<ObjectName> set = queueProducers.keySet();
662        return set.toArray(new ObjectName[set.size()]);
663    }
664
665    protected ObjectName[] getTemporaryTopicProducers() {
666        Set<ObjectName> set = temporaryTopicProducers.keySet();
667        return set.toArray(new ObjectName[set.size()]);
668    }
669
670    protected ObjectName[] getTemporaryQueueProducers() {
671        Set<ObjectName> set = temporaryQueueProducers.keySet();
672        return set.toArray(new ObjectName[set.size()]);
673    }
674
675    protected ObjectName[] getDynamicDestinationProducers() {
676        Set<ObjectName> set = dynamicDestinationProducers.keySet();
677        return set.toArray(new ObjectName[set.size()]);
678    }
679
680    public Broker getContextBroker() {
681        return contextBroker;
682    }
683
684    public void setContextBroker(Broker contextBroker) {
685        this.contextBroker = contextBroker;
686    }
687
688    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
689        ObjectName objectName = null;
690        try {
691            objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
692            if (!registeredMBeans.contains(objectName))  {
693
694                AbortSlowConsumerStrategyView view = null;
695                if (strategy instanceof AbortSlowAckConsumerStrategy) {
696                    view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
697                } else {
698                    view = new AbortSlowConsumerStrategyView(this, strategy);
699                }
700
701                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
702                registeredMBeans.add(objectName);
703            }
704        } catch (Exception e) {
705            LOG.warn("Failed to register MBean {}", strategy);
706            LOG.debug("Failure reason: ", e);
707        }
708        return objectName;
709    }
710
711    public void registerRecoveredTransactionMBean(XATransaction transaction) {
712        try {
713            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
714            if (!registeredMBeans.contains(objectName))  {
715                RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
716                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
717                registeredMBeans.add(objectName);
718            }
719        } catch (Exception e) {
720            LOG.warn("Failed to register prepared transaction MBean {}", transaction);
721            LOG.debug("Failure reason: ", e);
722        }
723    }
724
725    public void unregister(XATransaction transaction) {
726        try {
727            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
728            if (registeredMBeans.remove(objectName)) {
729                try {
730                    managementContext.unregisterMBean(objectName);
731                } catch (Throwable e) {
732                    LOG.warn("Failed to unregister MBean {}", objectName);
733                    LOG.debug("Failure reason: ", e);
734                }
735            }
736        } catch (Exception e) {
737            LOG.warn("Failed to create object name to unregister {}", transaction, e);
738        }
739    }
740
741    public ObjectName getSubscriberObjectName(Subscription key) {
742        return subscriptionMap.get(key);
743    }
744
745    public Subscription getSubscriber(ObjectName key) {
746        Subscription sub = null;
747        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
748            if (entry.getValue().equals(key)) {
749                sub = entry.getKey();
750                break;
751            }
752        }
753        return sub;
754    }
755
756    public Map<ObjectName, DestinationView> getQueueViews() {
757        return queues;
758    }
759}