001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021 022import javax.jms.ResourceAllocationException; 023 024import org.apache.activemq.advisory.AdvisorySupport; 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.BrokerService; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.ProducerBrokerExchange; 029import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 030import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ActiveMQTopic; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.command.MessageAck; 035import org.apache.activemq.command.MessageDispatchNotification; 036import org.apache.activemq.command.ProducerInfo; 037import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 038import org.apache.activemq.security.SecurityContext; 039import org.apache.activemq.state.ProducerState; 040import org.apache.activemq.store.MessageStore; 041import org.apache.activemq.thread.Scheduler; 042import org.apache.activemq.usage.MemoryUsage; 043import org.apache.activemq.usage.SystemUsage; 044import org.apache.activemq.usage.Usage; 045import org.slf4j.Logger; 046 047/** 048 * 049 */ 050public abstract class BaseDestination implements Destination { 051 /** 052 * The maximum number of messages to page in to the destination from 053 * persistent storage 054 */ 055 public static final int MAX_PAGE_SIZE = 200; 056 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 057 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 058 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 059 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 060 public static final int MAX_AUDIT_DEPTH = 10000; 061 062 protected final ActiveMQDestination destination; 063 protected final Broker broker; 064 protected final MessageStore store; 065 protected SystemUsage systemUsage; 066 protected MemoryUsage memoryUsage; 067 private boolean producerFlowControl = true; 068 private boolean alwaysRetroactive = false; 069 protected boolean warnOnProducerFlowControl = true; 070 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 071 072 private int maxProducersToAudit = 1024; 073 private int maxAuditDepth = 2048; 074 private boolean enableAudit = true; 075 private int maxPageSize = MAX_PAGE_SIZE; 076 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 077 private boolean useCache = true; 078 private int minimumMessageSize = 1024; 079 private boolean lazyDispatch = false; 080 private boolean advisoryForSlowConsumers; 081 private boolean advisoryForFastProducers; 082 private boolean advisoryForDiscardingMessages; 083 private boolean advisoryWhenFull; 084 private boolean advisoryForDelivery; 085 private boolean advisoryForConsumed; 086 private boolean sendAdvisoryIfNoConsumers; 087 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 088 protected final BrokerService brokerService; 089 protected final Broker regionBroker; 090 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 091 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 092 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 093 protected int cursorMemoryHighWaterMark = 70; 094 protected int storeUsageHighWaterMark = 100; 095 private SlowConsumerStrategy slowConsumerStrategy; 096 private boolean prioritizedMessages; 097 private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 098 private boolean gcIfInactive; 099 private boolean gcWithNetworkConsumers; 100 private long lastActiveTime=0l; 101 private boolean reduceMemoryFootprint = false; 102 protected final Scheduler scheduler; 103 private boolean disposed = false; 104 private boolean doOptimzeMessageStorage = true; 105 /* 106 * percentage of in-flight messages above which optimize message store is disabled 107 */ 108 private int optimizeMessageStoreInFlightLimit = 10; 109 private boolean persistJMSRedelivered; 110 111 /** 112 * @param brokerService 113 * @param store 114 * @param destination 115 * @param parentStats 116 * @throws Exception 117 */ 118 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 119 this.brokerService = brokerService; 120 this.broker = brokerService.getBroker(); 121 this.store = store; 122 this.destination = destination; 123 // let's copy the enabled property from the parent DestinationStatistics 124 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 125 this.destinationStatistics.setParent(parentStats); 126 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 127 this.memoryUsage = this.systemUsage.getMemoryUsage(); 128 this.memoryUsage.setUsagePortion(1.0f); 129 this.regionBroker = brokerService.getRegionBroker(); 130 this.scheduler = brokerService.getBroker().getScheduler(); 131 } 132 133 /** 134 * initialize the destination 135 * 136 * @throws Exception 137 */ 138 public void initialize() throws Exception { 139 // Let the store know what usage manager we are using so that he can 140 // flush messages to disk when usage gets high. 141 if (store != null) { 142 store.setMemoryUsage(this.memoryUsage); 143 } 144 } 145 146 /** 147 * @return the producerFlowControl 148 */ 149 @Override 150 public boolean isProducerFlowControl() { 151 return producerFlowControl; 152 } 153 154 /** 155 * @param producerFlowControl the producerFlowControl to set 156 */ 157 @Override 158 public void setProducerFlowControl(boolean producerFlowControl) { 159 this.producerFlowControl = producerFlowControl; 160 } 161 162 @Override 163 public boolean isAlwaysRetroactive() { 164 return alwaysRetroactive; 165 } 166 167 @Override 168 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 169 this.alwaysRetroactive = alwaysRetroactive; 170 } 171 172 /** 173 * Set's the interval at which warnings about producers being blocked by 174 * resource usage will be triggered. Values of 0 or less will disable 175 * warnings 176 * 177 * @param blockedProducerWarningInterval the interval at which warning about 178 * blocked producers will be triggered. 179 */ 180 @Override 181 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 182 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 183 } 184 185 /** 186 * 187 * @return the interval at which warning about blocked producers will be 188 * triggered. 189 */ 190 @Override 191 public long getBlockedProducerWarningInterval() { 192 return blockedProducerWarningInterval; 193 } 194 195 /** 196 * @return the maxProducersToAudit 197 */ 198 @Override 199 public int getMaxProducersToAudit() { 200 return maxProducersToAudit; 201 } 202 203 /** 204 * @param maxProducersToAudit the maxProducersToAudit to set 205 */ 206 @Override 207 public void setMaxProducersToAudit(int maxProducersToAudit) { 208 this.maxProducersToAudit = maxProducersToAudit; 209 } 210 211 /** 212 * @return the maxAuditDepth 213 */ 214 @Override 215 public int getMaxAuditDepth() { 216 return maxAuditDepth; 217 } 218 219 /** 220 * @param maxAuditDepth the maxAuditDepth to set 221 */ 222 @Override 223 public void setMaxAuditDepth(int maxAuditDepth) { 224 this.maxAuditDepth = maxAuditDepth; 225 } 226 227 /** 228 * @return the enableAudit 229 */ 230 @Override 231 public boolean isEnableAudit() { 232 return enableAudit; 233 } 234 235 /** 236 * @param enableAudit the enableAudit to set 237 */ 238 @Override 239 public void setEnableAudit(boolean enableAudit) { 240 this.enableAudit = enableAudit; 241 } 242 243 @Override 244 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 245 destinationStatistics.getProducers().increment(); 246 this.lastActiveTime=0l; 247 } 248 249 @Override 250 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 251 destinationStatistics.getProducers().decrement(); 252 } 253 254 @Override 255 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 256 destinationStatistics.getConsumers().increment(); 257 this.lastActiveTime=0l; 258 } 259 260 @Override 261 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 262 destinationStatistics.getConsumers().decrement(); 263 } 264 265 266 @Override 267 public final MemoryUsage getMemoryUsage() { 268 return memoryUsage; 269 } 270 271 @Override 272 public void setMemoryUsage(MemoryUsage memoryUsage) { 273 this.memoryUsage = memoryUsage; 274 } 275 276 @Override 277 public DestinationStatistics getDestinationStatistics() { 278 return destinationStatistics; 279 } 280 281 @Override 282 public ActiveMQDestination getActiveMQDestination() { 283 return destination; 284 } 285 286 @Override 287 public final String getName() { 288 return getActiveMQDestination().getPhysicalName(); 289 } 290 291 @Override 292 public final MessageStore getMessageStore() { 293 return store; 294 } 295 296 @Override 297 public boolean isActive() { 298 boolean isActive = destinationStatistics.getConsumers().getCount() != 0 || 299 destinationStatistics.getProducers().getCount() != 0; 300 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) { 301 isActive = hasRegularConsumers(getConsumers()); 302 } 303 return isActive; 304 } 305 306 @Override 307 public int getMaxPageSize() { 308 return maxPageSize; 309 } 310 311 @Override 312 public void setMaxPageSize(int maxPageSize) { 313 this.maxPageSize = maxPageSize; 314 } 315 316 @Override 317 public int getMaxBrowsePageSize() { 318 return this.maxBrowsePageSize; 319 } 320 321 @Override 322 public void setMaxBrowsePageSize(int maxPageSize) { 323 this.maxBrowsePageSize = maxPageSize; 324 } 325 326 public int getMaxExpirePageSize() { 327 return this.maxExpirePageSize; 328 } 329 330 public void setMaxExpirePageSize(int maxPageSize) { 331 this.maxExpirePageSize = maxPageSize; 332 } 333 334 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 335 this.expireMessagesPeriod = expireMessagesPeriod; 336 } 337 338 public long getExpireMessagesPeriod() { 339 return expireMessagesPeriod; 340 } 341 342 @Override 343 public boolean isUseCache() { 344 return useCache; 345 } 346 347 @Override 348 public void setUseCache(boolean useCache) { 349 this.useCache = useCache; 350 } 351 352 @Override 353 public int getMinimumMessageSize() { 354 return minimumMessageSize; 355 } 356 357 @Override 358 public void setMinimumMessageSize(int minimumMessageSize) { 359 this.minimumMessageSize = minimumMessageSize; 360 } 361 362 @Override 363 public boolean isLazyDispatch() { 364 return lazyDispatch; 365 } 366 367 @Override 368 public void setLazyDispatch(boolean lazyDispatch) { 369 this.lazyDispatch = lazyDispatch; 370 } 371 372 protected long getDestinationSequenceId() { 373 return regionBroker.getBrokerSequenceId(); 374 } 375 376 /** 377 * @return the advisoryForSlowConsumers 378 */ 379 public boolean isAdvisoryForSlowConsumers() { 380 return advisoryForSlowConsumers; 381 } 382 383 /** 384 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 385 */ 386 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 387 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 388 } 389 390 /** 391 * @return the advisoryForDiscardingMessages 392 */ 393 public boolean isAdvisoryForDiscardingMessages() { 394 return advisoryForDiscardingMessages; 395 } 396 397 /** 398 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 399 * set 400 */ 401 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 402 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 403 } 404 405 /** 406 * @return the advisoryWhenFull 407 */ 408 public boolean isAdvisoryWhenFull() { 409 return advisoryWhenFull; 410 } 411 412 /** 413 * @param advisoryWhenFull the advisoryWhenFull to set 414 */ 415 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 416 this.advisoryWhenFull = advisoryWhenFull; 417 } 418 419 /** 420 * @return the advisoryForDelivery 421 */ 422 public boolean isAdvisoryForDelivery() { 423 return advisoryForDelivery; 424 } 425 426 /** 427 * @param advisoryForDelivery the advisoryForDelivery to set 428 */ 429 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 430 this.advisoryForDelivery = advisoryForDelivery; 431 } 432 433 /** 434 * @return the advisoryForConsumed 435 */ 436 public boolean isAdvisoryForConsumed() { 437 return advisoryForConsumed; 438 } 439 440 /** 441 * @param advisoryForConsumed the advisoryForConsumed to set 442 */ 443 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 444 this.advisoryForConsumed = advisoryForConsumed; 445 } 446 447 /** 448 * @return the advisdoryForFastProducers 449 */ 450 public boolean isAdvisoryForFastProducers() { 451 return advisoryForFastProducers; 452 } 453 454 /** 455 * @param advisoryForFastProducers the advisdoryForFastProducers to set 456 */ 457 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 458 this.advisoryForFastProducers = advisoryForFastProducers; 459 } 460 461 public boolean isSendAdvisoryIfNoConsumers() { 462 return sendAdvisoryIfNoConsumers; 463 } 464 465 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 466 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 467 } 468 469 /** 470 * @return the dead letter strategy 471 */ 472 @Override 473 public DeadLetterStrategy getDeadLetterStrategy() { 474 return deadLetterStrategy; 475 } 476 477 /** 478 * set the dead letter strategy 479 * 480 * @param deadLetterStrategy 481 */ 482 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 483 this.deadLetterStrategy = deadLetterStrategy; 484 } 485 486 @Override 487 public int getCursorMemoryHighWaterMark() { 488 return this.cursorMemoryHighWaterMark; 489 } 490 491 @Override 492 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 493 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 494 } 495 496 /** 497 * called when message is consumed 498 * 499 * @param context 500 * @param messageReference 501 */ 502 @Override 503 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 504 if (advisoryForConsumed) { 505 broker.messageConsumed(context, messageReference); 506 } 507 } 508 509 /** 510 * Called when message is delivered to the broker 511 * 512 * @param context 513 * @param messageReference 514 */ 515 @Override 516 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 517 if (advisoryForDelivery) { 518 broker.messageDelivered(context, messageReference); 519 } 520 } 521 522 /** 523 * Called when a message is discarded - e.g. running low on memory This will 524 * happen only if the policy is enabled - e.g. non durable topics 525 * 526 * @param context 527 * @param messageReference 528 */ 529 @Override 530 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 531 if (advisoryForDiscardingMessages) { 532 broker.messageDiscarded(context, sub, messageReference); 533 } 534 } 535 536 /** 537 * Called when there is a slow consumer 538 * 539 * @param context 540 * @param subs 541 */ 542 @Override 543 public void slowConsumer(ConnectionContext context, Subscription subs) { 544 if (advisoryForSlowConsumers) { 545 broker.slowConsumer(context, this, subs); 546 } 547 if (slowConsumerStrategy != null) { 548 slowConsumerStrategy.slowConsumer(context, subs); 549 } 550 } 551 552 /** 553 * Called to notify a producer is too fast 554 * 555 * @param context 556 * @param producerInfo 557 */ 558 @Override 559 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 560 if (advisoryForFastProducers) { 561 broker.fastProducer(context, producerInfo, getActiveMQDestination()); 562 } 563 } 564 565 /** 566 * Called when a Usage reaches a limit 567 * 568 * @param context 569 * @param usage 570 */ 571 @Override 572 public void isFull(ConnectionContext context, Usage<?> usage) { 573 if (advisoryWhenFull) { 574 broker.isFull(context, this, usage); 575 } 576 } 577 578 @Override 579 public void dispose(ConnectionContext context) throws IOException { 580 if (this.store != null) { 581 this.store.removeAllMessages(context); 582 this.store.dispose(context); 583 } 584 this.destinationStatistics.setParent(null); 585 this.memoryUsage.stop(); 586 this.disposed = true; 587 } 588 589 @Override 590 public boolean isDisposed() { 591 return this.disposed; 592 } 593 594 /** 595 * Provides a hook to allow messages with no consumer to be processed in 596 * some way - such as to send to a dead letter queue or something.. 597 */ 598 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 599 if (!msg.isPersistent()) { 600 if (isSendAdvisoryIfNoConsumers()) { 601 // allow messages with no consumers to be dispatched to a dead 602 // letter queue 603 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 604 605 Message message = msg.copy(); 606 // The original destination and transaction id do not get 607 // filled when the message is first sent, 608 // it is only populated if the message is routed to another 609 // destination like the DLQ 610 if (message.getOriginalDestination() != null) { 611 message.setOriginalDestination(message.getDestination()); 612 } 613 if (message.getOriginalTransactionId() != null) { 614 message.setOriginalTransactionId(message.getTransactionId()); 615 } 616 617 ActiveMQTopic advisoryTopic; 618 if (destination.isQueue()) { 619 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 620 } else { 621 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 622 } 623 message.setDestination(advisoryTopic); 624 message.setTransactionId(null); 625 626 // Disable flow control for this since since we don't want 627 // to block. 628 boolean originalFlowControl = context.isProducerFlowControl(); 629 try { 630 context.setProducerFlowControl(false); 631 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 632 producerExchange.setMutable(false); 633 producerExchange.setConnectionContext(context); 634 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 635 context.getBroker().send(producerExchange, message); 636 } finally { 637 context.setProducerFlowControl(originalFlowControl); 638 } 639 640 } 641 } 642 } 643 } 644 645 @Override 646 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 647 } 648 649 public final int getStoreUsageHighWaterMark() { 650 return this.storeUsageHighWaterMark; 651 } 652 653 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 654 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 655 } 656 657 protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 658 waitForSpace(context, producerBrokerExchange, usage, 100, warning); 659 } 660 661 protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 662 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 663 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 664 throw new ResourceAllocationException(warning); 665 } 666 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 667 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 668 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 669 throw new ResourceAllocationException(warning); 670 } 671 } else { 672 long start = System.currentTimeMillis(); 673 long nextWarn = start; 674 producerBrokerExchange.blockingOnFlowControl(true); 675 destinationStatistics.getBlockedSends().increment(); 676 while (!usage.waitForSpace(1000, highWaterMark)) { 677 if (context.getStopping().get()) { 678 throw new IOException("Connection closed, send aborted."); 679 } 680 681 long now = System.currentTimeMillis(); 682 if (now >= nextWarn) { 683 getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))}); 684 nextWarn = now + blockedProducerWarningInterval; 685 } 686 } 687 long finish = System.currentTimeMillis(); 688 long totalTimeBlocked = finish - start; 689 destinationStatistics.getBlockedTime().addTime(totalTimeBlocked); 690 producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked); 691 producerBrokerExchange.blockingOnFlowControl(false); 692 } 693 } 694 695 protected abstract Logger getLog(); 696 697 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 698 this.slowConsumerStrategy = slowConsumerStrategy; 699 } 700 701 @Override 702 public SlowConsumerStrategy getSlowConsumerStrategy() { 703 return this.slowConsumerStrategy; 704 } 705 706 707 @Override 708 public boolean isPrioritizedMessages() { 709 return this.prioritizedMessages; 710 } 711 712 public void setPrioritizedMessages(boolean prioritizedMessages) { 713 this.prioritizedMessages = prioritizedMessages; 714 if (store != null) { 715 store.setPrioritizedMessages(prioritizedMessages); 716 } 717 } 718 719 /** 720 * @return the inactiveTimeoutBeforeGC 721 */ 722 @Override 723 public long getInactiveTimeoutBeforeGC() { 724 return this.inactiveTimeoutBeforeGC; 725 } 726 727 /** 728 * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set 729 */ 730 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 731 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 732 } 733 734 /** 735 * @return the gcIfInactive 736 */ 737 public boolean isGcIfInactive() { 738 return this.gcIfInactive; 739 } 740 741 /** 742 * @param gcIfInactive the gcIfInactive to set 743 */ 744 public void setGcIfInactive(boolean gcIfInactive) { 745 this.gcIfInactive = gcIfInactive; 746 } 747 748 /** 749 * Indicate if it is ok to gc destinations that have only network consumers 750 * @param gcWithNetworkConsumers 751 */ 752 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 753 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 754 } 755 756 public boolean isGcWithNetworkConsumers() { 757 return gcWithNetworkConsumers; 758 } 759 760 @Override 761 public void markForGC(long timeStamp) { 762 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 763 && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { 764 this.lastActiveTime = timeStamp; 765 } 766 } 767 768 @Override 769 public boolean canGC() { 770 boolean result = false; 771 if (isGcIfInactive()&& this.lastActiveTime != 0l) { 772 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) { 773 result = true; 774 } 775 } 776 return result; 777 } 778 779 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 780 this.reduceMemoryFootprint = reduceMemoryFootprint; 781 } 782 783 protected boolean isReduceMemoryFootprint() { 784 return this.reduceMemoryFootprint; 785 } 786 787 @Override 788 public boolean isDoOptimzeMessageStorage() { 789 return doOptimzeMessageStorage; 790 } 791 792 @Override 793 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 794 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 795 } 796 797 public int getOptimizeMessageStoreInFlightLimit() { 798 return optimizeMessageStoreInFlightLimit; 799 } 800 801 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 802 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 803 } 804 805 806 @Override 807 public abstract List<Subscription> getConsumers(); 808 809 protected boolean hasRegularConsumers(List<Subscription> consumers) { 810 boolean hasRegularConsumers = false; 811 for (Subscription subscription: consumers) { 812 if (!subscription.getConsumerInfo().isNetworkSubscription()) { 813 hasRegularConsumers = true; 814 break; 815 } 816 } 817 return hasRegularConsumers; 818 } 819 820 public ConnectionContext createConnectionContext() { 821 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); 822 answer.setBroker(this.broker); 823 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 824 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 825 return answer; 826 } 827 828 protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) { 829 // the original ack may be a ranged ack, but we are trying to delete 830 // a specific 831 // message store here so we need to convert to a non ranged ack. 832 if (ack.getMessageCount() > 0) { 833 // Dup the ack 834 MessageAck a = new MessageAck(); 835 ack.copy(a); 836 ack = a; 837 // Convert to non-ranged. 838 ack.setMessageCount(1); 839 } 840 // always use node messageId so we can access entry/data Location 841 ack.setFirstMessageId(node.getMessageId()); 842 ack.setLastMessageId(node.getMessageId()); 843 return ack; 844 } 845 846 protected boolean isDLQ() { 847 return destination.isDLQ(); 848 } 849 850 @Override 851 public void duplicateFromStore(Message message, Subscription durableSub) { 852 ConnectionContext connectionContext = createConnectionContext(); 853 getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); 854 Throwable cause = new Throwable("duplicate from store for " + destination); 855 message.setRegionDestination(this); 856 broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); 857 MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); 858 messageAck.setPoisonCause(cause); 859 try { 860 acknowledge(connectionContext, durableSub, messageAck, message); 861 } catch (IOException e) { 862 getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); 863 } 864 } 865 866 public void setPersistJMSRedelivered(boolean persistJMSRedelivered) { 867 this.persistJMSRedelivered = persistJMSRedelivered; 868 } 869 870 public boolean isPersistJMSRedelivered() { 871 return persistJMSRedelivered; 872 } 873}