8.4. Clustered Listeners

Clustered listeners allow listeners to be used in a distributed cache configuration. In a distributed cache environment, registered local listeners are only notified of events that are local to the node where the event has occurred. Clustered listeners resolve this issue by allowing a single listener to receive any write notification that occurs in the cluster, regardless of where the event occurred. As a result, clustered listeners perform slower than non-clustered listeners, which only provide event notifications for the node on which the event occurs.
When using clustered listeners, client applications are notified when an entry is added, updated, expired, or deleted in a particular cache. The event is cluster-wide so that client applications can access the event regardless of the node on which the application resides or connects with.
The event will always be triggered on the node where the listener was registered, while disregarding where the cache update originated.

8.4.1. Configuring Clustered Listeners

In the following use case, listener stores events as it receives them.

Procedure 8.1. Clustered Listener Configuration

@Listener(clustered = true)
  protected static class ClusterListener {
     List<CacheEntryEvent> events = Collections.synchronizedList(new ArrayList<CacheEntryEvent>());
 
     @CacheEntryCreated
     @CacheEntryModified
     @CacheEntryExpired
     @CacheEntryRemoved
     public void onCacheEvent(CacheEntryEvent event) {
        log.debugf("Adding new cluster event %s", event);
        events.add(event);
     }
  }
 
  public void addClusterListener(Cache<?, ?> cache) {
     ClusterListener clusterListener = new ClusterListener();
     cache.addListener(clusterListener);
  }
  1. Clustered listeners are enabled by annotating the @Listener class with clustered=true.
  2. The following methods are annotated to allow client applications to be notified when entries are added, modified, expired, or removed.
    • @CacheEntryCreated
    • @CacheEntryModified
    • @CacheEntryExpired
    • @CacheEntryRemoved
  3. The listener is registered with a cache, with the option of passing on a filter or converter.
The following limitations occur when using clustered listeners, that do not apply to non-clustered listeners:
  • A cluster listener can only listen to entries that are created, modified, expired, or removed. No other events are listened to by a clustered listener.
  • Only post events are sent to a clustered listener, pre events are ignored.

8.4.2. The Cache Listener API

Clustered listeners can be added on top of the existing @CacheListener API via the addListener method.

Example 8.3. The Cache Listener API

cache.addListener(Object listener, Filter filter, Converter converter);

public @interface Listener {
  boolean clustered() default false;
  boolean includeCurrentState() default false;
  boolean sync() default true;
}


interface CacheEventFilter<K,V> {
  public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType);
}

interface CacheEventConverter<K,V,C> {
  public C convert(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType);
}
The Cache API
The local or clustered listener can be registered with the cache.addListener method, and is active until one of the following events occur.
  • The listener is explicitly unregistered by invoking cache.removeListener.
  • The node on which the listener was registered crashes.
Listener Annotation
The listener annotation is enhanced with three attributes:
  • clustered():This attribute defines whether the annotated listener is clustered or not. Note that clustered listeners can only be notified for @CacheEntryRemoved, @CacheEntryCreated, @CacheEntryExpired, and @CacheEntryModified events. This attribute is false by default.
  • includeCurrentState(): This attribute applies to clustered listeners only, and is false by default. When set to true, the entire existing state within the cluster is evaluated. When being registered, a listener will immediately be sent a CacheCreatedEvent for every entry in the cache.
oldValue and oldMetadata
The oldValue and oldMetadata values are extra methods on the accept method of CacheEventFilter and CacheEventConverter classes. They values are provided to any listener, including local listeners. For more information about these values, see the JBoss Data Grid API Documentation.
EventType
The EventType includes the type of event, whether it was a retry, and if it was a pre or post event.
When using clustered listeners, the order in which the cache is updated is reflected in the sequence of notifications received.
The clustered listener does not guarantee that an event is sent only once. The listener implementation must be idempotent in order to prevent situations where the same event is sent more than once. Implementors can expect singularity to be honored for stable clusters and outside of the time span in which synthetic events are generated as a result of includeCurrentState.

8.4.3. Clustered Listener Example

The following use case demonstrates a listener that wants to know when orders are generated that have a destination of New York, NY. The listener requires a Filter that filters all orders that come in and out of New York. The listener also requires a Converter as it does not require the entire order, only the date it is to be delivered.

Example 8.4. Use Case: Filtering and Converting the New York orders

class CityStateFilter implements CacheEventFilter<String, Order> {
   private final String state;
   private final String city;

   public boolean accept(String orderId, Order oldOrder, 
                         Metadata oldMetadata, Order newOrder, 
                         Metadata newMetadata, EventType eventType) {
      switch (eventType.getType()) {
         // Only send update if the order is going to our city
         case Type.CACHE_ENTRY_CREATED:
            return city.equals(newOrder.getCity()) && 
                   state.equals(newOrder.getState());
         // Only send update if our order has changed from our city to elsewhere or if is now going to our city
         case Type.CACHE_ENTRY_MODIFIED:
            if (city.equals(oldOrder.getCity()) && 
                state.equals(oldOrder.getState())) {
               // If old city matches then we have to compare if new order is no longer going to our city
               return !city.equals(newOrder.getCity()) || 
                      !state.equals(newOrder.getState());
            } else {
               // If the old city doesn't match ours then only send update if new update does match ours
               return city.equals(newOrder.getCity()) && 
                      state.equals(newOrder.getState());
            }
         // On remove we have to send update if our order was originally going to city
         case Type.CACHE_ENTRY_REMOVED:
            return city.equals(oldOrder.getCity()) && 
                   state.equals(oldOrder.getState());
      }
      return false;
   }
}

class OrderDateConverter implements CacheEventConverter<String, Order, Date> {
   private final String state;
   private final String city;

   public Date convert(String orderId, Order oldValue, 
                       Metadata oldMetadata, Order newValue, 
                       Metadata newMetadata, EventType eventType) {
      // If remove we do not care about date - this tells listener to remove its data
      if (eventType.isRemove()) {
         return null;
      } else if (eventType.isModified()) {
         if (state.equals(newValue.getState()) && 
             city.equals(newValue.getCity())) {
            // If it is a modification meaning the destination has changed to ours then we allow it
            return newValue.getDate();
         } else {
            // If destination is no longer our city it means it was changed from us so send null
            return null;
         }
      } else {
         // This was a create so we always send date
         return newValue.getDate();
      }
   }
}

8.4.4. Optimized Cache Filter Converter

The example provided in Section 8.4.3, “Clustered Listener Example” could use the optimized CacheEventFilterConverter, in order to perform the filtering and converting of results into one step.
The CacheEventFilterConverter is an optimization that allows the event filter and conversion to be performed in one step. This can be used when an event filter and converter are most efficiently used as the same object, composing the filtering and conversion in the same method. This can only be used in situations where your conversion will not return a null value, as a returned value of null indicates that the value did not pass the filter. To convert a null value, use the CacheEventFilter and the CacheEventConverter interfaces independently.
The following is an example of the New York orders use case using the CacheEventFilterConverter:

Example 8.5. CacheEventFilterConverter

class OrderDateFilterConverter extends AbstractCacheEventFilterConverter<String, Order, Date> {
    private final String state;
    private final String city;

    public Date filterAndConvert(String orderId, Order oldValue, 
                                 Metadata oldMetadata, Order newValue, 
                                 Metadata newMetadata, EventType eventType) {
        // Remove if the date is not required - this tells listener to remove its data
        if (eventType.isRemove()) {
            return null;
        } else if (eventType.isModified()) {
            if (state.equals(newValue.getState()) && 
                city.equals(newValue.getCity())) {
                // If it is a modification meaning the destination has changed to ours then we allow it
                return newValue.getDate();
            } else {
                // If destination is no longer our city it means it was changed from us so send null
                return null;
            }
        } else {
            // This was a create so we always send date
            return newValue.getDate();
        }
    }
}
When registering the listener, provide the FilterConverter as both arguments to the filter and converter:
OrderDateFilterConverter filterConverter = new OrderDateFilterConverter("NY", "New York");
cache.addListener(listener, filterConveter, filterConverter);