001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import org.apache.activemq.filter.BooleanExpression;
023import org.apache.activemq.state.CommandVisitor;
024
025/**
026 * @openwire:marshaller code="5"
027 *
028 */
029public class ConsumerInfo extends BaseCommand {
030
031    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032
033    public static final byte HIGH_PRIORITY = 10;
034    public static final byte NORMAL_PRIORITY = 0;
035    public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036    public static final byte LOW_PRIORITY = -10;
037
038    protected ConsumerId consumerId;
039    protected ActiveMQDestination destination;
040    protected int prefetchSize;
041    protected int maximumPendingMessageLimit;
042    protected boolean browser;
043    protected boolean dispatchAsync;
044    protected String selector;
045    protected String clientId;
046    protected String subscriptionName;
047    protected boolean noLocal;
048    protected boolean exclusive;
049    protected boolean retroactive;
050    protected byte priority;
051    protected BrokerId[] brokerPath;
052    protected boolean optimizedAcknowledge;
053    // used by the broker
054    protected transient int currentPrefetchSize;
055    // if true, the consumer will not send range
056    protected boolean noRangeAcks;
057    // acks.
058
059    protected BooleanExpression additionalPredicate;
060    protected transient boolean networkSubscription; // this subscription
061    protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
062
063    // not marshalled, populated from RemoveInfo, the last message delivered, used
064    // to suppress redelivery on prefetched messages after close
065    private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
066    private transient long assignedGroupCount;
067    // originated from a
068    // network connection
069
070    public ConsumerInfo() {
071    }
072
073    public ConsumerInfo(ConsumerId consumerId) {
074        this.consumerId = consumerId;
075    }
076
077    public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
078        this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
079    }
080
081    public ConsumerInfo copy() {
082        ConsumerInfo info = new ConsumerInfo();
083        copy(info);
084        return info;
085    }
086
087    public void copy(ConsumerInfo info) {
088        super.copy(info);
089        info.consumerId = consumerId;
090        info.destination = destination;
091        info.prefetchSize = prefetchSize;
092        info.maximumPendingMessageLimit = maximumPendingMessageLimit;
093        info.browser = browser;
094        info.dispatchAsync = dispatchAsync;
095        info.selector = selector;
096        info.clientId = clientId;
097        info.subscriptionName = subscriptionName;
098        info.noLocal = noLocal;
099        info.exclusive = exclusive;
100        info.retroactive = retroactive;
101        info.priority = priority;
102        info.brokerPath = brokerPath;
103        info.networkSubscription = networkSubscription;
104        if (networkConsumerIds != null) {
105            if (info.networkConsumerIds==null){
106                info.networkConsumerIds=new ArrayList<ConsumerId>();
107            }
108            info.networkConsumerIds.addAll(networkConsumerIds);
109        }
110    }
111
112    public boolean isDurable() {
113        return subscriptionName != null;
114    }
115
116    @Override
117    public byte getDataStructureType() {
118        return DATA_STRUCTURE_TYPE;
119    }
120
121    /**
122     * Is used to uniquely identify the consumer to the broker.
123     *
124     * @openwire:property version=1 cache=true
125     */
126    public ConsumerId getConsumerId() {
127        return consumerId;
128    }
129
130    public void setConsumerId(ConsumerId consumerId) {
131        this.consumerId = consumerId;
132    }
133
134    /**
135     * Is this consumer a queue browser?
136     *
137     * @openwire:property version=1
138     */
139    public boolean isBrowser() {
140        return browser;
141    }
142
143    public void setBrowser(boolean browser) {
144        this.browser = browser;
145    }
146
147    /**
148     * The destination that the consumer is interested in receiving messages
149     * from. This destination could be a composite destination.
150     *
151     * @openwire:property version=1 cache=true
152     */
153    public ActiveMQDestination getDestination() {
154        return destination;
155    }
156
157    public void setDestination(ActiveMQDestination destination) {
158        this.destination = destination;
159    }
160
161    /**
162     * How many messages a broker will send to the client without receiving an
163     * ack before he stops dispatching messages to the client.
164     *
165     * @openwire:property version=1
166     */
167    public int getPrefetchSize() {
168        return prefetchSize;
169    }
170
171    public void setPrefetchSize(int prefetchSize) {
172        this.prefetchSize = prefetchSize;
173        this.currentPrefetchSize = prefetchSize;
174    }
175
176    /**
177     * How many messages a broker will keep around, above the prefetch limit,
178     * for non-durable topics before starting to discard older messages.
179     *
180     * @openwire:property version=1
181     */
182    public int getMaximumPendingMessageLimit() {
183        return maximumPendingMessageLimit;
184    }
185
186    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
187        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
188    }
189
190    /**
191     * Should the broker dispatch a message to the consumer async? If he does it
192     * async, then he uses a more SEDA style of processing while if it is not
193     * done async, then he broker use a STP style of processing. STP is more
194     * appropriate in high bandwidth situations or when being used by and in vm
195     * transport.
196     *
197     * @openwire:property version=1
198     */
199    public boolean isDispatchAsync() {
200        return dispatchAsync;
201    }
202
203    public void setDispatchAsync(boolean dispatchAsync) {
204        this.dispatchAsync = dispatchAsync;
205    }
206
207    /**
208     * The JMS selector used to filter out messages that this consumer is
209     * interested in.
210     *
211     * @openwire:property version=1
212     */
213    public String getSelector() {
214        return selector;
215    }
216
217    public void setSelector(String selector) {
218        this.selector = selector;
219    }
220
221    /**
222     * Used to identify the id of a client connection.
223     *
224     * @openwire:property version=10
225     */
226    public String getClientId() {
227        return clientId;
228    }
229
230    public void setClientId(String clientId) {
231        this.clientId = clientId;
232    }
233
234    /**
235     * Used to identify the name of a durable subscription.
236     *
237     * @openwire:property version=1
238     */
239    public String getSubscriptionName() {
240        return subscriptionName;
241    }
242
243    public void setSubscriptionName(String durableSubscriptionId) {
244        this.subscriptionName = durableSubscriptionId;
245    }
246
247    /**
248     * Set noLocal to true to avoid receiving messages that were published
249     * locally on the same connection.
250     *
251     * @openwire:property version=1
252     */
253    public boolean isNoLocal() {
254        return noLocal;
255    }
256
257    public void setNoLocal(boolean noLocal) {
258        this.noLocal = noLocal;
259    }
260
261    /**
262     * An exclusive consumer locks out other consumers from being able to
263     * receive messages from the destination. If there are multiple exclusive
264     * consumers for a destination, the first one created will be the exclusive
265     * consumer of the destination.
266     *
267     * @openwire:property version=1
268     */
269    public boolean isExclusive() {
270        return exclusive;
271    }
272
273    public void setExclusive(boolean exclusive) {
274        this.exclusive = exclusive;
275    }
276
277    /**
278     * A retroactive consumer only has meaning for Topics. It allows a consumer
279     * to retroactively see messages sent prior to the consumer being created.
280     * If the consumer is not durable, it will be delivered the last message
281     * published to the topic. If the consumer is durable then it will receive
282     * all persistent messages that are still stored in persistent storage for
283     * that topic.
284     *
285     * @openwire:property version=1
286     */
287    public boolean isRetroactive() {
288        return retroactive;
289    }
290
291    public void setRetroactive(boolean retroactive) {
292        this.retroactive = retroactive;
293    }
294
295    public RemoveInfo createRemoveCommand() {
296        RemoveInfo command = new RemoveInfo(getConsumerId());
297        command.setResponseRequired(isResponseRequired());
298        return command;
299    }
300
301    /**
302     * The broker will avoid dispatching to a lower priority consumer if there
303     * are other higher priority consumers available to dispatch to. This allows
304     * letting the broker to have an affinity to higher priority consumers.
305     * Default priority is 0.
306     *
307     * @openwire:property version=1
308     */
309    public byte getPriority() {
310        return priority;
311    }
312
313    public void setPriority(byte priority) {
314        this.priority = priority;
315    }
316
317    /**
318     * The route of brokers the command has moved through.
319     *
320     * @openwire:property version=1 cache=true
321     */
322    public BrokerId[] getBrokerPath() {
323        return brokerPath;
324    }
325
326    public void setBrokerPath(BrokerId[] brokerPath) {
327        this.brokerPath = brokerPath;
328    }
329
330    /**
331     * A transient additional predicate that can be used it inject additional
332     * predicates into the selector on the fly. Handy if if say a Security
333     * Broker interceptor wants to filter out messages based on security level
334     * of the consumer.
335     *
336     * @openwire:property version=1
337     */
338    public BooleanExpression getAdditionalPredicate() {
339        return additionalPredicate;
340    }
341
342    public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
343        this.additionalPredicate = additionalPredicate;
344    }
345
346    @Override
347    public Response visit(CommandVisitor visitor) throws Exception {
348        return visitor.processAddConsumer(this);
349    }
350
351    /**
352     * @openwire:property version=1
353     * @return Returns the networkSubscription.
354     */
355    public boolean isNetworkSubscription() {
356        return networkSubscription;
357    }
358
359    /**
360     * @param networkSubscription The networkSubscription to set.
361     */
362    public void setNetworkSubscription(boolean networkSubscription) {
363        this.networkSubscription = networkSubscription;
364    }
365
366    /**
367     * @openwire:property version=1
368     * @return Returns the optimizedAcknowledge.
369     */
370    public boolean isOptimizedAcknowledge() {
371        return optimizedAcknowledge;
372    }
373
374    /**
375     * @param optimizedAcknowledge The optimizedAcknowledge to set.
376     */
377    public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
378        this.optimizedAcknowledge = optimizedAcknowledge;
379    }
380
381    /**
382     * @return Returns the currentPrefetchSize.
383     */
384    public int getCurrentPrefetchSize() {
385        return currentPrefetchSize;
386    }
387
388    /**
389     * @param currentPrefetchSize The currentPrefetchSize to set.
390     */
391    public void setCurrentPrefetchSize(int currentPrefetchSize) {
392        this.currentPrefetchSize = currentPrefetchSize;
393    }
394
395    /**
396     * The broker may be able to optimize it's processing or provides better QOS
397     * if it knows the consumer will not be sending ranged acks.
398     *
399     * @return true if the consumer will not send range acks.
400     * @openwire:property version=1
401     */
402    public boolean isNoRangeAcks() {
403        return noRangeAcks;
404    }
405
406    public void setNoRangeAcks(boolean noRangeAcks) {
407        this.noRangeAcks = noRangeAcks;
408    }
409
410    public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
411        if (networkConsumerIds == null) {
412            networkConsumerIds = new ArrayList<ConsumerId>();
413        }
414        networkConsumerIds.add(networkConsumerId);
415    }
416
417    public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
418        if (networkConsumerIds != null) {
419            networkConsumerIds.remove(networkConsumerId);
420            if (networkConsumerIds.isEmpty()) {
421                networkConsumerIds=null;
422            }
423        }
424    }
425
426    public synchronized boolean isNetworkConsumersEmpty() {
427        return networkConsumerIds == null || networkConsumerIds.isEmpty();
428    }
429
430    public synchronized List<ConsumerId> getNetworkConsumerIds(){
431        List<ConsumerId> result = new ArrayList<ConsumerId>();
432        if (networkConsumerIds != null) {
433            result.addAll(networkConsumerIds);
434        }
435        return result;
436    }
437
438    @Override
439    public int hashCode() {
440        return (consumerId == null) ? 0 : consumerId.hashCode();
441    }
442
443    @Override
444    public boolean equals(Object obj) {
445        if (this == obj) {
446            return true;
447        }
448        if (obj == null) {
449            return false;
450        }
451        if (getClass() != obj.getClass()) {
452            return false;
453        }
454
455        ConsumerInfo other = (ConsumerInfo) obj;
456
457        if (consumerId == null && other.consumerId != null) {
458            return false;
459        } else if (!consumerId.equals(other.consumerId)) {
460            return false;
461        }
462        return true;
463    }
464
465    /**
466     * Tracks the original subscription id that causes a subscription to
467     * percolate through a network when networkTTL > 1. Tracking the original
468     * subscription allows duplicate suppression.
469     *
470     * @return array of the current subscription path
471     * @openwire:property version=4
472     */
473    public ConsumerId[] getNetworkConsumerPath() {
474        ConsumerId[] result = null;
475        if (networkConsumerIds != null) {
476            result = networkConsumerIds.toArray(new ConsumerId[0]);
477        }
478        return result;
479    }
480
481    public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
482        if (consumerPath != null) {
483            for (int i=0; i<consumerPath.length; i++) {
484                addNetworkConsumerId(consumerPath[i]);
485            }
486        }
487    }
488
489    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
490        this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
491    }
492
493    public long getLastDeliveredSequenceId() {
494        return lastDeliveredSequenceId;
495    }
496
497    public void incrementAssignedGroupCount() {
498        this.assignedGroupCount++;
499    }
500
501    public void clearAssignedGroupCount() {
502        this.assignedGroupCount=0;
503    }
504
505    public void decrementAssignedGroupCount() {
506        this.assignedGroupCount--;
507    }
508
509    public long getAssignedGroupCount() {
510        return assignedGroupCount;
511    }
512
513}