001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.ArrayList;
020
021import javax.jms.Destination;
022import javax.jms.JMSException;
023
024import org.apache.activemq.ActiveMQMessageTransformation;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQTopic;
027
028public final class AdvisorySupport {
029    public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
030    public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
031            + "Connection");
032    public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
033    public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
034    public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
035    public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
036    public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
037    public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
038    public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
039    public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
040    public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
041    public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
042    public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
043    public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
044    public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
045    public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
046    public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
047    public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
048    public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
049    public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
050    public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
051    public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
052    public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
053    public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
054    public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
055    public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure";
056    public static final String AGENT_TOPIC = "ActiveMQ.Agent";
057    public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
058    public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
059    public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
060    public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
061    public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
062    public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
063    public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
064    public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
065    public static final String MSG_PROPERTY_DESTINATION = "orignalDestination";
066    public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
067    public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
068
069    public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
070            TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
071                    TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
072    public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
073            TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
074    private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
075
076    private AdvisorySupport() {
077    }
078
079    public static ActiveMQTopic getConnectionAdvisoryTopic() {
080        return CONNECTION_ADVISORY_TOPIC;
081    }
082
083    public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException {
084        return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination));
085    }
086
087    public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException {
088        ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>();
089
090        result.add(getConsumerAdvisoryTopic(destination));
091        result.add(getProducerAdvisoryTopic(destination));
092        result.add(getExpiredMessageTopic(destination));
093        result.add(getNoConsumersAdvisoryTopic(destination));
094        result.add(getSlowConsumerAdvisoryTopic(destination));
095        result.add(getFastProducerAdvisoryTopic(destination));
096        result.add(getMessageDiscardedAdvisoryTopic(destination));
097        result.add(getMessageDeliveredAdvisoryTopic(destination));
098        result.add(getMessageConsumedAdvisoryTopic(destination));
099        result.add(getMessageDLQdAdvisoryTopic(destination));
100        result.add(getFullAdvisoryTopic(destination));
101
102        return result.toArray(new ActiveMQTopic[0]);
103    }
104
105    public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
106        return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
107    }
108
109    public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
110        String prefix;
111        if (destination.isQueue()) {
112            prefix = QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX;
113        } else {
114            prefix = TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX;
115        }
116        return getAdvisoryTopic(destination, prefix, true);
117    }
118
119    public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
120        return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
121    }
122
123    public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
124        String prefix;
125        if (destination.isQueue()) {
126            prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
127        } else {
128            prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
129        }
130        return getAdvisoryTopic(destination, prefix, false);
131    }
132
133    private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) {
134        return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "&sbquo;"));
135    }
136
137    public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
138        return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
139    }
140
141    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
142        if (destination.isQueue()) {
143            return getExpiredQueueMessageAdvisoryTopic(destination);
144        }
145        return getExpiredTopicMessageAdvisoryTopic(destination);
146    }
147
148    public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
149        String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
150        return new ActiveMQTopic(name);
151    }
152
153    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
154        return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
155    }
156
157    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
158        String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
159        return new ActiveMQTopic(name);
160    }
161
162    public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException {
163        return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
164    }
165
166    public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) {
167        if (destination.isQueue()) {
168            return getNoQueueConsumersAdvisoryTopic(destination);
169        }
170        return getNoTopicConsumersAdvisoryTopic(destination);
171    }
172
173    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
174        return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
175    }
176
177    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
178        String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
179        return new ActiveMQTopic(name);
180    }
181
182    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
183        return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
184    }
185
186    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
187        String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
188        return new ActiveMQTopic(name);
189    }
190
191    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
192        return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
193    }
194
195    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
196        String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
197                + destination.getPhysicalName();
198        return new ActiveMQTopic(name);
199    }
200
201    public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
202        return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
203    }
204
205    public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
206        String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
207                + destination.getPhysicalName();
208        return new ActiveMQTopic(name);
209    }
210
211    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
212        return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
213    }
214
215    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
216        String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
217                + destination.getPhysicalName();
218        return new ActiveMQTopic(name);
219    }
220
221    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
222        return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
223    }
224
225    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
226        String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
227                + destination.getPhysicalName();
228        return new ActiveMQTopic(name);
229    }
230
231    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
232        return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
233    }
234
235    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
236        String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
237                + destination.getPhysicalName();
238        return new ActiveMQTopic(name);
239    }
240
241    public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
242        String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
243                + destination.getPhysicalName();
244        return new ActiveMQTopic(name);
245    }
246
247    public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
248        return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
249    }
250
251    public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
252        return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
253    }
254
255    public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
256        return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
257    }
258
259    public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
260        String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
261                + destination.getPhysicalName();
262        return new ActiveMQTopic(name);
263    }
264
265    public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
266        return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
267    }
268
269    public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
270        switch (destination.getDestinationType()) {
271            case ActiveMQDestination.QUEUE_TYPE:
272                return QUEUE_ADVISORY_TOPIC;
273            case ActiveMQDestination.TOPIC_TYPE:
274                return TOPIC_ADVISORY_TOPIC;
275            case ActiveMQDestination.TEMP_QUEUE_TYPE:
276                return TEMP_QUEUE_ADVISORY_TOPIC;
277            case ActiveMQDestination.TEMP_TOPIC_TYPE:
278                return TEMP_TOPIC_ADVISORY_TOPIC;
279            default:
280                throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
281        }
282    }
283
284    public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
285        return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
286    }
287
288    public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) {
289        if (destination.isComposite()) {
290            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
291            for (int i = 0; i < compositeDestinations.length; i++) {
292                if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
293                    return false;
294                }
295            }
296            return true;
297        } else {
298            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
299        }
300    }
301
302    public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
303        if (destination.isComposite()) {
304            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
305            for (int i = 0; i < compositeDestinations.length; i++) {
306                if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
307                    return true;
308                }
309            }
310            return false;
311        } else {
312            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
313                    || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
314        }
315    }
316
317    public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
318        return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
319    }
320
321    public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
322        if (destination != null) {
323            if (destination.isComposite()) {
324                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
325                for (int i = 0; i < compositeDestinations.length; i++) {
326                    if (isAdvisoryTopic(compositeDestinations[i])) {
327                        return true;
328                    }
329                }
330                return false;
331            } else {
332                return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
333            }
334        }
335        return false;
336    }
337
338    public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
339        return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
340    }
341
342    public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
343        if (destination.isComposite()) {
344            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
345            for (int i = 0; i < compositeDestinations.length; i++) {
346                if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
347                    return true;
348                }
349            }
350            return false;
351        } else {
352            return destination.equals(CONNECTION_ADVISORY_TOPIC);
353        }
354    }
355
356    public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
357        return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
358    }
359
360    public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
361        if (destination.isComposite()) {
362            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
363            for (int i = 0; i < compositeDestinations.length; i++) {
364                if (isProducerAdvisoryTopic(compositeDestinations[i])) {
365                    return true;
366                }
367            }
368            return false;
369        } else {
370            return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
371        }
372    }
373
374    public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
375        return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
376    }
377
378    public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
379        if (destination.isComposite()) {
380            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
381            for (int i = 0; i < compositeDestinations.length; i++) {
382                if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
383                    return true;
384                }
385            }
386            return false;
387        } else {
388            return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
389        }
390    }
391
392    public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
393        return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
394    }
395
396    public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
397        if (destination.isComposite()) {
398            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
399            for (int i = 0; i < compositeDestinations.length; i++) {
400                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
401                    return true;
402                }
403            }
404            return false;
405        } else {
406            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
407        }
408    }
409
410    public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
411        return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
412    }
413
414    public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
415        if (destination.isComposite()) {
416            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
417            for (int i = 0; i < compositeDestinations.length; i++) {
418                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
419                    return true;
420                }
421            }
422            return false;
423        } else {
424            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
425        }
426    }
427
428    public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
429        return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
430    }
431
432    public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
433        if (destination.isComposite()) {
434            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
435            for (int i = 0; i < compositeDestinations.length; i++) {
436                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
437                    return true;
438                }
439            }
440            return false;
441        } else {
442            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
443        }
444    }
445
446    public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
447        return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
448    }
449
450    public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
451        if (destination.isComposite()) {
452            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
453            for (int i = 0; i < compositeDestinations.length; i++) {
454                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
455                    return true;
456                }
457            }
458            return false;
459        } else {
460            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
461        }
462    }
463
464    public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
465        return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
466    }
467
468    public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
469        if (destination.isComposite()) {
470            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
471            for (int i = 0; i < compositeDestinations.length; i++) {
472                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
473                    return true;
474                }
475            }
476            return false;
477        } else {
478            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
479        }
480    }
481
482    public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
483        return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
484    }
485
486    public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
487        if (destination.isComposite()) {
488            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
489            for (int i = 0; i < compositeDestinations.length; i++) {
490                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
491                    return true;
492                }
493            }
494            return false;
495        } else {
496            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
497        }
498    }
499
500    public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
501        return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
502    }
503
504    public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
505        if (destination.isComposite()) {
506            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
507            for (int i = 0; i < compositeDestinations.length; i++) {
508                if (isFullAdvisoryTopic(compositeDestinations[i])) {
509                    return true;
510                }
511            }
512            return false;
513        } else {
514            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
515        }
516    }
517
518    public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException {
519        return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
520    }
521
522    public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
523        if (destination.isComposite()) {
524            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
525            for (int i = 0; i < compositeDestinations.length; i++) {
526                if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
527                    return true;
528                }
529            }
530            return false;
531        } else {
532            return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
533        }
534    }
535
536    /**
537     * Returns the agent topic which is used to send commands to the broker
538     */
539    public static Destination getAgentDestination() {
540        return AGENT_TOPIC_DESTINATION;
541    }
542
543    public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() {
544        return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX);
545    }
546}