001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023import java.util.concurrent.atomic.AtomicLong;
024import javax.jms.InvalidSelectorException;
025import javax.jms.JMSException;
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.Broker;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ConsumerId;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.filter.BooleanExpression;
035import org.apache.activemq.filter.DestinationFilter;
036import org.apache.activemq.filter.LogicExpression;
037import org.apache.activemq.filter.MessageEvaluationContext;
038import org.apache.activemq.filter.NoLocalExpression;
039import org.apache.activemq.selector.SelectorParser;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043public abstract class AbstractSubscription implements Subscription {
044
045    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscription.class);
046    protected Broker broker;
047    protected ConnectionContext context;
048    protected ConsumerInfo info;
049    protected final DestinationFilter destinationFilter;
050    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
051    private BooleanExpression selectorExpression;
052    private ObjectName objectName;
053    private int cursorMemoryHighWaterMark = 70;
054    private boolean slowConsumer;
055    private long lastAckTime;
056    private AtomicLong consumedCount = new AtomicLong();
057
058    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
059        this.broker = broker;
060        this.context = context;
061        this.info = info;
062        this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
063        this.selectorExpression = parseSelector(info);
064        this.lastAckTime = System.currentTimeMillis();
065    }
066
067    private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
068        BooleanExpression rc = null;
069        if (info.getSelector() != null) {
070            rc = SelectorParser.parse(info.getSelector());
071        }
072        if (info.isNoLocal()) {
073            if (rc == null) {
074                rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
075            } else {
076                rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
077            }
078        }
079        if (info.getAdditionalPredicate() != null) {
080            if (rc == null) {
081                rc = info.getAdditionalPredicate();
082            } else {
083                rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
084            }
085        }
086        return rc;
087    }
088
089    @Override
090    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
091        this.lastAckTime = System.currentTimeMillis();
092        this.consumedCount.incrementAndGet();
093    }
094
095    @Override
096    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
097        ConsumerId targetConsumerId = node.getTargetConsumerId();
098        if (targetConsumerId != null) {
099            if (!targetConsumerId.equals(info.getConsumerId())) {
100                return false;
101            }
102        }
103        try {
104            return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
105        } catch (JMSException e) {
106            LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
107            return false;
108        }
109    }
110
111    @Override
112    public boolean isWildcard() {
113        return destinationFilter.isWildcard();
114    }
115
116    @Override
117    public boolean matches(ActiveMQDestination destination) {
118        return destinationFilter.matches(destination);
119    }
120
121    @Override
122    public void add(ConnectionContext context, Destination destination) throws Exception {
123        destinations.add(destination);
124    }
125
126    @Override
127    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
128        destinations.remove(destination);
129        return Collections.EMPTY_LIST;
130    }
131
132    @Override
133    public ConsumerInfo getConsumerInfo() {
134        return info;
135    }
136
137    @Override
138    public void gc() {
139    }
140
141    @Override
142    public ConnectionContext getContext() {
143        return context;
144    }
145
146    public ConsumerInfo getInfo() {
147        return info;
148    }
149
150    public BooleanExpression getSelectorExpression() {
151        return selectorExpression;
152    }
153
154    @Override
155    public String getSelector() {
156        return info.getSelector();
157    }
158
159    @Override
160    public void setSelector(String selector) throws InvalidSelectorException {
161        ConsumerInfo copy = info.copy();
162        copy.setSelector(selector);
163        BooleanExpression newSelector = parseSelector(copy);
164        // its valid so lets actually update it now
165        info.setSelector(selector);
166        this.selectorExpression = newSelector;
167    }
168
169    @Override
170    public ObjectName getObjectName() {
171        return objectName;
172    }
173
174    @Override
175    public void setObjectName(ObjectName objectName) {
176        this.objectName = objectName;
177    }
178
179    @Override
180    public int getPrefetchSize() {
181        return info.getPrefetchSize();
182    }
183    public void setPrefetchSize(int newSize) {
184        info.setPrefetchSize(newSize);
185    }
186
187    @Override
188    public boolean isRecoveryRequired() {
189        return true;
190    }
191
192    @Override
193    public boolean isSlowConsumer() {
194        return slowConsumer;
195    }
196
197    public void setSlowConsumer(boolean val) {
198        slowConsumer = val;
199    }
200
201    @Override
202    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
203        boolean result = false;
204        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
205        try {
206            Destination regionDestination = (Destination) message.getRegionDestination();
207            msgContext.setDestination(regionDestination.getActiveMQDestination());
208            msgContext.setMessageReference(message);
209            result = matches(message, msgContext);
210            if (result) {
211                doAddRecoveredMessage(message);
212            }
213
214        } finally {
215            msgContext.clear();
216        }
217        return result;
218    }
219
220    @Override
221    public ActiveMQDestination getActiveMQDestination() {
222        return info != null ? info.getDestination() : null;
223    }
224
225    @Override
226    public boolean isBrowser() {
227        return info != null && info.isBrowser();
228    }
229
230    @Override
231    public int getInFlightUsage() {
232        if (info.getPrefetchSize() > 0) {
233        return (getInFlightSize() * 100)/info.getPrefetchSize();
234        }
235        return Integer.MAX_VALUE;
236    }
237
238    /**
239     * Add a destination
240     * @param destination
241     */
242    public void addDestination(Destination destination) {
243
244    }
245
246    /**
247     * Remove a destination
248     * @param destination
249     */
250    public void removeDestination(Destination destination) {
251
252    }
253
254    @Override
255    public int getCursorMemoryHighWaterMark(){
256        return this.cursorMemoryHighWaterMark;
257    }
258
259    @Override
260    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
261        this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
262    }
263
264    @Override
265    public int countBeforeFull() {
266        return getDispatchedQueueSize() - info.getPrefetchSize();
267    }
268
269    @Override
270    public void unmatched(MessageReference node) throws IOException {
271        // only durable topic subs have something to do here
272    }
273
274    protected void doAddRecoveredMessage(MessageReference message) throws Exception {
275        add(message);
276    }
277
278    @Override
279    public long getTimeOfLastMessageAck() {
280        return lastAckTime;
281    }
282
283    public void setTimeOfLastMessageAck(long value) {
284        this.lastAckTime = value;
285    }
286
287    public long getConsumedCount(){
288        return consumedCount.get();
289    }
290
291    public void incrementConsumedCount(){
292        consumedCount.incrementAndGet();
293    }
294
295    public void resetConsumedCount(){
296        consumedCount.set(0);
297    }
298}