public abstract class AbstractJournalStorageManager extends Object implements StorageManager
Using this class also ensures that locks are acquired in the right order, avoiding dead-locks.
Modifier and Type | Class and Description |
---|---|
static class |
AbstractJournalStorageManager.JournalContent |
StorageManager.LargeMessageExtension
Modifier and Type | Field and Description |
---|---|
protected Journal |
bindingsJournal |
protected Configuration |
config |
protected ExecutorFactory |
executorFactory
Used to create Operation Contexts
|
protected BatchingIDGenerator |
idGenerator |
protected ExecutorFactory |
ioExecutors |
protected boolean |
journalLoaded |
protected Set<Long> |
largeMessagesToDelete |
protected Map<SimpleString,PersistedAddressSetting> |
mapPersistedAddressSettings |
protected Map<SimpleString,PersistedRoles> |
mapPersistedRoles |
protected Journal |
messageJournal |
protected Semaphore |
pageMaxConcurrentIO |
protected int |
perfBlastPages |
protected ScheduledExecutorService |
scheduledExecutorService |
protected boolean |
started |
protected ReentrantReadWriteLock |
storageManagerLock |
Constructor and Description |
---|
AbstractJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory ioExecutors) |
AbstractJournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory ioExecutors,
IOCriticalErrorListener criticalErrorListener) |
Modifier and Type | Method and Description |
---|---|
void |
addGrouping(GroupBinding groupBinding) |
void |
addQueueBinding(long tx,
Binding binding) |
boolean |
addToPage(PagingStore store,
ServerMessage msg,
Transaction tx,
RouteContextList listCtx)
Write message to page if we are paging.
|
void |
afterCompleteOperations(IOCallback run) |
void |
afterPageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
void |
afterStoreOperations(IOCallback run)
This is similar to afterComplete, however this only cares about the journal part.
|
void |
beforePageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
protected abstract void |
beforeStart() |
protected abstract void |
beforeStop() |
void |
clearContext() |
void |
commit(long txID) |
void |
commit(long txID,
boolean lineUpContext) |
void |
commitBindings(long txID) |
protected void |
confirmLargeMessage(LargeServerMessage largeServerMessage) |
void |
confirmPendingLargeMessage(long recordID)
We don't need messageID now but we are likely to need it we ever decide to support a database
|
void |
confirmPendingLargeMessageTX(Transaction tx,
long messageID,
long recordID)
Confirms that a large message was finished
|
void |
criticalError(Throwable error) |
void |
deleteAddressSetting(SimpleString addressMatch) |
void |
deleteCursorAcknowledge(long ackID) |
void |
deleteCursorAcknowledgeTransactional(long txID,
long ackID) |
void |
deleteDuplicateID(long recordID) |
void |
deleteDuplicateIDTransactional(long txID,
long recordID) |
void |
deleteGrouping(long tx,
GroupBinding groupBinding) |
void |
deleteHeuristicCompletion(long id) |
void |
deleteID(long journalD) |
void |
deleteIncrementRecord(long txID,
long recordID) |
void |
deleteMessage(long messageID) |
void |
deletePageComplete(long ackID) |
void |
deletePageCounter(long txID,
long recordID) |
void |
deletePageTransactional(long recordID) |
void |
deletePendingPageCounter(long txID,
long recordID) |
void |
deleteQueueBinding(long tx,
long queueBindingID) |
void |
deleteQueueStatus(long recordID) |
void |
deleteSecurityRoles(SimpleString addressMatch) |
long |
generateID() |
Journal |
getBindingsJournal() |
OperationContext |
getContext()
Get the context associated with the thread for later reuse
|
long |
getCurrentID() |
IDGenerator |
getIDGenerator() |
Journal |
getMessageJournal() |
protected abstract void |
init(Configuration config,
IOCriticalErrorListener criticalErrorListener)
Called during initialization.
|
boolean |
isStarted() |
void |
lineUpContext() |
JournalLoadInformation |
loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) |
JournalLoadInformation[] |
loadInternalOnly()
TODO: Is this still being used ?
|
JournalLoadInformation |
loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long,QueueBindingInfo> queueInfos,
Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap,
Set<Pair<Long,Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) |
static String |
md5(File file) |
protected static PersistentQueueBindingEncoding |
newBindingEncoding(long id,
ActiveMQBuffer buffer) |
OperationContext |
newContext(Executor executor1)
It just creates an OperationContext without associating it
|
protected static QueueStatusEncoding |
newQueueStatusEncoding(long id,
ActiveMQBuffer buffer) |
protected static PersistedRoles |
newSecurityRecord(long id,
ActiveMQBuffer buffer) |
OperationContext |
newSingleThreadContext() |
protected abstract LargeServerMessage |
parseLargeMessage(Map<Long,ServerMessage> messages,
ActiveMQBuffer buff) |
protected abstract void |
performCachedLargeMessageDeletes()
Assumption is that this is only called with a writeLock on the StorageManager.
|
void |
persistIdGenerator()
Closes the
IDGenerator persisting the current record ID. |
void |
prepare(long txID,
Xid xid) |
void |
readLock()
Read lock the StorageManager.
|
void |
readUnLock()
Unlock the manager.
|
List<PersistedAddressSetting> |
recoverAddressSettings() |
List<PersistedRoles> |
recoverPersistedRoles() |
void |
rollback(long txID) |
void |
rollbackBindings(long txID) |
void |
setContext(OperationContext context)
Set the context back to the thread
|
void |
start() |
void |
stop() |
void |
stop(boolean ioCriticalError,
boolean sendFailover) |
void |
storeAcknowledge(long queueID,
long messageID) |
void |
storeAcknowledgeTransactional(long txID,
long queueID,
long messageID) |
void |
storeAddressSetting(PersistedAddressSetting addressSetting) |
void |
storeCursorAcknowledge(long queueID,
PagePosition position) |
void |
storeCursorAcknowledgeTransactional(long txID,
long queueID,
PagePosition position) |
void |
storeDuplicateID(SimpleString address,
byte[] duplID,
long recordID) |
void |
storeDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
long |
storeHeuristicCompletion(Xid xid,
boolean isCommit) |
void |
storeID(long journalID,
long id)
Stores the id from IDManager.
|
void |
storeMessage(ServerMessage message) |
void |
storeMessageTransactional(long txID,
ServerMessage message) |
void |
storePageCompleteTransactional(long txID,
long queueID,
PagePosition position) |
long |
storePageCounter(long txID,
long queueID,
long value) |
long |
storePageCounterInc(long queueID,
int value) |
long |
storePageCounterInc(long txID,
long queueID,
int value) |
void |
storePageTransaction(long txID,
PageTransactionInfo pageTransaction) |
long |
storePendingCounter(long queueID,
long pageID,
int inc) |
long |
storeQueueStatus(long queueID,
QueueStatus status) |
void |
storeReference(long queueID,
long messageID,
boolean last) |
void |
storeReferenceTransactional(long txID,
long queueID,
long messageID) |
void |
storeSecurityRoles(PersistedRoles persistedRoles) |
void |
updateDeliveryCount(MessageReference ref) |
void |
updateDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
void |
updatePageTransaction(long txID,
PageTransactionInfo pageTransaction,
int depages) |
void |
updatePageTransaction(PageTransactionInfo pageTransaction,
int depages)
FIXME Unused
|
void |
updateScheduledDeliveryTime(MessageReference ref) |
void |
updateScheduledDeliveryTimeTransactional(long txID,
MessageReference ref) |
void |
waitOnOperations()
Block until the operations are done.
|
boolean |
waitOnOperations(long timeout)
Block until the operations are done.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
addBytesToLargeMessage, allocateDirectBuffer, createFileForLargeMessage, createLargeMessage, createLargeMessage, freeDirectBuffer, injectMonitor, pageClosed, pageDeleted, pageWrite, startReplication, stopReplication
protected Semaphore pageMaxConcurrentIO
protected BatchingIDGenerator idGenerator
protected final ExecutorFactory ioExecutors
protected final ScheduledExecutorService scheduledExecutorService
protected final ReentrantReadWriteLock storageManagerLock
protected Journal messageJournal
protected Journal bindingsJournal
protected volatile boolean started
protected final ExecutorFactory executorFactory
protected int perfBlastPages
protected boolean journalLoaded
protected final Configuration config
protected final Map<SimpleString,PersistedRoles> mapPersistedRoles
protected final Map<SimpleString,PersistedAddressSetting> mapPersistedAddressSettings
public AbstractJournalStorageManager(Configuration config, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, ExecutorFactory ioExecutors)
public AbstractJournalStorageManager(Configuration config, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, ExecutorFactory ioExecutors, IOCriticalErrorListener criticalErrorListener)
protected abstract void init(Configuration config, IOCriticalErrorListener criticalErrorListener)
config
- criticalErrorListener
- public void criticalError(Throwable error)
criticalError
in interface StorageManager
public void clearContext()
clearContext
in interface StorageManager
public IDGenerator getIDGenerator()
public final void waitOnOperations() throws Exception
StorageManager
waitOnOperations
in interface StorageManager
Exception
public final boolean waitOnOperations(long timeout) throws Exception
StorageManager
waitOnOperations
in interface StorageManager
Exception
public OperationContext getContext()
StorageManager
getContext
in interface StorageManager
public void setContext(OperationContext context)
StorageManager
setContext
in interface StorageManager
public OperationContext newSingleThreadContext()
newSingleThreadContext
in interface StorageManager
public OperationContext newContext(Executor executor1)
StorageManager
newContext
in interface StorageManager
public void afterCompleteOperations(IOCallback run)
afterCompleteOperations
in interface StorageManager
public void afterStoreOperations(IOCallback run)
StorageManager
afterStoreOperations
in interface StorageManager
public long generateID()
generateID
in interface IDGenerator
public long getCurrentID()
getCurrentID
in interface IDGenerator
public void confirmPendingLargeMessageTX(Transaction tx, long messageID, long recordID) throws Exception
StorageManager
confirmPendingLargeMessageTX
in interface StorageManager
Exception
public void confirmPendingLargeMessage(long recordID) throws Exception
confirmPendingLargeMessage
in interface StorageManager
Exception
public void storeMessage(ServerMessage message) throws Exception
storeMessage
in interface StorageManager
Exception
public void storeReference(long queueID, long messageID, boolean last) throws Exception
storeReference
in interface StorageManager
Exception
public void readLock()
StorageManager
The main lock is used to write lock the whole manager when starting replication. Sub-systems, say Paging classes, that use locks of their own AND also write through the StorageManager MUST first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock when starting replication sync.
readLock
in interface StorageManager
public void readUnLock()
StorageManager
readUnLock
in interface StorageManager
StorageManager.readLock()
public void storeAcknowledge(long queueID, long messageID) throws Exception
storeAcknowledge
in interface StorageManager
Exception
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
storeCursorAcknowledge
in interface StorageManager
Exception
public void deleteMessage(long messageID) throws Exception
deleteMessage
in interface StorageManager
Exception
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception
updateScheduledDeliveryTime
in interface StorageManager
Exception
public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateID
in interface StorageManager
Exception
public void deleteDuplicateID(long recordID) throws Exception
deleteDuplicateID
in interface StorageManager
Exception
public void storeMessageTransactional(long txID, ServerMessage message) throws Exception
storeMessageTransactional
in interface StorageManager
Exception
public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
storePageTransaction
in interface StorageManager
Exception
public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depages) throws Exception
updatePageTransaction
in interface StorageManager
Exception
public void updatePageTransaction(PageTransactionInfo pageTransaction, int depages) throws Exception
StorageManager
updatePageTransaction
in interface StorageManager
Exception
public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception
storeReferenceTransactional
in interface StorageManager
Exception
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
storeAcknowledgeTransactional
in interface StorageManager
Exception
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
storeCursorAcknowledgeTransactional
in interface StorageManager
Exception
public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception
storePageCompleteTransactional
in interface StorageManager
Exception
public void deletePageComplete(long ackID) throws Exception
deletePageComplete
in interface StorageManager
Exception
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
deleteCursorAcknowledgeTransactional
in interface StorageManager
Exception
public void deleteCursorAcknowledge(long ackID) throws Exception
deleteCursorAcknowledge
in interface StorageManager
Exception
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
storeHeuristicCompletion
in interface StorageManager
Exception
public void deleteHeuristicCompletion(long id) throws Exception
deleteHeuristicCompletion
in interface StorageManager
Exception
public void deletePageTransactional(long recordID) throws Exception
deletePageTransactional
in interface StorageManager
Exception
public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception
updateScheduledDeliveryTimeTransactional
in interface StorageManager
Exception
public void prepare(long txID, Xid xid) throws Exception
prepare
in interface StorageManager
Exception
public void commit(long txID) throws Exception
commit
in interface StorageManager
Exception
public void commitBindings(long txID) throws Exception
commitBindings
in interface StorageManager
Exception
public void rollbackBindings(long txID) throws Exception
rollbackBindings
in interface StorageManager
Exception
public void commit(long txID, boolean lineUpContext) throws Exception
commit
in interface StorageManager
Exception
public void rollback(long txID) throws Exception
rollback
in interface StorageManager
Exception
public void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateIDTransactional
in interface StorageManager
Exception
public void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception
updateDuplicateIDTransactional
in interface StorageManager
Exception
public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
deleteDuplicateIDTransactional
in interface StorageManager
Exception
public void updateDeliveryCount(MessageReference ref) throws Exception
updateDeliveryCount
in interface StorageManager
Exception
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
storeAddressSetting
in interface StorageManager
Exception
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
recoverAddressSettings
in interface StorageManager
Exception
public List<PersistedRoles> recoverPersistedRoles() throws Exception
recoverPersistedRoles
in interface StorageManager
Exception
public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
storeSecurityRoles
in interface StorageManager
Exception
public void storeID(long journalID, long id) throws Exception
StorageManager
storeID
in interface StorageManager
Exception
public void deleteID(long journalD) throws Exception
deleteID
in interface StorageManager
Exception
public void deleteAddressSetting(SimpleString addressMatch) throws Exception
deleteAddressSetting
in interface StorageManager
Exception
public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
deleteSecurityRoles
in interface StorageManager
Exception
public JournalLoadInformation loadMessageJournal(PostOffice postOffice, PagingManager pagingManager, ResourceManager resourceManager, Map<Long,QueueBindingInfo> queueInfos, Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap, Set<Pair<Long,Long>> pendingLargeMessages, List<PageCountPending> pendingNonTXPageCounter, JournalLoader journalLoader) throws Exception
loadMessageJournal
in interface StorageManager
Exception
public void addGrouping(GroupBinding groupBinding) throws Exception
addGrouping
in interface StorageManager
Exception
public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception
deleteGrouping
in interface StorageManager
Exception
public void addQueueBinding(long tx, Binding binding) throws Exception
addQueueBinding
in interface StorageManager
Exception
public void deleteQueueBinding(long tx, long queueBindingID) throws Exception
deleteQueueBinding
in interface StorageManager
Exception
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception
storeQueueStatus
in interface StorageManager
queueID
- The id of the queuestatus
- The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED)Exception
public void deleteQueueStatus(long recordID) throws Exception
deleteQueueStatus
in interface StorageManager
Exception
public long storePageCounterInc(long txID, long queueID, int value) throws Exception
storePageCounterInc
in interface StorageManager
Exception
public long storePageCounterInc(long queueID, int value) throws Exception
storePageCounterInc
in interface StorageManager
Exception
public long storePageCounter(long txID, long queueID, long value) throws Exception
storePageCounter
in interface StorageManager
Exception
public long storePendingCounter(long queueID, long pageID, int inc) throws Exception
storePendingCounter
in interface StorageManager
Exception
public void deleteIncrementRecord(long txID, long recordID) throws Exception
deleteIncrementRecord
in interface StorageManager
Exception
public void deletePageCounter(long txID, long recordID) throws Exception
deletePageCounter
in interface StorageManager
Exception
public void deletePendingPageCounter(long txID, long recordID) throws Exception
deletePendingPageCounter
in interface StorageManager
Exception
public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
loadBindingJournal
in interface StorageManager
Exception
public void lineUpContext()
lineUpContext
in interface StorageManager
public void start() throws Exception
start
in interface ActiveMQComponent
Exception
public void stop() throws Exception
stop
in interface ActiveMQComponent
Exception
public void persistIdGenerator()
StorageManager
IDGenerator
persisting the current record ID.
Effectively a "pre-stop" method. Necessary due to the "stop"-order at
ActiveMQServerImpl
persistIdGenerator
in interface StorageManager
protected abstract void performCachedLargeMessageDeletes()
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception
stop
in interface StorageManager
ioCriticalError
- is the server being stopped due to an IO critical error.sendFailover
- this is to send the replication stopping in case of replication.Exception
public boolean isStarted()
isStarted
in interface ActiveMQComponent
public JournalLoadInformation[] loadInternalOnly() throws Exception
Exception
public void beforePageRead() throws Exception
StorageManager
beforePageRead
in interface StorageManager
Exception
public void afterPageRead() throws Exception
StorageManager
afterPageRead
in interface StorageManager
Exception
public Journal getMessageJournal()
getMessageJournal
in interface StorageManager
public Journal getBindingsJournal()
getBindingsJournal
in interface StorageManager
protected void confirmLargeMessage(LargeServerMessage largeServerMessage)
protected abstract LargeServerMessage parseLargeMessage(Map<Long,ServerMessage> messages, ActiveMQBuffer buff) throws Exception
Exception
protected static PersistedRoles newSecurityRecord(long id, ActiveMQBuffer buffer)
id
- buffer
- protected static PersistentQueueBindingEncoding newBindingEncoding(long id, ActiveMQBuffer buffer)
id
- buffer
- protected static QueueStatusEncoding newQueueStatusEncoding(long id, ActiveMQBuffer buffer)
id
- buffer
- public boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception
StorageManager
This is primarily a PagingStore
call, but as with any other call writing persistent
data, it must go through here. Both for the sake of replication, and also to ensure that it
takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
creation of dead-locks.
addToPage
in interface StorageManager
true
if we are paging and have handled the data, false
if the data
needs to be sent to the journalException
Copyright © 2017 JBoss by Red Hat. All rights reserved.