001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.IndirectMessageReference; 028import org.apache.activemq.broker.region.MessageReference; 029import org.apache.activemq.broker.region.QueueMessageReference; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 032import org.apache.activemq.openwire.OpenWireFormat; 033import org.apache.activemq.store.PList; 034import org.apache.activemq.store.PListStore; 035import org.apache.activemq.store.PListEntry; 036import org.apache.activemq.usage.SystemUsage; 037import org.apache.activemq.usage.Usage; 038import org.apache.activemq.usage.UsageListener; 039import org.apache.activemq.wireformat.WireFormat; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.activemq.util.ByteSequence; 043 044/** 045 * persist pending messages pending message (messages awaiting dispatch to a 046 * consumer) cursor 047 * 048 * 049 */ 050public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 051 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); 052 private static final AtomicLong NAME_COUNT = new AtomicLong(); 053 protected Broker broker; 054 private final PListStore store; 055 private final String name; 056 private PendingList memoryList; 057 private PList diskList; 058 private Iterator<MessageReference> iter; 059 private Destination regionDestination; 060 private boolean iterating; 061 private boolean flushRequired; 062 private final AtomicBoolean started = new AtomicBoolean(); 063 private final WireFormat wireFormat = new OpenWireFormat(); 064 /** 065 * @param broker 066 * @param name 067 * @param prioritizedMessages 068 */ 069 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { 070 super(prioritizedMessages); 071 if (this.prioritizedMessages) { 072 this.memoryList = new PrioritizedPendingList(); 073 } else { 074 this.memoryList = new OrderedPendingList(); 075 } 076 this.broker = broker; 077 // the store can be null if the BrokerService has persistence 078 // turned off 079 this.store = broker.getTempDataStore(); 080 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 081 } 082 083 @Override 084 public void start() throws Exception { 085 if (started.compareAndSet(false, true)) { 086 if( this.broker != null) { 087 wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion()); 088 } 089 super.start(); 090 if (systemUsage != null) { 091 systemUsage.getMemoryUsage().addUsageListener(this); 092 } 093 } 094 } 095 096 @Override 097 public void stop() throws Exception { 098 if (started.compareAndSet(true, false)) { 099 super.stop(); 100 if (systemUsage != null) { 101 systemUsage.getMemoryUsage().removeUsageListener(this); 102 } 103 } 104 } 105 106 /** 107 * @return true if there are no pending messages 108 */ 109 @Override 110 public synchronized boolean isEmpty() { 111 if (memoryList.isEmpty() && isDiskListEmpty()) { 112 return true; 113 } 114 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 115 MessageReference node = iterator.next(); 116 if (node == QueueMessageReference.NULL_MESSAGE) { 117 continue; 118 } 119 if (!node.isDropped()) { 120 return false; 121 } 122 // We can remove dropped references. 123 iterator.remove(); 124 } 125 return isDiskListEmpty(); 126 } 127 128 /** 129 * reset the cursor 130 */ 131 @Override 132 public synchronized void reset() { 133 iterating = true; 134 last = null; 135 if (isDiskListEmpty()) { 136 this.iter = this.memoryList.iterator(); 137 } else { 138 this.iter = new DiskIterator(); 139 } 140 } 141 142 @Override 143 public synchronized void release() { 144 iterating = false; 145 if (iter instanceof DiskIterator) { 146 ((DiskIterator)iter).release(); 147 }; 148 if (flushRequired) { 149 flushRequired = false; 150 if (!hasSpace()) { 151 flushToDisk(); 152 } 153 } 154 // ensure any memory ref is released 155 iter = null; 156 } 157 158 @Override 159 public synchronized void destroy() throws Exception { 160 stop(); 161 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 162 MessageReference node = i.next(); 163 node.decrementReferenceCount(); 164 } 165 memoryList.clear(); 166 destroyDiskList(); 167 } 168 169 private void destroyDiskList() throws Exception { 170 if (diskList != null) { 171 store.removePList(name); 172 diskList = null; 173 } 174 } 175 176 @Override 177 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 178 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 179 int count = 0; 180 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 181 MessageReference ref = i.next(); 182 ref.incrementReferenceCount(); 183 result.add(ref); 184 count++; 185 } 186 if (count < maxItems && !isDiskListEmpty()) { 187 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) { 188 Message message = (Message) i.next(); 189 message.setRegionDestination(regionDestination); 190 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 191 message.incrementReferenceCount(); 192 result.add(message); 193 count++; 194 } 195 } 196 return result; 197 } 198 199 /** 200 * add message to await dispatch 201 * 202 * @param node 203 * @throws Exception 204 */ 205 @Override 206 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 207 return tryAddMessageLast(node, 0); 208 } 209 210 @Override 211 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 212 if (!node.isExpired()) { 213 try { 214 regionDestination = (Destination) node.getMessage().getRegionDestination(); 215 if (isDiskListEmpty()) { 216 if (hasSpace() || this.store == null) { 217 memoryList.addMessageLast(node); 218 node.incrementReferenceCount(); 219 setCacheEnabled(true); 220 return true; 221 } 222 } 223 if (!hasSpace()) { 224 if (isDiskListEmpty()) { 225 expireOldMessages(); 226 if (hasSpace()) { 227 memoryList.addMessageLast(node); 228 node.incrementReferenceCount(); 229 return true; 230 } else { 231 flushToDisk(); 232 } 233 } 234 } 235 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) { 236 ByteSequence bs = getByteSequence(node.getMessage()); 237 getDiskList().addLast(node.getMessageId().toString(), bs); 238 return true; 239 } 240 return false; 241 242 } catch (Exception e) { 243 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 244 throw new RuntimeException(e); 245 } 246 } else { 247 discardExpiredMessage(node); 248 } 249 //message expired 250 return true; 251 } 252 253 /** 254 * add message to await dispatch 255 * 256 * @param node 257 */ 258 @Override 259 public synchronized void addMessageFirst(MessageReference node) { 260 if (!node.isExpired()) { 261 try { 262 regionDestination = (Destination) node.getMessage().getRegionDestination(); 263 if (isDiskListEmpty()) { 264 if (hasSpace()) { 265 memoryList.addMessageFirst(node); 266 node.incrementReferenceCount(); 267 setCacheEnabled(true); 268 return; 269 } 270 } 271 if (!hasSpace()) { 272 if (isDiskListEmpty()) { 273 expireOldMessages(); 274 if (hasSpace()) { 275 memoryList.addMessageFirst(node); 276 node.incrementReferenceCount(); 277 return; 278 } else { 279 flushToDisk(); 280 } 281 } 282 } 283 systemUsage.getTempUsage().waitForSpace(); 284 node.decrementReferenceCount(); 285 ByteSequence bs = getByteSequence(node.getMessage()); 286 Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs); 287 node.getMessageId().setPlistLocator(locator); 288 289 } catch (Exception e) { 290 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 291 throw new RuntimeException(e); 292 } 293 } else { 294 discardExpiredMessage(node); 295 } 296 } 297 298 /** 299 * @return true if there pending messages to dispatch 300 */ 301 @Override 302 public synchronized boolean hasNext() { 303 return iter.hasNext(); 304 } 305 306 /** 307 * @return the next pending message 308 */ 309 @Override 310 public synchronized MessageReference next() { 311 MessageReference reference = iter.next(); 312 last = reference; 313 if (!isDiskListEmpty()) { 314 // got from disk 315 reference.getMessage().setRegionDestination(regionDestination); 316 reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 317 } 318 reference.incrementReferenceCount(); 319 return reference; 320 } 321 322 /** 323 * remove the message at the cursor position 324 */ 325 @Override 326 public synchronized void remove() { 327 iter.remove(); 328 if (last != null) { 329 last.decrementReferenceCount(); 330 } 331 } 332 333 /** 334 * @param node 335 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 336 */ 337 @Override 338 public synchronized void remove(MessageReference node) { 339 if (memoryList.remove(node) != null) { 340 node.decrementReferenceCount(); 341 } 342 if (!isDiskListEmpty()) { 343 try { 344 getDiskList().remove(node.getMessageId().getPlistLocator()); 345 } catch (IOException e) { 346 throw new RuntimeException(e); 347 } 348 } 349 } 350 351 /** 352 * @return the number of pending messages 353 */ 354 @Override 355 public synchronized int size() { 356 return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size()); 357 } 358 359 /** 360 * clear all pending messages 361 */ 362 @Override 363 public synchronized void clear() { 364 memoryList.clear(); 365 if (!isDiskListEmpty()) { 366 try { 367 getDiskList().destroy(); 368 } catch (IOException e) { 369 throw new RuntimeException(e); 370 } 371 } 372 last = null; 373 } 374 375 @Override 376 public synchronized boolean isFull() { 377 378 return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); 379 380 } 381 382 @Override 383 public boolean hasMessagesBufferedToDeliver() { 384 return !isEmpty(); 385 } 386 387 @Override 388 public void setSystemUsage(SystemUsage usageManager) { 389 super.setSystemUsage(usageManager); 390 } 391 392 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 393 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 394 synchronized (this) { 395 if (!flushRequired && size() != 0) { 396 flushRequired =true; 397 if (!iterating) { 398 expireOldMessages(); 399 if (!hasSpace()) { 400 flushToDisk(); 401 flushRequired = false; 402 } 403 } 404 } 405 } 406 } 407 } 408 409 @Override 410 public boolean isTransient() { 411 return true; 412 } 413 414 protected synchronized void expireOldMessages() { 415 if (!memoryList.isEmpty()) { 416 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 417 MessageReference node = iterator.next(); 418 if (node.isExpired()) { 419 node.decrementReferenceCount(); 420 discardExpiredMessage(node); 421 iterator.remove(); 422 } 423 } 424 } 425 } 426 427 protected synchronized void flushToDisk() { 428 if (!memoryList.isEmpty() && store != null) { 429 long start = 0; 430 if (LOG.isTraceEnabled()) { 431 start = System.currentTimeMillis(); 432 LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 433 } 434 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 435 MessageReference node = iterator.next(); 436 node.decrementReferenceCount(); 437 ByteSequence bs; 438 try { 439 bs = getByteSequence(node.getMessage()); 440 getDiskList().addLast(node.getMessageId().toString(), bs); 441 } catch (IOException e) { 442 LOG.error("Failed to write to disk list", e); 443 throw new RuntimeException(e); 444 } 445 446 } 447 memoryList.clear(); 448 setCacheEnabled(false); 449 LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 450 } 451 } 452 453 protected boolean isDiskListEmpty() { 454 return diskList == null || diskList.isEmpty(); 455 } 456 457 public PList getDiskList() { 458 if (diskList == null) { 459 try { 460 diskList = store.getPList(name); 461 } catch (Exception e) { 462 LOG.error("Caught an IO Exception getting the DiskList {}", name, e); 463 throw new RuntimeException(e); 464 } 465 } 466 return diskList; 467 } 468 469 private void discardExpiredMessage(MessageReference reference) { 470 LOG.debug("Discarding expired message {}", reference); 471 if (broker.isExpired(reference)) { 472 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 473 context.setBroker(broker); 474 ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); 475 } 476 } 477 478 protected ByteSequence getByteSequence(Message message) throws IOException { 479 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 480 return new ByteSequence(packet.data, packet.offset, packet.length); 481 } 482 483 protected Message getMessage(ByteSequence bs) throws IOException { 484 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs 485 .getOffset(), bs.getLength()); 486 return (Message) this.wireFormat.unmarshal(packet); 487 488 } 489 490 final class DiskIterator implements Iterator<MessageReference> { 491 private final PList.PListIterator iterator; 492 DiskIterator() { 493 try { 494 iterator = getDiskList().iterator(); 495 } catch (Exception e) { 496 throw new RuntimeException(e); 497 } 498 } 499 500 public boolean hasNext() { 501 return iterator.hasNext(); 502 } 503 504 public MessageReference next() { 505 try { 506 PListEntry entry = iterator.next(); 507 Message message = getMessage(entry.getByteSequence()); 508 message.getMessageId().setPlistLocator(entry.getLocator()); 509 return message; 510 } catch (IOException e) { 511 LOG.error("I/O error", e); 512 throw new RuntimeException(e); 513 } 514 } 515 516 public void remove() { 517 iterator.remove(); 518 } 519 520 public void release() { 521 iterator.release(); 522 } 523 } 524}