001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArrayList;
028import java.util.concurrent.Future;
029import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.BrokerService;
033import org.apache.activemq.broker.ConnectionContext;
034import org.apache.activemq.broker.ProducerBrokerExchange;
035import org.apache.activemq.broker.region.policy.DispatchPolicy;
036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
040import org.apache.activemq.broker.util.InsertionCountList;
041import org.apache.activemq.command.ActiveMQDestination;
042import org.apache.activemq.command.ExceptionResponse;
043import org.apache.activemq.command.Message;
044import org.apache.activemq.command.MessageAck;
045import org.apache.activemq.command.MessageId;
046import org.apache.activemq.command.ProducerAck;
047import org.apache.activemq.command.ProducerInfo;
048import org.apache.activemq.command.Response;
049import org.apache.activemq.command.SubscriptionInfo;
050import org.apache.activemq.filter.MessageEvaluationContext;
051import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
052import org.apache.activemq.store.MessageRecoveryListener;
053import org.apache.activemq.store.TopicMessageStore;
054import org.apache.activemq.thread.Task;
055import org.apache.activemq.thread.TaskRunner;
056import org.apache.activemq.thread.TaskRunnerFactory;
057import org.apache.activemq.transaction.Synchronization;
058import org.apache.activemq.util.SubscriptionKey;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * The Topic is a destination that sends a copy of a message to every active
064 * Subscription registered.
065 */
066public class Topic extends BaseDestination implements Task {
067    protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
068    private final TopicMessageStore topicStore;
069    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
070    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
071    private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
072    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
073    private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
074    private final TaskRunner taskRunner;
075    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
076    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
077        @Override
078        public void run() {
079            try {
080                Topic.this.taskRunner.wakeup();
081            } catch (InterruptedException e) {
082            }
083        };
084    };
085
086    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
087            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
088        super(brokerService, store, destination, parentStats);
089        this.topicStore = store;
090        subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
091        this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
092    }
093
094    @Override
095    public void initialize() throws Exception {
096        super.initialize();
097        // set non default subscription recovery policy (override policyEntries)
098        if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) {
099            subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
100            setAlwaysRetroactive(true);
101        }
102        if (store != null) {
103            // AMQ-2586: Better to leave this stat at zero than to give the user
104            // misleading metrics.
105            // int messageCount = store.getMessageCount();
106            // destinationStatistics.getMessages().setCount(messageCount);
107        }
108    }
109
110    @Override
111    public List<Subscription> getConsumers() {
112        synchronized (consumers) {
113            return new ArrayList<Subscription>(consumers);
114        }
115    }
116
117    public boolean lock(MessageReference node, LockOwner sub) {
118        return true;
119    }
120
121    @Override
122    public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
123        if (!sub.getConsumerInfo().isDurable()) {
124
125            // Do a retroactive recovery if needed.
126            if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
127
128                // synchronize with dispatch method so that no new messages are sent
129                // while we are recovering a subscription to avoid out of order messages.
130                dispatchLock.writeLock().lock();
131                try {
132                    boolean applyRecovery = false;
133                    synchronized (consumers) {
134                        if (!consumers.contains(sub)){
135                            sub.add(context, this);
136                            consumers.add(sub);
137                            applyRecovery=true;
138                            super.addSubscription(context, sub);
139                        }
140                    }
141                    if (applyRecovery){
142                        subscriptionRecoveryPolicy.recover(context, this, sub);
143                    }
144                } finally {
145                    dispatchLock.writeLock().unlock();
146                }
147
148            } else {
149                synchronized (consumers) {
150                    if (!consumers.contains(sub)){
151                        sub.add(context, this);
152                        consumers.add(sub);
153                        super.addSubscription(context, sub);
154                    }
155                }
156            }
157        } else {
158            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
159            super.addSubscription(context, sub);
160            sub.add(context, this);
161            if(dsub.isActive()) {
162                synchronized (consumers) {
163                    boolean hasSubscription = false;
164
165                    if (consumers.size() == 0) {
166                        hasSubscription = false;
167                    } else {
168                        for (Subscription currentSub : consumers) {
169                            if (currentSub.getConsumerInfo().isDurable()) {
170                                DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub;
171                                if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) {
172                                    hasSubscription = true;
173                                    break;
174                                }
175                            }
176                        }
177                    }
178
179                    if (!hasSubscription) {
180                        consumers.add(sub);
181                    }
182                }
183            }
184            durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
185        }
186    }
187
188    @Override
189    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
190        if (!sub.getConsumerInfo().isDurable()) {
191            super.removeSubscription(context, sub, lastDeliveredSequenceId);
192            synchronized (consumers) {
193                consumers.remove(sub);
194            }
195        }
196        sub.remove(context, this);
197    }
198
199    public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
200        if (topicStore != null) {
201            topicStore.deleteSubscription(key.clientId, key.subscriptionName);
202            DurableTopicSubscription removed = durableSubscribers.remove(key);
203            if (removed != null) {
204                destinationStatistics.getConsumers().decrement();
205                // deactivate and remove
206                removed.deactivate(false, 0l);
207                consumers.remove(removed);
208            }
209        }
210    }
211
212    public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
213        // synchronize with dispatch method so that no new messages are sent
214        // while we are recovering a subscription to avoid out of order messages.
215        dispatchLock.writeLock().lock();
216        try {
217
218            if (topicStore == null) {
219                return;
220            }
221
222            // Recover the durable subscription.
223            String clientId = subscription.getSubscriptionKey().getClientId();
224            String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
225            String selector = subscription.getConsumerInfo().getSelector();
226            SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
227            if (info != null) {
228                // Check to see if selector changed.
229                String s1 = info.getSelector();
230                if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
231                    // Need to delete the subscription
232                    topicStore.deleteSubscription(clientId, subscriptionName);
233                    info = null;
234                    synchronized (consumers) {
235                        consumers.remove(subscription);
236                    }
237                } else {
238                    synchronized (consumers) {
239                        if (!consumers.contains(subscription)) {
240                            consumers.add(subscription);
241                        }
242                    }
243                }
244            }
245
246            // Do we need to create the subscription?
247            if (info == null) {
248                info = new SubscriptionInfo();
249                info.setClientId(clientId);
250                info.setSelector(selector);
251                info.setSubscriptionName(subscriptionName);
252                info.setDestination(getActiveMQDestination());
253                // This destination is an actual destination id.
254                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
255                // This destination might be a pattern
256                synchronized (consumers) {
257                    consumers.add(subscription);
258                    topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive());
259                }
260            }
261
262            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
263            msgContext.setDestination(destination);
264            if (subscription.isRecoveryRequired()) {
265                topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
266                    @Override
267                    public boolean recoverMessage(Message message) throws Exception {
268                        message.setRegionDestination(Topic.this);
269                        try {
270                            msgContext.setMessageReference(message);
271                            if (subscription.matches(message, msgContext)) {
272                                subscription.add(message);
273                            }
274                        } catch (IOException e) {
275                            LOG.error("Failed to recover this message {}", message, e);
276                        }
277                        return true;
278                    }
279
280                    @Override
281                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
282                        throw new RuntimeException("Should not be called.");
283                    }
284
285                    @Override
286                    public boolean hasSpace() {
287                        return true;
288                    }
289
290                    @Override
291                    public boolean isDuplicate(MessageId id) {
292                        return false;
293                    }
294                });
295            }
296        } finally {
297            dispatchLock.writeLock().unlock();
298        }
299    }
300
301    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
302        synchronized (consumers) {
303            consumers.remove(sub);
304        }
305        sub.remove(context, this, dispatched);
306    }
307
308    public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
309        if (subscription.getConsumerInfo().isRetroactive()) {
310            subscriptionRecoveryPolicy.recover(context, this, subscription);
311        }
312    }
313
314    @Override
315    public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
316        final ConnectionContext context = producerExchange.getConnectionContext();
317
318        final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
319        producerExchange.incrementSend();
320        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
321                && !context.isInRecoveryMode();
322
323        // There is delay between the client sending it and it arriving at the
324        // destination.. it may have expired.
325        if (message.isExpired()) {
326            broker.messageExpired(context, message, null);
327            getDestinationStatistics().getExpired().increment();
328            if (sendProducerAck) {
329                ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
330                context.getConnection().dispatchAsync(ack);
331            }
332            return;
333        }
334
335        if (memoryUsage.isFull()) {
336            isFull(context, memoryUsage);
337            fastProducer(context, producerInfo);
338
339            if (isProducerFlowControl() && context.isProducerFlowControl()) {
340
341                if (warnOnProducerFlowControl) {
342                    warnOnProducerFlowControl = false;
343                    LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
344                            getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit());
345                }
346
347                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
348                    throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
349                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
350                            + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
351                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
352                }
353
354                // We can avoid blocking due to low usage if the producer is sending a sync message or
355                // if it is using a producer window
356                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
357                    synchronized (messagesWaitingForSpace) {
358                        messagesWaitingForSpace.add(new Runnable() {
359                            @Override
360                            public void run() {
361                                try {
362
363                                    // While waiting for space to free up... the
364                                    // message may have expired.
365                                    if (message.isExpired()) {
366                                        broker.messageExpired(context, message, null);
367                                        getDestinationStatistics().getExpired().increment();
368                                    } else {
369                                        doMessageSend(producerExchange, message);
370                                    }
371
372                                    if (sendProducerAck) {
373                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
374                                                .getSize());
375                                        context.getConnection().dispatchAsync(ack);
376                                    } else {
377                                        Response response = new Response();
378                                        response.setCorrelationId(message.getCommandId());
379                                        context.getConnection().dispatchAsync(response);
380                                    }
381
382                                } catch (Exception e) {
383                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
384                                        ExceptionResponse response = new ExceptionResponse(e);
385                                        response.setCorrelationId(message.getCommandId());
386                                        context.getConnection().dispatchAsync(response);
387                                    }
388                                }
389                            }
390                        });
391
392                        registerCallbackForNotFullNotification();
393                        context.setDontSendReponse(true);
394                        return;
395                    }
396
397                } else {
398                    // Producer flow control cannot be used, so we have do the flow control
399                    // at the broker by blocking this thread until there is space available.
400
401                    if (memoryUsage.isFull()) {
402                        if (context.isInTransaction()) {
403
404                            int count = 0;
405                            while (!memoryUsage.waitForSpace(1000)) {
406                                if (context.getStopping().get()) {
407                                    throw new IOException("Connection closed, send aborted.");
408                                }
409                                if (count > 2 && context.isInTransaction()) {
410                                    count = 0;
411                                    int size = context.getTransaction().size();
412                                    LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message);
413                                }
414                                count++;
415                            }
416                        } else {
417                            waitForSpace(
418                                    context,
419                                    producerExchange,
420                                    memoryUsage,
421                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
422                                            + message.getProducerId()
423                                            + ") to prevent flooding "
424                                            + getActiveMQDestination().getQualifiedName()
425                                            + "."
426                                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
427                        }
428                    }
429
430                    // The usage manager could have delayed us by the time
431                    // we unblock the message could have expired..
432                    if (message.isExpired()) {
433                        getDestinationStatistics().getExpired().increment();
434                        LOG.debug("Expired message: {}", message);
435                        return;
436                    }
437                }
438            }
439        }
440
441        doMessageSend(producerExchange, message);
442        messageDelivered(context, message);
443        if (sendProducerAck) {
444            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
445            context.getConnection().dispatchAsync(ack);
446        }
447    }
448
449    /**
450     * do send the message - this needs to be synchronized to ensure messages
451     * are stored AND dispatched in the right order
452     *
453     * @param producerExchange
454     * @param message
455     * @throws IOException
456     * @throws Exception
457     */
458    synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
459            throws IOException, Exception {
460        final ConnectionContext context = producerExchange.getConnectionContext();
461        message.setRegionDestination(this);
462        message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
463        Future<Object> result = null;
464
465        if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
466            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
467                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
468                        + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
469                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
470                        + " See http://activemq.apache.org/producer-flow-control.html for more info";
471                if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
472                    throw new javax.jms.ResourceAllocationException(logMessage);
473                }
474
475                waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
476            }
477            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
478        }
479
480        message.incrementReferenceCount();
481
482        if (context.isInTransaction()) {
483            context.getTransaction().addSynchronization(new Synchronization() {
484                @Override
485                public void afterCommit() throws Exception {
486                    // It could take while before we receive the commit
487                    // operation.. by that time the message could have
488                    // expired..
489                    if (broker.isExpired(message)) {
490                        getDestinationStatistics().getExpired().increment();
491                        broker.messageExpired(context, message, null);
492                        message.decrementReferenceCount();
493                        return;
494                    }
495                    try {
496                        dispatch(context, message);
497                    } finally {
498                        message.decrementReferenceCount();
499                    }
500                }
501
502                @Override
503                public void afterRollback() throws Exception {
504                    message.decrementReferenceCount();
505                }
506            });
507
508        } else {
509            try {
510                dispatch(context, message);
511            } finally {
512                message.decrementReferenceCount();
513            }
514        }
515
516        if (result != null && !result.isCancelled()) {
517            try {
518                result.get();
519            } catch (CancellationException e) {
520                // ignore - the task has been cancelled if the message
521                // has already been deleted
522            }
523        }
524    }
525
526    private boolean canOptimizeOutPersistence() {
527        return durableSubscribers.size() == 0;
528    }
529
530    @Override
531    public String toString() {
532        return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
533    }
534
535    @Override
536    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
537            final MessageReference node) throws IOException {
538        if (topicStore != null && node.isPersistent()) {
539            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
540            SubscriptionKey key = dsub.getSubscriptionKey();
541            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
542                    convertToNonRangedAck(ack, node));
543        }
544        messageConsumed(context, node);
545    }
546
547    @Override
548    public void gc() {
549    }
550
551    public Message loadMessage(MessageId messageId) throws IOException {
552        return topicStore != null ? topicStore.getMessage(messageId) : null;
553    }
554
555    @Override
556    public void start() throws Exception {
557        this.subscriptionRecoveryPolicy.start();
558        if (memoryUsage != null) {
559            memoryUsage.start();
560        }
561
562        if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) {
563            scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
564        }
565    }
566
567    @Override
568    public void stop() throws Exception {
569        if (taskRunner != null) {
570            taskRunner.shutdown();
571        }
572        this.subscriptionRecoveryPolicy.stop();
573        if (memoryUsage != null) {
574            memoryUsage.stop();
575        }
576        if (this.topicStore != null) {
577            this.topicStore.stop();
578        }
579
580         scheduler.cancel(expireMessagesTask);
581    }
582
583    @Override
584    public Message[] browse() {
585        final List<Message> result = new ArrayList<Message>();
586        doBrowse(result, getMaxBrowsePageSize());
587        return result.toArray(new Message[result.size()]);
588    }
589
590    private void doBrowse(final List<Message> browseList, final int max) {
591        try {
592            if (topicStore != null) {
593                final List<Message> toExpire = new ArrayList<Message>();
594                topicStore.recover(new MessageRecoveryListener() {
595                    @Override
596                    public boolean recoverMessage(Message message) throws Exception {
597                        if (message.isExpired()) {
598                            toExpire.add(message);
599                        }
600                        browseList.add(message);
601                        return true;
602                    }
603
604                    @Override
605                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
606                        return true;
607                    }
608
609                    @Override
610                    public boolean hasSpace() {
611                        return browseList.size() < max;
612                    }
613
614                    @Override
615                    public boolean isDuplicate(MessageId id) {
616                        return false;
617                    }
618                });
619                final ConnectionContext connectionContext = createConnectionContext();
620                for (Message message : toExpire) {
621                    for (DurableTopicSubscription sub : durableSubscribers.values()) {
622                        if (!sub.isActive()) {
623                            messageExpired(connectionContext, sub, message);
624                        }
625                    }
626                }
627                Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
628                if (msgs != null) {
629                    for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
630                        browseList.add(msgs[i]);
631                    }
632                }
633            }
634        } catch (Throwable e) {
635            LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e);
636        }
637    }
638
639    @Override
640    public boolean iterate() {
641        synchronized (messagesWaitingForSpace) {
642            while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
643                Runnable op = messagesWaitingForSpace.removeFirst();
644                op.run();
645            }
646
647            if (!messagesWaitingForSpace.isEmpty()) {
648                registerCallbackForNotFullNotification();
649            }
650        }
651        return false;
652    }
653
654    private void registerCallbackForNotFullNotification() {
655        // If the usage manager is not full, then the task will not
656        // get called..
657        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
658            // so call it directly here.
659            sendMessagesWaitingForSpaceTask.run();
660        }
661    }
662
663    // Properties
664    // -------------------------------------------------------------------------
665
666    public DispatchPolicy getDispatchPolicy() {
667        return dispatchPolicy;
668    }
669
670    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
671        this.dispatchPolicy = dispatchPolicy;
672    }
673
674    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
675        return subscriptionRecoveryPolicy;
676    }
677
678    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
679        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
680            // allow users to combine retained message policy with other ActiveMQ policies
681            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
682            policy.setWrapped(recoveryPolicy);
683        } else {
684            this.subscriptionRecoveryPolicy = recoveryPolicy;
685        }
686    }
687
688    // Implementation methods
689    // -------------------------------------------------------------------------
690
691    @Override
692    public final void wakeup() {
693    }
694
695    protected void dispatch(final ConnectionContext context, Message message) throws Exception {
696        // AMQ-2586: Better to leave this stat at zero than to give the user
697        // misleading metrics.
698        // destinationStatistics.getMessages().increment();
699        destinationStatistics.getEnqueues().increment();
700        destinationStatistics.getMessageSize().addSize(message.getSize());
701        MessageEvaluationContext msgContext = null;
702
703        dispatchLock.readLock().lock();
704        try {
705            if (!subscriptionRecoveryPolicy.add(context, message)) {
706                return;
707            }
708            synchronized (consumers) {
709                if (consumers.isEmpty()) {
710                    onMessageWithNoConsumers(context, message);
711                    return;
712                }
713            }
714            msgContext = context.getMessageEvaluationContext();
715            msgContext.setDestination(destination);
716            msgContext.setMessageReference(message);
717            if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
718                onMessageWithNoConsumers(context, message);
719            }
720
721        } finally {
722            dispatchLock.readLock().unlock();
723            if (msgContext != null) {
724                msgContext.clear();
725            }
726        }
727    }
728
729    private final Runnable expireMessagesTask = new Runnable() {
730        @Override
731        public void run() {
732            List<Message> browsedMessages = new InsertionCountList<Message>();
733            doBrowse(browsedMessages, getMaxExpirePageSize());
734        }
735    };
736
737    @Override
738    public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
739        broker.messageExpired(context, reference, subs);
740        // AMQ-2586: Better to leave this stat at zero than to give the user
741        // misleading metrics.
742        // destinationStatistics.getMessages().decrement();
743        destinationStatistics.getExpired().increment();
744        MessageAck ack = new MessageAck();
745        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
746        ack.setDestination(destination);
747        ack.setMessageID(reference.getMessageId());
748        try {
749            if (subs instanceof DurableTopicSubscription) {
750                ((DurableTopicSubscription)subs).removePending(reference);
751            }
752            acknowledge(context, subs, ack, reference);
753        } catch (Exception e) {
754            LOG.error("Failed to remove expired Message from the store ", e);
755        }
756    }
757
758    @Override
759    protected Logger getLog() {
760        return LOG;
761    }
762
763    protected boolean isOptimizeStorage(){
764        boolean result = false;
765
766        if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
767                result = true;
768                for (DurableTopicSubscription s : durableSubscribers.values()) {
769                    if (s.isActive()== false){
770                        result = false;
771                        break;
772                    }
773                    if (s.getPrefetchSize()==0){
774                        result = false;
775                        break;
776                    }
777                    if (s.isSlowConsumer()){
778                        result = false;
779                        break;
780                    }
781                    if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
782                        result = false;
783                        break;
784                    }
785                }
786        }
787        return result;
788    }
789
790    /**
791     * force a reread of the store - after transaction recovery completion
792     */
793    @Override
794    public void clearPendingMessages() {
795        dispatchLock.readLock().lock();
796        try {
797            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
798                clearPendingAndDispatch(durableTopicSubscription);
799            }
800        } finally {
801            dispatchLock.readLock().unlock();
802        }
803    }
804
805    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
806        synchronized (durableTopicSubscription.pendingLock) {
807            durableTopicSubscription.pending.clear();
808            try {
809                durableTopicSubscription.dispatchPending();
810            } catch (IOException exception) {
811                LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{
812                        durableTopicSubscription,
813                        destination,
814                        durableTopicSubscription.pending }, exception);
815            }
816        }
817    }
818
819    private void rollback(MessageId poisoned) {
820        dispatchLock.readLock().lock();
821        try {
822            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
823                durableTopicSubscription.getPending().rollback(poisoned);
824            }
825        } finally {
826            dispatchLock.readLock().unlock();
827        }
828    }
829
830    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
831        return durableSubscribers;
832    }
833}