001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import org.apache.activemq.filter.BooleanExpression; 023import org.apache.activemq.state.CommandVisitor; 024 025/** 026 * @openwire:marshaller code="5" 027 * 028 */ 029public class ConsumerInfo extends BaseCommand { 030 031 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO; 032 033 public static final byte HIGH_PRIORITY = 10; 034 public static final byte NORMAL_PRIORITY = 0; 035 public static final byte NETWORK_CONSUMER_PRIORITY = -5; 036 public static final byte LOW_PRIORITY = -10; 037 038 protected ConsumerId consumerId; 039 protected ActiveMQDestination destination; 040 protected int prefetchSize; 041 protected int maximumPendingMessageLimit; 042 protected boolean browser; 043 protected boolean dispatchAsync; 044 protected String selector; 045 protected String clientId; 046 protected String subscriptionName; 047 protected boolean noLocal; 048 protected boolean exclusive; 049 protected boolean retroactive; 050 protected byte priority; 051 protected BrokerId[] brokerPath; 052 protected boolean optimizedAcknowledge; 053 // used by the broker 054 protected transient int currentPrefetchSize; 055 // if true, the consumer will not send range 056 protected boolean noRangeAcks; 057 // acks. 058 059 protected BooleanExpression additionalPredicate; 060 protected transient boolean networkSubscription; // this subscription 061 protected transient List<ConsumerId> networkConsumerIds; // the original consumerId 062 063 // not marshalled, populated from RemoveInfo, the last message delivered, used 064 // to suppress redelivery on prefetched messages after close 065 private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; 066 private transient long assignedGroupCount; 067 // originated from a 068 // network connection 069 070 public ConsumerInfo() { 071 } 072 073 public ConsumerInfo(ConsumerId consumerId) { 074 this.consumerId = consumerId; 075 } 076 077 public ConsumerInfo(SessionInfo sessionInfo, long consumerId) { 078 this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId); 079 } 080 081 public ConsumerInfo copy() { 082 ConsumerInfo info = new ConsumerInfo(); 083 copy(info); 084 return info; 085 } 086 087 public void copy(ConsumerInfo info) { 088 super.copy(info); 089 info.consumerId = consumerId; 090 info.destination = destination; 091 info.prefetchSize = prefetchSize; 092 info.maximumPendingMessageLimit = maximumPendingMessageLimit; 093 info.browser = browser; 094 info.dispatchAsync = dispatchAsync; 095 info.selector = selector; 096 info.clientId = clientId; 097 info.subscriptionName = subscriptionName; 098 info.noLocal = noLocal; 099 info.exclusive = exclusive; 100 info.retroactive = retroactive; 101 info.priority = priority; 102 info.brokerPath = brokerPath; 103 info.networkSubscription = networkSubscription; 104 if (networkConsumerIds != null) { 105 if (info.networkConsumerIds==null){ 106 info.networkConsumerIds=new ArrayList<ConsumerId>(); 107 } 108 info.networkConsumerIds.addAll(networkConsumerIds); 109 } 110 } 111 112 public boolean isDurable() { 113 return subscriptionName != null; 114 } 115 116 @Override 117 public byte getDataStructureType() { 118 return DATA_STRUCTURE_TYPE; 119 } 120 121 /** 122 * Is used to uniquely identify the consumer to the broker. 123 * 124 * @openwire:property version=1 cache=true 125 */ 126 public ConsumerId getConsumerId() { 127 return consumerId; 128 } 129 130 public void setConsumerId(ConsumerId consumerId) { 131 this.consumerId = consumerId; 132 } 133 134 /** 135 * Is this consumer a queue browser? 136 * 137 * @openwire:property version=1 138 */ 139 public boolean isBrowser() { 140 return browser; 141 } 142 143 public void setBrowser(boolean browser) { 144 this.browser = browser; 145 } 146 147 /** 148 * The destination that the consumer is interested in receiving messages 149 * from. This destination could be a composite destination. 150 * 151 * @openwire:property version=1 cache=true 152 */ 153 public ActiveMQDestination getDestination() { 154 return destination; 155 } 156 157 public void setDestination(ActiveMQDestination destination) { 158 this.destination = destination; 159 } 160 161 /** 162 * How many messages a broker will send to the client without receiving an 163 * ack before he stops dispatching messages to the client. 164 * 165 * @openwire:property version=1 166 */ 167 public int getPrefetchSize() { 168 return prefetchSize; 169 } 170 171 public void setPrefetchSize(int prefetchSize) { 172 this.prefetchSize = prefetchSize; 173 this.currentPrefetchSize = prefetchSize; 174 } 175 176 /** 177 * How many messages a broker will keep around, above the prefetch limit, 178 * for non-durable topics before starting to discard older messages. 179 * 180 * @openwire:property version=1 181 */ 182 public int getMaximumPendingMessageLimit() { 183 return maximumPendingMessageLimit; 184 } 185 186 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { 187 this.maximumPendingMessageLimit = maximumPendingMessageLimit; 188 } 189 190 /** 191 * Should the broker dispatch a message to the consumer async? If he does it 192 * async, then he uses a more SEDA style of processing while if it is not 193 * done async, then he broker use a STP style of processing. STP is more 194 * appropriate in high bandwidth situations or when being used by and in vm 195 * transport. 196 * 197 * @openwire:property version=1 198 */ 199 public boolean isDispatchAsync() { 200 return dispatchAsync; 201 } 202 203 public void setDispatchAsync(boolean dispatchAsync) { 204 this.dispatchAsync = dispatchAsync; 205 } 206 207 /** 208 * The JMS selector used to filter out messages that this consumer is 209 * interested in. 210 * 211 * @openwire:property version=1 212 */ 213 public String getSelector() { 214 return selector; 215 } 216 217 public void setSelector(String selector) { 218 this.selector = selector; 219 } 220 221 /** 222 * Used to identify the id of a client connection. 223 * 224 * @openwire:property version=10 225 */ 226 public String getClientId() { 227 return clientId; 228 } 229 230 public void setClientId(String clientId) { 231 this.clientId = clientId; 232 } 233 234 /** 235 * Used to identify the name of a durable subscription. 236 * 237 * @openwire:property version=1 238 */ 239 public String getSubscriptionName() { 240 return subscriptionName; 241 } 242 243 public void setSubscriptionName(String durableSubscriptionId) { 244 this.subscriptionName = durableSubscriptionId; 245 } 246 247 /** 248 * Set noLocal to true to avoid receiving messages that were published 249 * locally on the same connection. 250 * 251 * @openwire:property version=1 252 */ 253 public boolean isNoLocal() { 254 return noLocal; 255 } 256 257 public void setNoLocal(boolean noLocal) { 258 this.noLocal = noLocal; 259 } 260 261 /** 262 * An exclusive consumer locks out other consumers from being able to 263 * receive messages from the destination. If there are multiple exclusive 264 * consumers for a destination, the first one created will be the exclusive 265 * consumer of the destination. 266 * 267 * @openwire:property version=1 268 */ 269 public boolean isExclusive() { 270 return exclusive; 271 } 272 273 public void setExclusive(boolean exclusive) { 274 this.exclusive = exclusive; 275 } 276 277 /** 278 * A retroactive consumer only has meaning for Topics. It allows a consumer 279 * to retroactively see messages sent prior to the consumer being created. 280 * If the consumer is not durable, it will be delivered the last message 281 * published to the topic. If the consumer is durable then it will receive 282 * all persistent messages that are still stored in persistent storage for 283 * that topic. 284 * 285 * @openwire:property version=1 286 */ 287 public boolean isRetroactive() { 288 return retroactive; 289 } 290 291 public void setRetroactive(boolean retroactive) { 292 this.retroactive = retroactive; 293 } 294 295 public RemoveInfo createRemoveCommand() { 296 RemoveInfo command = new RemoveInfo(getConsumerId()); 297 command.setResponseRequired(isResponseRequired()); 298 return command; 299 } 300 301 /** 302 * The broker will avoid dispatching to a lower priority consumer if there 303 * are other higher priority consumers available to dispatch to. This allows 304 * letting the broker to have an affinity to higher priority consumers. 305 * Default priority is 0. 306 * 307 * @openwire:property version=1 308 */ 309 public byte getPriority() { 310 return priority; 311 } 312 313 public void setPriority(byte priority) { 314 this.priority = priority; 315 } 316 317 /** 318 * The route of brokers the command has moved through. 319 * 320 * @openwire:property version=1 cache=true 321 */ 322 public BrokerId[] getBrokerPath() { 323 return brokerPath; 324 } 325 326 public void setBrokerPath(BrokerId[] brokerPath) { 327 this.brokerPath = brokerPath; 328 } 329 330 /** 331 * A transient additional predicate that can be used it inject additional 332 * predicates into the selector on the fly. Handy if if say a Security 333 * Broker interceptor wants to filter out messages based on security level 334 * of the consumer. 335 * 336 * @openwire:property version=1 337 */ 338 public BooleanExpression getAdditionalPredicate() { 339 return additionalPredicate; 340 } 341 342 public void setAdditionalPredicate(BooleanExpression additionalPredicate) { 343 this.additionalPredicate = additionalPredicate; 344 } 345 346 @Override 347 public Response visit(CommandVisitor visitor) throws Exception { 348 return visitor.processAddConsumer(this); 349 } 350 351 /** 352 * @openwire:property version=1 353 * @return Returns the networkSubscription. 354 */ 355 public boolean isNetworkSubscription() { 356 return networkSubscription; 357 } 358 359 /** 360 * @param networkSubscription The networkSubscription to set. 361 */ 362 public void setNetworkSubscription(boolean networkSubscription) { 363 this.networkSubscription = networkSubscription; 364 } 365 366 /** 367 * @openwire:property version=1 368 * @return Returns the optimizedAcknowledge. 369 */ 370 public boolean isOptimizedAcknowledge() { 371 return optimizedAcknowledge; 372 } 373 374 /** 375 * @param optimizedAcknowledge The optimizedAcknowledge to set. 376 */ 377 public void setOptimizedAcknowledge(boolean optimizedAcknowledge) { 378 this.optimizedAcknowledge = optimizedAcknowledge; 379 } 380 381 /** 382 * @return Returns the currentPrefetchSize. 383 */ 384 public int getCurrentPrefetchSize() { 385 return currentPrefetchSize; 386 } 387 388 /** 389 * @param currentPrefetchSize The currentPrefetchSize to set. 390 */ 391 public void setCurrentPrefetchSize(int currentPrefetchSize) { 392 this.currentPrefetchSize = currentPrefetchSize; 393 } 394 395 /** 396 * The broker may be able to optimize it's processing or provides better QOS 397 * if it knows the consumer will not be sending ranged acks. 398 * 399 * @return true if the consumer will not send range acks. 400 * @openwire:property version=1 401 */ 402 public boolean isNoRangeAcks() { 403 return noRangeAcks; 404 } 405 406 public void setNoRangeAcks(boolean noRangeAcks) { 407 this.noRangeAcks = noRangeAcks; 408 } 409 410 public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) { 411 if (networkConsumerIds == null) { 412 networkConsumerIds = new ArrayList<ConsumerId>(); 413 } 414 networkConsumerIds.add(networkConsumerId); 415 } 416 417 public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) { 418 if (networkConsumerIds != null) { 419 networkConsumerIds.remove(networkConsumerId); 420 if (networkConsumerIds.isEmpty()) { 421 networkConsumerIds=null; 422 } 423 } 424 } 425 426 public synchronized boolean isNetworkConsumersEmpty() { 427 return networkConsumerIds == null || networkConsumerIds.isEmpty(); 428 } 429 430 public synchronized List<ConsumerId> getNetworkConsumerIds(){ 431 List<ConsumerId> result = new ArrayList<ConsumerId>(); 432 if (networkConsumerIds != null) { 433 result.addAll(networkConsumerIds); 434 } 435 return result; 436 } 437 438 @Override 439 public int hashCode() { 440 return (consumerId == null) ? 0 : consumerId.hashCode(); 441 } 442 443 @Override 444 public boolean equals(Object obj) { 445 if (this == obj) { 446 return true; 447 } 448 if (obj == null) { 449 return false; 450 } 451 if (getClass() != obj.getClass()) { 452 return false; 453 } 454 455 ConsumerInfo other = (ConsumerInfo) obj; 456 457 if (consumerId == null && other.consumerId != null) { 458 return false; 459 } else if (!consumerId.equals(other.consumerId)) { 460 return false; 461 } 462 return true; 463 } 464 465 /** 466 * Tracks the original subscription id that causes a subscription to 467 * percolate through a network when networkTTL > 1. Tracking the original 468 * subscription allows duplicate suppression. 469 * 470 * @return array of the current subscription path 471 * @openwire:property version=4 472 */ 473 public ConsumerId[] getNetworkConsumerPath() { 474 ConsumerId[] result = null; 475 if (networkConsumerIds != null) { 476 result = networkConsumerIds.toArray(new ConsumerId[0]); 477 } 478 return result; 479 } 480 481 public void setNetworkConsumerPath(ConsumerId[] consumerPath) { 482 if (consumerPath != null) { 483 for (int i=0; i<consumerPath.length; i++) { 484 addNetworkConsumerId(consumerPath[i]); 485 } 486 } 487 } 488 489 public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) { 490 this.lastDeliveredSequenceId = lastDeliveredSequenceId; 491 } 492 493 public long getLastDeliveredSequenceId() { 494 return lastDeliveredSequenceId; 495 } 496 497 public void incrementAssignedGroupCount() { 498 this.assignedGroupCount++; 499 } 500 501 public void clearAssignedGroupCount() { 502 this.assignedGroupCount=0; 503 } 504 505 public void decrementAssignedGroupCount() { 506 this.assignedGroupCount--; 507 } 508 509 public long getAssignedGroupCount() { 510 return assignedGroupCount; 511 } 512 513}