-
Language:
English
-
Language:
English
8.4. Clustered Listeners
Clustered listeners allow listeners to be used in a distributed cache configuration. In a distributed cache environment, registered local listeners are only notified of events that are local to the node where the event has occurred. Clustered listeners resolve this issue by allowing a single listener to receive any write notification that occurs in the cluster, regardless of where the event occurred. As a result, clustered listeners perform slower than non-clustered listeners, which only provide event notifications for the node on which the event occurs.
When using clustered listeners, client applications are notified when an entry is added, updated, expired, or deleted in a particular cache. The event is cluster-wide so that client applications can access the event regardless of the node on which the application resides or connects with.
The event will always be triggered on the node where the listener was registered, while disregarding where the cache update originated.
8.4.1. Configuring Clustered Listeners
In the following use case, listener stores events as it receives them.
Procedure 8.1. Clustered Listener Configuration
@Listener(clustered = true) protected static class ClusterListener { List<CacheEntryEvent> events = Collections.synchronizedList(new ArrayList<CacheEntryEvent>()); @CacheEntryCreated @CacheEntryModified @CacheEntryExpired @CacheEntryRemoved public void onCacheEvent(CacheEntryEvent event) { log.debugf("Adding new cluster event %s", event); events.add(event); } } public void addClusterListener(Cache<?, ?> cache) { ClusterListener clusterListener = new ClusterListener(); cache.addListener(clusterListener); }
- Clustered listeners are enabled by annotating the
@Listener
class withclustered=true
. - The following methods are annotated to allow client applications to be notified when entries are added, modified, expired, or removed.
@CacheEntryCreated
@CacheEntryModified
@CacheEntryExpired
@CacheEntryRemoved
- The listener is registered with a cache, with the option of passing on a filter or converter.
The following limitations occur when using clustered listeners, that do not apply to non-clustered listeners:
- A cluster listener can only listen to entries that are created, modified, expired, or removed. No other events are listened to by a clustered listener.
- Only post events are sent to a clustered listener, pre events are ignored.
8.4.2. The Cache Listener API
Clustered listeners can be added on top of the existing
@CacheListener API
via the addListener
method.
Example 8.3. The Cache Listener API
cache.addListener(Object listener, Filter filter, Converter converter); public @interface Listener { boolean clustered() default false; boolean includeCurrentState() default false; boolean sync() default true; } interface CacheEventFilter<K,V> { public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType); } interface CacheEventConverter<K,V,C> { public C convert(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType); }
- The Cache API
- The local or clustered listener can be registered with the
cache.addListener
method, and is active until one of the following events occur.- The listener is explicitly unregistered by invoking
cache.removeListener
. - The node on which the listener was registered crashes.
- Listener Annotation
- The listener annotation is enhanced with three attributes:
clustered()
:This attribute defines whether the annotated listener is clustered or not. Note that clustered listeners can only be notified for@CacheEntryRemoved
,@CacheEntryCreated
,@CacheEntryExpired
, and@CacheEntryModified
events. This attribute is false by default.includeCurrentState()
: This attribute applies to clustered listeners only, and is false by default. When set totrue
, the entire existing state within the cluster is evaluated. When being registered, a listener will immediately be sent aCacheCreatedEvent
for every entry in the cache.- Refer to Section 8.2.3, “About Synchronous and Asynchronous Notifications” for information regarding
sync()
.
oldValue
andoldMetadata
- The
oldValue
andoldMetadata
values are extra methods on the accept method ofCacheEventFilter
andCacheEventConverter
classes. They values are provided to any listener, including local listeners. For more information about these values, see the JBoss Data Grid API Documentation. EventType
- The
EventType
includes the type of event, whether it was a retry, and if it was a pre or post event.
When using clustered listeners, the order in which the cache is updated is reflected in the sequence of notifications received.
The clustered listener does not guarantee that an event is sent only once. The listener implementation must be idempotent in order to prevent situations where the same event is sent more than once. Implementors can expect singularity to be honored for stable clusters and outside of the time span in which synthetic events are generated as a result of
includeCurrentState
.
8.4.3. Clustered Listener Example
The following use case demonstrates a listener that wants to know when orders are generated that have a destination of New York, NY. The listener requires a Filter that filters all orders that come in and out of New York. The listener also requires a Converter as it does not require the entire order, only the date it is to be delivered.
Example 8.4. Use Case: Filtering and Converting the New York orders
class CityStateFilter implements CacheEventFilter<String, Order> { private final String state; private final String city; public boolean accept(String orderId, Order oldOrder, Metadata oldMetadata, Order newOrder, Metadata newMetadata, EventType eventType) { switch (eventType.getType()) { // Only send update if the order is going to our city case Type.CACHE_ENTRY_CREATED: return city.equals(newOrder.getCity()) && state.equals(newOrder.getState()); // Only send update if our order has changed from our city to elsewhere or if is now going to our city case Type.CACHE_ENTRY_MODIFIED: if (city.equals(oldOrder.getCity()) && state.equals(oldOrder.getState())) { // If old city matches then we have to compare if new order is no longer going to our city return !city.equals(newOrder.getCity()) || !state.equals(newOrder.getState()); } else { // If the old city doesn't match ours then only send update if new update does match ours return city.equals(newOrder.getCity()) && state.equals(newOrder.getState()); } // On remove we have to send update if our order was originally going to city case Type.CACHE_ENTRY_REMOVED: return city.equals(oldOrder.getCity()) && state.equals(oldOrder.getState()); } return false; } } class OrderDateConverter implements CacheEventConverter<String, Order, Date> { private final String state; private final String city; public Date convert(String orderId, Order oldValue, Metadata oldMetadata, Order newValue, Metadata newMetadata, EventType eventType) { // If remove we do not care about date - this tells listener to remove its data if (eventType.isRemove()) { return null; } else if (eventType.isModified()) { if (state.equals(newValue.getState()) && city.equals(newValue.getCity())) { // If it is a modification meaning the destination has changed to ours then we allow it return newValue.getDate(); } else { // If destination is no longer our city it means it was changed from us so send null return null; } } else { // This was a create so we always send date return newValue.getDate(); } } }
8.4.4. Optimized Cache Filter Converter
The example provided in Section 8.4.3, “Clustered Listener Example” could use the optimized
CacheEventFilterConverter
, in order to perform the filtering and converting of results into one step.
The
CacheEventFilterConverter
is an optimization that allows the event filter and conversion to be performed in one step. This can be used when an event filter and converter are most efficiently used as the same object, composing the filtering and conversion in the same method. This can only be used in situations where your conversion will not return a null value, as a returned value of null indicates that the value did not pass the filter. To convert a null value, use the CacheEventFilter
and the CacheEventConverter
interfaces independently.
The following is an example of the New York orders use case using the
CacheEventFilterConverter
:
Example 8.5. CacheEventFilterConverter
class OrderDateFilterConverter extends AbstractCacheEventFilterConverter<String, Order, Date> { private final String state; private final String city; public Date filterAndConvert(String orderId, Order oldValue, Metadata oldMetadata, Order newValue, Metadata newMetadata, EventType eventType) { // Remove if the date is not required - this tells listener to remove its data if (eventType.isRemove()) { return null; } else if (eventType.isModified()) { if (state.equals(newValue.getState()) && city.equals(newValue.getCity())) { // If it is a modification meaning the destination has changed to ours then we allow it return newValue.getDate(); } else { // If destination is no longer our city it means it was changed from us so send null return null; } } else { // This was a create so we always send date return newValue.getDate(); } } }
When registering the listener, provide the
FilterConverter
as both arguments to the filter and converter:
OrderDateFilterConverter filterConverter = new OrderDateFilterConverter("NY", "New York"); cache.addListener(listener, filterConveter, filterConverter);