001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ExceptionResponse; 043import org.apache.activemq.command.Message; 044import org.apache.activemq.command.MessageAck; 045import org.apache.activemq.command.MessageId; 046import org.apache.activemq.command.ProducerAck; 047import org.apache.activemq.command.ProducerInfo; 048import org.apache.activemq.command.Response; 049import org.apache.activemq.command.SubscriptionInfo; 050import org.apache.activemq.filter.MessageEvaluationContext; 051import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 052import org.apache.activemq.store.MessageRecoveryListener; 053import org.apache.activemq.store.TopicMessageStore; 054import org.apache.activemq.thread.Task; 055import org.apache.activemq.thread.TaskRunner; 056import org.apache.activemq.thread.TaskRunnerFactory; 057import org.apache.activemq.transaction.Synchronization; 058import org.apache.activemq.util.SubscriptionKey; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * The Topic is a destination that sends a copy of a message to every active 064 * Subscription registered. 065 */ 066public class Topic extends BaseDestination implements Task { 067 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 068 private final TopicMessageStore topicStore; 069 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 070 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 071 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 072 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 073 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 074 private final TaskRunner taskRunner; 075 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 076 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 077 @Override 078 public void run() { 079 try { 080 Topic.this.taskRunner.wakeup(); 081 } catch (InterruptedException e) { 082 } 083 }; 084 }; 085 086 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 087 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 088 super(brokerService, store, destination, parentStats); 089 this.topicStore = store; 090 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 091 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 092 } 093 094 @Override 095 public void initialize() throws Exception { 096 super.initialize(); 097 // set non default subscription recovery policy (override policyEntries) 098 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 099 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 100 setAlwaysRetroactive(true); 101 } 102 if (store != null) { 103 // AMQ-2586: Better to leave this stat at zero than to give the user 104 // misleading metrics. 105 // int messageCount = store.getMessageCount(); 106 // destinationStatistics.getMessages().setCount(messageCount); 107 } 108 } 109 110 @Override 111 public List<Subscription> getConsumers() { 112 synchronized (consumers) { 113 return new ArrayList<Subscription>(consumers); 114 } 115 } 116 117 public boolean lock(MessageReference node, LockOwner sub) { 118 return true; 119 } 120 121 @Override 122 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 123 if (!sub.getConsumerInfo().isDurable()) { 124 125 // Do a retroactive recovery if needed. 126 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 127 128 // synchronize with dispatch method so that no new messages are sent 129 // while we are recovering a subscription to avoid out of order messages. 130 dispatchLock.writeLock().lock(); 131 try { 132 boolean applyRecovery = false; 133 synchronized (consumers) { 134 if (!consumers.contains(sub)){ 135 sub.add(context, this); 136 consumers.add(sub); 137 applyRecovery=true; 138 super.addSubscription(context, sub); 139 } 140 } 141 if (applyRecovery){ 142 subscriptionRecoveryPolicy.recover(context, this, sub); 143 } 144 } finally { 145 dispatchLock.writeLock().unlock(); 146 } 147 148 } else { 149 synchronized (consumers) { 150 if (!consumers.contains(sub)){ 151 sub.add(context, this); 152 consumers.add(sub); 153 super.addSubscription(context, sub); 154 } 155 } 156 } 157 } else { 158 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 159 super.addSubscription(context, sub); 160 sub.add(context, this); 161 if(dsub.isActive()) { 162 synchronized (consumers) { 163 boolean hasSubscription = false; 164 165 if (consumers.size() == 0) { 166 hasSubscription = false; 167 } else { 168 for (Subscription currentSub : consumers) { 169 if (currentSub.getConsumerInfo().isDurable()) { 170 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 171 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 172 hasSubscription = true; 173 break; 174 } 175 } 176 } 177 } 178 179 if (!hasSubscription) { 180 consumers.add(sub); 181 } 182 } 183 } 184 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 185 } 186 } 187 188 @Override 189 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 190 if (!sub.getConsumerInfo().isDurable()) { 191 super.removeSubscription(context, sub, lastDeliveredSequenceId); 192 synchronized (consumers) { 193 consumers.remove(sub); 194 } 195 } 196 sub.remove(context, this); 197 } 198 199 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 200 if (topicStore != null) { 201 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 202 DurableTopicSubscription removed = durableSubscribers.remove(key); 203 if (removed != null) { 204 destinationStatistics.getConsumers().decrement(); 205 // deactivate and remove 206 removed.deactivate(false, 0l); 207 consumers.remove(removed); 208 } 209 } 210 } 211 212 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 213 // synchronize with dispatch method so that no new messages are sent 214 // while we are recovering a subscription to avoid out of order messages. 215 dispatchLock.writeLock().lock(); 216 try { 217 218 if (topicStore == null) { 219 return; 220 } 221 222 // Recover the durable subscription. 223 String clientId = subscription.getSubscriptionKey().getClientId(); 224 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 225 String selector = subscription.getConsumerInfo().getSelector(); 226 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 227 if (info != null) { 228 // Check to see if selector changed. 229 String s1 = info.getSelector(); 230 if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) { 231 // Need to delete the subscription 232 topicStore.deleteSubscription(clientId, subscriptionName); 233 info = null; 234 synchronized (consumers) { 235 consumers.remove(subscription); 236 } 237 } else { 238 synchronized (consumers) { 239 if (!consumers.contains(subscription)) { 240 consumers.add(subscription); 241 } 242 } 243 } 244 } 245 246 // Do we need to create the subscription? 247 if (info == null) { 248 info = new SubscriptionInfo(); 249 info.setClientId(clientId); 250 info.setSelector(selector); 251 info.setSubscriptionName(subscriptionName); 252 info.setDestination(getActiveMQDestination()); 253 // This destination is an actual destination id. 254 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 255 // This destination might be a pattern 256 synchronized (consumers) { 257 consumers.add(subscription); 258 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 259 } 260 } 261 262 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 263 msgContext.setDestination(destination); 264 if (subscription.isRecoveryRequired()) { 265 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 266 @Override 267 public boolean recoverMessage(Message message) throws Exception { 268 message.setRegionDestination(Topic.this); 269 try { 270 msgContext.setMessageReference(message); 271 if (subscription.matches(message, msgContext)) { 272 subscription.add(message); 273 } 274 } catch (IOException e) { 275 LOG.error("Failed to recover this message {}", message, e); 276 } 277 return true; 278 } 279 280 @Override 281 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 282 throw new RuntimeException("Should not be called."); 283 } 284 285 @Override 286 public boolean hasSpace() { 287 return true; 288 } 289 290 @Override 291 public boolean isDuplicate(MessageId id) { 292 return false; 293 } 294 }); 295 } 296 } finally { 297 dispatchLock.writeLock().unlock(); 298 } 299 } 300 301 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 302 synchronized (consumers) { 303 consumers.remove(sub); 304 } 305 sub.remove(context, this, dispatched); 306 } 307 308 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 309 if (subscription.getConsumerInfo().isRetroactive()) { 310 subscriptionRecoveryPolicy.recover(context, this, subscription); 311 } 312 } 313 314 @Override 315 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 316 final ConnectionContext context = producerExchange.getConnectionContext(); 317 318 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 319 producerExchange.incrementSend(); 320 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 321 && !context.isInRecoveryMode(); 322 323 // There is delay between the client sending it and it arriving at the 324 // destination.. it may have expired. 325 if (message.isExpired()) { 326 broker.messageExpired(context, message, null); 327 getDestinationStatistics().getExpired().increment(); 328 if (sendProducerAck) { 329 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 330 context.getConnection().dispatchAsync(ack); 331 } 332 return; 333 } 334 335 if (memoryUsage.isFull()) { 336 isFull(context, memoryUsage); 337 fastProducer(context, producerInfo); 338 339 if (isProducerFlowControl() && context.isProducerFlowControl()) { 340 341 if (warnOnProducerFlowControl) { 342 warnOnProducerFlowControl = false; 343 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 344 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 345 } 346 347 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 348 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 349 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 350 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 351 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 352 } 353 354 // We can avoid blocking due to low usage if the producer is sending a sync message or 355 // if it is using a producer window 356 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 357 synchronized (messagesWaitingForSpace) { 358 messagesWaitingForSpace.add(new Runnable() { 359 @Override 360 public void run() { 361 try { 362 363 // While waiting for space to free up... the 364 // message may have expired. 365 if (message.isExpired()) { 366 broker.messageExpired(context, message, null); 367 getDestinationStatistics().getExpired().increment(); 368 } else { 369 doMessageSend(producerExchange, message); 370 } 371 372 if (sendProducerAck) { 373 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 374 .getSize()); 375 context.getConnection().dispatchAsync(ack); 376 } else { 377 Response response = new Response(); 378 response.setCorrelationId(message.getCommandId()); 379 context.getConnection().dispatchAsync(response); 380 } 381 382 } catch (Exception e) { 383 if (!sendProducerAck && !context.isInRecoveryMode()) { 384 ExceptionResponse response = new ExceptionResponse(e); 385 response.setCorrelationId(message.getCommandId()); 386 context.getConnection().dispatchAsync(response); 387 } 388 } 389 } 390 }); 391 392 registerCallbackForNotFullNotification(); 393 context.setDontSendReponse(true); 394 return; 395 } 396 397 } else { 398 // Producer flow control cannot be used, so we have do the flow control 399 // at the broker by blocking this thread until there is space available. 400 401 if (memoryUsage.isFull()) { 402 if (context.isInTransaction()) { 403 404 int count = 0; 405 while (!memoryUsage.waitForSpace(1000)) { 406 if (context.getStopping().get()) { 407 throw new IOException("Connection closed, send aborted."); 408 } 409 if (count > 2 && context.isInTransaction()) { 410 count = 0; 411 int size = context.getTransaction().size(); 412 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 413 } 414 count++; 415 } 416 } else { 417 waitForSpace( 418 context, 419 producerExchange, 420 memoryUsage, 421 "Usage Manager Memory Usage limit reached. Stopping producer (" 422 + message.getProducerId() 423 + ") to prevent flooding " 424 + getActiveMQDestination().getQualifiedName() 425 + "." 426 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 427 } 428 } 429 430 // The usage manager could have delayed us by the time 431 // we unblock the message could have expired.. 432 if (message.isExpired()) { 433 getDestinationStatistics().getExpired().increment(); 434 LOG.debug("Expired message: {}", message); 435 return; 436 } 437 } 438 } 439 } 440 441 doMessageSend(producerExchange, message); 442 messageDelivered(context, message); 443 if (sendProducerAck) { 444 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 445 context.getConnection().dispatchAsync(ack); 446 } 447 } 448 449 /** 450 * do send the message - this needs to be synchronized to ensure messages 451 * are stored AND dispatched in the right order 452 * 453 * @param producerExchange 454 * @param message 455 * @throws IOException 456 * @throws Exception 457 */ 458 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 459 throws IOException, Exception { 460 final ConnectionContext context = producerExchange.getConnectionContext(); 461 message.setRegionDestination(this); 462 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 463 Future<Object> result = null; 464 465 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 466 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 467 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 468 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 469 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 470 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 471 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 472 throw new javax.jms.ResourceAllocationException(logMessage); 473 } 474 475 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 476 } 477 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 478 } 479 480 message.incrementReferenceCount(); 481 482 if (context.isInTransaction()) { 483 context.getTransaction().addSynchronization(new Synchronization() { 484 @Override 485 public void afterCommit() throws Exception { 486 // It could take while before we receive the commit 487 // operation.. by that time the message could have 488 // expired.. 489 if (broker.isExpired(message)) { 490 getDestinationStatistics().getExpired().increment(); 491 broker.messageExpired(context, message, null); 492 message.decrementReferenceCount(); 493 return; 494 } 495 try { 496 dispatch(context, message); 497 } finally { 498 message.decrementReferenceCount(); 499 } 500 } 501 502 @Override 503 public void afterRollback() throws Exception { 504 message.decrementReferenceCount(); 505 } 506 }); 507 508 } else { 509 try { 510 dispatch(context, message); 511 } finally { 512 message.decrementReferenceCount(); 513 } 514 } 515 516 if (result != null && !result.isCancelled()) { 517 try { 518 result.get(); 519 } catch (CancellationException e) { 520 // ignore - the task has been cancelled if the message 521 // has already been deleted 522 } 523 } 524 } 525 526 private boolean canOptimizeOutPersistence() { 527 return durableSubscribers.size() == 0; 528 } 529 530 @Override 531 public String toString() { 532 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 533 } 534 535 @Override 536 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 537 final MessageReference node) throws IOException { 538 if (topicStore != null && node.isPersistent()) { 539 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 540 SubscriptionKey key = dsub.getSubscriptionKey(); 541 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 542 convertToNonRangedAck(ack, node)); 543 } 544 messageConsumed(context, node); 545 } 546 547 @Override 548 public void gc() { 549 } 550 551 public Message loadMessage(MessageId messageId) throws IOException { 552 return topicStore != null ? topicStore.getMessage(messageId) : null; 553 } 554 555 @Override 556 public void start() throws Exception { 557 this.subscriptionRecoveryPolicy.start(); 558 if (memoryUsage != null) { 559 memoryUsage.start(); 560 } 561 562 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 563 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 564 } 565 } 566 567 @Override 568 public void stop() throws Exception { 569 if (taskRunner != null) { 570 taskRunner.shutdown(); 571 } 572 this.subscriptionRecoveryPolicy.stop(); 573 if (memoryUsage != null) { 574 memoryUsage.stop(); 575 } 576 if (this.topicStore != null) { 577 this.topicStore.stop(); 578 } 579 580 scheduler.cancel(expireMessagesTask); 581 } 582 583 @Override 584 public Message[] browse() { 585 final List<Message> result = new ArrayList<Message>(); 586 doBrowse(result, getMaxBrowsePageSize()); 587 return result.toArray(new Message[result.size()]); 588 } 589 590 private void doBrowse(final List<Message> browseList, final int max) { 591 try { 592 if (topicStore != null) { 593 final List<Message> toExpire = new ArrayList<Message>(); 594 topicStore.recover(new MessageRecoveryListener() { 595 @Override 596 public boolean recoverMessage(Message message) throws Exception { 597 if (message.isExpired()) { 598 toExpire.add(message); 599 } 600 browseList.add(message); 601 return true; 602 } 603 604 @Override 605 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 606 return true; 607 } 608 609 @Override 610 public boolean hasSpace() { 611 return browseList.size() < max; 612 } 613 614 @Override 615 public boolean isDuplicate(MessageId id) { 616 return false; 617 } 618 }); 619 final ConnectionContext connectionContext = createConnectionContext(); 620 for (Message message : toExpire) { 621 for (DurableTopicSubscription sub : durableSubscribers.values()) { 622 if (!sub.isActive()) { 623 messageExpired(connectionContext, sub, message); 624 } 625 } 626 } 627 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 628 if (msgs != null) { 629 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 630 browseList.add(msgs[i]); 631 } 632 } 633 } 634 } catch (Throwable e) { 635 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 636 } 637 } 638 639 @Override 640 public boolean iterate() { 641 synchronized (messagesWaitingForSpace) { 642 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 643 Runnable op = messagesWaitingForSpace.removeFirst(); 644 op.run(); 645 } 646 647 if (!messagesWaitingForSpace.isEmpty()) { 648 registerCallbackForNotFullNotification(); 649 } 650 } 651 return false; 652 } 653 654 private void registerCallbackForNotFullNotification() { 655 // If the usage manager is not full, then the task will not 656 // get called.. 657 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 658 // so call it directly here. 659 sendMessagesWaitingForSpaceTask.run(); 660 } 661 } 662 663 // Properties 664 // ------------------------------------------------------------------------- 665 666 public DispatchPolicy getDispatchPolicy() { 667 return dispatchPolicy; 668 } 669 670 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 671 this.dispatchPolicy = dispatchPolicy; 672 } 673 674 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 675 return subscriptionRecoveryPolicy; 676 } 677 678 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 679 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 680 // allow users to combine retained message policy with other ActiveMQ policies 681 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 682 policy.setWrapped(recoveryPolicy); 683 } else { 684 this.subscriptionRecoveryPolicy = recoveryPolicy; 685 } 686 } 687 688 // Implementation methods 689 // ------------------------------------------------------------------------- 690 691 @Override 692 public final void wakeup() { 693 } 694 695 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 696 // AMQ-2586: Better to leave this stat at zero than to give the user 697 // misleading metrics. 698 // destinationStatistics.getMessages().increment(); 699 destinationStatistics.getEnqueues().increment(); 700 destinationStatistics.getMessageSize().addSize(message.getSize()); 701 MessageEvaluationContext msgContext = null; 702 703 dispatchLock.readLock().lock(); 704 try { 705 if (!subscriptionRecoveryPolicy.add(context, message)) { 706 return; 707 } 708 synchronized (consumers) { 709 if (consumers.isEmpty()) { 710 onMessageWithNoConsumers(context, message); 711 return; 712 } 713 } 714 msgContext = context.getMessageEvaluationContext(); 715 msgContext.setDestination(destination); 716 msgContext.setMessageReference(message); 717 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 718 onMessageWithNoConsumers(context, message); 719 } 720 721 } finally { 722 dispatchLock.readLock().unlock(); 723 if (msgContext != null) { 724 msgContext.clear(); 725 } 726 } 727 } 728 729 private final Runnable expireMessagesTask = new Runnable() { 730 @Override 731 public void run() { 732 List<Message> browsedMessages = new InsertionCountList<Message>(); 733 doBrowse(browsedMessages, getMaxExpirePageSize()); 734 } 735 }; 736 737 @Override 738 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 739 broker.messageExpired(context, reference, subs); 740 // AMQ-2586: Better to leave this stat at zero than to give the user 741 // misleading metrics. 742 // destinationStatistics.getMessages().decrement(); 743 destinationStatistics.getExpired().increment(); 744 MessageAck ack = new MessageAck(); 745 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 746 ack.setDestination(destination); 747 ack.setMessageID(reference.getMessageId()); 748 try { 749 if (subs instanceof DurableTopicSubscription) { 750 ((DurableTopicSubscription)subs).removePending(reference); 751 } 752 acknowledge(context, subs, ack, reference); 753 } catch (Exception e) { 754 LOG.error("Failed to remove expired Message from the store ", e); 755 } 756 } 757 758 @Override 759 protected Logger getLog() { 760 return LOG; 761 } 762 763 protected boolean isOptimizeStorage(){ 764 boolean result = false; 765 766 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 767 result = true; 768 for (DurableTopicSubscription s : durableSubscribers.values()) { 769 if (s.isActive()== false){ 770 result = false; 771 break; 772 } 773 if (s.getPrefetchSize()==0){ 774 result = false; 775 break; 776 } 777 if (s.isSlowConsumer()){ 778 result = false; 779 break; 780 } 781 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 782 result = false; 783 break; 784 } 785 } 786 } 787 return result; 788 } 789 790 /** 791 * force a reread of the store - after transaction recovery completion 792 */ 793 @Override 794 public void clearPendingMessages() { 795 dispatchLock.readLock().lock(); 796 try { 797 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 798 clearPendingAndDispatch(durableTopicSubscription); 799 } 800 } finally { 801 dispatchLock.readLock().unlock(); 802 } 803 } 804 805 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 806 synchronized (durableTopicSubscription.pendingLock) { 807 durableTopicSubscription.pending.clear(); 808 try { 809 durableTopicSubscription.dispatchPending(); 810 } catch (IOException exception) { 811 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 812 durableTopicSubscription, 813 destination, 814 durableTopicSubscription.pending }, exception); 815 } 816 } 817 } 818 819 private void rollback(MessageId poisoned) { 820 dispatchLock.readLock().lock(); 821 try { 822 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 823 durableTopicSubscription.getPending().rollback(poisoned); 824 } 825 } finally { 826 dispatchLock.readLock().unlock(); 827 } 828 } 829 830 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 831 return durableSubscribers; 832 } 833}