public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager
Using this class also ensures that locks are acquired in the right order, avoiding dead-locks.
Modifier and Type | Class and Description |
---|---|
static class |
AbstractJournalStorageManager.JournalContent |
StorageManager.LargeMessageExtension
Modifier and Type | Field and Description |
---|---|
protected Journal |
bindingsJournal |
protected Configuration |
config |
protected static int |
CRITICAL_PATHS |
protected static int |
CRITICAL_STOP |
protected static int |
CRITICAL_STOP_2 |
protected static int |
CRITICAL_STORE |
protected ExecutorFactory |
executorFactory
Used to create Operation Contexts
|
protected BatchingIDGenerator |
idGenerator |
protected ExecutorFactory |
ioExecutorFactory |
protected boolean |
journalLoaded |
protected Set<Long> |
largeMessagesToDelete |
protected Map<SimpleString,PersistedAddressSetting> |
mapPersistedAddressSettings |
protected Map<SimpleString,PersistedRoles> |
mapPersistedRoles |
protected Journal |
messageJournal |
protected Semaphore |
pageMaxConcurrentIO |
protected ScheduledExecutorService |
scheduledExecutorService |
protected boolean |
started |
protected ReentrantReadWriteLock |
storageManagerLock |
Constructor and Description |
---|
AbstractJournalStorageManager(Configuration config,
CriticalAnalyzer analyzer,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory ioExecutorFactory) |
AbstractJournalStorageManager(Configuration config,
CriticalAnalyzer analyzer,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory ioExecutorFactory,
IOCriticalErrorListener criticalErrorListener) |
Modifier and Type | Method and Description |
---|---|
void |
addAddressBinding(long tx,
AddressInfo addressInfo) |
void |
addGrouping(GroupBinding groupBinding) |
void |
addQueueBinding(long tx,
Binding binding) |
boolean |
addToPage(PagingStore store,
Message msg,
Transaction tx,
RouteContextList listCtx)
Write message to page if we are paging.
|
void |
afterCompleteOperations(IOCallback run) |
void |
afterPageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
void |
afterStoreOperations(IOCallback run)
This is similar to afterComplete, however this only cares about the journal part.
|
void |
beforePageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
protected abstract void |
beforeStart() |
protected abstract void |
beforeStop() |
void |
checkInvalidPageTransactions(PagingManager pagingManager,
Set<PageTransactionInfo> invalidPageTransactions) |
void |
clearContext() |
void |
commit(long txID) |
void |
commit(long txID,
boolean lineUpContext) |
void |
commitBindings(long txID) |
protected void |
confirmLargeMessage(LargeServerMessage largeServerMessage) |
void |
confirmPendingLargeMessage(long recordID)
We don't need messageID now but we are likely to need it we ever decide to support a database
|
void |
confirmPendingLargeMessageTX(Transaction tx,
long messageID,
long recordID)
Confirms that a large message was finished
|
void |
criticalError(Throwable error) |
void |
deleteAddressBinding(long tx,
long addressBindingID) |
void |
deleteAddressSetting(SimpleString addressMatch) |
void |
deleteCursorAcknowledge(long ackID) |
void |
deleteCursorAcknowledgeTransactional(long txID,
long ackID) |
void |
deleteDuplicateID(long recordID) |
void |
deleteDuplicateIDTransactional(long txID,
long recordID) |
void |
deleteGrouping(long tx,
GroupBinding groupBinding) |
void |
deleteHeuristicCompletion(long id) |
void |
deleteID(long journalD) |
void |
deleteIncrementRecord(long txID,
long recordID) |
void |
deleteMessage(long messageID) |
void |
deletePageComplete(long ackID) |
void |
deletePageCounter(long txID,
long recordID) |
void |
deletePageTransactional(long recordID) |
void |
deletePendingPageCounter(long txID,
long recordID) |
void |
deleteQueueBinding(long tx,
long queueBindingID) |
void |
deleteQueueStatus(long recordID) |
void |
deleteSecurityRoles(SimpleString addressMatch) |
long |
generateID() |
Journal |
getBindingsJournal() |
OperationContext |
getContext()
Get the context associated with the thread for later reuse
|
long |
getCurrentID() |
IDGenerator |
getIDGenerator() |
long |
getMaxRecordSize() |
Journal |
getMessageJournal() |
protected abstract void |
init(Configuration config,
IOCriticalErrorListener criticalErrorListener)
Called during initialization.
|
boolean |
isStarted() |
void |
lineUpContext() |
JournalLoadInformation |
loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos,
List<AddressBindingInfo> addressBindingInfos) |
JournalLoadInformation[] |
loadInternalOnly()
TODO: Is this still being used ?
|
JournalLoadInformation |
loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long,QueueBindingInfo> queueInfos,
Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap,
Set<Pair<Long,Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) |
static String |
md5(File file) |
protected static PersistentAddressBindingEncoding |
newAddressBindingEncoding(long id,
ActiveMQBuffer buffer) |
OperationContext |
newContext(Executor executor1)
It just creates an OperationContext without associating it
|
protected static PersistentQueueBindingEncoding |
newQueueBindingEncoding(long id,
ActiveMQBuffer buffer) |
protected static QueueStatusEncoding |
newQueueStatusEncoding(long id,
ActiveMQBuffer buffer) |
protected static PersistedRoles |
newSecurityRecord(long id,
ActiveMQBuffer buffer) |
OperationContext |
newSingleThreadContext() |
protected abstract LargeServerMessage |
parseLargeMessage(Map<Long,Message> messages,
ActiveMQBuffer buff) |
protected abstract void |
performCachedLargeMessageDeletes()
Assumption is that this is only called with a writeLock on the StorageManager.
|
void |
persistIdGenerator()
Closes the
IDGenerator persisting the current record ID. |
void |
prepare(long txID,
Xid xid) |
void |
readLock()
Read lock the StorageManager.
|
void |
readUnLock()
Unlock the manager.
|
List<PersistedAddressSetting> |
recoverAddressSettings() |
List<PersistedRoles> |
recoverPersistedRoles() |
void |
rollback(long txID) |
void |
rollbackBindings(long txID) |
void |
setContext(OperationContext context)
Set the context back to the thread
|
void |
start() |
void |
stop() |
void |
stop(boolean ioCriticalError,
boolean sendFailover) |
void |
storeAcknowledge(long queueID,
long messageID) |
void |
storeAcknowledgeTransactional(long txID,
long queueID,
long messageID) |
void |
storeAddressSetting(PersistedAddressSetting addressSetting) |
void |
storeCursorAcknowledge(long queueID,
PagePosition position) |
void |
storeCursorAcknowledgeTransactional(long txID,
long queueID,
PagePosition position) |
void |
storeDuplicateID(SimpleString address,
byte[] duplID,
long recordID) |
void |
storeDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
long |
storeHeuristicCompletion(Xid xid,
boolean isCommit) |
void |
storeID(long journalID,
long id)
Stores the id from IDManager.
|
void |
storeMessage(Message message) |
void |
storeMessageTransactional(long txID,
Message message) |
void |
storePageCompleteTransactional(long txID,
long queueID,
PagePosition position) |
long |
storePageCounter(long txID,
long queueID,
long value,
long persistentSize) |
long |
storePageCounterInc(long queueID,
int value,
long persistentSize) |
long |
storePageCounterInc(long txID,
long queueID,
int value,
long persistentSize) |
void |
storePageTransaction(long txID,
PageTransactionInfo pageTransaction) |
long |
storePendingCounter(long queueID,
long pageID) |
long |
storeQueueStatus(long queueID,
QueueStatus status) |
void |
storeReference(long queueID,
long messageID,
boolean last) |
void |
storeReferenceTransactional(long txID,
long queueID,
long messageID) |
void |
storeSecurityRoles(PersistedRoles persistedRoles) |
void |
updateDeliveryCount(MessageReference ref) |
void |
updateDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
void |
updatePageTransaction(long txID,
PageTransactionInfo pageTransaction,
int depages) |
void |
updateQueueBinding(long tx,
Binding binding) |
void |
updateScheduledDeliveryTime(MessageReference ref) |
void |
updateScheduledDeliveryTimeTransactional(long txID,
MessageReference ref) |
void |
waitOnOperations()
Block until the operations are done.
|
boolean |
waitOnOperations(long timeout)
Block until the operations are done.
|
void |
writeLock()
for internal use and testsuite, don't use it outside of tests
|
void |
writeUnlock()
for internal use and testsuite, don't use it outside of tests
|
enterCritical, getCriticalAnalyzer, isExpired, leaveCritical
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
addBytesToLargeMessage, allocateDirectBuffer, createFileForLargeMessage, createLargeMessage, createLargeMessage, freeDirectBuffer, getJournalSequentialFileFactory, injectMonitor, pageClosed, pageDeleted, pageWrite, startReplication, stopReplication
protected static final int CRITICAL_PATHS
protected static final int CRITICAL_STORE
protected static final int CRITICAL_STOP
protected static final int CRITICAL_STOP_2
protected Semaphore pageMaxConcurrentIO
protected BatchingIDGenerator idGenerator
protected final ExecutorFactory ioExecutorFactory
protected final ScheduledExecutorService scheduledExecutorService
protected final ReentrantReadWriteLock storageManagerLock
protected Journal messageJournal
protected Journal bindingsJournal
protected volatile boolean started
protected final ExecutorFactory executorFactory
protected boolean journalLoaded
protected final Configuration config
protected final Map<SimpleString,PersistedRoles> mapPersistedRoles
protected final Map<SimpleString,PersistedAddressSetting> mapPersistedAddressSettings
public AbstractJournalStorageManager(Configuration config, CriticalAnalyzer analyzer, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, ExecutorFactory ioExecutorFactory)
public AbstractJournalStorageManager(Configuration config, CriticalAnalyzer analyzer, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, ExecutorFactory ioExecutorFactory, IOCriticalErrorListener criticalErrorListener)
public long getMaxRecordSize()
getMaxRecordSize
in interface StorageManager
protected abstract void init(Configuration config, IOCriticalErrorListener criticalErrorListener)
config
- criticalErrorListener
- public void criticalError(Throwable error)
criticalError
in interface StorageManager
public void clearContext()
clearContext
in interface StorageManager
public IDGenerator getIDGenerator()
public final void waitOnOperations() throws Exception
StorageManager
waitOnOperations
in interface StorageManager
Exception
public final boolean waitOnOperations(long timeout) throws Exception
StorageManager
waitOnOperations
in interface StorageManager
Exception
public OperationContext getContext()
StorageManager
getContext
in interface StorageManager
public void setContext(OperationContext context)
StorageManager
setContext
in interface StorageManager
public OperationContext newSingleThreadContext()
newSingleThreadContext
in interface StorageManager
public OperationContext newContext(Executor executor1)
StorageManager
newContext
in interface StorageManager
public void afterCompleteOperations(IOCallback run)
afterCompleteOperations
in interface StorageManager
public void afterStoreOperations(IOCallback run)
StorageManager
afterStoreOperations
in interface StorageManager
public long generateID()
generateID
in interface IDGenerator
public long getCurrentID()
getCurrentID
in interface IDGenerator
public void confirmPendingLargeMessageTX(Transaction tx, long messageID, long recordID) throws Exception
StorageManager
confirmPendingLargeMessageTX
in interface StorageManager
Exception
public void confirmPendingLargeMessage(long recordID) throws Exception
confirmPendingLargeMessage
in interface StorageManager
Exception
public void storeMessage(Message message) throws Exception
storeMessage
in interface StorageManager
Exception
public void storeReference(long queueID, long messageID, boolean last) throws Exception
storeReference
in interface StorageManager
Exception
public void readLock()
StorageManager
The main lock is used to write lock the whole manager when starting replication. Sub-systems, say Paging classes, that use locks of their own AND also write through the StorageManager MUST first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock when starting replication sync.
readLock
in interface StorageManager
public void readUnLock()
StorageManager
readUnLock
in interface StorageManager
StorageManager.readLock()
public void writeLock()
public void writeUnlock()
public void storeAcknowledge(long queueID, long messageID) throws Exception
storeAcknowledge
in interface StorageManager
Exception
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
storeCursorAcknowledge
in interface StorageManager
Exception
public void deleteMessage(long messageID) throws Exception
deleteMessage
in interface StorageManager
Exception
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception
updateScheduledDeliveryTime
in interface StorageManager
Exception
public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateID
in interface StorageManager
Exception
public void deleteDuplicateID(long recordID) throws Exception
deleteDuplicateID
in interface StorageManager
Exception
public void storeMessageTransactional(long txID, Message message) throws Exception
storeMessageTransactional
in interface StorageManager
Exception
public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
storePageTransaction
in interface StorageManager
Exception
public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depages) throws Exception
updatePageTransaction
in interface StorageManager
Exception
public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception
storeReferenceTransactional
in interface StorageManager
Exception
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
storeAcknowledgeTransactional
in interface StorageManager
Exception
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
storeCursorAcknowledgeTransactional
in interface StorageManager
Exception
public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception
storePageCompleteTransactional
in interface StorageManager
Exception
public void deletePageComplete(long ackID) throws Exception
deletePageComplete
in interface StorageManager
Exception
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
deleteCursorAcknowledgeTransactional
in interface StorageManager
Exception
public void deleteCursorAcknowledge(long ackID) throws Exception
deleteCursorAcknowledge
in interface StorageManager
Exception
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
storeHeuristicCompletion
in interface StorageManager
Exception
public void deleteHeuristicCompletion(long id) throws Exception
deleteHeuristicCompletion
in interface StorageManager
Exception
public void deletePageTransactional(long recordID) throws Exception
deletePageTransactional
in interface StorageManager
Exception
public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception
updateScheduledDeliveryTimeTransactional
in interface StorageManager
Exception
public void prepare(long txID, Xid xid) throws Exception
prepare
in interface StorageManager
Exception
public void commit(long txID) throws Exception
commit
in interface StorageManager
Exception
public void commitBindings(long txID) throws Exception
commitBindings
in interface StorageManager
Exception
public void rollbackBindings(long txID) throws Exception
rollbackBindings
in interface StorageManager
Exception
public void commit(long txID, boolean lineUpContext) throws Exception
commit
in interface StorageManager
Exception
public void rollback(long txID) throws Exception
rollback
in interface StorageManager
Exception
public void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateIDTransactional
in interface StorageManager
Exception
public void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception
updateDuplicateIDTransactional
in interface StorageManager
Exception
public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
deleteDuplicateIDTransactional
in interface StorageManager
Exception
public void updateDeliveryCount(MessageReference ref) throws Exception
updateDeliveryCount
in interface StorageManager
Exception
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
storeAddressSetting
in interface StorageManager
Exception
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
recoverAddressSettings
in interface StorageManager
Exception
public List<PersistedRoles> recoverPersistedRoles() throws Exception
recoverPersistedRoles
in interface StorageManager
Exception
public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
storeSecurityRoles
in interface StorageManager
Exception
public void storeID(long journalID, long id) throws Exception
StorageManager
storeID
in interface StorageManager
Exception
public void deleteID(long journalD) throws Exception
deleteID
in interface StorageManager
Exception
public void deleteAddressSetting(SimpleString addressMatch) throws Exception
deleteAddressSetting
in interface StorageManager
Exception
public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
deleteSecurityRoles
in interface StorageManager
Exception
public JournalLoadInformation loadMessageJournal(PostOffice postOffice, PagingManager pagingManager, ResourceManager resourceManager, Map<Long,QueueBindingInfo> queueInfos, Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap, Set<Pair<Long,Long>> pendingLargeMessages, List<PageCountPending> pendingNonTXPageCounter, JournalLoader journalLoader) throws Exception
loadMessageJournal
in interface StorageManager
Exception
public void checkInvalidPageTransactions(PagingManager pagingManager, Set<PageTransactionInfo> invalidPageTransactions)
public void addGrouping(GroupBinding groupBinding) throws Exception
addGrouping
in interface StorageManager
Exception
public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception
deleteGrouping
in interface StorageManager
Exception
public void updateQueueBinding(long tx, Binding binding) throws Exception
updateQueueBinding
in interface StorageManager
Exception
public void addQueueBinding(long tx, Binding binding) throws Exception
addQueueBinding
in interface StorageManager
Exception
public void deleteQueueBinding(long tx, long queueBindingID) throws Exception
deleteQueueBinding
in interface StorageManager
Exception
public long storeQueueStatus(long queueID, QueueStatus status) throws Exception
storeQueueStatus
in interface StorageManager
queueID
- The id of the queuestatus
- The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED)Exception
public void deleteQueueStatus(long recordID) throws Exception
deleteQueueStatus
in interface StorageManager
Exception
public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception
addAddressBinding
in interface StorageManager
Exception
public void deleteAddressBinding(long tx, long addressBindingID) throws Exception
deleteAddressBinding
in interface StorageManager
Exception
public long storePageCounterInc(long txID, long queueID, int value, long persistentSize) throws Exception
storePageCounterInc
in interface StorageManager
Exception
public long storePageCounterInc(long queueID, int value, long persistentSize) throws Exception
storePageCounterInc
in interface StorageManager
Exception
public long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception
storePageCounter
in interface StorageManager
Exception
public long storePendingCounter(long queueID, long pageID) throws Exception
storePendingCounter
in interface StorageManager
Exception
public void deleteIncrementRecord(long txID, long recordID) throws Exception
deleteIncrementRecord
in interface StorageManager
Exception
public void deletePageCounter(long txID, long recordID) throws Exception
deletePageCounter
in interface StorageManager
Exception
public void deletePendingPageCounter(long txID, long recordID) throws Exception
deletePendingPageCounter
in interface StorageManager
Exception
public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos, List<AddressBindingInfo> addressBindingInfos) throws Exception
loadBindingJournal
in interface StorageManager
Exception
public void lineUpContext()
lineUpContext
in interface StorageManager
public void start() throws Exception
start
in interface ActiveMQComponent
Exception
public void stop() throws Exception
stop
in interface ActiveMQComponent
Exception
public void persistIdGenerator()
StorageManager
IDGenerator
persisting the current record ID.
Effectively a "pre-stop" method. Necessary due to the "stop"-order at
ActiveMQServerImpl
persistIdGenerator
in interface StorageManager
protected abstract void performCachedLargeMessageDeletes()
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception
stop
in interface StorageManager
ioCriticalError
- is the server being stopped due to an IO critical error.sendFailover
- this is to send the replication stopping in case of replication.Exception
public boolean isStarted()
isStarted
in interface ActiveMQComponent
public JournalLoadInformation[] loadInternalOnly() throws Exception
Exception
public void beforePageRead() throws Exception
StorageManager
beforePageRead
in interface StorageManager
Exception
public void afterPageRead() throws Exception
StorageManager
afterPageRead
in interface StorageManager
Exception
public Journal getMessageJournal()
getMessageJournal
in interface StorageManager
public Journal getBindingsJournal()
getBindingsJournal
in interface StorageManager
protected void confirmLargeMessage(LargeServerMessage largeServerMessage)
protected abstract LargeServerMessage parseLargeMessage(Map<Long,Message> messages, ActiveMQBuffer buff) throws Exception
Exception
protected static PersistedRoles newSecurityRecord(long id, ActiveMQBuffer buffer)
id
- buffer
- protected static PersistentQueueBindingEncoding newQueueBindingEncoding(long id, ActiveMQBuffer buffer)
id
- buffer
- protected static QueueStatusEncoding newQueueStatusEncoding(long id, ActiveMQBuffer buffer)
id
- buffer
- protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long id, ActiveMQBuffer buffer)
public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception
StorageManager
This is primarily a PagingStore
call, but as with any other call writing persistent
data, it must go through here. Both for the sake of replication, and also to ensure that it
takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
creation of dead-locks.
addToPage
in interface StorageManager
true
if we are paging and have handled the data, false
if the data
needs to be sent to the journalException
Copyright © 2018 JBoss by Red Hat. All rights reserved.