001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.LinkedList; 021import java.util.concurrent.atomic.AtomicInteger; 022import java.util.concurrent.atomic.AtomicLong; 023 024import javax.jms.JMSException; 025 026import org.apache.activemq.ActiveMQMessageAudit; 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 030import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 031import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 032import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 033import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 034import org.apache.activemq.command.ConsumerControl; 035import org.apache.activemq.command.ConsumerInfo; 036import org.apache.activemq.command.Message; 037import org.apache.activemq.command.MessageAck; 038import org.apache.activemq.command.MessageDispatch; 039import org.apache.activemq.command.MessageDispatchNotification; 040import org.apache.activemq.command.MessagePull; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.transaction.Synchronization; 044import org.apache.activemq.transport.TransmitCallback; 045import org.apache.activemq.usage.SystemUsage; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049public class TopicSubscription extends AbstractSubscription { 050 051 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); 052 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 053 054 protected PendingMessageCursor matched; 055 protected final SystemUsage usageManager; 056 protected AtomicLong dispatchedCounter = new AtomicLong(); 057 058 boolean singleDestination = true; 059 Destination destination; 060 private final Scheduler scheduler; 061 062 private int maximumPendingMessages = -1; 063 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 064 private int discarded; 065 private final Object matchedListMutex = new Object(); 066 private final AtomicLong enqueueCounter = new AtomicLong(0); 067 private final AtomicLong dequeueCounter = new AtomicLong(0); 068 private final AtomicInteger prefetchExtension = new AtomicInteger(0); 069 private int memoryUsageHighWaterMark = 95; 070 // allow duplicate suppression in a ring network of brokers 071 protected int maxProducersToAudit = 1024; 072 protected int maxAuditDepth = 1000; 073 protected boolean enableAudit = false; 074 protected ActiveMQMessageAudit audit; 075 protected boolean active = false; 076 protected boolean discarding = false; 077 078 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 079 super(broker, context, info); 080 this.usageManager = usageManager; 081 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 082 if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) { 083 this.matched = new VMPendingMessageCursor(false); 084 } else { 085 this.matched = new FilePendingMessageCursor(broker,matchedName,false); 086 } 087 088 this.scheduler = broker.getScheduler(); 089 } 090 091 public void init() throws Exception { 092 this.matched.setSystemUsage(usageManager); 093 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 094 this.matched.start(); 095 if (enableAudit) { 096 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); 097 } 098 this.active=true; 099 } 100 101 @Override 102 public void add(MessageReference node) throws Exception { 103 if (isDuplicate(node)) { 104 return; 105 } 106 // Lets use an indirect reference so that we can associate a unique 107 // locator /w the message. 108 node = new IndirectMessageReference(node.getMessage()); 109 enqueueCounter.incrementAndGet(); 110 synchronized (matchedListMutex) { 111 // if this subscriber is already discarding a message, we don't want to add 112 // any more messages to it as those messages can only be advisories generated in the process, 113 // which can trigger the recursive call loop 114 if (discarding) return; 115 116 if (!isFull() && matched.isEmpty()) { 117 // if maximumPendingMessages is set we will only discard messages which 118 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 119 dispatch(node); 120 setSlowConsumer(false); 121 } else { 122 if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { 123 // Slow consumers should log and set their state as such. 124 if (!isSlowConsumer()) { 125 LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); 126 setSlowConsumer(true); 127 for (Destination dest: destinations) { 128 dest.slowConsumer(getContext(), this); 129 } 130 } 131 } 132 if (maximumPendingMessages != 0) { 133 boolean warnedAboutWait = false; 134 while (active) { 135 while (matched.isFull()) { 136 if (getContext().getStopping().get()) { 137 LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); 138 enqueueCounter.decrementAndGet(); 139 return; 140 } 141 if (!warnedAboutWait) { 142 LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.", 143 new Object[]{ 144 toString(), 145 matched, 146 matched.getSystemUsage().getTempUsage().getPercentUsage(), 147 matched.getSystemUsage().getMemoryUsage().getPercentUsage() 148 }); 149 warnedAboutWait = true; 150 } 151 matchedListMutex.wait(20); 152 } 153 // Temporary storage could be full - so just try to add the message 154 // see https://issues.apache.org/activemq/browse/AMQ-2475 155 if (matched.tryAddMessageLast(node, 10)) { 156 break; 157 } 158 } 159 if (maximumPendingMessages > 0) { 160 // calculate the high water mark from which point we 161 // will eagerly evict expired messages 162 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 163 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 164 max = maximumPendingMessages; 165 } 166 if (!matched.isEmpty() && matched.size() > max) { 167 removeExpiredMessages(); 168 } 169 // lets discard old messages as we are a slow consumer 170 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 171 int pageInSize = matched.size() - maximumPendingMessages; 172 // only page in a 1000 at a time - else we could blow the memory 173 pageInSize = Math.max(1000, pageInSize); 174 LinkedList<MessageReference> list = null; 175 MessageReference[] oldMessages=null; 176 synchronized(matched){ 177 list = matched.pageInList(pageInSize); 178 oldMessages = messageEvictionStrategy.evictMessages(list); 179 for (MessageReference ref : list) { 180 ref.decrementReferenceCount(); 181 } 182 } 183 int messagesToEvict = 0; 184 if (oldMessages != null){ 185 messagesToEvict = oldMessages.length; 186 for (int i = 0; i < messagesToEvict; i++) { 187 MessageReference oldMessage = oldMessages[i]; 188 discard(oldMessage); 189 } 190 } 191 // lets avoid an infinite loop if we are given a bad eviction strategy 192 // for a bad strategy lets just not evict 193 if (messagesToEvict == 0) { 194 LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{ 195 destination, messageEvictionStrategy, list.size() 196 }); 197 break; 198 } 199 } 200 } 201 dispatchMatched(); 202 } 203 } 204 } 205 } 206 207 private boolean isDuplicate(MessageReference node) { 208 boolean duplicate = false; 209 if (enableAudit && audit != null) { 210 duplicate = audit.isDuplicate(node); 211 if (LOG.isDebugEnabled()) { 212 if (duplicate) { 213 LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId()); 214 } 215 } 216 } 217 return duplicate; 218 } 219 220 /** 221 * Discard any expired messages from the matched list. Called from a 222 * synchronized block. 223 * 224 * @throws IOException 225 */ 226 protected void removeExpiredMessages() throws IOException { 227 try { 228 matched.reset(); 229 while (matched.hasNext()) { 230 MessageReference node = matched.next(); 231 node.decrementReferenceCount(); 232 if (broker.isExpired(node)) { 233 matched.remove(); 234 dispatchedCounter.incrementAndGet(); 235 node.decrementReferenceCount(); 236 ((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); 237 broker.messageExpired(getContext(), node, this); 238 break; 239 } 240 } 241 } finally { 242 matched.release(); 243 } 244 } 245 246 @Override 247 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 248 synchronized (matchedListMutex) { 249 try { 250 matched.reset(); 251 while (matched.hasNext()) { 252 MessageReference node = matched.next(); 253 node.decrementReferenceCount(); 254 if (node.getMessageId().equals(mdn.getMessageId())) { 255 matched.remove(); 256 dispatchedCounter.incrementAndGet(); 257 node.decrementReferenceCount(); 258 break; 259 } 260 } 261 } finally { 262 matched.release(); 263 } 264 } 265 } 266 267 @Override 268 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 269 super.acknowledge(context, ack); 270 271 // Handle the standard acknowledgment case. 272 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { 273 if (context.isInTransaction()) { 274 context.getTransaction().addSynchronization(new Synchronization() { 275 276 @Override 277 public void afterCommit() throws Exception { 278 synchronized (TopicSubscription.this) { 279 if (singleDestination && destination != null) { 280 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 281 } 282 } 283 dequeueCounter.addAndGet(ack.getMessageCount()); 284 dispatchMatched(); 285 } 286 }); 287 } else { 288 if (singleDestination && destination != null) { 289 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 290 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 291 if (info.isNetworkSubscription()) { 292 destination.getDestinationStatistics().getForwards().add(ack.getMessageCount()); 293 } 294 } 295 dequeueCounter.addAndGet(ack.getMessageCount()); 296 } 297 while (true) { 298 int currentExtension = prefetchExtension.get(); 299 int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); 300 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 301 break; 302 } 303 } 304 dispatchMatched(); 305 return; 306 } else if (ack.isDeliveredAck()) { 307 // Message was delivered but not acknowledged: update pre-fetch counters. 308 prefetchExtension.addAndGet(ack.getMessageCount()); 309 dispatchMatched(); 310 return; 311 } else if (ack.isExpiredAck()) { 312 if (singleDestination && destination != null) { 313 destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); 314 destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); 315 destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); 316 } 317 dequeueCounter.addAndGet(ack.getMessageCount()); 318 while (true) { 319 int currentExtension = prefetchExtension.get(); 320 int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); 321 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 322 break; 323 } 324 } 325 dispatchMatched(); 326 return; 327 } else if (ack.isRedeliveredAck()) { 328 // nothing to do atm 329 return; 330 } 331 throw new JMSException("Invalid acknowledgment: " + ack); 332 } 333 334 @Override 335 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 336 337 // The slave should not deliver pull messages. 338 if (getPrefetchSize() == 0) { 339 340 final long currentDispatchedCount = dispatchedCounter.get(); 341 prefetchExtension.set(pull.getQuantity()); 342 dispatchMatched(); 343 344 // If there was nothing dispatched.. we may need to setup a timeout. 345 if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone()) { 346 347 // immediate timeout used by receiveNoWait() 348 if (pull.getTimeout() == -1) { 349 // Send a NULL message to signal nothing pending. 350 dispatch(null); 351 prefetchExtension.set(0); 352 } 353 354 if (pull.getTimeout() > 0) { 355 scheduler.executeAfterDelay(new Runnable() { 356 357 @Override 358 public void run() { 359 pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone()); 360 } 361 }, pull.getTimeout()); 362 } 363 } 364 } 365 return null; 366 } 367 368 /** 369 * Occurs when a pull times out. If nothing has been dispatched since the 370 * timeout was setup, then send the NULL message. 371 */ 372 private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) { 373 synchronized (matchedListMutex) { 374 if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) { 375 try { 376 dispatch(null); 377 } catch (Exception e) { 378 context.getConnection().serviceException(e); 379 } finally { 380 prefetchExtension.set(0); 381 } 382 } 383 } 384 } 385 386 @Override 387 public int getPendingQueueSize() { 388 return matched(); 389 } 390 391 @Override 392 public int getDispatchedQueueSize() { 393 return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get()); 394 } 395 396 public int getMaximumPendingMessages() { 397 return maximumPendingMessages; 398 } 399 400 @Override 401 public long getDispatchedCounter() { 402 return dispatchedCounter.get(); 403 } 404 405 @Override 406 public long getEnqueueCounter() { 407 return enqueueCounter.get(); 408 } 409 410 @Override 411 public long getDequeueCounter() { 412 return dequeueCounter.get(); 413 } 414 415 /** 416 * @return the number of messages discarded due to being a slow consumer 417 */ 418 public int discarded() { 419 synchronized (matchedListMutex) { 420 return discarded; 421 } 422 } 423 424 /** 425 * @return the number of matched messages (messages targeted for the 426 * subscription but not yet able to be dispatched due to the 427 * prefetch buffer being full). 428 */ 429 public int matched() { 430 synchronized (matchedListMutex) { 431 return matched.size(); 432 } 433 } 434 435 /** 436 * Sets the maximum number of pending messages that can be matched against 437 * this consumer before old messages are discarded. 438 */ 439 public void setMaximumPendingMessages(int maximumPendingMessages) { 440 this.maximumPendingMessages = maximumPendingMessages; 441 } 442 443 public MessageEvictionStrategy getMessageEvictionStrategy() { 444 return messageEvictionStrategy; 445 } 446 447 /** 448 * Sets the eviction strategy used to decide which message to evict when the 449 * slow consumer needs to discard messages 450 */ 451 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 452 this.messageEvictionStrategy = messageEvictionStrategy; 453 } 454 455 public int getMaxProducersToAudit() { 456 return maxProducersToAudit; 457 } 458 459 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 460 this.maxProducersToAudit = maxProducersToAudit; 461 if (audit != null) { 462 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 463 } 464 } 465 466 public int getMaxAuditDepth() { 467 return maxAuditDepth; 468 } 469 470 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 471 this.maxAuditDepth = maxAuditDepth; 472 if (audit != null) { 473 audit.setAuditDepth(maxAuditDepth); 474 } 475 } 476 477 public boolean isEnableAudit() { 478 return enableAudit; 479 } 480 481 public synchronized void setEnableAudit(boolean enableAudit) { 482 this.enableAudit = enableAudit; 483 if (enableAudit && audit == null) { 484 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 485 } 486 } 487 488 // Implementation methods 489 // ------------------------------------------------------------------------- 490 @Override 491 public boolean isFull() { 492 return getDispatchedQueueSize() >= info.getPrefetchSize(); 493 } 494 495 @Override 496 public int getInFlightSize() { 497 return getDispatchedQueueSize(); 498 } 499 500 /** 501 * @return true when 60% or more room is left for dispatching messages 502 */ 503 @Override 504 public boolean isLowWaterMark() { 505 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); 506 } 507 508 /** 509 * @return true when 10% or less room is left for dispatching messages 510 */ 511 @Override 512 public boolean isHighWaterMark() { 513 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); 514 } 515 516 /** 517 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 518 */ 519 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 520 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 521 } 522 523 /** 524 * @return the memoryUsageHighWaterMark 525 */ 526 public int getMemoryUsageHighWaterMark() { 527 return this.memoryUsageHighWaterMark; 528 } 529 530 /** 531 * @return the usageManager 532 */ 533 public SystemUsage getUsageManager() { 534 return this.usageManager; 535 } 536 537 /** 538 * @return the matched 539 */ 540 public PendingMessageCursor getMatched() { 541 return this.matched; 542 } 543 544 /** 545 * @param matched the matched to set 546 */ 547 public void setMatched(PendingMessageCursor matched) { 548 this.matched = matched; 549 } 550 551 /** 552 * inform the MessageConsumer on the client to change it's prefetch 553 * 554 * @param newPrefetch 555 */ 556 @Override 557 public void updateConsumerPrefetch(int newPrefetch) { 558 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 559 ConsumerControl cc = new ConsumerControl(); 560 cc.setConsumerId(info.getConsumerId()); 561 cc.setPrefetch(newPrefetch); 562 context.getConnection().dispatchAsync(cc); 563 } 564 } 565 566 private void dispatchMatched() throws IOException { 567 synchronized (matchedListMutex) { 568 if (!matched.isEmpty() && !isFull()) { 569 try { 570 matched.reset(); 571 572 while (matched.hasNext() && !isFull()) { 573 MessageReference message = matched.next(); 574 message.decrementReferenceCount(); 575 matched.remove(); 576 // Message may have been sitting in the matched list a while 577 // waiting for the consumer to ak the message. 578 if (message.isExpired()) { 579 discard(message); 580 continue; // just drop it. 581 } 582 dispatch(message); 583 } 584 } finally { 585 matched.release(); 586 } 587 } 588 } 589 } 590 591 private void dispatch(final MessageReference node) throws IOException { 592 Message message = node != null ? node.getMessage() : null; 593 if (node != null) { 594 node.incrementReferenceCount(); 595 } 596 // Make sure we can dispatch a message. 597 MessageDispatch md = new MessageDispatch(); 598 md.setMessage(message); 599 md.setConsumerId(info.getConsumerId()); 600 if (node != null) { 601 md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); 602 dispatchedCounter.incrementAndGet(); 603 // Keep track if this subscription is receiving messages from a single destination. 604 if (singleDestination) { 605 if (destination == null) { 606 destination = (Destination)node.getRegionDestination(); 607 } else { 608 if (destination != node.getRegionDestination()) { 609 singleDestination = false; 610 } 611 } 612 } 613 } 614 if (info.isDispatchAsync()) { 615 if (node != null) { 616 md.setTransmitCallback(new TransmitCallback() { 617 618 @Override 619 public void onSuccess() { 620 Destination regionDestination = (Destination) node.getRegionDestination(); 621 regionDestination.getDestinationStatistics().getDispatched().increment(); 622 regionDestination.getDestinationStatistics().getInflight().increment(); 623 node.decrementReferenceCount(); 624 } 625 626 @Override 627 public void onFailure() { 628 Destination regionDestination = (Destination) node.getRegionDestination(); 629 regionDestination.getDestinationStatistics().getDispatched().increment(); 630 regionDestination.getDestinationStatistics().getInflight().increment(); 631 node.decrementReferenceCount(); 632 } 633 }); 634 } 635 context.getConnection().dispatchAsync(md); 636 } else { 637 context.getConnection().dispatchSync(md); 638 if (node != null) { 639 Destination regionDestination = (Destination) node.getRegionDestination(); 640 regionDestination.getDestinationStatistics().getDispatched().increment(); 641 regionDestination.getDestinationStatistics().getInflight().increment(); 642 node.decrementReferenceCount(); 643 } 644 } 645 } 646 647 private void discard(MessageReference message) { 648 discarding = true; 649 try { 650 message.decrementReferenceCount(); 651 matched.remove(message); 652 discarded++; 653 if (destination != null) { 654 destination.getDestinationStatistics().getDequeues().increment(); 655 } 656 LOG.debug("{}, discarding message {}", this, message); 657 Destination dest = (Destination) message.getRegionDestination(); 658 if (dest != null) { 659 dest.messageDiscarded(getContext(), this, message); 660 } 661 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); 662 } finally { 663 discarding = false; 664 } 665 } 666 667 @Override 668 public String toString() { 669 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 670 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded(); 671 } 672 673 @Override 674 public void destroy() { 675 this.active=false; 676 synchronized (matchedListMutex) { 677 try { 678 matched.destroy(); 679 } catch (Exception e) { 680 LOG.warn("Failed to destroy cursor", e); 681 } 682 } 683 setSlowConsumer(false); 684 } 685 686 @Override 687 public int getPrefetchSize() { 688 return info.getPrefetchSize(); 689 } 690 691 @Override 692 public void setPrefetchSize(int newSize) { 693 info.setPrefetchSize(newSize); 694 try { 695 dispatchMatched(); 696 } catch(Exception e) { 697 LOG.trace("Caught exception on dispatch after prefetch size change."); 698 } 699 } 700}