001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.ArrayList; 020 021import javax.jms.Destination; 022import javax.jms.JMSException; 023 024import org.apache.activemq.ActiveMQMessageTransformation; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ActiveMQTopic; 027 028public final class AdvisorySupport { 029 public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory."; 030 public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX 031 + "Connection"); 032 public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue"); 033 public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic"); 034 public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue"); 035 public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic"); 036 public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer."; 037 public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; 038 public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; 039 public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; 040 public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; 041 public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; 042 public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic."; 043 public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue."; 044 public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic."; 045 public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue."; 046 public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer."; 047 public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer."; 048 public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded."; 049 public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL."; 050 public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered."; 051 public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed."; 052 public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd."; 053 public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker"; 054 public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge"; 055 public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure"; 056 public static final String AGENT_TOPIC = "ActiveMQ.Agent"; 057 public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; 058 public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId"; 059 public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName"; 060 public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL"; 061 public static final String MSG_PROPERTY_USAGE_NAME = "usageName"; 062 public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId"; 063 public static final String MSG_PROPERTY_PRODUCER_ID = "producerId"; 064 public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId"; 065 public static final String MSG_PROPERTY_DESTINATION = "orignalDestination"; 066 public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount"; 067 public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount"; 068 069 public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( 070 TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + 071 TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName()); 072 public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( 073 TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName()); 074 private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC); 075 076 private AdvisorySupport() { 077 } 078 079 public static ActiveMQTopic getConnectionAdvisoryTopic() { 080 return CONNECTION_ADVISORY_TOPIC; 081 } 082 083 public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException { 084 return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination)); 085 } 086 087 public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException { 088 ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>(); 089 090 result.add(getConsumerAdvisoryTopic(destination)); 091 result.add(getProducerAdvisoryTopic(destination)); 092 result.add(getExpiredMessageTopic(destination)); 093 result.add(getNoConsumersAdvisoryTopic(destination)); 094 result.add(getSlowConsumerAdvisoryTopic(destination)); 095 result.add(getFastProducerAdvisoryTopic(destination)); 096 result.add(getMessageDiscardedAdvisoryTopic(destination)); 097 result.add(getMessageDeliveredAdvisoryTopic(destination)); 098 result.add(getMessageConsumedAdvisoryTopic(destination)); 099 result.add(getMessageDLQdAdvisoryTopic(destination)); 100 result.add(getFullAdvisoryTopic(destination)); 101 102 return result.toArray(new ActiveMQTopic[0]); 103 } 104 105 public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException { 106 return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 107 } 108 109 public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) { 110 String prefix; 111 if (destination.isQueue()) { 112 prefix = QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX; 113 } else { 114 prefix = TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX; 115 } 116 return getAdvisoryTopic(destination, prefix, true); 117 } 118 119 public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { 120 return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 121 } 122 123 public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) { 124 String prefix; 125 if (destination.isQueue()) { 126 prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX; 127 } else { 128 prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX; 129 } 130 return getAdvisoryTopic(destination, prefix, false); 131 } 132 133 private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) { 134 return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚")); 135 } 136 137 public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException { 138 return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination)); 139 } 140 141 public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) { 142 if (destination.isQueue()) { 143 return getExpiredQueueMessageAdvisoryTopic(destination); 144 } 145 return getExpiredTopicMessageAdvisoryTopic(destination); 146 } 147 148 public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) { 149 String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName(); 150 return new ActiveMQTopic(name); 151 } 152 153 public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException { 154 return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 155 } 156 157 public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) { 158 String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName(); 159 return new ActiveMQTopic(name); 160 } 161 162 public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException { 163 return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination)); 164 } 165 166 public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) { 167 if (destination.isQueue()) { 168 return getNoQueueConsumersAdvisoryTopic(destination); 169 } 170 return getNoTopicConsumersAdvisoryTopic(destination); 171 } 172 173 public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException { 174 return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 175 } 176 177 public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) { 178 String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName(); 179 return new ActiveMQTopic(name); 180 } 181 182 public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException { 183 return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 184 } 185 186 public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) { 187 String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName(); 188 return new ActiveMQTopic(name); 189 } 190 191 public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { 192 return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 193 } 194 195 public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) { 196 String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 197 + destination.getPhysicalName(); 198 return new ActiveMQTopic(name); 199 } 200 201 public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException { 202 return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 203 } 204 205 public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) { 206 String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 207 + destination.getPhysicalName(); 208 return new ActiveMQTopic(name); 209 } 210 211 public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException { 212 return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 213 } 214 215 public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) { 216 String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 217 + destination.getPhysicalName(); 218 return new ActiveMQTopic(name); 219 } 220 221 public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException { 222 return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 223 } 224 225 public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) { 226 String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 227 + destination.getPhysicalName(); 228 return new ActiveMQTopic(name); 229 } 230 231 public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException { 232 return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 233 } 234 235 public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) { 236 String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 237 + destination.getPhysicalName(); 238 return new ActiveMQTopic(name); 239 } 240 241 public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) { 242 String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 243 + destination.getPhysicalName(); 244 return new ActiveMQTopic(name); 245 } 246 247 public static ActiveMQTopic getMasterBrokerAdvisoryTopic() { 248 return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX); 249 } 250 251 public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() { 252 return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX); 253 } 254 255 public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException { 256 return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 257 } 258 259 public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) { 260 String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 261 + destination.getPhysicalName(); 262 return new ActiveMQTopic(name); 263 } 264 265 public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException { 266 return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 267 } 268 269 public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) { 270 switch (destination.getDestinationType()) { 271 case ActiveMQDestination.QUEUE_TYPE: 272 return QUEUE_ADVISORY_TOPIC; 273 case ActiveMQDestination.TOPIC_TYPE: 274 return TOPIC_ADVISORY_TOPIC; 275 case ActiveMQDestination.TEMP_QUEUE_TYPE: 276 return TEMP_QUEUE_ADVISORY_TOPIC; 277 case ActiveMQDestination.TEMP_TOPIC_TYPE: 278 return TEMP_TOPIC_ADVISORY_TOPIC; 279 default: 280 throw new RuntimeException("Unknown destination type: " + destination.getDestinationType()); 281 } 282 } 283 284 public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException { 285 return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 286 } 287 288 public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) { 289 if (destination.isComposite()) { 290 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 291 for (int i = 0; i < compositeDestinations.length; i++) { 292 if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) { 293 return false; 294 } 295 } 296 return true; 297 } else { 298 return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC); 299 } 300 } 301 302 public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) { 303 if (destination.isComposite()) { 304 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 305 for (int i = 0; i < compositeDestinations.length; i++) { 306 if (isDestinationAdvisoryTopic(compositeDestinations[i])) { 307 return true; 308 } 309 } 310 return false; 311 } else { 312 return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) 313 || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC); 314 } 315 } 316 317 public static boolean isAdvisoryTopic(Destination destination) throws JMSException { 318 return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 319 } 320 321 public static boolean isAdvisoryTopic(ActiveMQDestination destination) { 322 if (destination != null) { 323 if (destination.isComposite()) { 324 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 325 for (int i = 0; i < compositeDestinations.length; i++) { 326 if (isAdvisoryTopic(compositeDestinations[i])) { 327 return true; 328 } 329 } 330 return false; 331 } else { 332 return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX); 333 } 334 } 335 return false; 336 } 337 338 public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException { 339 return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 340 } 341 342 public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) { 343 if (destination.isComposite()) { 344 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 345 for (int i = 0; i < compositeDestinations.length; i++) { 346 if (isConnectionAdvisoryTopic(compositeDestinations[i])) { 347 return true; 348 } 349 } 350 return false; 351 } else { 352 return destination.equals(CONNECTION_ADVISORY_TOPIC); 353 } 354 } 355 356 public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException { 357 return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 358 } 359 360 public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) { 361 if (destination.isComposite()) { 362 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 363 for (int i = 0; i < compositeDestinations.length; i++) { 364 if (isProducerAdvisoryTopic(compositeDestinations[i])) { 365 return true; 366 } 367 } 368 return false; 369 } else { 370 return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX); 371 } 372 } 373 374 public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException { 375 return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 376 } 377 378 public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) { 379 if (destination.isComposite()) { 380 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 381 for (int i = 0; i < compositeDestinations.length; i++) { 382 if (isConsumerAdvisoryTopic(compositeDestinations[i])) { 383 return true; 384 } 385 } 386 return false; 387 } else { 388 return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX); 389 } 390 } 391 392 public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { 393 return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 394 } 395 396 public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) { 397 if (destination.isComposite()) { 398 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 399 for (int i = 0; i < compositeDestinations.length; i++) { 400 if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) { 401 return true; 402 } 403 } 404 return false; 405 } else { 406 return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX); 407 } 408 } 409 410 public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException { 411 return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 412 } 413 414 public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) { 415 if (destination.isComposite()) { 416 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 417 for (int i = 0; i < compositeDestinations.length; i++) { 418 if (isFastProducerAdvisoryTopic(compositeDestinations[i])) { 419 return true; 420 } 421 } 422 return false; 423 } else { 424 return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX); 425 } 426 } 427 428 public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException { 429 return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 430 } 431 432 public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) { 433 if (destination.isComposite()) { 434 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 435 for (int i = 0; i < compositeDestinations.length; i++) { 436 if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) { 437 return true; 438 } 439 } 440 return false; 441 } else { 442 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX); 443 } 444 } 445 446 public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException { 447 return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 448 } 449 450 public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) { 451 if (destination.isComposite()) { 452 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 453 for (int i = 0; i < compositeDestinations.length; i++) { 454 if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) { 455 return true; 456 } 457 } 458 return false; 459 } else { 460 return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX); 461 } 462 } 463 464 public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException { 465 return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 466 } 467 468 public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) { 469 if (destination.isComposite()) { 470 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 471 for (int i = 0; i < compositeDestinations.length; i++) { 472 if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) { 473 return true; 474 } 475 } 476 return false; 477 } else { 478 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX); 479 } 480 } 481 482 public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException { 483 return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 484 } 485 486 public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) { 487 if (destination.isComposite()) { 488 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 489 for (int i = 0; i < compositeDestinations.length; i++) { 490 if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) { 491 return true; 492 } 493 } 494 return false; 495 } else { 496 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX); 497 } 498 } 499 500 public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException { 501 return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 502 } 503 504 public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) { 505 if (destination.isComposite()) { 506 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 507 for (int i = 0; i < compositeDestinations.length; i++) { 508 if (isFullAdvisoryTopic(compositeDestinations[i])) { 509 return true; 510 } 511 } 512 return false; 513 } else { 514 return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX); 515 } 516 } 517 518 public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException { 519 return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 520 } 521 522 public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) { 523 if (destination.isComposite()) { 524 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 525 for (int i = 0; i < compositeDestinations.length; i++) { 526 if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) { 527 return true; 528 } 529 } 530 return false; 531 } else { 532 return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX); 533 } 534 } 535 536 /** 537 * Returns the agent topic which is used to send commands to the broker 538 */ 539 public static Destination getAgentDestination() { 540 return AGENT_TOPIC_DESTINATION; 541 } 542 543 public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() { 544 return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX); 545 } 546}