001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.activemq.broker.ConnectionContext; 044import org.apache.activemq.broker.region.BaseDestination; 045import org.apache.activemq.broker.scheduler.JobSchedulerStore; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQQueue; 048import org.apache.activemq.command.ActiveMQTempQueue; 049import org.apache.activemq.command.ActiveMQTempTopic; 050import org.apache.activemq.command.ActiveMQTopic; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.MessageId; 054import org.apache.activemq.command.ProducerId; 055import org.apache.activemq.command.SubscriptionInfo; 056import org.apache.activemq.command.TransactionId; 057import org.apache.activemq.openwire.OpenWireFormat; 058import org.apache.activemq.protobuf.Buffer; 059import org.apache.activemq.store.AbstractMessageStore; 060import org.apache.activemq.store.IndexListener; 061import org.apache.activemq.store.ListenableFuture; 062import org.apache.activemq.store.MessageRecoveryListener; 063import org.apache.activemq.store.MessageStore; 064import org.apache.activemq.store.PersistenceAdapter; 065import org.apache.activemq.store.TopicMessageStore; 066import org.apache.activemq.store.TransactionIdTransformer; 067import org.apache.activemq.store.TransactionStore; 068import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 069import org.apache.activemq.store.kahadb.data.KahaDestination; 070import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 071import org.apache.activemq.store.kahadb.data.KahaLocation; 072import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 073import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 074import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 075import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 076import org.apache.activemq.store.kahadb.disk.journal.Location; 077import org.apache.activemq.store.kahadb.disk.page.Transaction; 078import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 079import org.apache.activemq.usage.MemoryUsage; 080import org.apache.activemq.usage.SystemUsage; 081import org.apache.activemq.util.ServiceStopper; 082import org.apache.activemq.util.ThreadPoolUtils; 083import org.apache.activemq.wireformat.WireFormat; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 088 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 089 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 090 091 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 092 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 093 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 094 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 095 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 096 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 097 098 protected ExecutorService queueExecutor; 099 protected ExecutorService topicExecutor; 100 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 101 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 102 final WireFormat wireFormat = new OpenWireFormat(); 103 private SystemUsage usageManager; 104 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 105 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 106 Semaphore globalQueueSemaphore; 107 Semaphore globalTopicSemaphore; 108 private boolean concurrentStoreAndDispatchQueues = true; 109 // when true, message order may be compromised when cache is exhausted if store is out 110 // or order w.r.t cache 111 private boolean concurrentStoreAndDispatchTopics = false; 112 private final boolean concurrentStoreAndDispatchTransactions = false; 113 private int maxAsyncJobs = MAX_ASYNC_JOBS; 114 private final KahaDBTransactionStore transactionStore; 115 private TransactionIdTransformer transactionIdTransformer; 116 117 public KahaDBStore() { 118 this.transactionStore = new KahaDBTransactionStore(this); 119 this.transactionIdTransformer = new TransactionIdTransformer() { 120 @Override 121 public TransactionId transform(TransactionId txid) { 122 return txid; 123 } 124 }; 125 } 126 127 @Override 128 public String toString() { 129 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 130 } 131 132 @Override 133 public void setBrokerName(String brokerName) { 134 } 135 136 @Override 137 public void setUsageManager(SystemUsage usageManager) { 138 this.usageManager = usageManager; 139 } 140 141 public SystemUsage getUsageManager() { 142 return this.usageManager; 143 } 144 145 /** 146 * @return the concurrentStoreAndDispatch 147 */ 148 public boolean isConcurrentStoreAndDispatchQueues() { 149 return this.concurrentStoreAndDispatchQueues; 150 } 151 152 /** 153 * @param concurrentStoreAndDispatch 154 * the concurrentStoreAndDispatch to set 155 */ 156 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 157 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 158 } 159 160 /** 161 * @return the concurrentStoreAndDispatch 162 */ 163 public boolean isConcurrentStoreAndDispatchTopics() { 164 return this.concurrentStoreAndDispatchTopics; 165 } 166 167 /** 168 * @param concurrentStoreAndDispatch 169 * the concurrentStoreAndDispatch to set 170 */ 171 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 172 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 173 } 174 175 public boolean isConcurrentStoreAndDispatchTransactions() { 176 return this.concurrentStoreAndDispatchTransactions; 177 } 178 179 /** 180 * @return the maxAsyncJobs 181 */ 182 public int getMaxAsyncJobs() { 183 return this.maxAsyncJobs; 184 } 185 186 /** 187 * @param maxAsyncJobs 188 * the maxAsyncJobs to set 189 */ 190 public void setMaxAsyncJobs(int maxAsyncJobs) { 191 this.maxAsyncJobs = maxAsyncJobs; 192 } 193 194 @Override 195 public void doStart() throws Exception { 196 if (brokerService != null) { 197 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 198 wireFormat.setVersion(metadata.openwireVersion); 199 200 if (LOG.isDebugEnabled()) { 201 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 202 } 203 204 } 205 super.doStart(); 206 207 if (brokerService != null) { 208 // In case the recovered store used a different OpenWire version log a warning 209 // to assist in determining why journal reads fail. 210 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 211 LOG.warn("Recovered Store uses a different OpenWire version[{}] " + 212 "than the version configured[{}].", 213 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 214 } 215 } 216 217 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 218 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 219 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 220 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 221 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 222 asyncQueueJobQueue, new ThreadFactory() { 223 @Override 224 public Thread newThread(Runnable runnable) { 225 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 226 thread.setDaemon(true); 227 return thread; 228 } 229 }); 230 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 231 asyncTopicJobQueue, new ThreadFactory() { 232 @Override 233 public Thread newThread(Runnable runnable) { 234 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 235 thread.setDaemon(true); 236 return thread; 237 } 238 }); 239 } 240 241 @Override 242 public void doStop(ServiceStopper stopper) throws Exception { 243 // drain down async jobs 244 LOG.info("Stopping async queue tasks"); 245 if (this.globalQueueSemaphore != null) { 246 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 247 } 248 synchronized (this.asyncQueueMaps) { 249 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 250 synchronized (m) { 251 for (StoreTask task : m.values()) { 252 task.cancel(); 253 } 254 } 255 } 256 this.asyncQueueMaps.clear(); 257 } 258 LOG.info("Stopping async topic tasks"); 259 if (this.globalTopicSemaphore != null) { 260 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 261 } 262 synchronized (this.asyncTopicMaps) { 263 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 264 synchronized (m) { 265 for (StoreTask task : m.values()) { 266 task.cancel(); 267 } 268 } 269 } 270 this.asyncTopicMaps.clear(); 271 } 272 if (this.globalQueueSemaphore != null) { 273 this.globalQueueSemaphore.drainPermits(); 274 } 275 if (this.globalTopicSemaphore != null) { 276 this.globalTopicSemaphore.drainPermits(); 277 } 278 if (this.queueExecutor != null) { 279 ThreadPoolUtils.shutdownNow(queueExecutor); 280 queueExecutor = null; 281 } 282 if (this.topicExecutor != null) { 283 ThreadPoolUtils.shutdownNow(topicExecutor); 284 topicExecutor = null; 285 } 286 LOG.info("Stopped KahaDB"); 287 super.doStop(stopper); 288 } 289 290 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 291 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 292 @Override 293 public Location execute(Transaction tx) throws IOException { 294 StoredDestination sd = getStoredDestination(destination, tx); 295 Long sequence = sd.messageIdIndex.get(tx, key); 296 if (sequence == null) { 297 return null; 298 } 299 return sd.orderIndex.get(tx, sequence).location; 300 } 301 }); 302 } 303 304 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 305 StoreQueueTask task = null; 306 synchronized (store.asyncTaskMap) { 307 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 308 } 309 return task; 310 } 311 312 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 313 synchronized (store.asyncTaskMap) { 314 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 315 } 316 this.queueExecutor.execute(task); 317 } 318 319 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 320 StoreTopicTask task = null; 321 synchronized (store.asyncTaskMap) { 322 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 323 } 324 return task; 325 } 326 327 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 328 synchronized (store.asyncTaskMap) { 329 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 330 } 331 this.topicExecutor.execute(task); 332 } 333 334 @Override 335 public TransactionStore createTransactionStore() throws IOException { 336 return this.transactionStore; 337 } 338 339 public boolean getForceRecoverIndex() { 340 return this.forceRecoverIndex; 341 } 342 343 public void setForceRecoverIndex(boolean forceRecoverIndex) { 344 this.forceRecoverIndex = forceRecoverIndex; 345 } 346 347 public class KahaDBMessageStore extends AbstractMessageStore { 348 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 349 protected KahaDestination dest; 350 private final int maxAsyncJobs; 351 private final Semaphore localDestinationSemaphore; 352 353 double doneTasks, canceledTasks = 0; 354 355 public KahaDBMessageStore(ActiveMQDestination destination) { 356 super(destination); 357 this.dest = convert(destination); 358 this.maxAsyncJobs = getMaxAsyncJobs(); 359 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 360 } 361 362 @Override 363 public ActiveMQDestination getDestination() { 364 return destination; 365 } 366 367 @Override 368 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 369 throws IOException { 370 if (isConcurrentStoreAndDispatchQueues()) { 371 StoreQueueTask result = new StoreQueueTask(this, context, message); 372 ListenableFuture<Object> future = result.getFuture(); 373 message.getMessageId().setFutureOrSequenceLong(future); 374 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 375 result.aquireLocks(); 376 addQueueTask(this, result); 377 if (indexListener != null) { 378 // allow concurrent dispatch by setting entry locator, 379 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 380 } 381 return future; 382 } else { 383 return super.asyncAddQueueMessage(context, message); 384 } 385 } 386 387 @Override 388 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 389 if (isConcurrentStoreAndDispatchQueues()) { 390 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 391 StoreQueueTask task = null; 392 synchronized (asyncTaskMap) { 393 task = (StoreQueueTask) asyncTaskMap.get(key); 394 } 395 if (task != null) { 396 if (ack.isInTransaction() || !task.cancel()) { 397 try { 398 task.future.get(); 399 } catch (InterruptedException e) { 400 throw new InterruptedIOException(e.toString()); 401 } catch (Exception ignored) { 402 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 403 } 404 removeMessage(context, ack); 405 } else { 406 synchronized (asyncTaskMap) { 407 asyncTaskMap.remove(key); 408 } 409 } 410 } else { 411 removeMessage(context, ack); 412 } 413 } else { 414 removeMessage(context, ack); 415 } 416 } 417 418 @Override 419 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 420 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 421 command.setDestination(dest); 422 command.setMessageId(message.getMessageId().toProducerKey()); 423 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 424 command.setPriority(message.getPriority()); 425 command.setPrioritySupported(isPrioritizedMessages()); 426 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 427 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 428 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 429 // sync add? (for async, future present from getFutureOrSequenceLong) 430 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 431 432 @Override 433 public void sequenceAssignedWithIndexLocked(final long sequence) { 434 message.getMessageId().setFutureOrSequenceLong(sequence); 435 if (indexListener != null) { 436 if (possibleFuture == null) { 437 trackPendingAdd(dest, sequence); 438 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 439 @Override 440 public void run() { 441 trackPendingAddComplete(dest, sequence); 442 } 443 })); 444 } 445 } 446 } 447 }, null); 448 } 449 450 @Override 451 public void updateMessage(Message message) throws IOException { 452 if (LOG.isTraceEnabled()) { 453 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 454 } 455 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 456 KahaAddMessageCommand command = new KahaAddMessageCommand(); 457 command.setDestination(dest); 458 command.setMessageId(message.getMessageId().toProducerKey()); 459 command.setPriority(message.getPriority()); 460 command.setPrioritySupported(prioritizedMessages); 461 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 462 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 463 updateMessageCommand.setMessage(command); 464 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 465 } 466 467 @Override 468 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 469 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 470 command.setDestination(dest); 471 command.setMessageId(ack.getLastMessageId().toProducerKey()); 472 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 473 474 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 475 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 476 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 477 } 478 479 @Override 480 public void removeAllMessages(ConnectionContext context) throws IOException { 481 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 482 command.setDestination(dest); 483 store(command, true, null, null); 484 } 485 486 @Override 487 public Message getMessage(MessageId identity) throws IOException { 488 final String key = identity.toProducerKey(); 489 490 // Hopefully one day the page file supports concurrent read 491 // operations... but for now we must 492 // externally synchronize... 493 Location location; 494 indexLock.writeLock().lock(); 495 try { 496 location = findMessageLocation(key, dest); 497 } finally { 498 indexLock.writeLock().unlock(); 499 } 500 if (location == null) { 501 return null; 502 } 503 504 return loadMessage(location); 505 } 506 507 @Override 508 public int getMessageCount() throws IOException { 509 try { 510 lockAsyncJobQueue(); 511 indexLock.writeLock().lock(); 512 try { 513 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 514 @Override 515 public Integer execute(Transaction tx) throws IOException { 516 // Iterate through all index entries to get a count 517 // of messages in the destination. 518 StoredDestination sd = getStoredDestination(dest, tx); 519 int rc = 0; 520 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 521 iterator.next(); 522 rc++; 523 } 524 return rc; 525 } 526 }); 527 } finally { 528 indexLock.writeLock().unlock(); 529 } 530 } finally { 531 unlockAsyncJobQueue(); 532 } 533 } 534 535 @Override 536 public boolean isEmpty() throws IOException { 537 indexLock.writeLock().lock(); 538 try { 539 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 540 @Override 541 public Boolean execute(Transaction tx) throws IOException { 542 // Iterate through all index entries to get a count of 543 // messages in the destination. 544 StoredDestination sd = getStoredDestination(dest, tx); 545 return sd.locationIndex.isEmpty(tx); 546 } 547 }); 548 } finally { 549 indexLock.writeLock().unlock(); 550 } 551 } 552 553 @Override 554 public void recover(final MessageRecoveryListener listener) throws Exception { 555 // recovery may involve expiry which will modify 556 indexLock.writeLock().lock(); 557 try { 558 pageFile.tx().execute(new Transaction.Closure<Exception>() { 559 @Override 560 public void execute(Transaction tx) throws Exception { 561 StoredDestination sd = getStoredDestination(dest, tx); 562 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 563 sd.orderIndex.resetCursorPosition(); 564 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 565 .hasNext(); ) { 566 Entry<Long, MessageKeys> entry = iterator.next(); 567 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 568 continue; 569 } 570 Message msg = loadMessage(entry.getValue().location); 571 listener.recoverMessage(msg); 572 } 573 } 574 }); 575 } finally { 576 indexLock.writeLock().unlock(); 577 } 578 } 579 580 @Override 581 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 582 indexLock.writeLock().lock(); 583 try { 584 pageFile.tx().execute(new Transaction.Closure<Exception>() { 585 @Override 586 public void execute(Transaction tx) throws Exception { 587 StoredDestination sd = getStoredDestination(dest, tx); 588 Entry<Long, MessageKeys> entry = null; 589 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 590 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 591 entry = iterator.next(); 592 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 593 continue; 594 } 595 Message msg = loadMessage(entry.getValue().location); 596 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 597 listener.recoverMessage(msg); 598 counter++; 599 if (counter >= maxReturned) { 600 break; 601 } 602 } 603 sd.orderIndex.stoppedIterating(); 604 } 605 }); 606 } finally { 607 indexLock.writeLock().unlock(); 608 } 609 } 610 611 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 612 int counter = 0; 613 String id; 614 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 615 id = iterator.next(); 616 iterator.remove(); 617 Long sequence = sd.messageIdIndex.get(tx, id); 618 if (sequence != null) { 619 if (sd.orderIndex.alreadyDispatched(sequence)) { 620 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 621 counter++; 622 if (counter >= maxReturned) { 623 break; 624 } 625 } else { 626 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 627 } 628 } else { 629 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 630 } 631 } 632 return counter; 633 } 634 635 636 @Override 637 public void resetBatching() { 638 if (pageFile.isLoaded()) { 639 indexLock.writeLock().lock(); 640 try { 641 pageFile.tx().execute(new Transaction.Closure<Exception>() { 642 @Override 643 public void execute(Transaction tx) throws Exception { 644 StoredDestination sd = getExistingStoredDestination(dest, tx); 645 if (sd != null) { 646 sd.orderIndex.resetCursorPosition();} 647 } 648 }); 649 } catch (Exception e) { 650 LOG.error("Failed to reset batching",e); 651 } finally { 652 indexLock.writeLock().unlock(); 653 } 654 } 655 } 656 657 @Override 658 public void setBatch(final MessageId identity) throws IOException { 659 indexLock.writeLock().lock(); 660 try { 661 pageFile.tx().execute(new Transaction.Closure<IOException>() { 662 @Override 663 public void execute(Transaction tx) throws IOException { 664 StoredDestination sd = getStoredDestination(dest, tx); 665 Long location = (Long) identity.getFutureOrSequenceLong(); 666 Long pending = sd.orderIndex.minPendingAdd(); 667 if (pending != null) { 668 location = Math.min(location, pending-1); 669 } 670 sd.orderIndex.setBatch(tx, location); 671 } 672 }); 673 } finally { 674 indexLock.writeLock().unlock(); 675 } 676 } 677 678 @Override 679 public void setMemoryUsage(MemoryUsage memoryUsage) { 680 } 681 @Override 682 public void start() throws Exception { 683 super.start(); 684 } 685 @Override 686 public void stop() throws Exception { 687 super.stop(); 688 } 689 690 protected void lockAsyncJobQueue() { 691 try { 692 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 693 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 694 } 695 } catch (Exception e) { 696 LOG.error("Failed to lock async jobs for " + this.destination, e); 697 } 698 } 699 700 protected void unlockAsyncJobQueue() { 701 this.localDestinationSemaphore.release(this.maxAsyncJobs); 702 } 703 704 protected void acquireLocalAsyncLock() { 705 try { 706 this.localDestinationSemaphore.acquire(); 707 } catch (InterruptedException e) { 708 LOG.error("Failed to aquire async lock for " + this.destination, e); 709 } 710 } 711 712 protected void releaseLocalAsyncLock() { 713 this.localDestinationSemaphore.release(); 714 } 715 716 @Override 717 public String toString(){ 718 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 719 } 720 } 721 722 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 723 private final AtomicInteger subscriptionCount = new AtomicInteger(); 724 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 725 super(destination); 726 this.subscriptionCount.set(getAllSubscriptions().length); 727 if (isConcurrentStoreAndDispatchTopics()) { 728 asyncTopicMaps.add(asyncTaskMap); 729 } 730 } 731 732 @Override 733 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 734 throws IOException { 735 if (isConcurrentStoreAndDispatchTopics()) { 736 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 737 result.aquireLocks(); 738 addTopicTask(this, result); 739 return result.getFuture(); 740 } else { 741 return super.asyncAddTopicMessage(context, message); 742 } 743 } 744 745 @Override 746 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 747 MessageId messageId, MessageAck ack) throws IOException { 748 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 749 if (isConcurrentStoreAndDispatchTopics()) { 750 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 751 StoreTopicTask task = null; 752 synchronized (asyncTaskMap) { 753 task = (StoreTopicTask) asyncTaskMap.get(key); 754 } 755 if (task != null) { 756 if (task.addSubscriptionKey(subscriptionKey)) { 757 removeTopicTask(this, messageId); 758 if (task.cancel()) { 759 synchronized (asyncTaskMap) { 760 asyncTaskMap.remove(key); 761 } 762 } 763 } 764 } else { 765 doAcknowledge(context, subscriptionKey, messageId, ack); 766 } 767 } else { 768 doAcknowledge(context, subscriptionKey, messageId, ack); 769 } 770 } 771 772 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 773 throws IOException { 774 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 775 command.setDestination(dest); 776 command.setSubscriptionKey(subscriptionKey); 777 command.setMessageId(messageId.toProducerKey()); 778 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 779 if (ack != null && ack.isUnmatchedAck()) { 780 command.setAck(UNMATCHED); 781 } else { 782 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 783 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 784 } 785 store(command, false, null, null); 786 } 787 788 @Override 789 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 790 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 791 .getSubscriptionName()); 792 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 793 command.setDestination(dest); 794 command.setSubscriptionKey(subscriptionKey.toString()); 795 command.setRetroactive(retroactive); 796 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 797 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 798 store(command, isEnableJournalDiskSyncs() && true, null, null); 799 this.subscriptionCount.incrementAndGet(); 800 } 801 802 @Override 803 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 804 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 805 command.setDestination(dest); 806 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 807 store(command, isEnableJournalDiskSyncs() && true, null, null); 808 this.subscriptionCount.decrementAndGet(); 809 } 810 811 @Override 812 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 813 814 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 815 indexLock.writeLock().lock(); 816 try { 817 pageFile.tx().execute(new Transaction.Closure<IOException>() { 818 @Override 819 public void execute(Transaction tx) throws IOException { 820 StoredDestination sd = getStoredDestination(dest, tx); 821 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 822 .hasNext();) { 823 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 824 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 825 .getValue().getSubscriptionInfo().newInput())); 826 subscriptions.add(info); 827 828 } 829 } 830 }); 831 } finally { 832 indexLock.writeLock().unlock(); 833 } 834 835 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 836 subscriptions.toArray(rc); 837 return rc; 838 } 839 840 @Override 841 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 842 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 843 indexLock.writeLock().lock(); 844 try { 845 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 846 @Override 847 public SubscriptionInfo execute(Transaction tx) throws IOException { 848 StoredDestination sd = getStoredDestination(dest, tx); 849 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 850 if (command == null) { 851 return null; 852 } 853 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 854 .getSubscriptionInfo().newInput())); 855 } 856 }); 857 } finally { 858 indexLock.writeLock().unlock(); 859 } 860 } 861 862 @Override 863 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 864 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 865 indexLock.writeLock().lock(); 866 try { 867 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 868 @Override 869 public Integer execute(Transaction tx) throws IOException { 870 StoredDestination sd = getStoredDestination(dest, tx); 871 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 872 if (cursorPos == null) { 873 // The subscription might not exist. 874 return 0; 875 } 876 877 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 878 } 879 }); 880 } finally { 881 indexLock.writeLock().unlock(); 882 } 883 } 884 885 @Override 886 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 887 throws Exception { 888 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 889 @SuppressWarnings("unused") 890 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 891 indexLock.writeLock().lock(); 892 try { 893 pageFile.tx().execute(new Transaction.Closure<Exception>() { 894 @Override 895 public void execute(Transaction tx) throws Exception { 896 StoredDestination sd = getStoredDestination(dest, tx); 897 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 898 sd.orderIndex.setBatch(tx, cursorPos); 899 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 900 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 901 .hasNext();) { 902 Entry<Long, MessageKeys> entry = iterator.next(); 903 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 904 continue; 905 } 906 listener.recoverMessage(loadMessage(entry.getValue().location)); 907 } 908 sd.orderIndex.resetCursorPosition(); 909 } 910 }); 911 } finally { 912 indexLock.writeLock().unlock(); 913 } 914 } 915 916 @Override 917 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 918 final MessageRecoveryListener listener) throws Exception { 919 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 920 @SuppressWarnings("unused") 921 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 922 indexLock.writeLock().lock(); 923 try { 924 pageFile.tx().execute(new Transaction.Closure<Exception>() { 925 @Override 926 public void execute(Transaction tx) throws Exception { 927 StoredDestination sd = getStoredDestination(dest, tx); 928 sd.orderIndex.resetCursorPosition(); 929 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 930 if (moc == null) { 931 LastAck pos = getLastAck(tx, sd, subscriptionKey); 932 if (pos == null) { 933 // sub deleted 934 return; 935 } 936 sd.orderIndex.setBatch(tx, pos); 937 moc = sd.orderIndex.cursor; 938 } else { 939 sd.orderIndex.cursor.sync(moc); 940 } 941 942 Entry<Long, MessageKeys> entry = null; 943 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 944 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 945 .hasNext();) { 946 entry = iterator.next(); 947 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 948 continue; 949 } 950 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 951 counter++; 952 } 953 if (counter >= maxReturned || listener.hasSpace() == false) { 954 break; 955 } 956 } 957 sd.orderIndex.stoppedIterating(); 958 if (entry != null) { 959 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 960 sd.subscriptionCursors.put(subscriptionKey, copy); 961 } 962 } 963 }); 964 } finally { 965 indexLock.writeLock().unlock(); 966 } 967 } 968 969 @Override 970 public void resetBatching(String clientId, String subscriptionName) { 971 try { 972 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 973 indexLock.writeLock().lock(); 974 try { 975 pageFile.tx().execute(new Transaction.Closure<IOException>() { 976 @Override 977 public void execute(Transaction tx) throws IOException { 978 StoredDestination sd = getStoredDestination(dest, tx); 979 sd.subscriptionCursors.remove(subscriptionKey); 980 } 981 }); 982 }finally { 983 indexLock.writeLock().unlock(); 984 } 985 } catch (IOException e) { 986 throw new RuntimeException(e); 987 } 988 } 989 } 990 991 String subscriptionKey(String clientId, String subscriptionName) { 992 return clientId + ":" + subscriptionName; 993 } 994 995 @Override 996 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 997 return this.transactionStore.proxy(new KahaDBMessageStore(destination)); 998 } 999 1000 @Override 1001 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1002 return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1003 } 1004 1005 /** 1006 * Cleanup method to remove any state associated with the given destination. 1007 * This method does not stop the message store (it might not be cached). 1008 * 1009 * @param destination 1010 * Destination to forget 1011 */ 1012 @Override 1013 public void removeQueueMessageStore(ActiveMQQueue destination) { 1014 } 1015 1016 /** 1017 * Cleanup method to remove any state associated with the given destination 1018 * This method does not stop the message store (it might not be cached). 1019 * 1020 * @param destination 1021 * Destination to forget 1022 */ 1023 @Override 1024 public void removeTopicMessageStore(ActiveMQTopic destination) { 1025 } 1026 1027 @Override 1028 public void deleteAllMessages() throws IOException { 1029 deleteAllMessages = true; 1030 } 1031 1032 @Override 1033 public Set<ActiveMQDestination> getDestinations() { 1034 try { 1035 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1036 indexLock.writeLock().lock(); 1037 try { 1038 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1039 @Override 1040 public void execute(Transaction tx) throws IOException { 1041 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1042 .hasNext();) { 1043 Entry<String, StoredDestination> entry = iterator.next(); 1044 if (!isEmptyTopic(entry, tx)) { 1045 rc.add(convert(entry.getKey())); 1046 } 1047 } 1048 } 1049 1050 private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) 1051 throws IOException { 1052 boolean isEmptyTopic = false; 1053 ActiveMQDestination dest = convert(entry.getKey()); 1054 if (dest.isTopic()) { 1055 StoredDestination loadedStore = getStoredDestination(convert(dest), tx); 1056 if (loadedStore.subscriptionAcks.isEmpty(tx)) { 1057 isEmptyTopic = true; 1058 } 1059 } 1060 return isEmptyTopic; 1061 } 1062 }); 1063 }finally { 1064 indexLock.writeLock().unlock(); 1065 } 1066 return rc; 1067 } catch (IOException e) { 1068 throw new RuntimeException(e); 1069 } 1070 } 1071 1072 @Override 1073 public long getLastMessageBrokerSequenceId() throws IOException { 1074 return 0; 1075 } 1076 1077 @Override 1078 public long getLastProducerSequenceId(ProducerId id) { 1079 indexLock.readLock().lock(); 1080 try { 1081 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1082 } finally { 1083 indexLock.readLock().unlock(); 1084 } 1085 } 1086 1087 @Override 1088 public long size() { 1089 try { 1090 return journalSize.get() + getPageFile().getDiskSize(); 1091 } catch (IOException e) { 1092 throw new RuntimeException(e); 1093 } 1094 } 1095 1096 @Override 1097 public void beginTransaction(ConnectionContext context) throws IOException { 1098 throw new IOException("Not yet implemented."); 1099 } 1100 @Override 1101 public void commitTransaction(ConnectionContext context) throws IOException { 1102 throw new IOException("Not yet implemented."); 1103 } 1104 @Override 1105 public void rollbackTransaction(ConnectionContext context) throws IOException { 1106 throw new IOException("Not yet implemented."); 1107 } 1108 1109 @Override 1110 public void checkpoint(boolean sync) throws IOException { 1111 super.checkpointCleanup(sync); 1112 } 1113 1114 // ///////////////////////////////////////////////////////////////// 1115 // Internal helper methods. 1116 // ///////////////////////////////////////////////////////////////// 1117 1118 /** 1119 * @param location 1120 * @return 1121 * @throws IOException 1122 */ 1123 Message loadMessage(Location location) throws IOException { 1124 JournalCommand<?> command = load(location); 1125 KahaAddMessageCommand addMessage = null; 1126 switch (command.type()) { 1127 case KAHA_UPDATE_MESSAGE_COMMAND: 1128 addMessage = ((KahaUpdateMessageCommand)command).getMessage(); 1129 break; 1130 default: 1131 addMessage = (KahaAddMessageCommand) command; 1132 } 1133 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1134 return msg; 1135 } 1136 1137 // ///////////////////////////////////////////////////////////////// 1138 // Internal conversion methods. 1139 // ///////////////////////////////////////////////////////////////// 1140 1141 KahaLocation convert(Location location) { 1142 KahaLocation rc = new KahaLocation(); 1143 rc.setLogId(location.getDataFileId()); 1144 rc.setOffset(location.getOffset()); 1145 return rc; 1146 } 1147 1148 KahaDestination convert(ActiveMQDestination dest) { 1149 KahaDestination rc = new KahaDestination(); 1150 rc.setName(dest.getPhysicalName()); 1151 switch (dest.getDestinationType()) { 1152 case ActiveMQDestination.QUEUE_TYPE: 1153 rc.setType(DestinationType.QUEUE); 1154 return rc; 1155 case ActiveMQDestination.TOPIC_TYPE: 1156 rc.setType(DestinationType.TOPIC); 1157 return rc; 1158 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1159 rc.setType(DestinationType.TEMP_QUEUE); 1160 return rc; 1161 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1162 rc.setType(DestinationType.TEMP_TOPIC); 1163 return rc; 1164 default: 1165 return null; 1166 } 1167 } 1168 1169 ActiveMQDestination convert(String dest) { 1170 int p = dest.indexOf(":"); 1171 if (p < 0) { 1172 throw new IllegalArgumentException("Not in the valid destination format"); 1173 } 1174 int type = Integer.parseInt(dest.substring(0, p)); 1175 String name = dest.substring(p + 1); 1176 return convert(type, name); 1177 } 1178 1179 private ActiveMQDestination convert(KahaDestination commandDestination) { 1180 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1181 } 1182 1183 private ActiveMQDestination convert(int type, String name) { 1184 switch (KahaDestination.DestinationType.valueOf(type)) { 1185 case QUEUE: 1186 return new ActiveMQQueue(name); 1187 case TOPIC: 1188 return new ActiveMQTopic(name); 1189 case TEMP_QUEUE: 1190 return new ActiveMQTempQueue(name); 1191 case TEMP_TOPIC: 1192 return new ActiveMQTempTopic(name); 1193 default: 1194 throw new IllegalArgumentException("Not in the valid destination format"); 1195 } 1196 } 1197 1198 public TransactionIdTransformer getTransactionIdTransformer() { 1199 return transactionIdTransformer; 1200 } 1201 1202 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1203 this.transactionIdTransformer = transactionIdTransformer; 1204 } 1205 1206 static class AsyncJobKey { 1207 MessageId id; 1208 ActiveMQDestination destination; 1209 1210 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1211 this.id = id; 1212 this.destination = destination; 1213 } 1214 1215 @Override 1216 public boolean equals(Object obj) { 1217 if (obj == this) { 1218 return true; 1219 } 1220 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1221 && destination.equals(((AsyncJobKey) obj).destination); 1222 } 1223 1224 @Override 1225 public int hashCode() { 1226 return id.hashCode() + destination.hashCode(); 1227 } 1228 1229 @Override 1230 public String toString() { 1231 return destination.getPhysicalName() + "-" + id; 1232 } 1233 } 1234 1235 public interface StoreTask { 1236 public boolean cancel(); 1237 1238 public void aquireLocks(); 1239 1240 public void releaseLocks(); 1241 } 1242 1243 class StoreQueueTask implements Runnable, StoreTask { 1244 protected final Message message; 1245 protected final ConnectionContext context; 1246 protected final KahaDBMessageStore store; 1247 protected final InnerFutureTask future; 1248 protected final AtomicBoolean done = new AtomicBoolean(); 1249 protected final AtomicBoolean locked = new AtomicBoolean(); 1250 1251 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1252 this.store = store; 1253 this.context = context; 1254 this.message = message; 1255 this.future = new InnerFutureTask(this); 1256 } 1257 1258 public ListenableFuture<Object> getFuture() { 1259 return this.future; 1260 } 1261 1262 @Override 1263 public boolean cancel() { 1264 if (this.done.compareAndSet(false, true)) { 1265 return this.future.cancel(false); 1266 } 1267 return false; 1268 } 1269 1270 @Override 1271 public void aquireLocks() { 1272 if (this.locked.compareAndSet(false, true)) { 1273 try { 1274 globalQueueSemaphore.acquire(); 1275 store.acquireLocalAsyncLock(); 1276 message.incrementReferenceCount(); 1277 } catch (InterruptedException e) { 1278 LOG.warn("Failed to aquire lock", e); 1279 } 1280 } 1281 1282 } 1283 1284 @Override 1285 public void releaseLocks() { 1286 if (this.locked.compareAndSet(true, false)) { 1287 store.releaseLocalAsyncLock(); 1288 globalQueueSemaphore.release(); 1289 message.decrementReferenceCount(); 1290 } 1291 } 1292 1293 @Override 1294 public void run() { 1295 this.store.doneTasks++; 1296 try { 1297 if (this.done.compareAndSet(false, true)) { 1298 this.store.addMessage(context, message); 1299 removeQueueTask(this.store, this.message.getMessageId()); 1300 this.future.complete(); 1301 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1302 System.err.println(this.store.dest.getName() + " cancelled: " 1303 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1304 this.store.canceledTasks = this.store.doneTasks = 0; 1305 } 1306 } catch (Exception e) { 1307 this.future.setException(e); 1308 } 1309 } 1310 1311 protected Message getMessage() { 1312 return this.message; 1313 } 1314 1315 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1316 1317 private Runnable listener; 1318 public InnerFutureTask(Runnable runnable) { 1319 super(runnable, null); 1320 1321 } 1322 1323 public void setException(final Exception e) { 1324 super.setException(e); 1325 } 1326 1327 public void complete() { 1328 super.set(null); 1329 } 1330 1331 @Override 1332 public void done() { 1333 fireListener(); 1334 } 1335 1336 @Override 1337 public void addListener(Runnable listener) { 1338 this.listener = listener; 1339 if (isDone()) { 1340 fireListener(); 1341 } 1342 } 1343 1344 private void fireListener() { 1345 if (listener != null) { 1346 try { 1347 listener.run(); 1348 } catch (Exception ignored) { 1349 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1350 } 1351 } 1352 } 1353 } 1354 } 1355 1356 class StoreTopicTask extends StoreQueueTask { 1357 private final int subscriptionCount; 1358 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1359 private final KahaDBTopicMessageStore topicStore; 1360 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1361 int subscriptionCount) { 1362 super(store, context, message); 1363 this.topicStore = store; 1364 this.subscriptionCount = subscriptionCount; 1365 1366 } 1367 1368 @Override 1369 public void aquireLocks() { 1370 if (this.locked.compareAndSet(false, true)) { 1371 try { 1372 globalTopicSemaphore.acquire(); 1373 store.acquireLocalAsyncLock(); 1374 message.incrementReferenceCount(); 1375 } catch (InterruptedException e) { 1376 LOG.warn("Failed to aquire lock", e); 1377 } 1378 } 1379 } 1380 1381 @Override 1382 public void releaseLocks() { 1383 if (this.locked.compareAndSet(true, false)) { 1384 message.decrementReferenceCount(); 1385 store.releaseLocalAsyncLock(); 1386 globalTopicSemaphore.release(); 1387 } 1388 } 1389 1390 /** 1391 * add a key 1392 * 1393 * @param key 1394 * @return true if all acknowledgements received 1395 */ 1396 public boolean addSubscriptionKey(String key) { 1397 synchronized (this.subscriptionKeys) { 1398 this.subscriptionKeys.add(key); 1399 } 1400 return this.subscriptionKeys.size() >= this.subscriptionCount; 1401 } 1402 1403 @Override 1404 public void run() { 1405 this.store.doneTasks++; 1406 try { 1407 if (this.done.compareAndSet(false, true)) { 1408 this.topicStore.addMessage(context, message); 1409 // apply any acks we have 1410 synchronized (this.subscriptionKeys) { 1411 for (String key : this.subscriptionKeys) { 1412 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1413 1414 } 1415 } 1416 removeTopicTask(this.topicStore, this.message.getMessageId()); 1417 this.future.complete(); 1418 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1419 System.err.println(this.store.dest.getName() + " cancelled: " 1420 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1421 this.store.canceledTasks = this.store.doneTasks = 0; 1422 } 1423 } catch (Exception e) { 1424 this.future.setException(e); 1425 } 1426 } 1427 } 1428 1429 public class StoreTaskExecutor extends ThreadPoolExecutor { 1430 1431 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1432 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1433 } 1434 1435 @Override 1436 protected void afterExecute(Runnable runnable, Throwable throwable) { 1437 super.afterExecute(runnable, throwable); 1438 1439 if (runnable instanceof StoreTask) { 1440 ((StoreTask)runnable).releaseLocks(); 1441 } 1442 } 1443 } 1444 1445 @Override 1446 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1447 return new JobSchedulerStoreImpl(); 1448 } 1449}