001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Iterator; 020import java.util.LinkedList; 021import java.util.ListIterator; 022import java.util.concurrent.CancellationException; 023import java.util.concurrent.Future; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.MessageReference; 028import org.apache.activemq.broker.region.Subscription; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.store.MessageRecoveryListener; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Store based cursor 037 * 038 */ 039public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 040 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class); 041 protected final Destination regionDestination; 042 protected final PendingList batchList; 043 private Iterator<MessageReference> iterator = null; 044 protected boolean batchResetNeeded = false; 045 protected int size; 046 private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); 047 private static int SYNC_ADD = 0; 048 private static int ASYNC_ADD = 1; 049 final MessageId[] lastCachedIds = new MessageId[2]; 050 protected boolean hadSpace = false; 051 052 protected AbstractStoreCursor(Destination destination) { 053 super((destination != null ? destination.isPrioritizedMessages():false)); 054 this.regionDestination=destination; 055 if (this.prioritizedMessages) { 056 this.batchList= new PrioritizedPendingList(); 057 } else { 058 this.batchList = new OrderedPendingList(); 059 } 060 } 061 062 063 public final synchronized void start() throws Exception{ 064 if (!isStarted()) { 065 super.start(); 066 resetBatch(); 067 resetSize(); 068 setCacheEnabled(size==0&&useCache); 069 } 070 } 071 072 protected void resetSize() { 073 this.size = getStoreSize(); 074 } 075 076 @Override 077 public void rebase() { 078 resetSize(); 079 } 080 081 public final synchronized void stop() throws Exception { 082 resetBatch(); 083 super.stop(); 084 gc(); 085 } 086 087 088 public final boolean recoverMessage(Message message) throws Exception { 089 return recoverMessage(message,false); 090 } 091 092 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 093 boolean recovered = false; 094 if (recordUniqueId(message.getMessageId())) { 095 if (!cached) { 096 message.setRegionDestination(regionDestination); 097 if( message.getMemoryUsage()==null ) { 098 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 099 } 100 } 101 message.incrementReferenceCount(); 102 batchList.addMessageLast(message); 103 clearIterator(true); 104 recovered = true; 105 } else if (!cached) { 106 // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart 107 if (message.isRecievedByDFBridge()) { 108 // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true 109 if (LOG.isTraceEnabled()) { 110 LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 111 } 112 } else { 113 LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 114 duplicate(message); 115 } 116 } else { 117 LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 118 if (message.getMessageId().getEntryLocator() instanceof Long) { 119 // JDBC will store a duplicate (with new sequence id) - it needs an ack (AMQ4952Test) 120 duplicate(message); 121 } 122 } 123 return recovered; 124 } 125 126 // track for processing outside of store index lock so we can dlq 127 final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>(); 128 private void duplicate(Message message) { 129 duplicatesFromStore.add(message); 130 } 131 132 void dealWithDuplicates() { 133 for (Message message : duplicatesFromStore) { 134 regionDestination.duplicateFromStore(message, getSubscription()); 135 } 136 duplicatesFromStore.clear(); 137 } 138 139 public final synchronized void reset() { 140 if (batchList.isEmpty()) { 141 try { 142 fillBatch(); 143 } catch (Exception e) { 144 LOG.error("{} - Failed to fill batch", this, e); 145 throw new RuntimeException(e); 146 } 147 } 148 clearIterator(true); 149 size(); 150 } 151 152 153 public synchronized void release() { 154 clearIterator(false); 155 } 156 157 private synchronized void clearIterator(boolean ensureIterator) { 158 boolean haveIterator = this.iterator != null; 159 this.iterator=null; 160 if(haveIterator&&ensureIterator) { 161 ensureIterator(); 162 } 163 } 164 165 private synchronized void ensureIterator() { 166 if(this.iterator==null) { 167 this.iterator=this.batchList.iterator(); 168 } 169 } 170 171 172 public final void finished() { 173 } 174 175 176 public final synchronized boolean hasNext() { 177 if (batchList.isEmpty()) { 178 try { 179 fillBatch(); 180 } catch (Exception e) { 181 LOG.error("{} - Failed to fill batch", this, e); 182 throw new RuntimeException(e); 183 } 184 } 185 ensureIterator(); 186 return this.iterator.hasNext(); 187 } 188 189 190 public final synchronized MessageReference next() { 191 MessageReference result = null; 192 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 193 result = this.iterator.next(); 194 } 195 last = result; 196 if (result != null) { 197 result.incrementReferenceCount(); 198 } 199 return result; 200 } 201 202 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 203 boolean disableCache = false; 204 if (hasSpace()) { 205 if (!isCacheEnabled() && size==0 && isStarted() && useCache) { 206 if (LOG.isTraceEnabled()) { 207 LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); 208 } 209 setCacheEnabled(true); 210 } 211 if (isCacheEnabled()) { 212 if (recoverMessage(node.getMessage(),true)) { 213 trackLastCached(node); 214 } else { 215 dealWithDuplicates(); 216 return false; 217 } 218 } 219 } else { 220 disableCache = true; 221 } 222 223 if (disableCache && isCacheEnabled()) { 224 if (LOG.isTraceEnabled()) { 225 LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); 226 } 227 syncWithStore(node.getMessage()); 228 setCacheEnabled(false); 229 } 230 size++; 231 return true; 232 } 233 234 private void syncWithStore(Message currentAdd) throws Exception { 235 pruneLastCached(); 236 if (lastCachedIds[SYNC_ADD] == null) { 237 // possibly only async adds, lets wait on the potential last add and reset from there 238 for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { 239 MessageId lastPending = it.previous(); 240 Object futureOrLong = lastPending.getFutureOrSequenceLong(); 241 if (futureOrLong instanceof Future) { 242 Future future = (Future) futureOrLong; 243 if (future.isCancelled()) { 244 continue; 245 } 246 try { 247 future.get(5, TimeUnit.SECONDS); 248 setLastCachedId(ASYNC_ADD, lastPending); 249 } catch (CancellationException ok) { 250 continue; 251 } catch (TimeoutException potentialDeadlock) { 252 LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); 253 } catch (Exception worstCaseWeReplay) { 254 LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); 255 } 256 } else { 257 setLastCachedId(ASYNC_ADD, lastPending); 258 } 259 break; 260 } 261 if (lastCachedIds[ASYNC_ADD] != null) { 262 // ensure we don't skip current possibly sync add b/c we waited on the future 263 if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { 264 setBatch(lastCachedIds[ASYNC_ADD]); 265 } 266 } 267 } else { 268 setBatch(lastCachedIds[SYNC_ADD]); 269 } 270 // cleanup 271 lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; 272 pendingCachedIds.clear(); 273 } 274 275 private void trackLastCached(MessageReference node) { 276 if (isAsync(node.getMessage())) { 277 pruneLastCached(); 278 pendingCachedIds.add(node.getMessageId()); 279 } else { 280 setLastCachedId(SYNC_ADD, node.getMessageId()); 281 } 282 } 283 284 private static final boolean isAsync(Message message) { 285 return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future; 286 } 287 288 private void pruneLastCached() { 289 for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { 290 MessageId candidate = it.next(); 291 final Object futureOrLong = candidate.getFutureOrSequenceLong(); 292 if (futureOrLong instanceof Future) { 293 Future future = (Future) futureOrLong; 294 if (future.isCancelled()) { 295 it.remove(); 296 } else { 297 // we don't want to wait for work to complete 298 break; 299 } 300 } else { 301 // complete 302 setLastCachedId(ASYNC_ADD, candidate); 303 304 // keep lock step with sync adds while order is preserved 305 if (lastCachedIds[SYNC_ADD] != null) { 306 long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); 307 if (Long.compare((Long)futureOrLong, next) == 0) { 308 setLastCachedId(SYNC_ADD, candidate); 309 } 310 } 311 it.remove(); 312 } 313 } 314 } 315 316 private void setLastCachedId(final int index, MessageId candidate) { 317 if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong() == null) { // possibly null for topics 318 lastCachedIds[index] = candidate; 319 } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong())) > 0) { 320 lastCachedIds[index] = candidate; 321 } 322 } 323 324 protected void setBatch(MessageId messageId) throws Exception { 325 } 326 327 328 public synchronized void addMessageFirst(MessageReference node) throws Exception { 329 setCacheEnabled(false); 330 size++; 331 } 332 333 334 public final synchronized void remove() { 335 size--; 336 if (iterator!=null) { 337 iterator.remove(); 338 } 339 if (last != null) { 340 last.decrementReferenceCount(); 341 } 342 } 343 344 345 public final synchronized void remove(MessageReference node) { 346 if (batchList.remove(node) != null) { 347 size--; 348 setCacheEnabled(false); 349 } 350 } 351 352 353 public final synchronized void clear() { 354 gc(); 355 } 356 357 358 public synchronized void gc() { 359 for (MessageReference msg : batchList) { 360 rollback(msg.getMessageId()); 361 msg.decrementReferenceCount(); 362 } 363 batchList.clear(); 364 clearIterator(false); 365 batchResetNeeded = true; 366 setCacheEnabled(false); 367 } 368 369 protected final synchronized void fillBatch() { 370 if (LOG.isTraceEnabled()) { 371 LOG.trace("{} fillBatch", this); 372 } 373 if (batchResetNeeded) { 374 resetSize(); 375 setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); 376 resetBatch(); 377 this.batchResetNeeded = false; 378 } 379 if (this.batchList.isEmpty() && this.size >0) { 380 try { 381 doFillBatch(); 382 } catch (Exception e) { 383 LOG.error("{} - Failed to fill batch", this, e); 384 throw new RuntimeException(e); 385 } 386 } 387 } 388 389 390 public final synchronized boolean isEmpty() { 391 // negative means more messages added to store through queue.send since last reset 392 return size == 0; 393 } 394 395 396 public final synchronized boolean hasMessagesBufferedToDeliver() { 397 return !batchList.isEmpty(); 398 } 399 400 401 public final synchronized int size() { 402 if (size < 0) { 403 this.size = getStoreSize(); 404 } 405 return size; 406 } 407 408 @Override 409 public String toString() { 410 return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded 411 + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() 412 + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() 413 + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") 414 + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); 415 } 416 417 protected abstract void doFillBatch() throws Exception; 418 419 protected abstract void resetBatch(); 420 421 protected abstract int getStoreSize(); 422 423 protected abstract boolean isStoreEmpty(); 424 425 public Subscription getSubscription() { 426 return null; 427 } 428}