001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.EOFException; 024import java.io.File; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InterruptedIOException; 028import java.io.ObjectInputStream; 029import java.io.ObjectOutputStream; 030import java.io.OutputStream; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Date; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.LinkedHashMap; 040import java.util.LinkedHashSet; 041import java.util.LinkedList; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.SortedSet; 047import java.util.TreeMap; 048import java.util.TreeSet; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicLong; 051import java.util.concurrent.locks.ReentrantReadWriteLock; 052 053import org.apache.activemq.ActiveMQMessageAuditNoSync; 054import org.apache.activemq.broker.BrokerService; 055import org.apache.activemq.broker.BrokerServiceAware; 056import org.apache.activemq.command.MessageAck; 057import org.apache.activemq.command.TransactionId; 058import org.apache.activemq.openwire.OpenWireFormat; 059import org.apache.activemq.protobuf.Buffer; 060import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 061import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 062import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 063import org.apache.activemq.store.kahadb.data.KahaDestination; 064import org.apache.activemq.store.kahadb.data.KahaEntryType; 065import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 066import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 067import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 068import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 069import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 070import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 071import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 072import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 073import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 074import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 075import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 076import org.apache.activemq.store.kahadb.disk.index.ListIndex; 077import org.apache.activemq.store.kahadb.disk.journal.DataFile; 078import org.apache.activemq.store.kahadb.disk.journal.Journal; 079import org.apache.activemq.store.kahadb.disk.journal.Location; 080import org.apache.activemq.store.kahadb.disk.page.Page; 081import org.apache.activemq.store.kahadb.disk.page.PageFile; 082import org.apache.activemq.store.kahadb.disk.page.Transaction; 083import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 084import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 085import org.apache.activemq.store.kahadb.disk.util.Marshaller; 086import org.apache.activemq.store.kahadb.disk.util.Sequence; 087import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 088import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 089import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 090import org.apache.activemq.util.ByteSequence; 091import org.apache.activemq.util.DataByteArrayInputStream; 092import org.apache.activemq.util.DataByteArrayOutputStream; 093import org.apache.activemq.util.IOHelper; 094import org.apache.activemq.util.ServiceStopper; 095import org.apache.activemq.util.ServiceSupport; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098 099public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 100 101 protected BrokerService brokerService; 102 103 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 104 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 105 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 106 protected static final Buffer UNMATCHED; 107 static { 108 UNMATCHED = new Buffer(new byte[]{}); 109 } 110 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 111 112 static final int CLOSED_STATE = 1; 113 static final int OPEN_STATE = 2; 114 static final long NOT_ACKED = -1; 115 116 static final int VERSION = 5; 117 118 protected class Metadata { 119 protected Page<Metadata> page; 120 protected int state; 121 protected BTreeIndex<String, StoredDestination> destinations; 122 protected Location lastUpdate; 123 protected Location firstInProgressTransactionLocation; 124 protected Location producerSequenceIdTrackerLocation = null; 125 protected Location ackMessageFileMapLocation = null; 126 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 127 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 128 protected int version = VERSION; 129 protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; 130 131 public void read(DataInput is) throws IOException { 132 state = is.readInt(); 133 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 134 if (is.readBoolean()) { 135 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 136 } else { 137 lastUpdate = null; 138 } 139 if (is.readBoolean()) { 140 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 141 } else { 142 firstInProgressTransactionLocation = null; 143 } 144 try { 145 if (is.readBoolean()) { 146 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 147 } else { 148 producerSequenceIdTrackerLocation = null; 149 } 150 } catch (EOFException expectedOnUpgrade) { 151 } 152 try { 153 version = is.readInt(); 154 } catch (EOFException expectedOnUpgrade) { 155 version = 1; 156 } 157 if (version >= 5 && is.readBoolean()) { 158 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 159 } else { 160 ackMessageFileMapLocation = null; 161 } 162 try { 163 openwireVersion = is.readInt(); 164 } catch (EOFException expectedOnUpgrade) { 165 openwireVersion = OpenWireFormat.DEFAULT_VERSION; 166 } 167 LOG.info("KahaDB is version " + version); 168 } 169 170 public void write(DataOutput os) throws IOException { 171 os.writeInt(state); 172 os.writeLong(destinations.getPageId()); 173 174 if (lastUpdate != null) { 175 os.writeBoolean(true); 176 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 177 } else { 178 os.writeBoolean(false); 179 } 180 181 if (firstInProgressTransactionLocation != null) { 182 os.writeBoolean(true); 183 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 184 } else { 185 os.writeBoolean(false); 186 } 187 188 if (producerSequenceIdTrackerLocation != null) { 189 os.writeBoolean(true); 190 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 191 } else { 192 os.writeBoolean(false); 193 } 194 os.writeInt(VERSION); 195 if (ackMessageFileMapLocation != null) { 196 os.writeBoolean(true); 197 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 198 } else { 199 os.writeBoolean(false); 200 } 201 os.writeInt(this.openwireVersion); 202 } 203 } 204 205 class MetadataMarshaller extends VariableMarshaller<Metadata> { 206 @Override 207 public Metadata readPayload(DataInput dataIn) throws IOException { 208 Metadata rc = createMetadata(); 209 rc.read(dataIn); 210 return rc; 211 } 212 213 @Override 214 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 215 object.write(dataOut); 216 } 217 } 218 219 protected PageFile pageFile; 220 protected Journal journal; 221 protected Metadata metadata = new Metadata(); 222 223 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 224 225 protected boolean failIfDatabaseIsLocked; 226 227 protected boolean deleteAllMessages; 228 protected File directory = DEFAULT_DIRECTORY; 229 protected File indexDirectory = null; 230 protected Thread checkpointThread; 231 protected boolean enableJournalDiskSyncs=true; 232 protected boolean archiveDataLogs; 233 protected File directoryArchive; 234 protected AtomicLong journalSize = new AtomicLong(0); 235 long checkpointInterval = 5*1000; 236 long cleanupInterval = 30*1000; 237 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 238 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 239 boolean enableIndexWriteAsync = false; 240 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 241 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 242 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 243 244 protected AtomicBoolean opened = new AtomicBoolean(); 245 private boolean ignoreMissingJournalfiles = false; 246 private int indexCacheSize = 10000; 247 private boolean checkForCorruptJournalFiles = false; 248 private boolean checksumJournalFiles = true; 249 protected boolean forceRecoverIndex = false; 250 private final Object checkpointThreadLock = new Object(); 251 private boolean rewriteOnRedelivery = false; 252 private boolean archiveCorruptedIndex = false; 253 private boolean useIndexLFRUEviction = false; 254 private float indexLFUEvictionFactor = 0.2f; 255 private boolean enableIndexDiskSyncs = true; 256 private boolean enableIndexRecoveryFile = true; 257 private boolean enableIndexPageCaching = true; 258 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 259 260 @Override 261 public void doStart() throws Exception { 262 load(); 263 } 264 265 @Override 266 public void doStop(ServiceStopper stopper) throws Exception { 267 unload(); 268 } 269 270 private void loadPageFile() throws IOException { 271 this.indexLock.writeLock().lock(); 272 try { 273 final PageFile pageFile = getPageFile(); 274 pageFile.load(); 275 pageFile.tx().execute(new Transaction.Closure<IOException>() { 276 @Override 277 public void execute(Transaction tx) throws IOException { 278 if (pageFile.getPageCount() == 0) { 279 // First time this is created.. Initialize the metadata 280 Page<Metadata> page = tx.allocate(); 281 assert page.getPageId() == 0; 282 page.set(metadata); 283 metadata.page = page; 284 metadata.state = CLOSED_STATE; 285 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 286 287 tx.store(metadata.page, metadataMarshaller, true); 288 } else { 289 Page<Metadata> page = tx.load(0, metadataMarshaller); 290 metadata = page.get(); 291 metadata.page = page; 292 } 293 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 294 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 295 metadata.destinations.load(tx); 296 } 297 }); 298 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 299 // Perhaps we should just keep an index of file 300 storedDestinations.clear(); 301 pageFile.tx().execute(new Transaction.Closure<IOException>() { 302 @Override 303 public void execute(Transaction tx) throws IOException { 304 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 305 Entry<String, StoredDestination> entry = iterator.next(); 306 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 307 storedDestinations.put(entry.getKey(), sd); 308 309 if (checkForCorruptJournalFiles) { 310 // sanity check the index also 311 if (!entry.getValue().locationIndex.isEmpty(tx)) { 312 if (entry.getValue().orderIndex.nextMessageId <= 0) { 313 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 314 } 315 } 316 } 317 } 318 } 319 }); 320 pageFile.flush(); 321 } finally { 322 this.indexLock.writeLock().unlock(); 323 } 324 } 325 326 private void startCheckpoint() { 327 if (checkpointInterval == 0 && cleanupInterval == 0) { 328 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 329 return; 330 } 331 synchronized (checkpointThreadLock) { 332 boolean start = false; 333 if (checkpointThread == null) { 334 start = true; 335 } else if (!checkpointThread.isAlive()) { 336 start = true; 337 LOG.info("KahaDB: Recovering checkpoint thread after death"); 338 } 339 if (start) { 340 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 341 @Override 342 public void run() { 343 try { 344 long lastCleanup = System.currentTimeMillis(); 345 long lastCheckpoint = System.currentTimeMillis(); 346 // Sleep for a short time so we can periodically check 347 // to see if we need to exit this thread. 348 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 349 while (opened.get()) { 350 Thread.sleep(sleepTime); 351 long now = System.currentTimeMillis(); 352 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { 353 checkpointCleanup(true); 354 lastCleanup = now; 355 lastCheckpoint = now; 356 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { 357 checkpointCleanup(false); 358 lastCheckpoint = now; 359 } 360 } 361 } catch (InterruptedException e) { 362 // Looks like someone really wants us to exit this thread... 363 } catch (IOException ioe) { 364 LOG.error("Checkpoint failed", ioe); 365 brokerService.handleIOException(ioe); 366 } 367 } 368 }; 369 370 checkpointThread.setDaemon(true); 371 checkpointThread.start(); 372 } 373 } 374 } 375 376 public void open() throws IOException { 377 if( opened.compareAndSet(false, true) ) { 378 getJournal().start(); 379 try { 380 loadPageFile(); 381 } catch (Throwable t) { 382 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 383 if (LOG.isDebugEnabled()) { 384 LOG.debug("Index load failure", t); 385 } 386 // try to recover index 387 try { 388 pageFile.unload(); 389 } catch (Exception ignore) {} 390 if (archiveCorruptedIndex) { 391 pageFile.archive(); 392 } else { 393 pageFile.delete(); 394 } 395 metadata = createMetadata(); 396 pageFile = null; 397 loadPageFile(); 398 } 399 startCheckpoint(); 400 recover(); 401 } 402 } 403 404 public void load() throws IOException { 405 this.indexLock.writeLock().lock(); 406 IOHelper.mkdirs(directory); 407 try { 408 if (deleteAllMessages) { 409 getJournal().start(); 410 getJournal().delete(); 411 getJournal().close(); 412 journal = null; 413 getPageFile().delete(); 414 LOG.info("Persistence store purged."); 415 deleteAllMessages = false; 416 } 417 418 open(); 419 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 420 } finally { 421 this.indexLock.writeLock().unlock(); 422 } 423 } 424 425 public void close() throws IOException, InterruptedException { 426 if( opened.compareAndSet(true, false)) { 427 checkpointLock.writeLock().lock(); 428 try { 429 if (metadata.page != null) { 430 checkpointUpdate(true); 431 } 432 pageFile.unload(); 433 metadata = createMetadata(); 434 } finally { 435 checkpointLock.writeLock().unlock(); 436 } 437 journal.close(); 438 synchronized (checkpointThreadLock) { 439 if (checkpointThread != null) { 440 checkpointThread.join(); 441 } 442 } 443 } 444 } 445 446 public void unload() throws IOException, InterruptedException { 447 this.indexLock.writeLock().lock(); 448 try { 449 if( pageFile != null && pageFile.isLoaded() ) { 450 metadata.state = CLOSED_STATE; 451 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 452 453 if (metadata.page != null) { 454 pageFile.tx().execute(new Transaction.Closure<IOException>() { 455 @Override 456 public void execute(Transaction tx) throws IOException { 457 tx.store(metadata.page, metadataMarshaller, true); 458 } 459 }); 460 } 461 } 462 } finally { 463 this.indexLock.writeLock().unlock(); 464 } 465 close(); 466 } 467 468 // public for testing 469 @SuppressWarnings("rawtypes") 470 public Location[] getInProgressTxLocationRange() { 471 Location[] range = new Location[]{null, null}; 472 synchronized (inflightTransactions) { 473 if (!inflightTransactions.isEmpty()) { 474 for (List<Operation> ops : inflightTransactions.values()) { 475 if (!ops.isEmpty()) { 476 trackMaxAndMin(range, ops); 477 } 478 } 479 } 480 if (!preparedTransactions.isEmpty()) { 481 for (List<Operation> ops : preparedTransactions.values()) { 482 if (!ops.isEmpty()) { 483 trackMaxAndMin(range, ops); 484 } 485 } 486 } 487 } 488 return range; 489 } 490 491 @SuppressWarnings("rawtypes") 492 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 493 Location t = ops.get(0).getLocation(); 494 if (range[0]==null || t.compareTo(range[0]) <= 0) { 495 range[0] = t; 496 } 497 t = ops.get(ops.size() -1).getLocation(); 498 if (range[1]==null || t.compareTo(range[1]) >= 0) { 499 range[1] = t; 500 } 501 } 502 503 class TranInfo { 504 TransactionId id; 505 Location location; 506 507 class opCount { 508 int add; 509 int remove; 510 } 511 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 512 513 @SuppressWarnings("rawtypes") 514 public void track(Operation operation) { 515 if (location == null ) { 516 location = operation.getLocation(); 517 } 518 KahaDestination destination; 519 boolean isAdd = false; 520 if (operation instanceof AddOperation) { 521 AddOperation add = (AddOperation) operation; 522 destination = add.getCommand().getDestination(); 523 isAdd = true; 524 } else { 525 RemoveOperation removeOpperation = (RemoveOperation) operation; 526 destination = removeOpperation.getCommand().getDestination(); 527 } 528 opCount opCount = destinationOpCount.get(destination); 529 if (opCount == null) { 530 opCount = new opCount(); 531 destinationOpCount.put(destination, opCount); 532 } 533 if (isAdd) { 534 opCount.add++; 535 } else { 536 opCount.remove++; 537 } 538 } 539 540 @Override 541 public String toString() { 542 StringBuffer buffer = new StringBuffer(); 543 buffer.append(location).append(";").append(id).append(";\n"); 544 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 545 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 546 } 547 return buffer.toString(); 548 } 549 } 550 551 @SuppressWarnings("rawtypes") 552 public String getTransactions() { 553 554 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 555 synchronized (inflightTransactions) { 556 if (!inflightTransactions.isEmpty()) { 557 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 558 TranInfo info = new TranInfo(); 559 info.id = entry.getKey(); 560 for (Operation operation : entry.getValue()) { 561 info.track(operation); 562 } 563 infos.add(info); 564 } 565 } 566 } 567 synchronized (preparedTransactions) { 568 if (!preparedTransactions.isEmpty()) { 569 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 570 TranInfo info = new TranInfo(); 571 info.id = entry.getKey(); 572 for (Operation operation : entry.getValue()) { 573 info.track(operation); 574 } 575 infos.add(info); 576 } 577 } 578 } 579 return infos.toString(); 580 } 581 582 /** 583 * Move all the messages that were in the journal into long term storage. We 584 * just replay and do a checkpoint. 585 * 586 * @throws IOException 587 * @throws IOException 588 * @throws IllegalStateException 589 */ 590 private void recover() throws IllegalStateException, IOException { 591 this.indexLock.writeLock().lock(); 592 try { 593 594 long start = System.currentTimeMillis(); 595 Location producerAuditPosition = recoverProducerAudit(); 596 Location ackMessageFileLocation = recoverAckMessageFileMap(); 597 Location lastIndoubtPosition = getRecoveryPosition(); 598 599 Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); 600 recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); 601 602 if (recoveryPosition != null) { 603 int redoCounter = 0; 604 LOG.info("Recovering from the journal @" + recoveryPosition); 605 while (recoveryPosition != null) { 606 try { 607 JournalCommand<?> message = load(recoveryPosition); 608 metadata.lastUpdate = recoveryPosition; 609 process(message, recoveryPosition, lastIndoubtPosition); 610 redoCounter++; 611 } catch (IOException failedRecovery) { 612 if (isIgnoreMissingJournalfiles()) { 613 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 614 // track this dud location 615 journal.corruptRecoveryLocation(recoveryPosition); 616 } else { 617 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 618 } 619 } 620 recoveryPosition = journal.getNextLocation(recoveryPosition); 621 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 622 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 623 } 624 } 625 if (LOG.isInfoEnabled()) { 626 long end = System.currentTimeMillis(); 627 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 628 } 629 } 630 631 // We may have to undo some index updates. 632 pageFile.tx().execute(new Transaction.Closure<IOException>() { 633 @Override 634 public void execute(Transaction tx) throws IOException { 635 recoverIndex(tx); 636 } 637 }); 638 639 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 640 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 641 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 642 synchronized (inflightTransactions) { 643 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 644 TransactionId id = it.next(); 645 if (id.isLocalTransaction()) { 646 toRollback.add(id); 647 } else { 648 toDiscard.add(id); 649 } 650 } 651 for (TransactionId tx: toRollback) { 652 if (LOG.isDebugEnabled()) { 653 LOG.debug("rolling back recovered indoubt local transaction " + tx); 654 } 655 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 656 } 657 for (TransactionId tx: toDiscard) { 658 if (LOG.isDebugEnabled()) { 659 LOG.debug("discarding recovered in-flight XA transaction " + tx); 660 } 661 inflightTransactions.remove(tx); 662 } 663 } 664 665 synchronized (preparedTransactions) { 666 for (TransactionId txId : preparedTransactions.keySet()) { 667 LOG.warn("Recovered prepared XA TX: [{}]", txId); 668 } 669 } 670 671 } finally { 672 this.indexLock.writeLock().unlock(); 673 } 674 } 675 676 @SuppressWarnings("unused") 677 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 678 return TransactionIdConversion.convertToLocal(tx); 679 } 680 681 private Location minimum(Location producerAuditPosition, 682 Location lastIndoubtPosition) { 683 Location min = null; 684 if (producerAuditPosition != null) { 685 min = producerAuditPosition; 686 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { 687 min = lastIndoubtPosition; 688 } 689 } else { 690 min = lastIndoubtPosition; 691 } 692 return min; 693 } 694 695 private Location recoverProducerAudit() throws IOException { 696 if (metadata.producerSequenceIdTrackerLocation != null) { 697 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 698 try { 699 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 700 int maxNumProducers = getMaxFailoverProducersToTrack(); 701 int maxAuditDepth = getFailoverProducersAuditDepth(); 702 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 703 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 704 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 705 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); 706 } catch (Exception e) { 707 LOG.warn("Cannot recover message audit", e); 708 return journal.getNextLocation(null); 709 } 710 } else { 711 // got no audit stored so got to recreate via replay from start of the journal 712 return journal.getNextLocation(null); 713 } 714 } 715 716 @SuppressWarnings("unchecked") 717 private Location recoverAckMessageFileMap() throws IOException { 718 if (metadata.ackMessageFileMapLocation != null) { 719 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 720 try { 721 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 722 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 723 return journal.getNextLocation(metadata.ackMessageFileMapLocation); 724 } catch (Exception e) { 725 LOG.warn("Cannot recover ackMessageFileMap", e); 726 return journal.getNextLocation(null); 727 } 728 } else { 729 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 730 return journal.getNextLocation(null); 731 } 732 } 733 734 protected void recoverIndex(Transaction tx) throws IOException { 735 long start = System.currentTimeMillis(); 736 // It is possible index updates got applied before the journal updates.. 737 // in that case we need to removed references to messages that are not in the journal 738 final Location lastAppendLocation = journal.getLastAppendLocation(); 739 long undoCounter=0; 740 741 // Go through all the destinations to see if they have messages past the lastAppendLocation 742 for (StoredDestination sd : storedDestinations.values()) { 743 744 final ArrayList<Long> matches = new ArrayList<Long>(); 745 // Find all the Locations that are >= than the last Append Location. 746 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 747 @Override 748 protected void matched(Location key, Long value) { 749 matches.add(value); 750 } 751 }); 752 753 for (Long sequenceId : matches) { 754 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 755 sd.locationIndex.remove(tx, keys.location); 756 sd.messageIdIndex.remove(tx, keys.messageId); 757 metadata.producerSequenceIdTracker.rollback(keys.messageId); 758 undoCounter++; 759 // TODO: do we need to modify the ack positions for the pub sub case? 760 } 761 } 762 763 if( undoCounter > 0 ) { 764 // The rolledback operations are basically in flight journal writes. To avoid getting 765 // these the end user should do sync writes to the journal. 766 if (LOG.isInfoEnabled()) { 767 long end = System.currentTimeMillis(); 768 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 769 } 770 } 771 772 undoCounter = 0; 773 start = System.currentTimeMillis(); 774 775 // Lets be extra paranoid here and verify that all the datafiles being referenced 776 // by the indexes still exists. 777 778 final SequenceSet ss = new SequenceSet(); 779 for (StoredDestination sd : storedDestinations.values()) { 780 // Use a visitor to cut down the number of pages that we load 781 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 782 int last=-1; 783 784 @Override 785 public boolean isInterestedInKeysBetween(Location first, Location second) { 786 if( first==null ) { 787 return !ss.contains(0, second.getDataFileId()); 788 } else if( second==null ) { 789 return true; 790 } else { 791 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 792 } 793 } 794 795 @Override 796 public void visit(List<Location> keys, List<Long> values) { 797 for (Location l : keys) { 798 int fileId = l.getDataFileId(); 799 if( last != fileId ) { 800 ss.add(fileId); 801 last = fileId; 802 } 803 } 804 } 805 806 }); 807 } 808 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 809 while (!ss.isEmpty()) { 810 missingJournalFiles.add((int) ss.removeFirst()); 811 } 812 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 813 814 if (!missingJournalFiles.isEmpty()) { 815 if (LOG.isInfoEnabled()) { 816 LOG.info("Some journal files are missing: " + missingJournalFiles); 817 } 818 } 819 820 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 821 for (Integer missing : missingJournalFiles) { 822 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 823 } 824 825 if (checkForCorruptJournalFiles) { 826 Collection<DataFile> dataFiles = journal.getFileMap().values(); 827 for (DataFile dataFile : dataFiles) { 828 int id = dataFile.getDataFileId(); 829 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 830 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 831 while (seq != null) { 832 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1))); 833 seq = seq.getNext(); 834 } 835 } 836 } 837 838 if (!missingPredicates.isEmpty()) { 839 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 840 final StoredDestination sd = sdEntry.getValue(); 841 final ArrayList<Long> matches = new ArrayList<Long>(); 842 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 843 @Override 844 protected void matched(Location key, Long value) { 845 matches.add(value); 846 } 847 }); 848 849 // If somes message references are affected by the missing data files... 850 if (!matches.isEmpty()) { 851 852 // We either 'gracefully' recover dropping the missing messages or 853 // we error out. 854 if( ignoreMissingJournalfiles ) { 855 // Update the index to remove the references to the missing data 856 for (Long sequenceId : matches) { 857 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 858 sd.locationIndex.remove(tx, keys.location); 859 sd.messageIdIndex.remove(tx, keys.messageId); 860 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 861 undoCounter++; 862 // TODO: do we need to modify the ack positions for the pub sub case? 863 } 864 } else { 865 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); 866 } 867 } 868 } 869 } 870 871 if( undoCounter > 0 ) { 872 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 873 // should do sync writes to the journal. 874 if (LOG.isInfoEnabled()) { 875 long end = System.currentTimeMillis(); 876 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 877 } 878 } 879 } 880 881 private Location nextRecoveryPosition; 882 private Location lastRecoveryPosition; 883 884 public void incrementalRecover() throws IOException { 885 this.indexLock.writeLock().lock(); 886 try { 887 if( nextRecoveryPosition == null ) { 888 if( lastRecoveryPosition==null ) { 889 nextRecoveryPosition = getRecoveryPosition(); 890 } else { 891 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 892 } 893 } 894 while (nextRecoveryPosition != null) { 895 lastRecoveryPosition = nextRecoveryPosition; 896 metadata.lastUpdate = lastRecoveryPosition; 897 JournalCommand<?> message = load(lastRecoveryPosition); 898 process(message, lastRecoveryPosition, (IndexAware) null); 899 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 900 } 901 } finally { 902 this.indexLock.writeLock().unlock(); 903 } 904 } 905 906 public Location getLastUpdatePosition() throws IOException { 907 return metadata.lastUpdate; 908 } 909 910 private Location getRecoveryPosition() throws IOException { 911 912 if (!this.forceRecoverIndex) { 913 914 // If we need to recover the transactions.. 915 if (metadata.firstInProgressTransactionLocation != null) { 916 return metadata.firstInProgressTransactionLocation; 917 } 918 919 // Perhaps there were no transactions... 920 if( metadata.lastUpdate!=null) { 921 // Start replay at the record after the last one recorded in the index file. 922 return journal.getNextLocation(metadata.lastUpdate); 923 } 924 } 925 // This loads the first position. 926 return journal.getNextLocation(null); 927 } 928 929 protected void checkpointCleanup(final boolean cleanup) throws IOException { 930 long start; 931 this.indexLock.writeLock().lock(); 932 try { 933 start = System.currentTimeMillis(); 934 if( !opened.get() ) { 935 return; 936 } 937 } finally { 938 this.indexLock.writeLock().unlock(); 939 } 940 checkpointUpdate(cleanup); 941 long end = System.currentTimeMillis(); 942 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 943 if (LOG.isInfoEnabled()) { 944 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 945 } 946 } 947 } 948 949 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 950 int size = data.serializedSizeFramed(); 951 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 952 os.writeByte(data.type().getNumber()); 953 data.writeFramed(os); 954 return os.toByteSequence(); 955 } 956 957 // ///////////////////////////////////////////////////////////////// 958 // Methods call by the broker to update and query the store. 959 // ///////////////////////////////////////////////////////////////// 960 public Location store(JournalCommand<?> data) throws IOException { 961 return store(data, false, null,null); 962 } 963 964 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 965 return store(data, false, null, null, onJournalStoreComplete); 966 } 967 968 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 969 return store(data, sync, before, after, null); 970 } 971 972 /** 973 * All updated are are funneled through this method. The updates are converted 974 * to a JournalMessage which is logged to the journal and then the data from 975 * the JournalMessage is used to update the index just like it would be done 976 * during a recovery process. 977 */ 978 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 979 try { 980 ByteSequence sequence = toByteSequence(data); 981 982 Location location; 983 checkpointLock.readLock().lock(); 984 try { 985 986 long start = System.currentTimeMillis(); 987 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 988 long start2 = System.currentTimeMillis(); 989 process(data, location, before); 990 991 long end = System.currentTimeMillis(); 992 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 993 if (LOG.isInfoEnabled()) { 994 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 995 } 996 } 997 998 } finally{ 999 checkpointLock.readLock().unlock(); 1000 } 1001 if (after != null) { 1002 after.run(); 1003 } 1004 1005 if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) { 1006 startCheckpoint(); 1007 } 1008 return location; 1009 } catch (IOException ioe) { 1010 LOG.error("KahaDB failed to store to Journal", ioe); 1011 brokerService.handleIOException(ioe); 1012 throw ioe; 1013 } 1014 } 1015 1016 /** 1017 * Loads a previously stored JournalMessage 1018 * 1019 * @param location 1020 * @return 1021 * @throws IOException 1022 */ 1023 public JournalCommand<?> load(Location location) throws IOException { 1024 long start = System.currentTimeMillis(); 1025 ByteSequence data = journal.read(location); 1026 long end = System.currentTimeMillis(); 1027 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1028 if (LOG.isInfoEnabled()) { 1029 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1030 } 1031 } 1032 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1033 byte readByte = is.readByte(); 1034 KahaEntryType type = KahaEntryType.valueOf(readByte); 1035 if( type == null ) { 1036 try { 1037 is.close(); 1038 } catch (IOException e) {} 1039 throw new IOException("Could not load journal record. Invalid location: "+location); 1040 } 1041 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1042 message.mergeFramed(is); 1043 return message; 1044 } 1045 1046 /** 1047 * do minimal recovery till we reach the last inDoubtLocation 1048 * @param data 1049 * @param location 1050 * @param inDoubtlocation 1051 * @throws IOException 1052 */ 1053 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1054 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1055 if (data instanceof KahaSubscriptionCommand) { 1056 KahaSubscriptionCommand kahaSubscriptionCommand = (KahaSubscriptionCommand)data; 1057 if (kahaSubscriptionCommand.hasSubscriptionInfo()) { 1058 // needs to be processed via activate and will be replayed on reconnect 1059 LOG.debug("ignoring add sub command during recovery replay:" + data); 1060 return; 1061 } 1062 } 1063 process(data, location, (IndexAware) null); 1064 } else { 1065 // just recover producer audit 1066 data.visit(new Visitor() { 1067 @Override 1068 public void visit(KahaAddMessageCommand command) throws IOException { 1069 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1070 } 1071 }); 1072 } 1073 } 1074 1075 // ///////////////////////////////////////////////////////////////// 1076 // Journaled record processing methods. Once the record is journaled, 1077 // these methods handle applying the index updates. These may be called 1078 // from the recovery method too so they need to be idempotent 1079 // ///////////////////////////////////////////////////////////////// 1080 1081 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1082 data.visit(new Visitor() { 1083 @Override 1084 public void visit(KahaAddMessageCommand command) throws IOException { 1085 process(command, location, onSequenceAssignedCallback); 1086 } 1087 1088 @Override 1089 public void visit(KahaRemoveMessageCommand command) throws IOException { 1090 process(command, location); 1091 } 1092 1093 @Override 1094 public void visit(KahaPrepareCommand command) throws IOException { 1095 process(command, location); 1096 } 1097 1098 @Override 1099 public void visit(KahaCommitCommand command) throws IOException { 1100 process(command, location, onSequenceAssignedCallback); 1101 } 1102 1103 @Override 1104 public void visit(KahaRollbackCommand command) throws IOException { 1105 process(command, location); 1106 } 1107 1108 @Override 1109 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1110 process(command, location); 1111 } 1112 1113 @Override 1114 public void visit(KahaSubscriptionCommand command) throws IOException { 1115 process(command, location); 1116 } 1117 1118 @Override 1119 public void visit(KahaProducerAuditCommand command) throws IOException { 1120 processLocation(location); 1121 } 1122 1123 @Override 1124 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1125 processLocation(location); 1126 } 1127 1128 @Override 1129 public void visit(KahaTraceCommand command) { 1130 processLocation(location); 1131 } 1132 1133 @Override 1134 public void visit(KahaUpdateMessageCommand command) throws IOException { 1135 process(command, location); 1136 } 1137 }); 1138 } 1139 1140 @SuppressWarnings("rawtypes") 1141 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1142 if (command.hasTransactionInfo()) { 1143 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1144 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1145 } else { 1146 this.indexLock.writeLock().lock(); 1147 try { 1148 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1149 @Override 1150 public void execute(Transaction tx) throws IOException { 1151 long assignedIndex = updateIndex(tx, command, location); 1152 if (runWithIndexLock != null) { 1153 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1154 } 1155 } 1156 }); 1157 1158 } finally { 1159 this.indexLock.writeLock().unlock(); 1160 } 1161 } 1162 } 1163 1164 @SuppressWarnings("rawtypes") 1165 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1166 this.indexLock.writeLock().lock(); 1167 try { 1168 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1169 @Override 1170 public void execute(Transaction tx) throws IOException { 1171 updateIndex(tx, command, location); 1172 } 1173 }); 1174 } finally { 1175 this.indexLock.writeLock().unlock(); 1176 } 1177 } 1178 1179 @SuppressWarnings("rawtypes") 1180 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1181 if (command.hasTransactionInfo()) { 1182 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1183 inflightTx.add(new RemoveOperation(command, location)); 1184 } else { 1185 this.indexLock.writeLock().lock(); 1186 try { 1187 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1188 @Override 1189 public void execute(Transaction tx) throws IOException { 1190 updateIndex(tx, command, location); 1191 } 1192 }); 1193 } finally { 1194 this.indexLock.writeLock().unlock(); 1195 } 1196 } 1197 } 1198 1199 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1200 this.indexLock.writeLock().lock(); 1201 try { 1202 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1203 @Override 1204 public void execute(Transaction tx) throws IOException { 1205 updateIndex(tx, command, location); 1206 } 1207 }); 1208 } finally { 1209 this.indexLock.writeLock().unlock(); 1210 } 1211 } 1212 1213 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1214 this.indexLock.writeLock().lock(); 1215 try { 1216 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1217 @Override 1218 public void execute(Transaction tx) throws IOException { 1219 updateIndex(tx, command, location); 1220 } 1221 }); 1222 } finally { 1223 this.indexLock.writeLock().unlock(); 1224 } 1225 } 1226 1227 protected void processLocation(final Location location) { 1228 this.indexLock.writeLock().lock(); 1229 try { 1230 metadata.lastUpdate = location; 1231 } finally { 1232 this.indexLock.writeLock().unlock(); 1233 } 1234 } 1235 1236 @SuppressWarnings("rawtypes") 1237 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1238 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1239 List<Operation> inflightTx; 1240 synchronized (inflightTransactions) { 1241 inflightTx = inflightTransactions.remove(key); 1242 if (inflightTx == null) { 1243 inflightTx = preparedTransactions.remove(key); 1244 } 1245 } 1246 if (inflightTx == null) { 1247 // only non persistent messages in this tx 1248 if (before != null) { 1249 before.sequenceAssignedWithIndexLocked(-1); 1250 } 1251 return; 1252 } 1253 1254 final List<Operation> messagingTx = inflightTx; 1255 indexLock.writeLock().lock(); 1256 try { 1257 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1258 @Override 1259 public void execute(Transaction tx) throws IOException { 1260 for (Operation op : messagingTx) { 1261 op.execute(tx); 1262 } 1263 } 1264 }); 1265 metadata.lastUpdate = location; 1266 } finally { 1267 indexLock.writeLock().unlock(); 1268 } 1269 } 1270 1271 @SuppressWarnings("rawtypes") 1272 protected void process(KahaPrepareCommand command, Location location) { 1273 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1274 synchronized (inflightTransactions) { 1275 List<Operation> tx = inflightTransactions.remove(key); 1276 if (tx != null) { 1277 preparedTransactions.put(key, tx); 1278 } 1279 } 1280 } 1281 1282 @SuppressWarnings("rawtypes") 1283 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1284 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1285 List<Operation> updates = null; 1286 synchronized (inflightTransactions) { 1287 updates = inflightTransactions.remove(key); 1288 if (updates == null) { 1289 updates = preparedTransactions.remove(key); 1290 } 1291 } 1292 } 1293 1294 // ///////////////////////////////////////////////////////////////// 1295 // These methods do the actual index updates. 1296 // ///////////////////////////////////////////////////////////////// 1297 1298 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1299 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1300 1301 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1302 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1303 1304 // Skip adding the message to the index if this is a topic and there are 1305 // no subscriptions. 1306 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1307 return -1; 1308 } 1309 1310 // Add the message. 1311 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1312 long id = sd.orderIndex.getNextMessageId(priority); 1313 Long previous = sd.locationIndex.put(tx, location, id); 1314 if (previous == null) { 1315 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1316 if (previous == null) { 1317 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1318 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1319 addAckLocationForNewMessage(tx, sd, id); 1320 } 1321 metadata.lastUpdate = location; 1322 } else { 1323 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1324 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1325 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1326 sd.locationIndex.remove(tx, location); 1327 id = -1; 1328 } 1329 } else { 1330 // restore the previous value.. Looks like this was a redo of a previously 1331 // added message. We don't want to assign it a new id as the other indexes would 1332 // be wrong.. 1333 sd.locationIndex.put(tx, location, previous); 1334 metadata.lastUpdate = location; 1335 } 1336 // record this id in any event, initial send or recovery 1337 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1338 return id; 1339 } 1340 1341 void trackPendingAdd(KahaDestination destination, Long seq) { 1342 StoredDestination sd = storedDestinations.get(key(destination)); 1343 if (sd != null) { 1344 sd.trackPendingAdd(seq); 1345 } 1346 } 1347 1348 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1349 StoredDestination sd = storedDestinations.get(key(destination)); 1350 if (sd != null) { 1351 sd.trackPendingAddComplete(seq); 1352 } 1353 } 1354 1355 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1356 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1357 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1358 1359 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1360 if (id != null) { 1361 MessageKeys previousKeys = sd.orderIndex.put( 1362 tx, 1363 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1364 id, 1365 new MessageKeys(command.getMessageId(), location) 1366 ); 1367 sd.locationIndex.put(tx, location, id); 1368 if(previousKeys != null) { 1369 sd.locationIndex.remove(tx, previousKeys.location); 1370 } 1371 metadata.lastUpdate = location; 1372 } else { 1373 LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1374 } 1375 } 1376 1377 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1378 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1379 if (!command.hasSubscriptionKey()) { 1380 1381 // In the queue case we just remove the message from the index.. 1382 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1383 if (sequenceId != null) { 1384 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1385 if (keys != null) { 1386 sd.locationIndex.remove(tx, keys.location); 1387 recordAckMessageReferenceLocation(ackLocation, keys.location); 1388 metadata.lastUpdate = ackLocation; 1389 } else if (LOG.isDebugEnabled()) { 1390 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1391 } 1392 } else if (LOG.isDebugEnabled()) { 1393 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1394 } 1395 } else { 1396 // In the topic case we need remove the message once it's been acked 1397 // by all the subs 1398 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1399 1400 // Make sure it's a valid message id... 1401 if (sequence != null) { 1402 String subscriptionKey = command.getSubscriptionKey(); 1403 if (command.getAck() != UNMATCHED) { 1404 sd.orderIndex.get(tx, sequence); 1405 byte priority = sd.orderIndex.lastGetPriority(); 1406 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1407 } 1408 1409 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1410 if (keys != null) { 1411 recordAckMessageReferenceLocation(ackLocation, keys.location); 1412 } 1413 // The following method handles deleting un-referenced messages. 1414 removeAckLocation(tx, sd, subscriptionKey, sequence); 1415 metadata.lastUpdate = ackLocation; 1416 } else if (LOG.isDebugEnabled()) { 1417 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1418 } 1419 1420 } 1421 } 1422 1423 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1424 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1425 if (referenceFileIds == null) { 1426 referenceFileIds = new HashSet<Integer>(); 1427 referenceFileIds.add(messageLocation.getDataFileId()); 1428 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1429 } else { 1430 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1431 if (!referenceFileIds.contains(id)) { 1432 referenceFileIds.add(id); 1433 } 1434 } 1435 } 1436 1437 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1438 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1439 sd.orderIndex.remove(tx); 1440 1441 sd.locationIndex.clear(tx); 1442 sd.locationIndex.unload(tx); 1443 tx.free(sd.locationIndex.getPageId()); 1444 1445 sd.messageIdIndex.clear(tx); 1446 sd.messageIdIndex.unload(tx); 1447 tx.free(sd.messageIdIndex.getPageId()); 1448 1449 if (sd.subscriptions != null) { 1450 sd.subscriptions.clear(tx); 1451 sd.subscriptions.unload(tx); 1452 tx.free(sd.subscriptions.getPageId()); 1453 1454 sd.subscriptionAcks.clear(tx); 1455 sd.subscriptionAcks.unload(tx); 1456 tx.free(sd.subscriptionAcks.getPageId()); 1457 1458 sd.ackPositions.clear(tx); 1459 sd.ackPositions.unload(tx); 1460 tx.free(sd.ackPositions.getHeadPageId()); 1461 1462 sd.subLocations.clear(tx); 1463 sd.subLocations.unload(tx); 1464 tx.free(sd.subLocations.getHeadPageId()); 1465 } 1466 1467 String key = key(command.getDestination()); 1468 storedDestinations.remove(key); 1469 metadata.destinations.remove(tx, key); 1470 } 1471 1472 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1473 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1474 final String subscriptionKey = command.getSubscriptionKey(); 1475 1476 // If set then we are creating it.. otherwise we are destroying the sub 1477 if (command.hasSubscriptionInfo()) { 1478 sd.subscriptions.put(tx, subscriptionKey, command); 1479 sd.subLocations.put(tx, subscriptionKey, location); 1480 long ackLocation=NOT_ACKED; 1481 if (!command.getRetroactive()) { 1482 ackLocation = sd.orderIndex.nextMessageId-1; 1483 } else { 1484 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1485 } 1486 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1487 sd.subscriptionCache.add(subscriptionKey); 1488 } else { 1489 // delete the sub... 1490 sd.subscriptions.remove(tx, subscriptionKey); 1491 sd.subLocations.remove(tx, subscriptionKey); 1492 sd.subscriptionAcks.remove(tx, subscriptionKey); 1493 sd.subscriptionCache.remove(subscriptionKey); 1494 removeAckLocationsForSub(tx, sd, subscriptionKey); 1495 1496 if (sd.subscriptions.isEmpty(tx)) { 1497 // remove the stored destination 1498 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1499 removeDestinationCommand.setDestination(command.getDestination()); 1500 updateIndex(tx, removeDestinationCommand, null); 1501 } 1502 } 1503 } 1504 1505 private void checkpointUpdate(final boolean cleanup) throws IOException { 1506 checkpointLock.writeLock().lock(); 1507 try { 1508 this.indexLock.writeLock().lock(); 1509 try { 1510 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1511 @Override 1512 public void execute(Transaction tx) throws IOException { 1513 checkpointUpdate(tx, cleanup); 1514 } 1515 }); 1516 } finally { 1517 this.indexLock.writeLock().unlock(); 1518 } 1519 1520 } finally { 1521 checkpointLock.writeLock().unlock(); 1522 } 1523 } 1524 1525 /** 1526 * @param tx 1527 * @throws IOException 1528 */ 1529 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1530 LOG.debug("Checkpoint started."); 1531 1532 // reflect last update exclusive of current checkpoint 1533 Location lastUpdate = metadata.lastUpdate; 1534 1535 metadata.state = OPEN_STATE; 1536 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1537 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1538 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1539 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1540 tx.store(metadata.page, metadataMarshaller, true); 1541 pageFile.flush(); 1542 1543 if( cleanup ) { 1544 1545 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1546 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 1547 1548 if (LOG.isTraceEnabled()) { 1549 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1550 } 1551 1552 if (lastUpdate != null) { 1553 gcCandidateSet.remove(lastUpdate.getDataFileId()); 1554 } 1555 1556 // Don't GC files under replication 1557 if( journalFilesBeingReplicated!=null ) { 1558 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1559 } 1560 1561 if (metadata.producerSequenceIdTrackerLocation != null) { 1562 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1563 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1564 // rewrite so we don't prevent gc 1565 metadata.producerSequenceIdTracker.setModified(true); 1566 if (LOG.isTraceEnabled()) { 1567 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1568 } 1569 } 1570 gcCandidateSet.remove(dataFileId); 1571 if (LOG.isTraceEnabled()) { 1572 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet); 1573 } 1574 } 1575 1576 if (metadata.ackMessageFileMapLocation != null) { 1577 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1578 gcCandidateSet.remove(dataFileId); 1579 if (LOG.isTraceEnabled()) { 1580 LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); 1581 } 1582 } 1583 1584 // Don't GC files referenced by in-progress tx 1585 if (inProgressTxRange[0] != null) { 1586 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1587 gcCandidateSet.remove(pendingTx); 1588 } 1589 } 1590 if (LOG.isTraceEnabled()) { 1591 LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1592 } 1593 1594 // Go through all the destinations to see if any of them can remove GC candidates. 1595 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1596 if( gcCandidateSet.isEmpty() ) { 1597 break; 1598 } 1599 1600 // Use a visitor to cut down the number of pages that we load 1601 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1602 int last=-1; 1603 @Override 1604 public boolean isInterestedInKeysBetween(Location first, Location second) { 1605 if( first==null ) { 1606 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1607 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1608 subset.remove(second.getDataFileId()); 1609 } 1610 return !subset.isEmpty(); 1611 } else if( second==null ) { 1612 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1613 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1614 subset.remove(first.getDataFileId()); 1615 } 1616 return !subset.isEmpty(); 1617 } else { 1618 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1619 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1620 subset.remove(first.getDataFileId()); 1621 } 1622 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1623 subset.remove(second.getDataFileId()); 1624 } 1625 return !subset.isEmpty(); 1626 } 1627 } 1628 1629 @Override 1630 public void visit(List<Location> keys, List<Long> values) { 1631 for (Location l : keys) { 1632 int fileId = l.getDataFileId(); 1633 if( last != fileId ) { 1634 gcCandidateSet.remove(fileId); 1635 last = fileId; 1636 } 1637 } 1638 } 1639 }); 1640 1641 // Durable Subscription 1642 if (entry.getValue().subLocations != null) { 1643 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1644 while (iter.hasNext()) { 1645 Entry<String, Location> subscription = iter.next(); 1646 int dataFileId = subscription.getValue().getDataFileId(); 1647 1648 // Move subscription along if it has no outstanding messages that need ack'd 1649 // and its in the last log file in the journal. 1650 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1651 final StoredDestination destination = entry.getValue(); 1652 final String subscriptionKey = subscription.getKey(); 1653 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1654 1655 // When pending is size one that is the next message Id meaning there 1656 // are no pending messages currently. 1657 if (pendingAcks == null || pendingAcks.size() <= 1) { 1658 if (LOG.isTraceEnabled()) { 1659 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1660 } 1661 1662 final KahaSubscriptionCommand kahaSub = 1663 destination.subscriptions.get(tx, subscriptionKey); 1664 destination.subLocations.put( 1665 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1666 1667 // Skips the remove from candidates if we rewrote the subscription 1668 // in order to prevent duplicate subscription commands on recover. 1669 // If another subscription is on the same file and isn't rewritten 1670 // than it will remove the file from the set. 1671 continue; 1672 } 1673 } 1674 1675 gcCandidateSet.remove(dataFileId); 1676 } 1677 } 1678 1679 if (LOG.isTraceEnabled()) { 1680 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1681 } 1682 } 1683 1684 // check we are not deleting file with ack for in-use journal files 1685 if (LOG.isTraceEnabled()) { 1686 LOG.trace("gc candidates: " + gcCandidateSet); 1687 } 1688 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1689 while (candidates.hasNext()) { 1690 Integer candidate = candidates.next(); 1691 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1692 if (referencedFileIds != null) { 1693 for (Integer referencedFileId : referencedFileIds) { 1694 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1695 // active file that is not targeted for deletion is referenced so don't delete 1696 candidates.remove(); 1697 break; 1698 } 1699 } 1700 if (gcCandidateSet.contains(candidate)) { 1701 metadata.ackMessageFileMap.remove(candidate); 1702 } else { 1703 if (LOG.isTraceEnabled()) { 1704 LOG.trace("not removing data file: " + candidate 1705 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1706 } 1707 } 1708 } 1709 } 1710 1711 if (!gcCandidateSet.isEmpty()) { 1712 if (LOG.isDebugEnabled()) { 1713 LOG.debug("Cleanup removing the data files: " + gcCandidateSet); 1714 } 1715 journal.removeDataFiles(gcCandidateSet); 1716 } 1717 } 1718 1719 LOG.debug("Checkpoint done."); 1720 } 1721 1722 final Runnable nullCompletionCallback = new Runnable() { 1723 @Override 1724 public void run() { 1725 } 1726 }; 1727 1728 private Location checkpointProducerAudit() throws IOException { 1729 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 1730 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1731 ObjectOutputStream oout = new ObjectOutputStream(baos); 1732 oout.writeObject(metadata.producerSequenceIdTracker); 1733 oout.flush(); 1734 oout.close(); 1735 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 1736 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 1737 try { 1738 location.getLatch().await(); 1739 } catch (InterruptedException e) { 1740 throw new InterruptedIOException(e.toString()); 1741 } 1742 return location; 1743 } 1744 return metadata.producerSequenceIdTrackerLocation; 1745 } 1746 1747 private Location checkpointAckMessageFileMap() throws IOException { 1748 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1749 ObjectOutputStream oout = new ObjectOutputStream(baos); 1750 oout.writeObject(metadata.ackMessageFileMap); 1751 oout.flush(); 1752 oout.close(); 1753 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 1754 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 1755 try { 1756 location.getLatch().await(); 1757 } catch (InterruptedException e) { 1758 throw new InterruptedIOException(e.toString()); 1759 } 1760 return location; 1761 } 1762 1763 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 1764 1765 ByteSequence sequence = toByteSequence(subscription); 1766 Location location = journal.write(sequence, nullCompletionCallback) ; 1767 1768 try { 1769 location.getLatch().await(); 1770 } catch (InterruptedException e) { 1771 throw new InterruptedIOException(e.toString()); 1772 } 1773 return location; 1774 } 1775 1776 public HashSet<Integer> getJournalFilesBeingReplicated() { 1777 return journalFilesBeingReplicated; 1778 } 1779 1780 // ///////////////////////////////////////////////////////////////// 1781 // StoredDestination related implementation methods. 1782 // ///////////////////////////////////////////////////////////////// 1783 1784 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 1785 1786 static class MessageKeys { 1787 final String messageId; 1788 final Location location; 1789 1790 public MessageKeys(String messageId, Location location) { 1791 this.messageId=messageId; 1792 this.location=location; 1793 } 1794 1795 @Override 1796 public String toString() { 1797 return "["+messageId+","+location+"]"; 1798 } 1799 } 1800 1801 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 1802 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 1803 1804 @Override 1805 public MessageKeys readPayload(DataInput dataIn) throws IOException { 1806 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 1807 } 1808 1809 @Override 1810 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 1811 dataOut.writeUTF(object.messageId); 1812 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 1813 } 1814 } 1815 1816 class LastAck { 1817 long lastAckedSequence; 1818 byte priority; 1819 1820 public LastAck(LastAck source) { 1821 this.lastAckedSequence = source.lastAckedSequence; 1822 this.priority = source.priority; 1823 } 1824 1825 public LastAck() { 1826 this.priority = MessageOrderIndex.HI; 1827 } 1828 1829 public LastAck(long ackLocation) { 1830 this.lastAckedSequence = ackLocation; 1831 this.priority = MessageOrderIndex.LO; 1832 } 1833 1834 public LastAck(long ackLocation, byte priority) { 1835 this.lastAckedSequence = ackLocation; 1836 this.priority = priority; 1837 } 1838 1839 @Override 1840 public String toString() { 1841 return "[" + lastAckedSequence + ":" + priority + "]"; 1842 } 1843 } 1844 1845 protected class LastAckMarshaller implements Marshaller<LastAck> { 1846 1847 @Override 1848 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 1849 dataOut.writeLong(object.lastAckedSequence); 1850 dataOut.writeByte(object.priority); 1851 } 1852 1853 @Override 1854 public LastAck readPayload(DataInput dataIn) throws IOException { 1855 LastAck lastAcked = new LastAck(); 1856 lastAcked.lastAckedSequence = dataIn.readLong(); 1857 if (metadata.version >= 3) { 1858 lastAcked.priority = dataIn.readByte(); 1859 } 1860 return lastAcked; 1861 } 1862 1863 @Override 1864 public int getFixedSize() { 1865 return 9; 1866 } 1867 1868 @Override 1869 public LastAck deepCopy(LastAck source) { 1870 return new LastAck(source); 1871 } 1872 1873 @Override 1874 public boolean isDeepCopySupported() { 1875 return true; 1876 } 1877 } 1878 1879 class StoredDestination { 1880 1881 MessageOrderIndex orderIndex = new MessageOrderIndex(); 1882 BTreeIndex<Location, Long> locationIndex; 1883 BTreeIndex<String, Long> messageIdIndex; 1884 1885 // These bits are only set for Topics 1886 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 1887 BTreeIndex<String, LastAck> subscriptionAcks; 1888 HashMap<String, MessageOrderCursor> subscriptionCursors; 1889 ListIndex<String, SequenceSet> ackPositions; 1890 ListIndex<String, Location> subLocations; 1891 1892 // Transient data used to track which Messages are no longer needed. 1893 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 1894 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 1895 1896 public void trackPendingAdd(Long seq) { 1897 orderIndex.trackPendingAdd(seq); 1898 } 1899 1900 public void trackPendingAddComplete(Long seq) { 1901 orderIndex.trackPendingAddComplete(seq); 1902 } 1903 1904 @Override 1905 public String toString() { 1906 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 1907 } 1908 } 1909 1910 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 1911 1912 @Override 1913 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 1914 final StoredDestination value = new StoredDestination(); 1915 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1916 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 1917 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 1918 1919 if (dataIn.readBoolean()) { 1920 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 1921 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 1922 if (metadata.version >= 4) { 1923 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 1924 } else { 1925 // upgrade 1926 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1927 @Override 1928 public void execute(Transaction tx) throws IOException { 1929 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 1930 1931 if (metadata.version >= 3) { 1932 // migrate 1933 BTreeIndex<Long, HashSet<String>> oldAckPositions = 1934 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 1935 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 1936 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 1937 oldAckPositions.load(tx); 1938 1939 1940 // Do the initial build of the data in memory before writing into the store 1941 // based Ack Positions List to avoid a lot of disk thrashing. 1942 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 1943 while (iterator.hasNext()) { 1944 Entry<Long, HashSet<String>> entry = iterator.next(); 1945 1946 for(String subKey : entry.getValue()) { 1947 SequenceSet pendingAcks = temp.get(subKey); 1948 if (pendingAcks == null) { 1949 pendingAcks = new SequenceSet(); 1950 temp.put(subKey, pendingAcks); 1951 } 1952 1953 pendingAcks.add(entry.getKey()); 1954 } 1955 } 1956 } 1957 // Now move the pending messages to ack data into the store backed 1958 // structure. 1959 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 1960 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 1961 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 1962 value.ackPositions.load(tx); 1963 for(String subscriptionKey : temp.keySet()) { 1964 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 1965 } 1966 1967 } 1968 }); 1969 } 1970 1971 if (metadata.version >= 5) { 1972 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 1973 } else { 1974 // upgrade 1975 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1976 @Override 1977 public void execute(Transaction tx) throws IOException { 1978 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 1979 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 1980 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 1981 value.subLocations.load(tx); 1982 } 1983 }); 1984 } 1985 } 1986 if (metadata.version >= 2) { 1987 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1988 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1989 } else { 1990 // upgrade 1991 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1992 @Override 1993 public void execute(Transaction tx) throws IOException { 1994 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 1995 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 1996 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 1997 value.orderIndex.lowPriorityIndex.load(tx); 1998 1999 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2000 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2001 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2002 value.orderIndex.highPriorityIndex.load(tx); 2003 } 2004 }); 2005 } 2006 2007 return value; 2008 } 2009 2010 @Override 2011 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2012 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2013 dataOut.writeLong(value.locationIndex.getPageId()); 2014 dataOut.writeLong(value.messageIdIndex.getPageId()); 2015 if (value.subscriptions != null) { 2016 dataOut.writeBoolean(true); 2017 dataOut.writeLong(value.subscriptions.getPageId()); 2018 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2019 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2020 dataOut.writeLong(value.subLocations.getHeadPageId()); 2021 } else { 2022 dataOut.writeBoolean(false); 2023 } 2024 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2025 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2026 } 2027 } 2028 2029 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2030 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2031 2032 @Override 2033 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2034 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2035 rc.mergeFramed((InputStream)dataIn); 2036 return rc; 2037 } 2038 2039 @Override 2040 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2041 object.writeFramed((OutputStream)dataOut); 2042 } 2043 } 2044 2045 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2046 String key = key(destination); 2047 StoredDestination rc = storedDestinations.get(key); 2048 if (rc == null) { 2049 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2050 rc = loadStoredDestination(tx, key, topic); 2051 // Cache it. We may want to remove/unload destinations from the 2052 // cache that are not used for a while 2053 // to reduce memory usage. 2054 storedDestinations.put(key, rc); 2055 } 2056 return rc; 2057 } 2058 2059 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2060 String key = key(destination); 2061 StoredDestination rc = storedDestinations.get(key); 2062 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2063 rc = getStoredDestination(destination, tx); 2064 } 2065 return rc; 2066 } 2067 2068 /** 2069 * @param tx 2070 * @param key 2071 * @param topic 2072 * @return 2073 * @throws IOException 2074 */ 2075 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2076 // Try to load the existing indexes.. 2077 StoredDestination rc = metadata.destinations.get(tx, key); 2078 if (rc == null) { 2079 // Brand new destination.. allocate indexes for it. 2080 rc = new StoredDestination(); 2081 rc.orderIndex.allocate(tx); 2082 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2083 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2084 2085 if (topic) { 2086 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2087 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2088 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2089 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2090 } 2091 metadata.destinations.put(tx, key, rc); 2092 } 2093 2094 // Configure the marshalers and load. 2095 rc.orderIndex.load(tx); 2096 2097 // Figure out the next key using the last entry in the destination. 2098 rc.orderIndex.configureLast(tx); 2099 2100 rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE); 2101 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2102 rc.locationIndex.load(tx); 2103 2104 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2105 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2106 rc.messageIdIndex.load(tx); 2107 2108 // If it was a topic... 2109 if (topic) { 2110 2111 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2112 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2113 rc.subscriptions.load(tx); 2114 2115 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2116 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2117 rc.subscriptionAcks.load(tx); 2118 2119 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2120 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2121 rc.ackPositions.load(tx); 2122 2123 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2124 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2125 rc.subLocations.load(tx); 2126 2127 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2128 2129 if (metadata.version < 3) { 2130 2131 // on upgrade need to fill ackLocation with available messages past last ack 2132 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2133 Entry<String, LastAck> entry = iterator.next(); 2134 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2135 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2136 Long sequence = orderIterator.next().getKey(); 2137 addAckLocation(tx, rc, sequence, entry.getKey()); 2138 } 2139 // modify so it is upgraded 2140 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2141 } 2142 } 2143 2144 // Configure the message references index 2145 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2146 while (subscriptions.hasNext()) { 2147 Entry<String, SequenceSet> subscription = subscriptions.next(); 2148 SequenceSet pendingAcks = subscription.getValue(); 2149 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2150 Long lastPendingAck = pendingAcks.getTail().getLast(); 2151 for(Long sequenceId : pendingAcks) { 2152 Long current = rc.messageReferences.get(sequenceId); 2153 if (current == null) { 2154 current = new Long(0); 2155 } 2156 2157 // We always add a trailing empty entry for the next position to start from 2158 // so we need to ensure we don't count that as a message reference on reload. 2159 if (!sequenceId.equals(lastPendingAck)) { 2160 current = current.longValue() + 1; 2161 } 2162 2163 rc.messageReferences.put(sequenceId, current); 2164 } 2165 } 2166 } 2167 2168 // Configure the subscription cache 2169 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2170 Entry<String, LastAck> entry = iterator.next(); 2171 rc.subscriptionCache.add(entry.getKey()); 2172 } 2173 2174 if (rc.orderIndex.nextMessageId == 0) { 2175 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2176 if (!rc.subscriptionAcks.isEmpty(tx)) { 2177 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2178 Entry<String, LastAck> entry = iterator.next(); 2179 rc.orderIndex.nextMessageId = 2180 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2181 } 2182 } 2183 } else { 2184 // update based on ackPositions for unmatched, last entry is always the next 2185 if (!rc.messageReferences.isEmpty()) { 2186 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2187 rc.orderIndex.nextMessageId = 2188 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2189 } 2190 } 2191 } 2192 2193 if (metadata.version < VERSION) { 2194 // store again after upgrade 2195 metadata.destinations.put(tx, key, rc); 2196 } 2197 return rc; 2198 } 2199 2200 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2201 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2202 if (sequences == null) { 2203 sequences = new SequenceSet(); 2204 sequences.add(messageSequence); 2205 sd.ackPositions.add(tx, subscriptionKey, sequences); 2206 } else { 2207 sequences.add(messageSequence); 2208 sd.ackPositions.put(tx, subscriptionKey, sequences); 2209 } 2210 2211 Long count = sd.messageReferences.get(messageSequence); 2212 if (count == null) { 2213 count = Long.valueOf(0L); 2214 } 2215 count = count.longValue() + 1; 2216 sd.messageReferences.put(messageSequence, count); 2217 } 2218 2219 // new sub is interested in potentially all existing messages 2220 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2221 SequenceSet allOutstanding = new SequenceSet(); 2222 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2223 while (iterator.hasNext()) { 2224 SequenceSet set = iterator.next().getValue(); 2225 for (Long entry : set) { 2226 allOutstanding.add(entry); 2227 } 2228 } 2229 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2230 2231 for (Long ackPosition : allOutstanding) { 2232 Long count = sd.messageReferences.get(ackPosition); 2233 count = count.longValue() + 1; 2234 sd.messageReferences.put(ackPosition, count); 2235 } 2236 } 2237 2238 // on a new message add, all existing subs are interested in this message 2239 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2240 for(String subscriptionKey : sd.subscriptionCache) { 2241 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2242 if (sequences == null) { 2243 sequences = new SequenceSet(); 2244 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2245 sd.ackPositions.add(tx, subscriptionKey, sequences); 2246 } else { 2247 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2248 sd.ackPositions.put(tx, subscriptionKey, sequences); 2249 } 2250 2251 Long count = sd.messageReferences.get(messageSequence); 2252 if (count == null) { 2253 count = Long.valueOf(0L); 2254 } 2255 count = count.longValue() + 1; 2256 sd.messageReferences.put(messageSequence, count); 2257 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L)); 2258 } 2259 } 2260 2261 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2262 if (!sd.ackPositions.isEmpty(tx)) { 2263 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2264 if (sequences == null || sequences.isEmpty()) { 2265 return; 2266 } 2267 2268 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2269 2270 for(Long sequenceId : sequences) { 2271 Long references = sd.messageReferences.get(sequenceId); 2272 if (references != null) { 2273 references = references.longValue() - 1; 2274 2275 if (references.longValue() > 0) { 2276 sd.messageReferences.put(sequenceId, references); 2277 } else { 2278 sd.messageReferences.remove(sequenceId); 2279 unreferenced.add(sequenceId); 2280 } 2281 } 2282 } 2283 2284 for(Long sequenceId : unreferenced) { 2285 // Find all the entries that need to get deleted. 2286 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2287 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2288 2289 // Do the actual deletes. 2290 for (Entry<Long, MessageKeys> entry : deletes) { 2291 sd.locationIndex.remove(tx, entry.getValue().location); 2292 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2293 sd.orderIndex.remove(tx, entry.getKey()); 2294 } 2295 } 2296 } 2297 } 2298 2299 /** 2300 * @param tx 2301 * @param sd 2302 * @param subscriptionKey 2303 * @param messageSequence 2304 * @throws IOException 2305 */ 2306 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { 2307 // Remove the sub from the previous location set.. 2308 if (messageSequence != null) { 2309 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2310 if (range != null && !range.isEmpty()) { 2311 range.remove(messageSequence); 2312 if (!range.isEmpty()) { 2313 sd.ackPositions.put(tx, subscriptionKey, range); 2314 } else { 2315 sd.ackPositions.remove(tx, subscriptionKey); 2316 } 2317 2318 // Check if the message is reference by any other subscription. 2319 Long count = sd.messageReferences.get(messageSequence); 2320 if (count != null){ 2321 long references = count.longValue() - 1; 2322 if (references > 0) { 2323 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2324 return; 2325 } else { 2326 sd.messageReferences.remove(messageSequence); 2327 } 2328 } 2329 2330 // Find all the entries that need to get deleted. 2331 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2332 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2333 2334 // Do the actual deletes. 2335 for (Entry<Long, MessageKeys> entry : deletes) { 2336 sd.locationIndex.remove(tx, entry.getValue().location); 2337 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2338 sd.orderIndex.remove(tx, entry.getKey()); 2339 } 2340 } 2341 } 2342 } 2343 2344 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2345 return sd.subscriptionAcks.get(tx, subscriptionKey); 2346 } 2347 2348 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2349 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2350 if (messageSequences != null) { 2351 long result = messageSequences.rangeSize(); 2352 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2353 return result > 0 ? result - 1 : 0; 2354 } 2355 2356 return 0; 2357 } 2358 2359 protected String key(KahaDestination destination) { 2360 return destination.getType().getNumber() + ":" + destination.getName(); 2361 } 2362 2363 // ///////////////////////////////////////////////////////////////// 2364 // Transaction related implementation methods. 2365 // ///////////////////////////////////////////////////////////////// 2366 @SuppressWarnings("rawtypes") 2367 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2368 @SuppressWarnings("rawtypes") 2369 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2370 protected final Set<String> ackedAndPrepared = new HashSet<String>(); 2371 protected final Set<String> rolledBackAcks = new HashSet<String>(); 2372 2373 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 2374 // till then they are skipped by the store. 2375 // 'at most once' XA guarantee 2376 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 2377 this.indexLock.writeLock().lock(); 2378 try { 2379 for (MessageAck ack : acks) { 2380 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 2381 } 2382 } finally { 2383 this.indexLock.writeLock().unlock(); 2384 } 2385 } 2386 2387 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 2388 if (acks != null) { 2389 this.indexLock.writeLock().lock(); 2390 try { 2391 for (MessageAck ack : acks) { 2392 final String id = ack.getLastMessageId().toProducerKey(); 2393 ackedAndPrepared.remove(id); 2394 if (rollback) { 2395 rolledBackAcks.add(id); 2396 } 2397 } 2398 } finally { 2399 this.indexLock.writeLock().unlock(); 2400 } 2401 } 2402 } 2403 2404 @SuppressWarnings("rawtypes") 2405 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2406 TransactionId key = TransactionIdConversion.convert(info); 2407 List<Operation> tx; 2408 synchronized (inflightTransactions) { 2409 tx = inflightTransactions.get(key); 2410 if (tx == null) { 2411 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2412 inflightTransactions.put(key, tx); 2413 } 2414 } 2415 return tx; 2416 } 2417 2418 @SuppressWarnings("unused") 2419 private TransactionId key(KahaTransactionInfo transactionInfo) { 2420 return TransactionIdConversion.convert(transactionInfo); 2421 } 2422 2423 abstract class Operation <T extends JournalCommand<T>> { 2424 final T command; 2425 final Location location; 2426 2427 public Operation(T command, Location location) { 2428 this.command = command; 2429 this.location = location; 2430 } 2431 2432 public Location getLocation() { 2433 return location; 2434 } 2435 2436 public T getCommand() { 2437 return command; 2438 } 2439 2440 abstract public void execute(Transaction tx) throws IOException; 2441 } 2442 2443 class AddOperation extends Operation<KahaAddMessageCommand> { 2444 final IndexAware runWithIndexLock; 2445 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2446 super(command, location); 2447 this.runWithIndexLock = runWithIndexLock; 2448 } 2449 2450 @Override 2451 public void execute(Transaction tx) throws IOException { 2452 long seq = updateIndex(tx, command, location); 2453 if (runWithIndexLock != null) { 2454 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2455 } 2456 } 2457 2458 } 2459 2460 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2461 2462 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2463 super(command, location); 2464 } 2465 2466 @Override 2467 public void execute(Transaction tx) throws IOException { 2468 updateIndex(tx, command, location); 2469 } 2470 } 2471 2472 // ///////////////////////////////////////////////////////////////// 2473 // Initialization related implementation methods. 2474 // ///////////////////////////////////////////////////////////////// 2475 2476 private PageFile createPageFile() throws IOException { 2477 if( indexDirectory == null ) { 2478 indexDirectory = directory; 2479 } 2480 IOHelper.mkdirs(indexDirectory); 2481 PageFile index = new PageFile(indexDirectory, "db"); 2482 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2483 index.setWriteBatchSize(getIndexWriteBatchSize()); 2484 index.setPageCacheSize(indexCacheSize); 2485 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2486 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2487 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2488 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2489 index.setEnablePageCaching(isEnableIndexPageCaching()); 2490 return index; 2491 } 2492 2493 private Journal createJournal() throws IOException { 2494 Journal manager = new Journal(); 2495 manager.setDirectory(directory); 2496 manager.setMaxFileLength(getJournalMaxFileLength()); 2497 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2498 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2499 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2500 manager.setArchiveDataLogs(isArchiveDataLogs()); 2501 manager.setSizeAccumulator(journalSize); 2502 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2503 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2504 manager.setPreallocationStrategy( 2505 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2506 if (getDirectoryArchive() != null) { 2507 IOHelper.mkdirs(getDirectoryArchive()); 2508 manager.setDirectoryArchive(getDirectoryArchive()); 2509 } 2510 return manager; 2511 } 2512 2513 private Metadata createMetadata() { 2514 Metadata md = new Metadata(); 2515 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2516 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2517 return md; 2518 } 2519 2520 public int getJournalMaxWriteBatchSize() { 2521 return journalMaxWriteBatchSize; 2522 } 2523 2524 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2525 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2526 } 2527 2528 public File getDirectory() { 2529 return directory; 2530 } 2531 2532 public void setDirectory(File directory) { 2533 this.directory = directory; 2534 } 2535 2536 public boolean isDeleteAllMessages() { 2537 return deleteAllMessages; 2538 } 2539 2540 public void setDeleteAllMessages(boolean deleteAllMessages) { 2541 this.deleteAllMessages = deleteAllMessages; 2542 } 2543 2544 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2545 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2546 } 2547 2548 public int getIndexWriteBatchSize() { 2549 return setIndexWriteBatchSize; 2550 } 2551 2552 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2553 this.enableIndexWriteAsync = enableIndexWriteAsync; 2554 } 2555 2556 boolean isEnableIndexWriteAsync() { 2557 return enableIndexWriteAsync; 2558 } 2559 2560 public boolean isEnableJournalDiskSyncs() { 2561 return enableJournalDiskSyncs; 2562 } 2563 2564 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2565 this.enableJournalDiskSyncs = syncWrites; 2566 } 2567 2568 public long getCheckpointInterval() { 2569 return checkpointInterval; 2570 } 2571 2572 public void setCheckpointInterval(long checkpointInterval) { 2573 this.checkpointInterval = checkpointInterval; 2574 } 2575 2576 public long getCleanupInterval() { 2577 return cleanupInterval; 2578 } 2579 2580 public void setCleanupInterval(long cleanupInterval) { 2581 this.cleanupInterval = cleanupInterval; 2582 } 2583 2584 public void setJournalMaxFileLength(int journalMaxFileLength) { 2585 this.journalMaxFileLength = journalMaxFileLength; 2586 } 2587 2588 public int getJournalMaxFileLength() { 2589 return journalMaxFileLength; 2590 } 2591 2592 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 2593 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 2594 } 2595 2596 public int getMaxFailoverProducersToTrack() { 2597 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 2598 } 2599 2600 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 2601 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 2602 } 2603 2604 public int getFailoverProducersAuditDepth() { 2605 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 2606 } 2607 2608 public PageFile getPageFile() throws IOException { 2609 if (pageFile == null) { 2610 pageFile = createPageFile(); 2611 } 2612 return pageFile; 2613 } 2614 2615 public Journal getJournal() throws IOException { 2616 if (journal == null) { 2617 journal = createJournal(); 2618 } 2619 return journal; 2620 } 2621 2622 public boolean isFailIfDatabaseIsLocked() { 2623 return failIfDatabaseIsLocked; 2624 } 2625 2626 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 2627 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 2628 } 2629 2630 public boolean isIgnoreMissingJournalfiles() { 2631 return ignoreMissingJournalfiles; 2632 } 2633 2634 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 2635 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 2636 } 2637 2638 public int getIndexCacheSize() { 2639 return indexCacheSize; 2640 } 2641 2642 public void setIndexCacheSize(int indexCacheSize) { 2643 this.indexCacheSize = indexCacheSize; 2644 } 2645 2646 public boolean isCheckForCorruptJournalFiles() { 2647 return checkForCorruptJournalFiles; 2648 } 2649 2650 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 2651 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 2652 } 2653 2654 public boolean isChecksumJournalFiles() { 2655 return checksumJournalFiles; 2656 } 2657 2658 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 2659 this.checksumJournalFiles = checksumJournalFiles; 2660 } 2661 2662 @Override 2663 public void setBrokerService(BrokerService brokerService) { 2664 this.brokerService = brokerService; 2665 } 2666 2667 /** 2668 * @return the archiveDataLogs 2669 */ 2670 public boolean isArchiveDataLogs() { 2671 return this.archiveDataLogs; 2672 } 2673 2674 /** 2675 * @param archiveDataLogs the archiveDataLogs to set 2676 */ 2677 public void setArchiveDataLogs(boolean archiveDataLogs) { 2678 this.archiveDataLogs = archiveDataLogs; 2679 } 2680 2681 /** 2682 * @return the directoryArchive 2683 */ 2684 public File getDirectoryArchive() { 2685 return this.directoryArchive; 2686 } 2687 2688 /** 2689 * @param directoryArchive the directoryArchive to set 2690 */ 2691 public void setDirectoryArchive(File directoryArchive) { 2692 this.directoryArchive = directoryArchive; 2693 } 2694 2695 public boolean isArchiveCorruptedIndex() { 2696 return archiveCorruptedIndex; 2697 } 2698 2699 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 2700 this.archiveCorruptedIndex = archiveCorruptedIndex; 2701 } 2702 2703 public float getIndexLFUEvictionFactor() { 2704 return indexLFUEvictionFactor; 2705 } 2706 2707 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 2708 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 2709 } 2710 2711 public boolean isUseIndexLFRUEviction() { 2712 return useIndexLFRUEviction; 2713 } 2714 2715 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 2716 this.useIndexLFRUEviction = useIndexLFRUEviction; 2717 } 2718 2719 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 2720 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 2721 } 2722 2723 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 2724 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 2725 } 2726 2727 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 2728 this.enableIndexPageCaching = enableIndexPageCaching; 2729 } 2730 2731 public boolean isEnableIndexDiskSyncs() { 2732 return enableIndexDiskSyncs; 2733 } 2734 2735 public boolean isEnableIndexRecoveryFile() { 2736 return enableIndexRecoveryFile; 2737 } 2738 2739 public boolean isEnableIndexPageCaching() { 2740 return enableIndexPageCaching; 2741 } 2742 2743 // ///////////////////////////////////////////////////////////////// 2744 // Internal conversion methods. 2745 // ///////////////////////////////////////////////////////////////// 2746 2747 class MessageOrderCursor{ 2748 long defaultCursorPosition; 2749 long lowPriorityCursorPosition; 2750 long highPriorityCursorPosition; 2751 MessageOrderCursor(){ 2752 } 2753 2754 MessageOrderCursor(long position){ 2755 this.defaultCursorPosition=position; 2756 this.lowPriorityCursorPosition=position; 2757 this.highPriorityCursorPosition=position; 2758 } 2759 2760 MessageOrderCursor(MessageOrderCursor other){ 2761 this.defaultCursorPosition=other.defaultCursorPosition; 2762 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 2763 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 2764 } 2765 2766 MessageOrderCursor copy() { 2767 return new MessageOrderCursor(this); 2768 } 2769 2770 void reset() { 2771 this.defaultCursorPosition=0; 2772 this.highPriorityCursorPosition=0; 2773 this.lowPriorityCursorPosition=0; 2774 } 2775 2776 void increment() { 2777 if (defaultCursorPosition!=0) { 2778 defaultCursorPosition++; 2779 } 2780 if (highPriorityCursorPosition!=0) { 2781 highPriorityCursorPosition++; 2782 } 2783 if (lowPriorityCursorPosition!=0) { 2784 lowPriorityCursorPosition++; 2785 } 2786 } 2787 2788 @Override 2789 public String toString() { 2790 return "MessageOrderCursor:[def:" + defaultCursorPosition 2791 + ", low:" + lowPriorityCursorPosition 2792 + ", high:" + highPriorityCursorPosition + "]"; 2793 } 2794 2795 public void sync(MessageOrderCursor other) { 2796 this.defaultCursorPosition=other.defaultCursorPosition; 2797 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 2798 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 2799 } 2800 } 2801 2802 class MessageOrderIndex { 2803 static final byte HI = 9; 2804 static final byte LO = 0; 2805 static final byte DEF = 4; 2806 2807 long nextMessageId; 2808 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 2809 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 2810 BTreeIndex<Long, MessageKeys> highPriorityIndex; 2811 final MessageOrderCursor cursor = new MessageOrderCursor(); 2812 Long lastDefaultKey; 2813 Long lastHighKey; 2814 Long lastLowKey; 2815 byte lastGetPriority; 2816 final List<Long> pendingAdditions = new LinkedList<Long>(); 2817 2818 MessageKeys remove(Transaction tx, Long key) throws IOException { 2819 MessageKeys result = defaultPriorityIndex.remove(tx, key); 2820 if (result == null && highPriorityIndex!=null) { 2821 result = highPriorityIndex.remove(tx, key); 2822 if (result ==null && lowPriorityIndex!=null) { 2823 result = lowPriorityIndex.remove(tx, key); 2824 } 2825 } 2826 return result; 2827 } 2828 2829 void load(Transaction tx) throws IOException { 2830 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2831 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2832 defaultPriorityIndex.load(tx); 2833 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2834 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2835 lowPriorityIndex.load(tx); 2836 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2837 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2838 highPriorityIndex.load(tx); 2839 } 2840 2841 void allocate(Transaction tx) throws IOException { 2842 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2843 if (metadata.version >= 2) { 2844 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2845 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2846 } 2847 } 2848 2849 void configureLast(Transaction tx) throws IOException { 2850 // Figure out the next key using the last entry in the destination. 2851 TreeSet<Long> orderedSet = new TreeSet<Long>(); 2852 2853 addLast(orderedSet, highPriorityIndex, tx); 2854 addLast(orderedSet, defaultPriorityIndex, tx); 2855 addLast(orderedSet, lowPriorityIndex, tx); 2856 2857 if (!orderedSet.isEmpty()) { 2858 nextMessageId = orderedSet.last() + 1; 2859 } 2860 } 2861 2862 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 2863 if (index != null) { 2864 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 2865 if (lastEntry != null) { 2866 orderedSet.add(lastEntry.getKey()); 2867 } 2868 } 2869 } 2870 2871 void clear(Transaction tx) throws IOException { 2872 this.remove(tx); 2873 this.resetCursorPosition(); 2874 this.allocate(tx); 2875 this.load(tx); 2876 this.configureLast(tx); 2877 } 2878 2879 void remove(Transaction tx) throws IOException { 2880 defaultPriorityIndex.clear(tx); 2881 defaultPriorityIndex.unload(tx); 2882 tx.free(defaultPriorityIndex.getPageId()); 2883 if (lowPriorityIndex != null) { 2884 lowPriorityIndex.clear(tx); 2885 lowPriorityIndex.unload(tx); 2886 2887 tx.free(lowPriorityIndex.getPageId()); 2888 } 2889 if (highPriorityIndex != null) { 2890 highPriorityIndex.clear(tx); 2891 highPriorityIndex.unload(tx); 2892 tx.free(highPriorityIndex.getPageId()); 2893 } 2894 } 2895 2896 void resetCursorPosition() { 2897 this.cursor.reset(); 2898 lastDefaultKey = null; 2899 lastHighKey = null; 2900 lastLowKey = null; 2901 } 2902 2903 void setBatch(Transaction tx, Long sequence) throws IOException { 2904 if (sequence != null) { 2905 Long nextPosition = new Long(sequence.longValue() + 1); 2906 if (defaultPriorityIndex.containsKey(tx, sequence)) { 2907 lastDefaultKey = sequence; 2908 cursor.defaultCursorPosition = nextPosition.longValue(); 2909 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequence)) { 2910 lastHighKey = sequence; 2911 cursor.highPriorityCursorPosition = nextPosition.longValue(); 2912 } else if (lowPriorityIndex.containsKey(tx, sequence)) { 2913 lastLowKey = sequence; 2914 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 2915 } else { 2916 lastDefaultKey = sequence; 2917 cursor.defaultCursorPosition = nextPosition.longValue(); 2918 } 2919 } 2920 } 2921 2922 void setBatch(Transaction tx, LastAck last) throws IOException { 2923 setBatch(tx, last.lastAckedSequence); 2924 if (cursor.defaultCursorPosition == 0 2925 && cursor.highPriorityCursorPosition == 0 2926 && cursor.lowPriorityCursorPosition == 0) { 2927 long next = last.lastAckedSequence + 1; 2928 switch (last.priority) { 2929 case DEF: 2930 cursor.defaultCursorPosition = next; 2931 cursor.highPriorityCursorPosition = next; 2932 break; 2933 case HI: 2934 cursor.highPriorityCursorPosition = next; 2935 break; 2936 case LO: 2937 cursor.lowPriorityCursorPosition = next; 2938 cursor.defaultCursorPosition = next; 2939 cursor.highPriorityCursorPosition = next; 2940 break; 2941 } 2942 } 2943 } 2944 2945 void stoppedIterating() { 2946 if (lastDefaultKey!=null) { 2947 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 2948 } 2949 if (lastHighKey!=null) { 2950 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 2951 } 2952 if (lastLowKey!=null) { 2953 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 2954 } 2955 lastDefaultKey = null; 2956 lastHighKey = null; 2957 lastLowKey = null; 2958 } 2959 2960 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 2961 throws IOException { 2962 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 2963 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 2964 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 2965 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 2966 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 2967 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 2968 } 2969 } 2970 2971 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 2972 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 2973 2974 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 2975 deletes.add(iterator.next()); 2976 } 2977 2978 long getNextMessageId(int priority) { 2979 return nextMessageId++; 2980 } 2981 2982 MessageKeys get(Transaction tx, Long key) throws IOException { 2983 MessageKeys result = defaultPriorityIndex.get(tx, key); 2984 if (result == null) { 2985 result = highPriorityIndex.get(tx, key); 2986 if (result == null) { 2987 result = lowPriorityIndex.get(tx, key); 2988 lastGetPriority = LO; 2989 } else { 2990 lastGetPriority = HI; 2991 } 2992 } else { 2993 lastGetPriority = DEF; 2994 } 2995 return result; 2996 } 2997 2998 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 2999 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3000 return defaultPriorityIndex.put(tx, key, value); 3001 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3002 return highPriorityIndex.put(tx, key, value); 3003 } else { 3004 return lowPriorityIndex.put(tx, key, value); 3005 } 3006 } 3007 3008 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3009 return new MessageOrderIterator(tx,cursor,this); 3010 } 3011 3012 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3013 return new MessageOrderIterator(tx,m,this); 3014 } 3015 3016 public byte lastGetPriority() { 3017 return lastGetPriority; 3018 } 3019 3020 public boolean alreadyDispatched(Long sequence) { 3021 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3022 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3023 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3024 } 3025 3026 public void trackPendingAdd(Long seq) { 3027 synchronized (pendingAdditions) { 3028 pendingAdditions.add(seq); 3029 } 3030 } 3031 3032 public void trackPendingAddComplete(Long seq) { 3033 synchronized (pendingAdditions) { 3034 pendingAdditions.remove(seq); 3035 } 3036 } 3037 3038 public Long minPendingAdd() { 3039 synchronized (pendingAdditions) { 3040 if (!pendingAdditions.isEmpty()) { 3041 return pendingAdditions.get(0); 3042 } else { 3043 return null; 3044 } 3045 } 3046 } 3047 3048 3049 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3050 Iterator<Entry<Long, MessageKeys>>currentIterator; 3051 final Iterator<Entry<Long, MessageKeys>>highIterator; 3052 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3053 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3054 3055 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3056 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3057 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3058 if (highPriorityIndex != null) { 3059 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3060 } else { 3061 this.highIterator = null; 3062 } 3063 if (lowPriorityIndex != null) { 3064 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3065 } else { 3066 this.lowIterator = null; 3067 } 3068 } 3069 3070 @Override 3071 public boolean hasNext() { 3072 if (currentIterator == null) { 3073 if (highIterator != null) { 3074 if (highIterator.hasNext()) { 3075 currentIterator = highIterator; 3076 return currentIterator.hasNext(); 3077 } 3078 if (defaultIterator.hasNext()) { 3079 currentIterator = defaultIterator; 3080 return currentIterator.hasNext(); 3081 } 3082 if (lowIterator.hasNext()) { 3083 currentIterator = lowIterator; 3084 return currentIterator.hasNext(); 3085 } 3086 return false; 3087 } else { 3088 currentIterator = defaultIterator; 3089 return currentIterator.hasNext(); 3090 } 3091 } 3092 if (highIterator != null) { 3093 if (currentIterator.hasNext()) { 3094 return true; 3095 } 3096 if (currentIterator == highIterator) { 3097 if (defaultIterator.hasNext()) { 3098 currentIterator = defaultIterator; 3099 return currentIterator.hasNext(); 3100 } 3101 if (lowIterator.hasNext()) { 3102 currentIterator = lowIterator; 3103 return currentIterator.hasNext(); 3104 } 3105 return false; 3106 } 3107 3108 if (currentIterator == defaultIterator) { 3109 if (lowIterator.hasNext()) { 3110 currentIterator = lowIterator; 3111 return currentIterator.hasNext(); 3112 } 3113 return false; 3114 } 3115 } 3116 return currentIterator.hasNext(); 3117 } 3118 3119 @Override 3120 public Entry<Long, MessageKeys> next() { 3121 Entry<Long, MessageKeys> result = currentIterator.next(); 3122 if (result != null) { 3123 Long key = result.getKey(); 3124 if (highIterator != null) { 3125 if (currentIterator == defaultIterator) { 3126 lastDefaultKey = key; 3127 } else if (currentIterator == highIterator) { 3128 lastHighKey = key; 3129 } else { 3130 lastLowKey = key; 3131 } 3132 } else { 3133 lastDefaultKey = key; 3134 } 3135 } 3136 return result; 3137 } 3138 3139 @Override 3140 public void remove() { 3141 throw new UnsupportedOperationException(); 3142 } 3143 3144 } 3145 } 3146 3147 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3148 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3149 3150 @Override 3151 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3152 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3153 ObjectOutputStream oout = new ObjectOutputStream(baos); 3154 oout.writeObject(object); 3155 oout.flush(); 3156 oout.close(); 3157 byte[] data = baos.toByteArray(); 3158 dataOut.writeInt(data.length); 3159 dataOut.write(data); 3160 } 3161 3162 @Override 3163 @SuppressWarnings("unchecked") 3164 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3165 int dataLen = dataIn.readInt(); 3166 byte[] data = new byte[dataLen]; 3167 dataIn.readFully(data); 3168 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3169 ObjectInputStream oin = new ObjectInputStream(bais); 3170 try { 3171 return (HashSet<String>) oin.readObject(); 3172 } catch (ClassNotFoundException cfe) { 3173 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3174 ioe.initCause(cfe); 3175 throw ioe; 3176 } 3177 } 3178 } 3179 3180 public File getIndexDirectory() { 3181 return indexDirectory; 3182 } 3183 3184 public void setIndexDirectory(File indexDirectory) { 3185 this.indexDirectory = indexDirectory; 3186 } 3187 3188 interface IndexAware { 3189 public void sequenceAssignedWithIndexLocked(long index); 3190 } 3191 3192 public String getPreallocationScope() { 3193 return preallocationScope; 3194 } 3195 3196 public void setPreallocationScope(String preallocationScope) { 3197 this.preallocationScope = preallocationScope; 3198 } 3199 3200 public String getPreallocationStrategy() { 3201 return preallocationStrategy; 3202 } 3203 3204 public void setPreallocationStrategy(String preallocationStrategy) { 3205 this.preallocationStrategy = preallocationStrategy; 3206 } 3207 3208}