public class JournalStorageManager extends Object implements StorageManager
Using this class also ensures that locks are acquired in the right order, avoiding dead-locks.
Notice that, turning on and off replication (on the live server side) is _mostly_ a matter of
using ReplicatedJournal
s instead of regular JournalImpl
, and sync the existing
data. For details see the Javadoc of
#startReplication(ReplicationManager, PagingManager, String, boolean)
.
StorageManager.LargeMessageExtension
Constructor and Description |
---|
JournalStorageManager(Configuration config,
ExecutorFactory executorFactory) |
JournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
IOCriticalErrorListener criticalErrorListener) |
Modifier and Type | Method and Description |
---|---|
void |
addBytesToLargeMessage(SequentialFile file,
long messageId,
byte[] bytes) |
void |
addGrouping(GroupBinding groupBinding) |
void |
addQueueBinding(long tx,
Binding binding) |
boolean |
addToPage(PagingStore store,
ServerMessage msg,
Transaction tx,
RouteContextList listCtx)
Write message to page if we are paging.
|
void |
afterCompleteOperations(IOCallback run) |
void |
afterPageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
ByteBuffer |
allocateDirectBuffer(int size)
AIO has an optimized buffer which has a method to release it
instead of the way NIO will release data based on GC.
|
void |
beforePageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
void |
clearContext() |
void |
commit(long txID) |
void |
commit(long txID,
boolean lineUpContext) |
void |
commitBindings(long txID) |
void |
confirmPendingLargeMessage(long recordID)
We don't need messageID now but we are likely to need it we ever decide to support a database
|
void |
confirmPendingLargeMessageTX(Transaction tx,
long messageID,
long recordID)
Confirms that a large message was finished
|
SequentialFile |
createFileForLargeMessage(long messageID,
StorageManager.LargeMessageExtension extension)
Instantiates a SequentialFile to be used for storing a
LargeServerMessage . |
LargeServerMessage |
createLargeMessage() |
LargeServerMessage |
createLargeMessage(long id,
MessageInternal message)
Creates a new LargeMessage with the given id.
|
void |
criticalError(Throwable error) |
void |
deleteAddressSetting(SimpleString addressMatch) |
void |
deleteCursorAcknowledge(long ackID) |
void |
deleteCursorAcknowledgeTransactional(long txID,
long ackID) |
void |
deleteDuplicateID(long recordID) |
void |
deleteDuplicateIDTransactional(long txID,
long recordID) |
void |
deleteGrouping(long tx,
GroupBinding groupBinding) |
void |
deleteHeuristicCompletion(long id) |
void |
deleteID(long journalD) |
void |
deleteIncrementRecord(long txID,
long recordID) |
void |
deleteMessage(long messageID) |
void |
deletePageComplete(long ackID) |
void |
deletePageCounter(long txID,
long recordID) |
void |
deletePageTransactional(long recordID) |
void |
deletePendingPageCounter(long txID,
long recordID) |
void |
deleteQueueBinding(long tx,
long queueBindingID) |
void |
deleteSecurityRoles(SimpleString addressMatch) |
void |
freeDirectBuffer(ByteBuffer buffer)
AIO has an optimized buffer which has a method to release it
instead of the way NIO will release data based on GC.
|
long |
generateID() |
Journal |
getBindingsJournal() |
OperationContext |
getContext()
Get the context associated with the thread for later reuse
|
long |
getCurrentID() |
IDGenerator |
getIDGenerator() |
Journal |
getMessageJournal() |
Executor |
getSingleThreadExecutor() |
boolean |
isReplicated() |
boolean |
isStarted() |
void |
lineUpContext() |
JournalLoadInformation |
loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) |
JournalLoadInformation[] |
loadInternalOnly()
TODO: Is this still being used ?
|
JournalLoadInformation |
loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long,QueueBindingInfo> queueInfos,
Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap,
Set<Pair<Long,Long>> pendingLargeMessages,
List<PageCountPending> pendingNonTXPageCounter,
JournalLoader journalLoader) |
static String |
md5(File file) |
protected static JournalStorageManager.PersistentQueueBindingEncoding |
newBindingEncoding(long id,
ActiveMQBuffer buffer) |
OperationContext |
newContext(Executor executor1)
It just creates an OperationContext without associating it
|
protected static PersistedRoles |
newSecurityRecord(long id,
ActiveMQBuffer buffer) |
OperationContext |
newSingleThreadContext() |
void |
pageClosed(SimpleString storeName,
int pageNumber) |
void |
pageDeleted(SimpleString storeName,
int pageNumber) |
void |
pageWrite(PagedMessage message,
int pageNumber) |
void |
persistIdGenerator()
Closes the
IDGenerator persisting the current record ID. |
void |
prepare(long txID,
Xid xid) |
void |
readLock()
Read lock the StorageManager.
|
void |
readUnLock()
Unlock the manager.
|
List<PersistedAddressSetting> |
recoverAddressSettings() |
List<PersistedRoles> |
recoverPersistedRoles() |
void |
rollback(long txID) |
void |
rollbackBindings(long txID) |
void |
setContext(OperationContext context)
Set the context back to the thread
|
void |
start() |
void |
startReplication(ReplicationManager replicationManager,
PagingManager pagingManager,
String nodeID,
boolean autoFailBack,
long initialReplicationSyncTimeout)
Starts replication at the live-server side.
|
void |
stop() |
void |
stop(boolean ioCriticalError) |
void |
stopReplication()
Stops replication by resetting replication-related fields to their 'unreplicated' state.
|
void |
storeAcknowledge(long queueID,
long messageID) |
void |
storeAcknowledgeTransactional(long txID,
long queueID,
long messageID) |
void |
storeAddressSetting(PersistedAddressSetting addressSetting) |
void |
storeCursorAcknowledge(long queueID,
PagePosition position) |
void |
storeCursorAcknowledgeTransactional(long txID,
long queueID,
PagePosition position) |
void |
storeDuplicateID(SimpleString address,
byte[] duplID,
long recordID) |
void |
storeDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
long |
storeHeuristicCompletion(Xid xid,
boolean isCommit) |
void |
storeID(long journalID,
long id)
Stores the id from IDManager.
|
void |
storeMessage(ServerMessage message) |
void |
storeMessageTransactional(long txID,
ServerMessage message) |
void |
storePageCompleteTransactional(long txID,
long queueID,
PagePosition position) |
long |
storePageCounter(long txID,
long queueID,
long value) |
long |
storePageCounterInc(long queueID,
int value) |
long |
storePageCounterInc(long txID,
long queueID,
int value) |
void |
storePageTransaction(long txID,
PageTransactionInfo pageTransaction) |
long |
storePendingCounter(long queueID,
long pageID,
int inc) |
long |
storePendingLargeMessage(long messageID) |
void |
storeReference(long queueID,
long messageID,
boolean last) |
void |
storeReferenceTransactional(long txID,
long queueID,
long messageID) |
void |
storeSecurityRoles(PersistedRoles persistedRoles) |
void |
updateDeliveryCount(MessageReference ref) |
void |
updateDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
void |
updatePageTransaction(long txID,
PageTransactionInfo pageTransaction,
int depages) |
void |
updatePageTransaction(PageTransactionInfo pageTransaction,
int depages)
FIXME Unused
|
void |
updateScheduledDeliveryTime(MessageReference ref) |
void |
updateScheduledDeliveryTimeTransactional(long txID,
MessageReference ref) |
void |
waitOnOperations()
Block until the operations are done.
|
boolean |
waitOnOperations(long timeout)
Block until the operations are done.
|
public JournalStorageManager(Configuration config, ExecutorFactory executorFactory)
public JournalStorageManager(Configuration config, ExecutorFactory executorFactory, IOCriticalErrorListener criticalErrorListener)
public void criticalError(Throwable error)
criticalError
in interface StorageManager
public void clearContext()
clearContext
in interface StorageManager
public boolean isReplicated()
public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID, boolean autoFailBack, long initialReplicationSyncTimeout) throws Exception
In practice that means 2 things:
(1) all currently existing data must be sent to the backup.
(2) every new persistent information is replicated (sent) to the backup.
To achieve (1), we lock the entire journal while collecting the list of files to send to the backup. The journal does not remain locked during actual synchronization.
To achieve (2), instead of writing directly to instances of JournalImpl
, we write to
instances of ReplicatedJournal
.
At the backup-side replication is handled by ReplicationEndpoint
.
startReplication
in interface StorageManager
replicationManager
- pagingManager
- ActiveMQException
Exception
startReplication(org.apache.activemq.artemis.core.replication.ReplicationManager, org.apache.activemq.artemis.core.paging.PagingManager, String, boolean, long)
public void stopReplication()
stopReplication
in interface StorageManager
public IDGenerator getIDGenerator()
public final void waitOnOperations() throws Exception
StorageManager
waitOnOperations
in interface StorageManager
Exception
public final boolean waitOnOperations(long timeout) throws Exception
StorageManager
waitOnOperations
in interface StorageManager
Exception
public void pageClosed(SimpleString storeName, int pageNumber)
pageClosed
in interface StorageManager
public void pageDeleted(SimpleString storeName, int pageNumber)
pageDeleted
in interface StorageManager
public void pageWrite(PagedMessage message, int pageNumber)
pageWrite
in interface StorageManager
public OperationContext getContext()
StorageManager
getContext
in interface StorageManager
public void setContext(OperationContext context)
StorageManager
setContext
in interface StorageManager
public Executor getSingleThreadExecutor()
public OperationContext newSingleThreadContext()
newSingleThreadContext
in interface StorageManager
public OperationContext newContext(Executor executor1)
StorageManager
newContext
in interface StorageManager
public void afterCompleteOperations(IOCallback run)
afterCompleteOperations
in interface StorageManager
public long generateID()
generateID
in interface IDGenerator
public long getCurrentID()
getCurrentID
in interface IDGenerator
public LargeServerMessage createLargeMessage()
createLargeMessage
in interface StorageManager
public final void addBytesToLargeMessage(SequentialFile file, long messageId, byte[] bytes) throws Exception
addBytesToLargeMessage
in interface StorageManager
Exception
public LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception
StorageManager
createLargeMessage
in interface StorageManager
message
- This is a temporary message that holds the parsed properties. The remoting
layer can't create a ServerMessage directly, then this will be replaced.Exception
public long storePendingLargeMessage(long messageID) throws Exception
Exception
public void confirmPendingLargeMessageTX(Transaction tx, long messageID, long recordID) throws Exception
StorageManager
confirmPendingLargeMessageTX
in interface StorageManager
Exception
public void confirmPendingLargeMessage(long recordID) throws Exception
confirmPendingLargeMessage
in interface StorageManager
Exception
public void storeMessage(ServerMessage message) throws Exception
storeMessage
in interface StorageManager
Exception
public void storeReference(long queueID, long messageID, boolean last) throws Exception
storeReference
in interface StorageManager
Exception
public void readLock()
StorageManager
The main lock is used to write lock the whole manager when starting replication. Sub-systems, say Paging classes, that use locks of their own AND also write through the StorageManager MUST first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock when starting replication sync.
readLock
in interface StorageManager
public void readUnLock()
StorageManager
readUnLock
in interface StorageManager
StorageManager.readLock()
public void storeAcknowledge(long queueID, long messageID) throws Exception
storeAcknowledge
in interface StorageManager
Exception
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
storeCursorAcknowledge
in interface StorageManager
Exception
public void deleteMessage(long messageID) throws Exception
deleteMessage
in interface StorageManager
Exception
public void updateScheduledDeliveryTime(MessageReference ref) throws Exception
updateScheduledDeliveryTime
in interface StorageManager
Exception
public void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateID
in interface StorageManager
Exception
public void deleteDuplicateID(long recordID) throws Exception
deleteDuplicateID
in interface StorageManager
Exception
public void storeMessageTransactional(long txID, ServerMessage message) throws Exception
storeMessageTransactional
in interface StorageManager
Exception
public void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception
storePageTransaction
in interface StorageManager
Exception
public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depages) throws Exception
updatePageTransaction
in interface StorageManager
Exception
public void updatePageTransaction(PageTransactionInfo pageTransaction, int depages) throws Exception
StorageManager
updatePageTransaction
in interface StorageManager
Exception
public void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception
storeReferenceTransactional
in interface StorageManager
Exception
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
storeAcknowledgeTransactional
in interface StorageManager
Exception
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
storeCursorAcknowledgeTransactional
in interface StorageManager
Exception
public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception
storePageCompleteTransactional
in interface StorageManager
Exception
public void deletePageComplete(long ackID) throws Exception
deletePageComplete
in interface StorageManager
Exception
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
deleteCursorAcknowledgeTransactional
in interface StorageManager
Exception
public void deleteCursorAcknowledge(long ackID) throws Exception
deleteCursorAcknowledge
in interface StorageManager
Exception
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
storeHeuristicCompletion
in interface StorageManager
Exception
public void deleteHeuristicCompletion(long id) throws Exception
deleteHeuristicCompletion
in interface StorageManager
Exception
public void deletePageTransactional(long recordID) throws Exception
deletePageTransactional
in interface StorageManager
Exception
public void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception
updateScheduledDeliveryTimeTransactional
in interface StorageManager
Exception
public void prepare(long txID, Xid xid) throws Exception
prepare
in interface StorageManager
Exception
public void commit(long txID) throws Exception
commit
in interface StorageManager
Exception
public void commitBindings(long txID) throws Exception
commitBindings
in interface StorageManager
Exception
public void rollbackBindings(long txID) throws Exception
rollbackBindings
in interface StorageManager
Exception
public void commit(long txID, boolean lineUpContext) throws Exception
commit
in interface StorageManager
Exception
public void rollback(long txID) throws Exception
rollback
in interface StorageManager
Exception
public void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateIDTransactional
in interface StorageManager
Exception
public void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception
updateDuplicateIDTransactional
in interface StorageManager
Exception
public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
deleteDuplicateIDTransactional
in interface StorageManager
Exception
public void updateDeliveryCount(MessageReference ref) throws Exception
updateDeliveryCount
in interface StorageManager
Exception
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
storeAddressSetting
in interface StorageManager
Exception
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
recoverAddressSettings
in interface StorageManager
Exception
public List<PersistedRoles> recoverPersistedRoles() throws Exception
recoverPersistedRoles
in interface StorageManager
Exception
public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
storeSecurityRoles
in interface StorageManager
Exception
public final void storeID(long journalID, long id) throws Exception
StorageManager
storeID
in interface StorageManager
Exception
public void deleteID(long journalD) throws Exception
deleteID
in interface StorageManager
Exception
public void deleteAddressSetting(SimpleString addressMatch) throws Exception
deleteAddressSetting
in interface StorageManager
Exception
public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
deleteSecurityRoles
in interface StorageManager
Exception
public JournalLoadInformation loadMessageJournal(PostOffice postOffice, PagingManager pagingManager, ResourceManager resourceManager, Map<Long,QueueBindingInfo> queueInfos, Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap, Set<Pair<Long,Long>> pendingLargeMessages, List<PageCountPending> pendingNonTXPageCounter, JournalLoader journalLoader) throws Exception
loadMessageJournal
in interface StorageManager
Exception
public void addGrouping(GroupBinding groupBinding) throws Exception
addGrouping
in interface StorageManager
Exception
public void deleteGrouping(long tx, GroupBinding groupBinding) throws Exception
deleteGrouping
in interface StorageManager
Exception
public void addQueueBinding(long tx, Binding binding) throws Exception
addQueueBinding
in interface StorageManager
Exception
public void deleteQueueBinding(long tx, long queueBindingID) throws Exception
deleteQueueBinding
in interface StorageManager
Exception
public long storePageCounterInc(long txID, long queueID, int value) throws Exception
storePageCounterInc
in interface StorageManager
Exception
public long storePageCounterInc(long queueID, int value) throws Exception
storePageCounterInc
in interface StorageManager
Exception
public long storePageCounter(long txID, long queueID, long value) throws Exception
storePageCounter
in interface StorageManager
Exception
public long storePendingCounter(long queueID, long pageID, int inc) throws Exception
storePendingCounter
in interface StorageManager
Exception
public void deleteIncrementRecord(long txID, long recordID) throws Exception
deleteIncrementRecord
in interface StorageManager
Exception
public void deletePageCounter(long txID, long recordID) throws Exception
deletePageCounter
in interface StorageManager
Exception
public void deletePendingPageCounter(long txID, long recordID) throws Exception
deletePendingPageCounter
in interface StorageManager
Exception
public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
loadBindingJournal
in interface StorageManager
Exception
public void lineUpContext()
lineUpContext
in interface StorageManager
public void start() throws Exception
start
in interface ActiveMQComponent
Exception
public void stop() throws Exception
stop
in interface ActiveMQComponent
Exception
public void persistIdGenerator()
StorageManager
IDGenerator
persisting the current record ID.
Effectively a "pre-stop" method. Necessary due to the "stop"-order at
ActiveMQServerImpl
persistIdGenerator
in interface StorageManager
public void stop(boolean ioCriticalError) throws Exception
stop
in interface StorageManager
ioCriticalError
- is the server being stopped due to an IO critical errorException
public boolean isStarted()
isStarted
in interface ActiveMQComponent
public JournalLoadInformation[] loadInternalOnly() throws Exception
Exception
public void beforePageRead() throws Exception
StorageManager
beforePageRead
in interface StorageManager
Exception
public void afterPageRead() throws Exception
StorageManager
afterPageRead
in interface StorageManager
Exception
public ByteBuffer allocateDirectBuffer(int size)
StorageManager
allocateDirectBuffer
in interface StorageManager
public void freeDirectBuffer(ByteBuffer buffer)
StorageManager
freeDirectBuffer
in interface StorageManager
public Journal getMessageJournal()
getMessageJournal
in interface StorageManager
public Journal getBindingsJournal()
getBindingsJournal
in interface StorageManager
public SequentialFile createFileForLargeMessage(long messageID, StorageManager.LargeMessageExtension extension)
StorageManager
LargeServerMessage
.createFileForLargeMessage
in interface StorageManager
messageID
- the id of the messageextension
- the extension to add to the fileprotected static PersistedRoles newSecurityRecord(long id, ActiveMQBuffer buffer)
id
- buffer
- protected static JournalStorageManager.PersistentQueueBindingEncoding newBindingEncoding(long id, ActiveMQBuffer buffer)
id
- buffer
- public boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception
StorageManager
This is primarily a PagingStore
call, but as with any other call writing persistent
data, it must go through here. Both for the sake of replication, and also to ensure that it
takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
creation of dead-locks.
addToPage
in interface StorageManager
true
if we are paging and have handled the data, false
if the data
needs to be sent to the journalException
Copyright © 2016 JBoss by Red Hat. All rights reserved.