001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.LinkedList;
021import java.util.concurrent.atomic.AtomicInteger;
022import java.util.concurrent.atomic.AtomicLong;
023
024import javax.jms.JMSException;
025
026import org.apache.activemq.ActiveMQMessageAudit;
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
030import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
031import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
032import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
033import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
034import org.apache.activemq.command.ConsumerControl;
035import org.apache.activemq.command.ConsumerInfo;
036import org.apache.activemq.command.Message;
037import org.apache.activemq.command.MessageAck;
038import org.apache.activemq.command.MessageDispatch;
039import org.apache.activemq.command.MessageDispatchNotification;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.transaction.Synchronization;
044import org.apache.activemq.transport.TransmitCallback;
045import org.apache.activemq.usage.SystemUsage;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049public class TopicSubscription extends AbstractSubscription {
050
051    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
052    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
053
054    protected PendingMessageCursor matched;
055    protected final SystemUsage usageManager;
056    protected AtomicLong dispatchedCounter = new AtomicLong();
057
058    boolean singleDestination = true;
059    Destination destination;
060    private final Scheduler scheduler;
061
062    private int maximumPendingMessages = -1;
063    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
064    private int discarded;
065    private final Object matchedListMutex = new Object();
066    private final AtomicLong enqueueCounter = new AtomicLong(0);
067    private final AtomicLong dequeueCounter = new AtomicLong(0);
068    private final AtomicInteger prefetchExtension = new AtomicInteger(0);
069    private int memoryUsageHighWaterMark = 95;
070    // allow duplicate suppression in a ring network of brokers
071    protected int maxProducersToAudit = 1024;
072    protected int maxAuditDepth = 1000;
073    protected boolean enableAudit = false;
074    protected ActiveMQMessageAudit audit;
075    protected boolean active = false;
076    protected boolean discarding = false;
077
078    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
079        super(broker, context, info);
080        this.usageManager = usageManager;
081        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
082        if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
083            this.matched = new VMPendingMessageCursor(false);
084        } else {
085            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
086        }
087
088        this.scheduler = broker.getScheduler();
089    }
090
091    public void init() throws Exception {
092        this.matched.setSystemUsage(usageManager);
093        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
094        this.matched.start();
095        if (enableAudit) {
096            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
097        }
098        this.active=true;
099    }
100
101    @Override
102    public void add(MessageReference node) throws Exception {
103        if (isDuplicate(node)) {
104            return;
105        }
106        // Lets use an indirect reference so that we can associate a unique
107        // locator /w the message.
108        node = new IndirectMessageReference(node.getMessage());
109        enqueueCounter.incrementAndGet();
110        synchronized (matchedListMutex) {
111            // if this subscriber is already discarding a message, we don't want to add
112            // any more messages to it as those messages can only be advisories generated in the process,
113            // which can trigger the recursive call loop
114            if (discarding) return;
115
116            if (!isFull() && matched.isEmpty()) {
117                // if maximumPendingMessages is set we will only discard messages which
118                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
119                dispatch(node);
120                setSlowConsumer(false);
121            } else {
122                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
123                    // Slow consumers should log and set their state as such.
124                    if (!isSlowConsumer()) {
125                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
126                        setSlowConsumer(true);
127                        for (Destination dest: destinations) {
128                            dest.slowConsumer(getContext(), this);
129                        }
130                    }
131                }
132                if (maximumPendingMessages != 0) {
133                    boolean warnedAboutWait = false;
134                    while (active) {
135                        while (matched.isFull()) {
136                            if (getContext().getStopping().get()) {
137                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
138                                enqueueCounter.decrementAndGet();
139                                return;
140                            }
141                            if (!warnedAboutWait) {
142                                LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
143                                        new Object[]{
144                                                toString(),
145                                                matched,
146                                                matched.getSystemUsage().getTempUsage().getPercentUsage(),
147                                                matched.getSystemUsage().getMemoryUsage().getPercentUsage()
148                                        });
149                                warnedAboutWait = true;
150                            }
151                            matchedListMutex.wait(20);
152                        }
153                        // Temporary storage could be full - so just try to add the message
154                        // see https://issues.apache.org/activemq/browse/AMQ-2475
155                        if (matched.tryAddMessageLast(node, 10)) {
156                            break;
157                        }
158                    }
159                    if (maximumPendingMessages > 0) {
160                        // calculate the high water mark from which point we
161                        // will eagerly evict expired messages
162                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
163                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
164                            max = maximumPendingMessages;
165                        }
166                        if (!matched.isEmpty() && matched.size() > max) {
167                            removeExpiredMessages();
168                        }
169                        // lets discard old messages as we are a slow consumer
170                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
171                            int pageInSize = matched.size() - maximumPendingMessages;
172                            // only page in a 1000 at a time - else we could blow the memory
173                            pageInSize = Math.max(1000, pageInSize);
174                            LinkedList<MessageReference> list = null;
175                            MessageReference[] oldMessages=null;
176                            synchronized(matched){
177                                list = matched.pageInList(pageInSize);
178                                oldMessages = messageEvictionStrategy.evictMessages(list);
179                                for (MessageReference ref : list) {
180                                    ref.decrementReferenceCount();
181                                }
182                            }
183                            int messagesToEvict = 0;
184                            if (oldMessages != null){
185                                messagesToEvict = oldMessages.length;
186                                for (int i = 0; i < messagesToEvict; i++) {
187                                    MessageReference oldMessage = oldMessages[i];
188                                    discard(oldMessage);
189                                }
190                            }
191                            // lets avoid an infinite loop if we are given a bad eviction strategy
192                            // for a bad strategy lets just not evict
193                            if (messagesToEvict == 0) {
194                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
195                                        destination, messageEvictionStrategy, list.size()
196                                });
197                                break;
198                            }
199                        }
200                    }
201                    dispatchMatched();
202                }
203            }
204        }
205    }
206
207    private boolean isDuplicate(MessageReference node) {
208        boolean duplicate = false;
209        if (enableAudit && audit != null) {
210            duplicate = audit.isDuplicate(node);
211            if (LOG.isDebugEnabled()) {
212                if (duplicate) {
213                    LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId());
214                }
215            }
216        }
217        return duplicate;
218    }
219
220    /**
221     * Discard any expired messages from the matched list. Called from a
222     * synchronized block.
223     *
224     * @throws IOException
225     */
226    protected void removeExpiredMessages() throws IOException {
227        try {
228            matched.reset();
229            while (matched.hasNext()) {
230                MessageReference node = matched.next();
231                node.decrementReferenceCount();
232                if (broker.isExpired(node)) {
233                    matched.remove();
234                    dispatchedCounter.incrementAndGet();
235                    node.decrementReferenceCount();
236                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
237                    broker.messageExpired(getContext(), node, this);
238                    break;
239                }
240            }
241        } finally {
242            matched.release();
243        }
244    }
245
246    @Override
247    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
248        synchronized (matchedListMutex) {
249            try {
250                matched.reset();
251                while (matched.hasNext()) {
252                    MessageReference node = matched.next();
253                    node.decrementReferenceCount();
254                    if (node.getMessageId().equals(mdn.getMessageId())) {
255                        matched.remove();
256                        dispatchedCounter.incrementAndGet();
257                        node.decrementReferenceCount();
258                        break;
259                    }
260                }
261            } finally {
262                matched.release();
263            }
264        }
265    }
266
267    @Override
268    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
269        super.acknowledge(context, ack);
270
271        // Handle the standard acknowledgment case.
272        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
273            if (context.isInTransaction()) {
274                context.getTransaction().addSynchronization(new Synchronization() {
275
276                    @Override
277                    public void afterCommit() throws Exception {
278                       synchronized (TopicSubscription.this) {
279                            if (singleDestination && destination != null) {
280                                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
281                            }
282                        }
283                        dequeueCounter.addAndGet(ack.getMessageCount());
284                        dispatchMatched();
285                    }
286                });
287            } else {
288                if (singleDestination && destination != null) {
289                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
290                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
291                    if (info.isNetworkSubscription()) {
292                        destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
293                    }
294                }
295                dequeueCounter.addAndGet(ack.getMessageCount());
296            }
297            while (true) {
298                int currentExtension = prefetchExtension.get();
299                int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
300                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
301                    break;
302                }
303            }
304            dispatchMatched();
305            return;
306        } else if (ack.isDeliveredAck()) {
307            // Message was delivered but not acknowledged: update pre-fetch counters.
308            prefetchExtension.addAndGet(ack.getMessageCount());
309            dispatchMatched();
310            return;
311        } else if (ack.isExpiredAck()) {
312            if (singleDestination && destination != null) {
313                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
314                destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
315                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
316            }
317            dequeueCounter.addAndGet(ack.getMessageCount());
318            while (true) {
319                int currentExtension = prefetchExtension.get();
320                int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
321                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
322                    break;
323                }
324            }
325            dispatchMatched();
326            return;
327        } else if (ack.isRedeliveredAck()) {
328            // nothing to do atm
329            return;
330        }
331        throw new JMSException("Invalid acknowledgment: " + ack);
332    }
333
334    @Override
335    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
336
337        // The slave should not deliver pull messages.
338        if (getPrefetchSize() == 0) {
339
340            final long currentDispatchedCount = dispatchedCounter.get();
341            prefetchExtension.set(pull.getQuantity());
342            dispatchMatched();
343
344            // If there was nothing dispatched.. we may need to setup a timeout.
345            if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone()) {
346
347                // immediate timeout used by receiveNoWait()
348                if (pull.getTimeout() == -1) {
349                    // Send a NULL message to signal nothing pending.
350                    dispatch(null);
351                    prefetchExtension.set(0);
352                }
353
354                if (pull.getTimeout() > 0) {
355                    scheduler.executeAfterDelay(new Runnable() {
356
357                        @Override
358                        public void run() {
359                            pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
360                        }
361                    }, pull.getTimeout());
362                }
363            }
364        }
365        return null;
366    }
367
368    /**
369     * Occurs when a pull times out. If nothing has been dispatched since the
370     * timeout was setup, then send the NULL message.
371     */
372    private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
373        synchronized (matchedListMutex) {
374            if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) {
375                try {
376                    dispatch(null);
377                } catch (Exception e) {
378                    context.getConnection().serviceException(e);
379                } finally {
380                    prefetchExtension.set(0);
381                }
382            }
383        }
384    }
385
386    @Override
387    public int getPendingQueueSize() {
388        return matched();
389    }
390
391    @Override
392    public int getDispatchedQueueSize() {
393        return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get());
394    }
395
396    public int getMaximumPendingMessages() {
397        return maximumPendingMessages;
398    }
399
400    @Override
401    public long getDispatchedCounter() {
402        return dispatchedCounter.get();
403    }
404
405    @Override
406    public long getEnqueueCounter() {
407        return enqueueCounter.get();
408    }
409
410    @Override
411    public long getDequeueCounter() {
412        return dequeueCounter.get();
413    }
414
415    /**
416     * @return the number of messages discarded due to being a slow consumer
417     */
418    public int discarded() {
419        synchronized (matchedListMutex) {
420            return discarded;
421        }
422    }
423
424    /**
425     * @return the number of matched messages (messages targeted for the
426     *         subscription but not yet able to be dispatched due to the
427     *         prefetch buffer being full).
428     */
429    public int matched() {
430        synchronized (matchedListMutex) {
431            return matched.size();
432        }
433    }
434
435    /**
436     * Sets the maximum number of pending messages that can be matched against
437     * this consumer before old messages are discarded.
438     */
439    public void setMaximumPendingMessages(int maximumPendingMessages) {
440        this.maximumPendingMessages = maximumPendingMessages;
441    }
442
443    public MessageEvictionStrategy getMessageEvictionStrategy() {
444        return messageEvictionStrategy;
445    }
446
447    /**
448     * Sets the eviction strategy used to decide which message to evict when the
449     * slow consumer needs to discard messages
450     */
451    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
452        this.messageEvictionStrategy = messageEvictionStrategy;
453    }
454
455    public int getMaxProducersToAudit() {
456        return maxProducersToAudit;
457    }
458
459    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
460        this.maxProducersToAudit = maxProducersToAudit;
461        if (audit != null) {
462            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
463        }
464    }
465
466    public int getMaxAuditDepth() {
467        return maxAuditDepth;
468    }
469
470    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
471        this.maxAuditDepth = maxAuditDepth;
472        if (audit != null) {
473            audit.setAuditDepth(maxAuditDepth);
474        }
475    }
476
477    public boolean isEnableAudit() {
478        return enableAudit;
479    }
480
481    public synchronized void setEnableAudit(boolean enableAudit) {
482        this.enableAudit = enableAudit;
483        if (enableAudit && audit == null) {
484            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
485        }
486    }
487
488    // Implementation methods
489    // -------------------------------------------------------------------------
490    @Override
491    public boolean isFull() {
492        return getDispatchedQueueSize() >= info.getPrefetchSize();
493    }
494
495    @Override
496    public int getInFlightSize() {
497        return getDispatchedQueueSize();
498    }
499
500    /**
501     * @return true when 60% or more room is left for dispatching messages
502     */
503    @Override
504    public boolean isLowWaterMark() {
505        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
506    }
507
508    /**
509     * @return true when 10% or less room is left for dispatching messages
510     */
511    @Override
512    public boolean isHighWaterMark() {
513        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
514    }
515
516    /**
517     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
518     */
519    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
520        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
521    }
522
523    /**
524     * @return the memoryUsageHighWaterMark
525     */
526    public int getMemoryUsageHighWaterMark() {
527        return this.memoryUsageHighWaterMark;
528    }
529
530    /**
531     * @return the usageManager
532     */
533    public SystemUsage getUsageManager() {
534        return this.usageManager;
535    }
536
537    /**
538     * @return the matched
539     */
540    public PendingMessageCursor getMatched() {
541        return this.matched;
542    }
543
544    /**
545     * @param matched the matched to set
546     */
547    public void setMatched(PendingMessageCursor matched) {
548        this.matched = matched;
549    }
550
551    /**
552     * inform the MessageConsumer on the client to change it's prefetch
553     *
554     * @param newPrefetch
555     */
556    @Override
557    public void updateConsumerPrefetch(int newPrefetch) {
558        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
559            ConsumerControl cc = new ConsumerControl();
560            cc.setConsumerId(info.getConsumerId());
561            cc.setPrefetch(newPrefetch);
562            context.getConnection().dispatchAsync(cc);
563        }
564    }
565
566    private void dispatchMatched() throws IOException {
567        synchronized (matchedListMutex) {
568            if (!matched.isEmpty() && !isFull()) {
569                try {
570                    matched.reset();
571
572                    while (matched.hasNext() && !isFull()) {
573                        MessageReference message = matched.next();
574                        message.decrementReferenceCount();
575                        matched.remove();
576                        // Message may have been sitting in the matched list a while
577                        // waiting for the consumer to ak the message.
578                        if (message.isExpired()) {
579                            discard(message);
580                            continue; // just drop it.
581                        }
582                        dispatch(message);
583                    }
584                } finally {
585                    matched.release();
586                }
587            }
588        }
589    }
590
591    private void dispatch(final MessageReference node) throws IOException {
592        Message message = node != null ? node.getMessage() : null;
593        if (node != null) {
594            node.incrementReferenceCount();
595        }
596        // Make sure we can dispatch a message.
597        MessageDispatch md = new MessageDispatch();
598        md.setMessage(message);
599        md.setConsumerId(info.getConsumerId());
600        if (node != null) {
601            md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
602            dispatchedCounter.incrementAndGet();
603            // Keep track if this subscription is receiving messages from a single destination.
604            if (singleDestination) {
605                if (destination == null) {
606                    destination = (Destination)node.getRegionDestination();
607                } else {
608                    if (destination != node.getRegionDestination()) {
609                        singleDestination = false;
610                    }
611                }
612            }
613        }
614        if (info.isDispatchAsync()) {
615            if (node != null) {
616                md.setTransmitCallback(new TransmitCallback() {
617
618                    @Override
619                    public void onSuccess() {
620                        Destination regionDestination = (Destination) node.getRegionDestination();
621                        regionDestination.getDestinationStatistics().getDispatched().increment();
622                        regionDestination.getDestinationStatistics().getInflight().increment();
623                        node.decrementReferenceCount();
624                    }
625
626                    @Override
627                    public void onFailure() {
628                        Destination regionDestination = (Destination) node.getRegionDestination();
629                        regionDestination.getDestinationStatistics().getDispatched().increment();
630                        regionDestination.getDestinationStatistics().getInflight().increment();
631                        node.decrementReferenceCount();
632                    }
633                });
634            }
635            context.getConnection().dispatchAsync(md);
636        } else {
637            context.getConnection().dispatchSync(md);
638            if (node != null) {
639                Destination regionDestination = (Destination) node.getRegionDestination();
640                regionDestination.getDestinationStatistics().getDispatched().increment();
641                regionDestination.getDestinationStatistics().getInflight().increment();
642                node.decrementReferenceCount();
643            }
644        }
645    }
646
647    private void discard(MessageReference message) {
648        discarding = true;
649        try {
650            message.decrementReferenceCount();
651            matched.remove(message);
652            discarded++;
653            if (destination != null) {
654                destination.getDestinationStatistics().getDequeues().increment();
655            }
656            LOG.debug("{}, discarding message {}", this, message);
657            Destination dest = (Destination) message.getRegionDestination();
658            if (dest != null) {
659                dest.messageDiscarded(getContext(), this, message);
660            }
661            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
662        } finally {
663            discarding = false;
664        }
665    }
666
667    @Override
668    public String toString() {
669        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
670               + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
671    }
672
673    @Override
674    public void destroy() {
675        this.active=false;
676        synchronized (matchedListMutex) {
677            try {
678                matched.destroy();
679            } catch (Exception e) {
680                LOG.warn("Failed to destroy cursor", e);
681            }
682        }
683        setSlowConsumer(false);
684    }
685
686    @Override
687    public int getPrefetchSize() {
688        return info.getPrefetchSize();
689    }
690
691    @Override
692    public void setPrefetchSize(int newSize) {
693        info.setPrefetchSize(newSize);
694        try {
695            dispatchMatched();
696        } catch(Exception e) {
697            LOG.trace("Caught exception on dispatch after prefetch size change.");
698        }
699    }
700}