001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021
022import javax.jms.InvalidSelectorException;
023import javax.management.ObjectName;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.command.MessageDispatchNotification;
029import org.apache.activemq.command.MessagePull;
030import org.apache.activemq.command.Response;
031import org.apache.activemq.filter.MessageEvaluationContext;
032
033/**
034 *
035 */
036public interface Subscription extends SubscriptionRecovery {
037
038    /**
039     * Used to add messages that match the subscription.
040     * @param node
041     * @throws Exception
042     * @throws InterruptedException
043     * @throws IOException
044     */
045    void add(MessageReference node) throws Exception;
046
047    /**
048     * Used when client acknowledge receipt of dispatched message.
049     * @throws IOException
050     * @throws Exception
051     */
052    void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception;
053
054    /**
055     * Allows a consumer to pull a message on demand
056     */
057    Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception;
058
059    /**
060     * Returns true if this subscription is a Wildcard subscription.
061     * @return true if wildcard subscription.
062     */
063    boolean isWildcard();
064
065    /**
066     * Is the subscription interested in the message?
067     * @param node
068     * @param context
069     * @return
070     * @throws IOException
071     */
072    boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
073
074    /**
075     * Is the subscription interested in messages in the destination?
076     * @param destination
077     * @return
078     */
079    boolean matches(ActiveMQDestination destination);
080
081    /**
082     * The subscription will be receiving messages from the destination.
083     * @param context
084     * @param destination
085     * @throws Exception
086     */
087    void add(ConnectionContext context, Destination destination) throws Exception;
088
089    /**
090     * The subscription will be no longer be receiving messages from the destination.
091     * @param context
092     * @param destination
093     * @return a list of un-acked messages that were added to the subscription.
094     */
095    List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
096
097    /**
098     * The ConsumerInfo object that created the subscription.
099     */
100    ConsumerInfo getConsumerInfo();
101
102    /**
103     * The subscription should release as may references as it can to help the garbage collector
104     * reclaim memory.
105     */
106    void gc();
107
108    /**
109     * Used by a Slave Broker to update dispatch infomation
110     * @param mdn
111     * @throws Exception
112     */
113    void processMessageDispatchNotification(MessageDispatchNotification  mdn) throws Exception;
114
115    /**
116     * @return number of messages pending delivery
117     */
118    int getPendingQueueSize();
119
120    /**
121     * @return number of messages dispatched to the client
122     */
123    int getDispatchedQueueSize();
124
125    /**
126     * @return number of messages dispatched to the client
127     */
128    long getDispatchedCounter();
129
130    /**
131     * @return number of messages that matched the subscription
132     */
133    long getEnqueueCounter();
134
135    /**
136     * @return number of messages queued by the client
137     */
138    long getDequeueCounter();
139
140    /**
141     * @return the JMS selector on the current subscription
142     */
143    String getSelector();
144
145    /**
146     * Attempts to change the current active selector on the subscription.
147     * This operation is not supported for persistent topics.
148     */
149    void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
150
151    /**
152     * @return the JMX object name that this subscription was registered as if applicable
153     */
154    ObjectName getObjectName();
155
156    /**
157     * Set when the subscription is registered in JMX
158     */
159    void setObjectName(ObjectName objectName);
160
161    /**
162     * @return true when 60% or more room is left for dispatching messages
163     */
164    boolean isLowWaterMark();
165
166    /**
167     * @return true when 10% or less room is left for dispatching messages
168     */
169    boolean isHighWaterMark();
170
171    /**
172     * @return true if there is no space to dispatch messages
173     */
174    boolean isFull();
175
176    /**
177     * inform the MessageConsumer on the client to change it's prefetch
178     * @param newPrefetch
179     */
180    void updateConsumerPrefetch(int newPrefetch);
181
182    /**
183     * Called when the subscription is destroyed.
184     */
185    void destroy();
186
187    /**
188     * @return the prefetch size that is configured for the subscription
189     */
190    int getPrefetchSize();
191
192    /**
193     * @return the number of messages awaiting acknowledgement
194     */
195    int getInFlightSize();
196
197    /**
198     * @return the in flight messages as a percentage of the prefetch size
199     */
200    int getInFlightUsage();
201
202    /**
203     * Informs the Broker if the subscription needs to intervention to recover it's state
204     * e.g. DurableTopicSubscriber may do
205     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
206     * @return true if recovery required
207     */
208    boolean isRecoveryRequired();
209
210    /**
211     * @return true if a browser
212     */
213    boolean isBrowser();
214
215    /**
216     * @return the number of messages this subscription can accept before its full
217     */
218    int countBeforeFull();
219
220    ConnectionContext getContext();
221
222    public int getCursorMemoryHighWaterMark();
223
224    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
225
226    boolean isSlowConsumer();
227
228    void unmatched(MessageReference node) throws IOException;
229
230    /**
231     * Returns the time since the last Ack message was received by this subscription.
232     *
233     * If there has never been an ack this value should be set to the creation time of the
234     * subscription.
235     *
236     * @return time of last received Ack message or Subscription create time if no Acks.
237     */
238    long getTimeOfLastMessageAck();
239
240    long  getConsumedCount();
241
242    void incrementConsumedCount();
243
244    void resetConsumedCount();
245
246}