001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.List;
021
022import javax.jms.ResourceAllocationException;
023
024import org.apache.activemq.advisory.AdvisorySupport;
025import org.apache.activemq.broker.Broker;
026import org.apache.activemq.broker.BrokerService;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.ProducerBrokerExchange;
029import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
030import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ActiveMQTopic;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.command.MessageAck;
035import org.apache.activemq.command.MessageDispatchNotification;
036import org.apache.activemq.command.ProducerInfo;
037import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
038import org.apache.activemq.security.SecurityContext;
039import org.apache.activemq.state.ProducerState;
040import org.apache.activemq.store.MessageStore;
041import org.apache.activemq.thread.Scheduler;
042import org.apache.activemq.usage.MemoryUsage;
043import org.apache.activemq.usage.SystemUsage;
044import org.apache.activemq.usage.Usage;
045import org.slf4j.Logger;
046
047/**
048 *
049 */
050public abstract class BaseDestination implements Destination {
051    /**
052     * The maximum number of messages to page in to the destination from
053     * persistent storage
054     */
055    public static final int MAX_PAGE_SIZE = 200;
056    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
057    public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
058    public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
059    public static final int MAX_PRODUCERS_TO_AUDIT = 64;
060    public static final int MAX_AUDIT_DEPTH = 10000;
061
062    protected final ActiveMQDestination destination;
063    protected final Broker broker;
064    protected final MessageStore store;
065    protected SystemUsage systemUsage;
066    protected MemoryUsage memoryUsage;
067    private boolean producerFlowControl = true;
068    private boolean alwaysRetroactive = false;
069    protected boolean warnOnProducerFlowControl = true;
070    protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
071
072    private int maxProducersToAudit = 1024;
073    private int maxAuditDepth = 2048;
074    private boolean enableAudit = true;
075    private int maxPageSize = MAX_PAGE_SIZE;
076    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
077    private boolean useCache = true;
078    private int minimumMessageSize = 1024;
079    private boolean lazyDispatch = false;
080    private boolean advisoryForSlowConsumers;
081    private boolean advisoryForFastProducers;
082    private boolean advisoryForDiscardingMessages;
083    private boolean advisoryWhenFull;
084    private boolean advisoryForDelivery;
085    private boolean advisoryForConsumed;
086    private boolean sendAdvisoryIfNoConsumers;
087    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
088    protected final BrokerService brokerService;
089    protected final Broker regionBroker;
090    protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
091    protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
092    private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
093    protected int cursorMemoryHighWaterMark = 70;
094    protected int storeUsageHighWaterMark = 100;
095    private SlowConsumerStrategy slowConsumerStrategy;
096    private boolean prioritizedMessages;
097    private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
098    private boolean gcIfInactive;
099    private boolean gcWithNetworkConsumers;
100    private long lastActiveTime=0l;
101    private boolean reduceMemoryFootprint = false;
102    protected final Scheduler scheduler;
103    private boolean disposed = false;
104    private boolean doOptimzeMessageStorage = true;
105    /*
106     * percentage of in-flight messages above which optimize message store is disabled
107     */
108    private int optimizeMessageStoreInFlightLimit = 10;
109    private boolean persistJMSRedelivered;
110
111    /**
112     * @param brokerService
113     * @param store
114     * @param destination
115     * @param parentStats
116     * @throws Exception
117     */
118    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
119        this.brokerService = brokerService;
120        this.broker = brokerService.getBroker();
121        this.store = store;
122        this.destination = destination;
123        // let's copy the enabled property from the parent DestinationStatistics
124        this.destinationStatistics.setEnabled(parentStats.isEnabled());
125        this.destinationStatistics.setParent(parentStats);
126        this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString());
127        this.memoryUsage = this.systemUsage.getMemoryUsage();
128        this.memoryUsage.setUsagePortion(1.0f);
129        this.regionBroker = brokerService.getRegionBroker();
130        this.scheduler = brokerService.getBroker().getScheduler();
131    }
132
133    /**
134     * initialize the destination
135     *
136     * @throws Exception
137     */
138    public void initialize() throws Exception {
139        // Let the store know what usage manager we are using so that he can
140        // flush messages to disk when usage gets high.
141        if (store != null) {
142            store.setMemoryUsage(this.memoryUsage);
143        }
144    }
145
146    /**
147     * @return the producerFlowControl
148     */
149    @Override
150    public boolean isProducerFlowControl() {
151        return producerFlowControl;
152    }
153
154    /**
155     * @param producerFlowControl the producerFlowControl to set
156     */
157    @Override
158    public void setProducerFlowControl(boolean producerFlowControl) {
159        this.producerFlowControl = producerFlowControl;
160    }
161
162    @Override
163    public boolean isAlwaysRetroactive() {
164        return alwaysRetroactive;
165    }
166
167    @Override
168    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
169        this.alwaysRetroactive = alwaysRetroactive;
170    }
171
172    /**
173     * Set's the interval at which warnings about producers being blocked by
174     * resource usage will be triggered. Values of 0 or less will disable
175     * warnings
176     *
177     * @param blockedProducerWarningInterval the interval at which warning about
178     *            blocked producers will be triggered.
179     */
180    @Override
181    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
182        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
183    }
184
185    /**
186     *
187     * @return the interval at which warning about blocked producers will be
188     *         triggered.
189     */
190    @Override
191    public long getBlockedProducerWarningInterval() {
192        return blockedProducerWarningInterval;
193    }
194
195    /**
196     * @return the maxProducersToAudit
197     */
198    @Override
199    public int getMaxProducersToAudit() {
200        return maxProducersToAudit;
201    }
202
203    /**
204     * @param maxProducersToAudit the maxProducersToAudit to set
205     */
206    @Override
207    public void setMaxProducersToAudit(int maxProducersToAudit) {
208        this.maxProducersToAudit = maxProducersToAudit;
209    }
210
211    /**
212     * @return the maxAuditDepth
213     */
214    @Override
215    public int getMaxAuditDepth() {
216        return maxAuditDepth;
217    }
218
219    /**
220     * @param maxAuditDepth the maxAuditDepth to set
221     */
222    @Override
223    public void setMaxAuditDepth(int maxAuditDepth) {
224        this.maxAuditDepth = maxAuditDepth;
225    }
226
227    /**
228     * @return the enableAudit
229     */
230    @Override
231    public boolean isEnableAudit() {
232        return enableAudit;
233    }
234
235    /**
236     * @param enableAudit the enableAudit to set
237     */
238    @Override
239    public void setEnableAudit(boolean enableAudit) {
240        this.enableAudit = enableAudit;
241    }
242
243    @Override
244    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
245        destinationStatistics.getProducers().increment();
246        this.lastActiveTime=0l;
247    }
248
249    @Override
250    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
251        destinationStatistics.getProducers().decrement();
252    }
253
254    @Override
255    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
256        destinationStatistics.getConsumers().increment();
257        this.lastActiveTime=0l;
258    }
259
260    @Override
261    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
262        destinationStatistics.getConsumers().decrement();
263    }
264
265
266    @Override
267    public final MemoryUsage getMemoryUsage() {
268        return memoryUsage;
269    }
270
271    @Override
272    public void setMemoryUsage(MemoryUsage memoryUsage) {
273        this.memoryUsage = memoryUsage;
274    }
275
276    @Override
277    public DestinationStatistics getDestinationStatistics() {
278        return destinationStatistics;
279    }
280
281    @Override
282    public ActiveMQDestination getActiveMQDestination() {
283        return destination;
284    }
285
286    @Override
287    public final String getName() {
288        return getActiveMQDestination().getPhysicalName();
289    }
290
291    @Override
292    public final MessageStore getMessageStore() {
293        return store;
294    }
295
296    @Override
297    public boolean isActive() {
298        boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
299                           destinationStatistics.getProducers().getCount() != 0;
300        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
301            isActive = hasRegularConsumers(getConsumers());
302        }
303        return isActive;
304    }
305
306    @Override
307    public int getMaxPageSize() {
308        return maxPageSize;
309    }
310
311    @Override
312    public void setMaxPageSize(int maxPageSize) {
313        this.maxPageSize = maxPageSize;
314    }
315
316    @Override
317    public int getMaxBrowsePageSize() {
318        return this.maxBrowsePageSize;
319    }
320
321    @Override
322    public void setMaxBrowsePageSize(int maxPageSize) {
323        this.maxBrowsePageSize = maxPageSize;
324    }
325
326    public int getMaxExpirePageSize() {
327        return this.maxExpirePageSize;
328    }
329
330    public void setMaxExpirePageSize(int maxPageSize) {
331        this.maxExpirePageSize = maxPageSize;
332    }
333
334    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
335        this.expireMessagesPeriod = expireMessagesPeriod;
336    }
337
338    public long getExpireMessagesPeriod() {
339        return expireMessagesPeriod;
340    }
341
342    @Override
343    public boolean isUseCache() {
344        return useCache;
345    }
346
347    @Override
348    public void setUseCache(boolean useCache) {
349        this.useCache = useCache;
350    }
351
352    @Override
353    public int getMinimumMessageSize() {
354        return minimumMessageSize;
355    }
356
357    @Override
358    public void setMinimumMessageSize(int minimumMessageSize) {
359        this.minimumMessageSize = minimumMessageSize;
360    }
361
362    @Override
363    public boolean isLazyDispatch() {
364        return lazyDispatch;
365    }
366
367    @Override
368    public void setLazyDispatch(boolean lazyDispatch) {
369        this.lazyDispatch = lazyDispatch;
370    }
371
372    protected long getDestinationSequenceId() {
373        return regionBroker.getBrokerSequenceId();
374    }
375
376    /**
377     * @return the advisoryForSlowConsumers
378     */
379    public boolean isAdvisoryForSlowConsumers() {
380        return advisoryForSlowConsumers;
381    }
382
383    /**
384     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
385     */
386    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
387        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
388    }
389
390    /**
391     * @return the advisoryForDiscardingMessages
392     */
393    public boolean isAdvisoryForDiscardingMessages() {
394        return advisoryForDiscardingMessages;
395    }
396
397    /**
398     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to
399     *            set
400     */
401    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
402        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
403    }
404
405    /**
406     * @return the advisoryWhenFull
407     */
408    public boolean isAdvisoryWhenFull() {
409        return advisoryWhenFull;
410    }
411
412    /**
413     * @param advisoryWhenFull the advisoryWhenFull to set
414     */
415    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
416        this.advisoryWhenFull = advisoryWhenFull;
417    }
418
419    /**
420     * @return the advisoryForDelivery
421     */
422    public boolean isAdvisoryForDelivery() {
423        return advisoryForDelivery;
424    }
425
426    /**
427     * @param advisoryForDelivery the advisoryForDelivery to set
428     */
429    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
430        this.advisoryForDelivery = advisoryForDelivery;
431    }
432
433    /**
434     * @return the advisoryForConsumed
435     */
436    public boolean isAdvisoryForConsumed() {
437        return advisoryForConsumed;
438    }
439
440    /**
441     * @param advisoryForConsumed the advisoryForConsumed to set
442     */
443    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
444        this.advisoryForConsumed = advisoryForConsumed;
445    }
446
447    /**
448     * @return the advisdoryForFastProducers
449     */
450    public boolean isAdvisoryForFastProducers() {
451        return advisoryForFastProducers;
452    }
453
454    /**
455     * @param advisoryForFastProducers the advisdoryForFastProducers to set
456     */
457    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
458        this.advisoryForFastProducers = advisoryForFastProducers;
459    }
460
461    public boolean isSendAdvisoryIfNoConsumers() {
462        return sendAdvisoryIfNoConsumers;
463    }
464
465    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
466        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
467    }
468
469    /**
470     * @return the dead letter strategy
471     */
472    @Override
473    public DeadLetterStrategy getDeadLetterStrategy() {
474        return deadLetterStrategy;
475    }
476
477    /**
478     * set the dead letter strategy
479     *
480     * @param deadLetterStrategy
481     */
482    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
483        this.deadLetterStrategy = deadLetterStrategy;
484    }
485
486    @Override
487    public int getCursorMemoryHighWaterMark() {
488        return this.cursorMemoryHighWaterMark;
489    }
490
491    @Override
492    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
493        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
494    }
495
496    /**
497     * called when message is consumed
498     *
499     * @param context
500     * @param messageReference
501     */
502    @Override
503    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
504        if (advisoryForConsumed) {
505            broker.messageConsumed(context, messageReference);
506        }
507    }
508
509    /**
510     * Called when message is delivered to the broker
511     *
512     * @param context
513     * @param messageReference
514     */
515    @Override
516    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
517        if (advisoryForDelivery) {
518            broker.messageDelivered(context, messageReference);
519        }
520    }
521
522    /**
523     * Called when a message is discarded - e.g. running low on memory This will
524     * happen only if the policy is enabled - e.g. non durable topics
525     *
526     * @param context
527     * @param messageReference
528     */
529    @Override
530    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
531        if (advisoryForDiscardingMessages) {
532            broker.messageDiscarded(context, sub, messageReference);
533        }
534    }
535
536    /**
537     * Called when there is a slow consumer
538     *
539     * @param context
540     * @param subs
541     */
542    @Override
543    public void slowConsumer(ConnectionContext context, Subscription subs) {
544        if (advisoryForSlowConsumers) {
545            broker.slowConsumer(context, this, subs);
546        }
547        if (slowConsumerStrategy != null) {
548            slowConsumerStrategy.slowConsumer(context, subs);
549        }
550    }
551
552    /**
553     * Called to notify a producer is too fast
554     *
555     * @param context
556     * @param producerInfo
557     */
558    @Override
559    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
560        if (advisoryForFastProducers) {
561            broker.fastProducer(context, producerInfo, getActiveMQDestination());
562        }
563    }
564
565    /**
566     * Called when a Usage reaches a limit
567     *
568     * @param context
569     * @param usage
570     */
571    @Override
572    public void isFull(ConnectionContext context, Usage<?> usage) {
573        if (advisoryWhenFull) {
574            broker.isFull(context, this, usage);
575        }
576    }
577
578    @Override
579    public void dispose(ConnectionContext context) throws IOException {
580        if (this.store != null) {
581            this.store.removeAllMessages(context);
582            this.store.dispose(context);
583        }
584        this.destinationStatistics.setParent(null);
585        this.memoryUsage.stop();
586        this.disposed = true;
587    }
588
589    @Override
590    public boolean isDisposed() {
591        return this.disposed;
592    }
593
594    /**
595     * Provides a hook to allow messages with no consumer to be processed in
596     * some way - such as to send to a dead letter queue or something..
597     */
598    protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
599        if (!msg.isPersistent()) {
600            if (isSendAdvisoryIfNoConsumers()) {
601                // allow messages with no consumers to be dispatched to a dead
602                // letter queue
603                if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
604
605                    Message message = msg.copy();
606                    // The original destination and transaction id do not get
607                    // filled when the message is first sent,
608                    // it is only populated if the message is routed to another
609                    // destination like the DLQ
610                    if (message.getOriginalDestination() != null) {
611                        message.setOriginalDestination(message.getDestination());
612                    }
613                    if (message.getOriginalTransactionId() != null) {
614                        message.setOriginalTransactionId(message.getTransactionId());
615                    }
616
617                    ActiveMQTopic advisoryTopic;
618                    if (destination.isQueue()) {
619                        advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination);
620                    } else {
621                        advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
622                    }
623                    message.setDestination(advisoryTopic);
624                    message.setTransactionId(null);
625
626                    // Disable flow control for this since since we don't want
627                    // to block.
628                    boolean originalFlowControl = context.isProducerFlowControl();
629                    try {
630                        context.setProducerFlowControl(false);
631                        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
632                        producerExchange.setMutable(false);
633                        producerExchange.setConnectionContext(context);
634                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
635                        context.getBroker().send(producerExchange, message);
636                    } finally {
637                        context.setProducerFlowControl(originalFlowControl);
638                    }
639
640                }
641            }
642        }
643    }
644
645    @Override
646    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
647    }
648
649    public final int getStoreUsageHighWaterMark() {
650        return this.storeUsageHighWaterMark;
651    }
652
653    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
654        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
655    }
656
657    protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
658        waitForSpace(context, producerBrokerExchange, usage, 100, warning);
659    }
660
661    protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
662        if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
663            getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning);
664            throw new ResourceAllocationException(warning);
665        }
666        if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
667            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
668                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning);
669                throw new ResourceAllocationException(warning);
670            }
671        } else {
672            long start = System.currentTimeMillis();
673            long nextWarn = start;
674            producerBrokerExchange.blockingOnFlowControl(true);
675            destinationStatistics.getBlockedSends().increment();
676            while (!usage.waitForSpace(1000, highWaterMark)) {
677                if (context.getStopping().get()) {
678                    throw new IOException("Connection closed, send aborted.");
679                }
680
681                long now = System.currentTimeMillis();
682                if (now >= nextWarn) {
683                    getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))});
684                    nextWarn = now + blockedProducerWarningInterval;
685                }
686            }
687            long finish = System.currentTimeMillis();
688            long totalTimeBlocked = finish - start;
689            destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
690            producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
691            producerBrokerExchange.blockingOnFlowControl(false);
692        }
693    }
694
695    protected abstract Logger getLog();
696
697    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
698        this.slowConsumerStrategy = slowConsumerStrategy;
699    }
700
701    @Override
702    public SlowConsumerStrategy getSlowConsumerStrategy() {
703        return this.slowConsumerStrategy;
704    }
705
706
707    @Override
708    public boolean isPrioritizedMessages() {
709        return this.prioritizedMessages;
710    }
711
712    public void setPrioritizedMessages(boolean prioritizedMessages) {
713        this.prioritizedMessages = prioritizedMessages;
714        if (store != null) {
715            store.setPrioritizedMessages(prioritizedMessages);
716        }
717    }
718
719    /**
720     * @return the inactiveTimeoutBeforeGC
721     */
722    @Override
723    public long getInactiveTimeoutBeforeGC() {
724        return this.inactiveTimeoutBeforeGC;
725    }
726
727    /**
728     * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set
729     */
730    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
731        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
732    }
733
734    /**
735     * @return the gcIfInactive
736     */
737    public boolean isGcIfInactive() {
738        return this.gcIfInactive;
739    }
740
741    /**
742     * @param gcIfInactive the gcIfInactive to set
743     */
744    public void setGcIfInactive(boolean gcIfInactive) {
745        this.gcIfInactive = gcIfInactive;
746    }
747
748    /**
749     * Indicate if it is ok to gc destinations that have only network consumers
750     * @param gcWithNetworkConsumers
751     */
752    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
753        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
754    }
755
756    public boolean isGcWithNetworkConsumers() {
757        return gcWithNetworkConsumers;
758    }
759
760    @Override
761    public void markForGC(long timeStamp) {
762        if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
763                && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
764            this.lastActiveTime = timeStamp;
765        }
766    }
767
768    @Override
769    public boolean canGC() {
770        boolean result = false;
771        if (isGcIfInactive()&& this.lastActiveTime != 0l) {
772            if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) {
773                result = true;
774            }
775        }
776        return result;
777    }
778
779    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
780        this.reduceMemoryFootprint = reduceMemoryFootprint;
781    }
782
783    protected boolean isReduceMemoryFootprint() {
784        return this.reduceMemoryFootprint;
785    }
786
787    @Override
788    public boolean isDoOptimzeMessageStorage() {
789        return doOptimzeMessageStorage;
790    }
791
792    @Override
793    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
794        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
795    }
796
797    public int getOptimizeMessageStoreInFlightLimit() {
798        return optimizeMessageStoreInFlightLimit;
799    }
800
801    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
802        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
803    }
804
805
806    @Override
807    public abstract List<Subscription> getConsumers();
808
809    protected boolean hasRegularConsumers(List<Subscription> consumers) {
810        boolean hasRegularConsumers = false;
811        for (Subscription subscription: consumers) {
812            if (!subscription.getConsumerInfo().isNetworkSubscription()) {
813                hasRegularConsumers = true;
814                break;
815            }
816        }
817        return hasRegularConsumers;
818    }
819
820    public ConnectionContext createConnectionContext() {
821        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
822        answer.setBroker(this.broker);
823        answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
824        answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
825        return answer;
826    }
827
828    protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) {
829        // the original ack may be a ranged ack, but we are trying to delete
830        // a specific
831        // message store here so we need to convert to a non ranged ack.
832        if (ack.getMessageCount() > 0) {
833            // Dup the ack
834            MessageAck a = new MessageAck();
835            ack.copy(a);
836            ack = a;
837            // Convert to non-ranged.
838            ack.setMessageCount(1);
839        }
840        // always use node messageId so we can access entry/data Location
841        ack.setFirstMessageId(node.getMessageId());
842        ack.setLastMessageId(node.getMessageId());
843        return ack;
844    }
845
846    protected boolean isDLQ() {
847        return destination.isDLQ();
848    }
849
850    @Override
851    public void duplicateFromStore(Message message, Subscription durableSub) {
852        ConnectionContext connectionContext = createConnectionContext();
853        getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
854        Throwable cause = new Throwable("duplicate from store for " + destination);
855        message.setRegionDestination(this);
856        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
857        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
858        messageAck.setPoisonCause(cause);
859        try {
860            acknowledge(connectionContext, durableSub, messageAck, message);
861        } catch (IOException e) {
862            getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
863        }
864    }
865
866    public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
867        this.persistJMSRedelivered = persistJMSRedelivered;
868    }
869
870    public boolean isPersistJMSRedelivered() {
871        return persistJMSRedelivered;
872    }
873}