Chapter 10. The Notification/Listener API

10.1. The Notification/Listener API

Red Hat JBoss Data Grid provides a listener API that provides notifications for events as they occur. Clients can choose to register with the listener API for relevant notifications. This annotation-driven API operates on cache-level events and cache manager-level events.

10.2. Listener Example

The following example defines a listener in Red Hat JBoss Data Grid that prints some information each time a new entry is added to the cache:

Configuring a Listener

@Listener
public class PrintWhenAdded {
  @CacheEntryCreated
  public void print(CacheEntryCreatedEvent event) {
    System.out.println("New entry " + event.getKey() + " created in the cache");
  }
}

10.3. Listener Notifications

10.3.1. Listener Notifications

Each cache event triggers a notification that is dispatched to listeners. A listener is a simple POJO annotated with @Listener. A Listenable is an interface that denotes that the implementation can have listeners attached to it. Each listener is registered using methods defined in the Listenable.

A listener can be attached to both the cache and Cache Manager to allow them to receive cache-level or cache manager-level notifications.

10.3.2. About Cache-level Notifications

In Red Hat JBoss Data Grid, cache-level events occur on a per-cache basis. Examples of cache-level events include the addition, removal and modification of entries, which trigger notifications to listeners registered on the relevant cache.

10.3.3. Cache Manager-level Notifications

Examples of events that occur in Red Hat JBoss Data Grid at the cache manager-level are:

  • The starting and stopping of caches
  • Nodes joining or leaving a cluster;

Cache manager-level events are located globally and used cluster-wide, but are restricted to events within caches created by a single cache manager.

The first two types of events, CacheStarted and CacheStopped are highly similar, and the following example demonstrates printing out the name of the cache that has started or stopped:

@CacheStarted
public void cacheStarted(CacheStartedEvent event){
    // Print the name of the Cache that started
    log.info("Cache Started: " + event.getCacheName());
}

@CacheStopped
public void cacheStopped(CacheStoppedEvent event){
    // Print the name of the Cache that stopped
    log.info("Cache Stopped: " + event.getCacheName());
}

When receiving a ViewChangedEvent or MergeEvent note that the list of old and new members is from the node that generated the event. For instance, consider the following scenario:

  • A JDG Cluster currently consists of nodes A, B, and C.
  • Node D joins the cluster.
  • Nodes A, B, and C will receive a ViewChangedEvent with [A,B,C] as the list of old members, and [A,B,C,D] as the list of new members.
  • Node D will receive a ViewChangedEvent with [D] as the list of old members, and [A,B,C,D] as the list of new members.

Therefore, a set intersection may be used to determine if a node has recently joined or left a cluster. By using getOldMembers() in conjunction with getNewMembers(), we may determine the set of nodes that have joined or left the cluster, as seen below:

@ViewChanged
public void viewChanged(ViewChangedEvent event){
    HashSet<Address> oldMembers = new HashSet(event.getOldMembers());
    HashSet<Address> newMembers = new HashSet(event.getNewMembers());
    HashSet<Address> oldCopy = (HashSet<Address>)oldMembers.clone();

    // Remove all new nodes from the old view.
    // The resulting set indicates nodes that have left the cluster.
    oldCopy.removeAll(newMembers);
    if(oldCopy.size() > 0){
        for (Address oldAdd : oldCopy){
            log.info("Node left:" + oldAdd.toString());
        }
    }

    // Remove all old nodes from the new view.
    // The resulting set indicates nodes that have joined the cluster.
    newMembers.removeAll(oldMembers);
    if(newMembers.size() > 0){
        for(Address newAdd : newMembers){
            log.info("Node joined: " + newAdd.toString());
        }
    }
}

Similar logic may be used during a MergeEvent to determine the new set of members in the cluster.

10.3.4. About Synchronous and Asynchronous Notifications

By default, notifications in Red Hat JBoss Data Grid are dispatched in the same thread that generates the event. Therefore the listener must be written in a way that does not block or prevent the thread’s progression.

Alternatively, the listener can be annotated as asynchronous, which dispatches notifications in a separate thread and prevents blocking the operations of the original thread.

Annotate listeners using the following:

@Listener (sync = false)
public class MyAsyncListener { .... }

Use the asyncListenerExecutor element in the XML configuration file to tune the thread pool that is used to dispatch asynchronous notifications.

Important

When using a synchronous, non-clustered listener that handles the CacheEntryExpiredEvent ensure that this listener does not block execution, as the expiration reaper is also synchronous in a non-clustered environment.

10.4. Modifying Cache Entries

10.4.1. Modifying Cache Entries

After the cache entry has been created, the cache entry can be modified programmatically.

10.4.2. Cache Entry Modified Listener Configuration

In a cache entry modified listener event, The getValue() method’s behavior is specific to whether the callback is triggered before or after the actual operation has been performed. For example, if event.isPre() is true, then event.getValue() would return the old value, prior to modification. If event.isPre() is false, then event.getValue() would return new value. If the event is creating and inserting a new entry, the old value would be null. For more information about isPre(), see the Red Hat JBoss Data Grid API Documentation's listing for the org.infinispan.notifications.cachelistener.event package.

Listeners can only be configured programmatically by using the methods exposed by the Listenable and FilteringListenable interfaces (which the Cache object implements).

10.4.3. Cache Entry Modified Listener Example

The following example defines a listener in Red Hat JBoss Data Grid that prints some information each time a cache entry is modified:

Modified Listener

@Listener
public class PrintWhenModified {

  @CacheEntryModified
  public void print(CacheEntryModifiedEvent event) {
    System.out.println("Cache entry modified. Details = " + event);
  }

}

10.5. Clustered Listeners

10.5.1. Clustered Listeners

Clustered listeners allow listeners to be used in a distributed cache configuration. In a distributed cache environment, registered local listeners are only notified of events that are local to the node where the event has occurred. Clustered listeners resolve this issue by allowing a single listener to receive any write notification that occurs in the cluster, regardless of where the event occurred. As a result, clustered listeners perform slower than non-clustered listeners, which only provide event notifications for the node on which the event occurs.

When using clustered listeners, client applications are notified when an entry is added, updated, expired, or deleted in a particular cache. The event is cluster-wide so that client applications can access the event regardless of the node on which the application resides or connects with.

The event will always be triggered on the node where the listener was registered, while disregarding where the cache update originated.

10.5.2. Configuring Clustered Listeners

In the following use case, listener stores events as it receives them.

Procedure: Clustered Listener Configuration

@Listener(clustered = true)
  protected static class ClusterListener {
     List<CacheEntryEvent> events = Collections.synchronizedList(new ArrayList<CacheEntryEvent>());

     @CacheEntryCreated
     @CacheEntryModified
     @CacheEntryExpired
     @CacheEntryRemoved
     public void onCacheEvent(CacheEntryEvent event) {
        log.debugf("Adding new cluster event %s", event);
        events.add(event);
     }
  }

  public void addClusterListener(Cache<?, ?> cache) {
     ClusterListener clusterListener = new ClusterListener();
     cache.addListener(clusterListener);
  }

  1. Clustered listeners are enabled by annotating the @Listener class with clustered=true.
  2. The following methods are annotated to allow client applications to be notified when entries are added, modified, expired, or removed.

    • @CacheEntryCreated
    • @CacheEntryModified
    • @CacheEntryExpired
    • @CacheEntryRemoved
  3. The listener is registered with a cache, with the option of passing on a filter or converter.

The following limitations occur when using clustered listeners, that do not apply to non-clustered listeners:

  • A cluster listener can only listen to entries that are created, modified, expired, or removed. No other events are listened to by a clustered listener.
  • Only post events are sent to a clustered listener, pre events are ignored.

10.5.3. The Cache Listener API

Clustered listeners can be added on top of the existing @CacheListener API via the addListener method.

The Cache Listener API

cache.addListener(Object listener, Filter filter, Converter converter);

public @interface Listener {
  boolean clustered() default false;
  boolean includeCurrentState() default false;
  boolean sync() default true;
}
interface CacheEventFilter<K,V> {
  public boolean accept(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType);
}
interface CacheEventConverter<K,V,C> {
  public C convert(K key, V oldValue, Metadata oldMetadata, V newValue, Metadata newMetadata, EventType eventType);
}
The Cache API

The local or clustered listener can be registered with the cache.addListener method, and is active until one of the following events occur.

  • The listener is explicitly unregistered by invoking cache.removeListener.
  • The node on which the listener was registered crashes.
Listener Annotation

The listener annotation is enhanced with three attributes:

  • clustered():This attribute defines whether the annotated listener is clustered or not. Note that clustered listeners can only be notified for @CacheEntryRemoved, @CacheEntryCreated, @CacheEntryExpired, and @CacheEntryModified events. This attribute is false by default.
  • includeCurrentState(): This attribute applies to clustered listeners only, and is false by default. When set to true, the entire existing state within the cluster is evaluated. When being registered, a listener will immediately be sent a CacheCreatedEvent for every entry in the cache.
  • Refer to About Synchronous and Asynchronous Notifications for information regarding sync().
oldValue and oldMetadata
The oldValue and oldMetadata values are extra methods on the accept method of CacheEventFilter and CacheEventConverter classes. They values are provided to any listener, including local listeners. For more information about these values, see the JBoss Data Grid API Documentation .
EventType
The EventType includes the type of event, whether it was a retry, and if it was a pre or post event.

When using clustered listeners, the order in which the cache is updated is reflected in the sequence of notifications received.

The clustered listener does not guarantee that an event is sent only once. The listener implementation must be idempotent in order to prevent situations where the same event is sent more than once. Implementors can expect singularity to be honored for stable clusters and outside of the time span in which synthetic events are generated as a result of includeCurrentState.

10.5.4. Clustered Listener Example

The following use case demonstrates a listener that wants to know when orders are generated that have a destination of New York, NY. The listener requires a Filter that filters all orders that come in and out of New York. The listener also requires a Converter as it does not require the entire order, only the date it is to be delivered.

Use Case: Filtering and Converting the New York orders

class CityStateFilter implements CacheEventFilter<String, Order> {
    private String state;
    private String city;

    public boolean accept(String orderId, Order oldOrder,
                          Metadata oldMetadata, Order newOrder,
                          Metadata newMetadata, EventType eventType) {
        switch (eventType.getType()) {
            // Only send update if the order is going to our city
            case CACHE_ENTRY_CREATED:
                return city.equals(newOrder.getCity()) &&
                        state.equals(newOrder.getState());
            // Only send update if our order has changed from our city to elsewhere or if is now going to our city
            case CACHE_ENTRY_MODIFIED:
                if (city.equals(oldOrder.getCity()) &&
                        state.equals(oldOrder.getState())) {
                    // If old city matches then we have to compare if new order is no longer going to our city
                    return !city.equals(newOrder.getCity()) ||
                            !state.equals(newOrder.getState());
                } else {
                    // If the old city doesn't match ours then only send update if new update does match ours
                    return city.equals(newOrder.getCity()) &&
                            state.equals(newOrder.getState());
                }
                // On remove we have to send update if our order was originally going to city
            case CACHE_ENTRY_REMOVED:
                return city.equals(oldOrder.getCity()) &&
                        state.equals(oldOrder.getState());
        }
        return false;
    }
}

class OrderDateConverter implements CacheEventConverter<String, Order, Date> {
    private String state;
    private String city;

    public Date convert(String orderId, Order oldValue,
                        Metadata oldMetadata, Order newValue,
                        Metadata newMetadata, EventType eventType) {
        // If remove we do not care about date - this tells listener to remove its data
        if (eventType.isRemove()) {
            return null;
        } else if (eventType.isModified()) {
            if (state.equals(newValue.getState()) &&
                    city.equals(newValue.getCity())) {
                // If it is a modification meaning the destination has changed to ours then we allow it
                return newValue.getDate();
            } else {
                // If destination is no longer our city it means it was changed from us so send null
                return null;
            }
        } else {
            // This was a create so we always send date
            return newValue.getDate();
        }
    }
}

10.5.5. Optimized Cache Filter Converter

The example provided in Clustered Listener Example could use the optimized CacheEventFilterConverter, in order to perform the filtering and converting of results into one step.

The CacheEventFilterConverter is an optimization that allows the event filter and conversion to be performed in one step. This can be used when an event filter and converter are most efficiently used as the same object, composing the filtering and conversion in the same method. This can only be used in situations where your conversion will not return a null value, as a returned value of null indicates that the value did not pass the filter. To convert a null value, use the CacheEventFilter and the CacheEventConverter interfaces independently.

The following is an example of the New York orders use case using the CacheEventFilterConverter:

CacheEventFilterConverter

class OrderDateFilterConverter extends AbstractCacheEventFilterConverter<String, Order, Date> {
    private final String state;
    private final String city;

    public Date filterAndConvert(String orderId, Order oldValue,
                                 Metadata oldMetadata, Order newValue,
                                 Metadata newMetadata, EventType eventType) {
        // Remove if the date is not required - this tells listener to remove its data
        if (eventType.isRemove()) {
            return null;
        } else if (eventType.isModified()) {
            if (state.equals(newValue.getState()) &&
                city.equals(newValue.getCity())) {
                // If it is a modification meaning the destination has changed to ours then we allow it
                return newValue.getDate();
            } else {
                // If destination is no longer our city it means it was changed from us so send null
                return null;
            }
        } else {
            // This was a create so we always send date
            return newValue.getDate();
        }
    }
}

When registering the listener, provide the FilterConverter as both arguments to the filter and converter:

OrderDateFilterConverter filterConverter = new OrderDateFilterConverter("NY", "New York");
cache.addListener(listener, filterConveter, filterConverter);

10.6. Remote Event Listeners (Hot Rod)

10.6.1. Remote Event Listeners (Hot Rod)

Event listeners allow Red Hat JBoss Data Grid Hot Rod servers to be able to notify remote clients of events such as CacheEntryCreated, CacheEntryModified, CacheEntryExpired and CacheEntryRemoved. Clients can choose whether or not to listen to these events to avoid flooding connected clients. This assumes that clients maintain persistent connections to the servers.

Client listeners for remote events can be added similarly to clustered listeners in library mode. The following example demonstrates a remote client listener that prints out each event it receives.

Event Print Listener

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class EventLogListener {

   @ClientCacheEntryCreated
   public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryModified
   public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryExpired
   public void handleExpiredEvent(ClientCacheEntryExpiredEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryRemoved
   public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) {
      System.out.println(e);
   }

}

  • ClientCacheEntryCreatedEvent and ClientCacheEntryModifiedEvent instances provide information on the key and version of the entry. This version can be used to invoke conditional operations on the server, such a replaceWithVersion or removeWithVersion.
  • ClientCacheEntryExpiredEvent events are sent when either a get() is called on an expired entry, or when the expiration reaper detects that an entry has expired. Once the entry has expired the cache will nullify the entry, and adjust its size appropriately; however, the event will only be generated in the two scenarios listed.
  • ClientCacheEntryRemovedEvent events are only sent when the remove operation succeeds. If a remove operation is invoked and no entry is found or there are no entries to remove, no event is generated. If users require remove events regardless of whether or not they are successful, a customized event logic can be created.
  • All client cache entry created, modified, and removed events provide a boolean isCommandRetried() method that will return true if the write command that caused it has to be retried due to a topology change. This indicates that the event has been duplicated or that another event was dropped and replaced, such as where a Modified event replaced a Created event.
Important

If the expected workload favors writes over reads it will be necessary to filter the events sent to prevent a large amount of excessive traffic being generated which may cause issues on either the client or the network. For more details on filtering events refer to .

10.6.2. Adding and Removing Event Listeners

Registering an Event Listener with the Server

The following example registers the Event Print Listener with the server. See Event Print Listener .

Adding an Event Listener

RemoteCache<Integer, String> cache = rcm.getCache();
cache.addClientListener(new EventLogListener());

Removing a Client Event Listener

A client event listener can be removed as follows

EventLogListener listener = ...
cache.removeClientListener(listener);

10.6.3. Remote Event Client Listener Example

The following procedure demonstrates the steps required to configure a remote client listener to interact with the remote cache via Hot Rod.

Configuring Remote Event Listeners

  1. Download the Red Hat JBoss Data Grid distribution from the Red Hat Customer Portal

    The latest JBoss Data Grid distribution includes the Hot Rod server with which the client will communicate.

  2. Start the server

    Start the JBoss Data Grid server by using the following command from the root of the server.

    $ ./bin/standalone.sh
  3. Write the application to interact with the Hot Rod server

    1. Maven Users

      Create an application with the following dependency and change the version to 8.5.3.Final-redhat-00002 or later:

      <properties>
        <infinispan.version>8.5.3.Final-redhat-00002</infinispan.version>
      </properties>
      [...]
      <dependency>
        <groupId>org.infinispan</groupId>
        <artifactId>infinispan-remote</artifactId>
        <version>${infinispan.version}</version>
      </dependency>
    2. Non-Maven users, adjust according to your chosen build tool or download the distribution containing all JBoss Data Grid jars.
  4. Write the client application

    The following demonstrates a simple remote event listener that logs all events received.

    import org.infinispan.client.hotrod.annotation.*;
    import org.infinispan.client.hotrod.event.*;
    
    @ClientListener
    public class EventLogListener {
    
     @ClientCacheEntryCreated
     @ClientCacheEntryModified
     @ClientCacheEntryRemoved
     public void handleRemoteEvent(ClientEvent event) {
       System.out.println(event);
      }
    
    }
  5. Use the remote event listener to execute operations against the remote cache

    The following example demonstrates a simple main java class, which adds the remote event listener and executes some operations against the remote cache.

    RemoteCacheManager rcm = new RemoteCacheManager();
    RemoteCache<Integer, String> cache = rcm.getCache();
    EventLogListener listener = new EventLogListener();
    try {
     cache.addClientListener(listener);
     cache.put(1, "one");
     cache.put(1, "new-one");
     cache.remove(1);
    } finally {
     cache.removeClientListener(listener);
    }

Result

Once executed, the console output should appear similar to the following:

ClientCacheEntryCreatedEvent(key=1,dataVersion=1)
ClientCacheEntryModifiedEvent(key=1,dataVersion=2)
ClientCacheEntryRemovedEvent(key=1)

The output indicates that by default, events come with the key and the internal data version associated with current value. The actual value is not sent back to the client for performance reasons. Receiving remote events has a performance impact, which is increased with cache size, as more operations are executed. To avoid inundating Hot Rod clients, filter remote events on the server side, or customize the event contents.

10.6.4. Filtering Remote Events

10.6.4.1. Filtering Remote Events

To prevent clients being inundated with events, Red Hat JBoss Data Grid Hot Rod remote events can be filtered by providing key/value filter factories that create instances that filter which events are sent to clients, and how these filters can act on client provided information.

Sending events to remote clients has a performance cost, which increases with the number of clients with registered remote listeners. The performance impact also increases with the number of modifications that are executed against the cache.

The performance cost can be reduced by filtering the events being sent on the server side. Custom code can be used to exclude certain events from being broadcast to the remote clients to improve performance.

Filtering can be based on either key or value information, or based on cache entry metadata. To enable filtering, a cache event filter factory that produces filter instances must be created. The following is a sample implementation that filters key “2” out of the events sent to clients.

KeyValueFilter

package sample;

import java.io.Serializable;
import org.infinispan.notifications.cachelistener.filter.*;
import org.infinispan.metadata.*;

@NamedFactory(name = "basic-filter-factory")
public class BasicKeyValueFilterFactory implements CacheEventFilterFactory {
  @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) {
    return new BasicKeyValueFilter();
  }

    static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable {
      @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
        return !"2".equals(key);
      }
    }
}

In order to register a listener with this key value filter factory, the factory must be given a unique name, and the Hot Rod server must be plugged with the name and the cache event filter factory instance.

10.6.4.2. Custom Filters for Remote Events

Custom filters can improve performance by excluding certain event information from being broadcast to the remote clients.

To plug the JBoss Data Grid Server with a custom filter use the following procedure:

Using a Custom Filter

  1. Create a JAR file with the filter implementation within it. Each factory must have a name assigned to it via the org.infinispan.filter.NamedFactory annotation. The example uses a KeyValueFilterFactory.
  2. Create a META-INF/services/org.infinispan.notifications.cachelistener.filter. CacheEventFilterFactory file within the JAR file, and within it write the fully qualified class name of the filter class implementation.
  3. Deploy the JAR file in the JBoss Data Grid Server by performing any of the following options:

    • Option 1: Deploy the JAR through the deployment scanner

      • Copy the JAR to the $JDG_HOME/standalone/deployments/ directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
    • Option 2: Deploy the JAR through the CLI

      • Connect to the desired instance with the CLI:

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • Once connected execute the deploy command:

        deploy /path/to/artifact.jar
    • Option 3: Deploy the JAR as a custom module

      • Connect to the JDG server by running the below command:

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • The jar containing the Custom Filter must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Filter:

        module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
      • In a different window add the newly added module as a dependency to the org.infinispan module by editing $JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml. In this file add the following entry:

        <dependencies>
          [...]
          <module name="$MODULE-NAME">
        </dependencies>
      • Restart the JDG server.

Once the server is plugged with the filter, add a remote client listener that will use the filter. The following example extends the EventLogListener implementation provided in Remote Event Client Listener Example (See Remote Event Client Listener Example), and overrides the @ClientListener annotation to indicate the filter factory to use with the listener.

Add Filter Factory to the Listener

@org.infinispan.client.hotrod.annotation.ClientListener(filterFactoryName = "basic-filter-factory")
public class BasicFilteredEventLogListener extends EventLogListener {}

The listener can now be added via the RemoteCacheAPI. The following example demonstrates this, and executes some operations against the remote cache.

Register the Listener with the Server

import org.infinispan.client.hotrod.*;

RemoteCacheManager rcm = new RemoteCacheManager();
RemoteCache<Integer, String> cache = rcm.getCache();
BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener();
try {
  cache.addClientListener(listener);
  cache.putIfAbsent(1, "one");
  cache.replace(1, "new-one");
  cache.putIfAbsent(2, "two");
  cache.replace(2, "new-two");
  cache.putIfAbsent(3, "three");
  cache.replace(3, "new-three");
  cache.remove(1);
  cache.remove(2);
  cache.remove(3);
} finally {
  cache.removeClientListener(listener);
}

The system output shows that the client receives events for all keys except those that have been filtered.

Result

The following demonstrates the resulting system output from the provided example.

ClientCacheEntryCreatedEvent(key=1,dataVersion=1)
ClientCacheEntryModifiedEvent(key=1,dataVersion=2)
ClientCacheEntryCreatedEvent(key=3,dataVersion=5)
ClientCacheEntryModifiedEvent(key=3,dataVersion=6)
ClientCacheEntryRemovedEvent(key=1)
ClientCacheEntryRemovedEvent(key=3)
Important

Filter instances must be marshallable when they are deployed in a cluster in order for filtering to occur where the event is generated, even if the event is generated in a different node to where the listener is registered. To make them marshallable, either make them extend Serializable, Externalizable, or provide a custom Externalizer.

10.6.4.3. Enhanced Filter Factories

When adding client listeners, users can provide parameters to the filter factory in order to generate different filter instances with different behaviors from a single filter factory based on client-side information.

The following configuration demonstrates how to enhance the filter factory so that it can filter dynamically based on the key provided when adding the listener, rather than filtering on a statically given key.

Configuring an Enhanced Filter Factory

package sample;

import java.io.Serializable;
import org.infinispan.notifications.cachelistener.filter.*;
import org.infinispan.metadata.*;

@NamedFactory(name = "basic-filter-factory")
public class BasicKeyValueFilterFactory implements CacheEventFilterFactory {
  @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) {
    return new BasicKeyValueFilter(params);
}

  static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable {
    private final Object[] params;
    public BasicKeyValueFilter(Object[] params) { this.params = params; }
    @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
      return !params[0].equals(key);
    }
  }
}

The filter can now filter by “3” instead of “2”:

Running an Enhanced Filter Factory

import org.infinispan.client.hotrod.*;

RemoteCacheManager rcm = new RemoteCacheManager();
RemoteCache<Integer, String> cache = rcm.getCache();
BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener();
try {
  cache.addClientListener(listener, new Object[]{3}, null); // <- Filter parameter passed
  cache.putIfAbsent(1, "one");
  cache.replace(1, "new-one");
  cache.putIfAbsent(2, "two");
  cache.replace(2, "new-two");
  cache.putIfAbsent(3, "three");
  cache.replace(3, "new-three");
  cache.remove(1);
  cache.remove(2);
  cache.remove(3);
} finally {
  cache.removeClientListener(listener);
}

Result

The provided example results in the following output:

ClientCacheEntryCreatedEvent(key=1,dataVersion=1)
ClientCacheEntryModifiedEvent(key=1,dataVersion=2)
ClientCacheEntryCreatedEvent(key=2,dataVersion=3)
ClientCacheEntryModifiedEvent(key=2,dataVersion=4)
ClientCacheEntryRemovedEvent(key=1)
ClientCacheEntryRemovedEvent(key=2)

The amount of information sent to clients can be further reduced or increased by customizing remote events.

10.6.5. Customizing Remote Events

10.6.5.1. Customizing Remote Events

In Red Hat JBoss Data Grid, Hot Rod remote events can be customized to contain the information required to be sent to a client. By default, events contain only a basic set of information, such as a key and type of event, in order to avoid overloading the client, and to reduce the cost of sending them.

The information included in these events can be customized to contain more information, such as values, or contain even less information. Customization is done via CacheEventConverter instances, which are created by implementing a CacheEventConverterFactory class. Each factory must have a name associated to it via the @NamedFactory annotation.

To plug the Red Hat JBoss Data Grid Server with an event converter use the following procedure:

Using a Converter

  1. Create a JAR file with the converter implementation within it. Each factory must have a name assigned to it via the org.infinispan.filter.NamedFactory annotation.
  2. Create a META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory file within the JAR file and within it, write the fully qualified class name of the converter class implementation.
  3. Deploy the JAR file in the Red Hat JBoss Data Grid Server by performing any of the following options:

    • Option 1: Deploy the JAR through the deployment scanner

      • Copy the JAR to the $JDG_HOME/standalone/deployments/ directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
    • Option 2: Deploy the JAR through the CLI

      • Connect to the desired instance with the CLI:

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • Once connected execute the deploy command:

        deploy /path/to/artifact.jar
    • Option 3: Deploy the JAR as a custom module

      • Connect to the JDG server by running the below command:

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • The jar containing the Custom Converter must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Converter:

        module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
      • In a different window add the newly added module as a dependency to the org.infinispan module by editing $JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml. In this file add the following entry:

        <dependencies>
          [...]
          <module name="$MODULE-NAME">
        </dependencies>
      • Restart the JDG server.

Converters can also act on client provided information, allowing converter instances to customize events based on the information provided when the listener was added. The API allows converter parameters to be passed in when the listener is added.

10.6.5.2. Adding a Converter

When a listener is added, the name of a converter factory can be provided to use with the listener. When the listener is added, the server looks up the factory and invokes the getConverter method to get a org.infinispan.filter.Converter class instance to customize events server side.

The following example demonstrates sending custom events containing value information to remote clients for a cache of Integers and Strings. The converter generates a new custom event, which includes the value as well as the key in the event. The custom event has a bigger event payload compared with default events, however if combined with filtering, it can reduce bandwidth cost.

Sending Custom Events

import org.infinispan.notifications.cachelistener.filter.*;

@NamedFactory(name = "value-added-converter-factory")
class ValueAddedConverterFactory implements CacheEventConverterFactory {
  // The following types correspond to the Key, Value, and the returned Event, respectively.
  public CacheEventConverter<Integer, String, ValueAddedEvent> getConverter(final Object[] params) {
    return new ValueAddedConverter();
  }

  static class ValueAddedConverter implements CacheEventConverter<Integer, String, ValueAddedEvent> {
    public ValueAddedEvent convert(Integer key, String oldValue,
                                   Metadata oldMetadata, String newValue,
                                   Metadata newMetadata, EventType eventType) {
      return new ValueAddedEvent(key, newValue);
    }
  }
}

// Must be Serializable or Externalizable.
class ValueAddedEvent implements Serializable {
    final Integer key;
    final String value;
    ValueAddedEvent(Integer key, String value) {
      this.key = key;
      this.value = value;
    }
}

10.6.5.3. Lightweight Events

Other converter implementations are able to send back events that contain no key or event type information, resulting in extremely lightweight events at the expense of having rich information provided by the event.

In order to plug the server with this converter, deploy the converter factory and associated converter class within a JAR file including a service definition inside the META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory file as follows:

sample.ValueAddedConverterFactor

The client listener must then be linked with the converter factory by adding the factory name to the @ClientListener annotation.

@ClientListener(converterFactoryName = "value-added-converter-factory")
public class CustomEventLogListener { ... }

10.6.5.4. Dynamic Converter Instances

Dynamic converter instances convert based on parameters provided when the listener is registered. Converters use the parameters received by the converter factories to enable this option. For example:

Dynamic Converter

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

class DynamicCacheEventConverterFactory implements CacheEventConverterFactory {
   // The following types correspond to the Key, Value, and the returned Event, respectively.
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return new DynamicCacheEventConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent convert(Integer key, String oldValue, Metadata metadata, String newValue, Metadata prevMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new ValueAddedEvent(key, null);

      return new ValueAddedEvent(key, newValue);
   }
}

The dynamic parameters required to do the conversion are provided when the listener is registered:

RemoteCache<Integer, String> cache = rcm.getCache();
cache.addClientListener(new EventLogListener(), null, new Object[]{1});

10.6.5.5. Adding a Remote Client Listener for Custom Events

Implementing a listener for custom events is slightly different to other remote events, as they involve non-default events. The same annotations are used as in other remote client listener implementations, but the callbacks receive instances of ClientCacheEntryCustomEvent<T>, where T is the type of custom event we are sending from the server. For example:

Custom Event Listener Implementation

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener(converterFactoryName = "value-added-converter-factory")
public class CustomEventLogListener {

    @ClientCacheEntryCreated
    @ClientCacheEntryModified
    @ClientCacheEntryRemoved
    public void handleRemoteEvent(ClientCacheEntryCustomEvent<ValueAddedEvent> event)
    {
        System.out.println(event);
    }
}

To use the remote event listener to execute operations against the remote cache, write a simple main Java class, which adds the remote event listener and executes some operations against the remote cache. For example:

Execute Operations against the Remote Cache

import org.infinispan.client.hotrod.*;

RemoteCacheManager rcm = new RemoteCacheManager();
RemoteCache<Integer, String> cache = rcm.getCache();
CustomEventLogListener listener = new CustomEventLogListener();
try {
  cache.addClientListener(listener);
  cache.put(1, "one");
  cache.put(1, "new-one");
  cache.remove(1);
} finally {
  cache.removeClientListener(listener);
}

Result

Once executed, the console output should appear similar to the following:

ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='one'}, eventType=CLIENT_CACHE_ENTRY_CREATED)
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='new-one'}, eventType=CLIENT_CACHE_ENTRY_MODIFIED)
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='null'}, eventType=CLIENT_CACHE_ENTRY_REMOVED
Important

Converter instances must be marshallable when they are deployed in a cluster in order for conversion to occur where the event is generated, even if the event is generated in a different node to where the listener is registered. To make them marshallable, either make them extend Serializable, Externalizable, or provide a custom Externalizer for them. Both client and server need to be aware of any custom event type and be able to marshall it in order to facilitate both server and client writing against type safe APIs. On the client side, this is done by an optional marshaller configurable via the RemoteCacheManager. On the server side, this is done by a marshaller added to the Hot Rod server configuration.

10.6.6. Event Marshalling

When filtering or customizing events, the KeyValueFilter and Converter instances must be marshallable. As the client listener is installed in a cluster, the filter and/or converter instances are sent to other nodes in the cluster in order for filtering and conversion to occur where the event originates, improving efficiency. These classes can be made marshallable by having them extend Serializable or by providing and registering a custom Externalizer.

To deploy a Marshaller instance server-side, use a similar method to that used for filtering and customized events.

Deploying a Marshaller

  1. Create a JAR file with the converter implementation within it. Each factory must have a name assigned to it via the org.infinispan.filter.NamedFactory annotation.
  2. Create a META-INF/services/org.infinispan.commons.marshall.Marshaller file within the JAR file and within it, write the fully qualified class name of the marshaller class implementation
  3. Deploy the JAR file in the Red Hat JBoss Data Grid by performing any of the following options:

    • Option 1: Deploy the JAR through the deployment scanner

      • Copy the JAR to the $JDG_HOME/standalone/deployments/ directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
    • Option 2: Deploy the JAR through the CLI

      • Connect to the desired instance with the CLI:

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • Once connected execute the deploy command:

        deploy /path/to/artifact.jar
    • Option 3: Deploy the JAR as a custom module

      • Connect to the JDG server by running the below command:

        [$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
      • The jar containing the Custom Marshaller must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Marshaller:

        module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
      • In a different window add the newly added module as a dependency to the org.infinispan module by editing $JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml. In this file add the following entry:

        <dependencies>
          [...]
          <module name="$MODULE-NAME">
        </dependencies>
      • Restart the JDG server.

The Marshaller can be deployed either in a separate jar, or in the same jar as the CacheEventConverter, and/or CacheEventFilter instances.

Note

Only the deployment of a single Marshaller instance is supported. If multiple marshaller instances are deployed, warning messages will be displayed as a reminder indicating which marshaller instance will be used.

10.6.7. Remote Event Clustering and Failover

When a client adds a remote listener, it is installed in a single node in the cluster, which is in charge of sending events back to the client for all affected operations that occur cluster-wide.

In a clustered environment, when the node containing the listener goes down, the Hot Rod client implementation transparently fails over the client listener registration to a different node. This may result in a gap in event consumption, which can be solved using one of the following solutions.

State Delivery

The @ClientListener annotation has an optional includeCurrentState parameter, which when enabled, has the server send CacheEntryCreatedEvent event instances for all existing cache entries to the client. As this behavior is driven by the client it detects when the node where the listener is registered goes offline and automatically registers the listener on another node in the cluster. By enabling includeCurrentState clients may recompute their state or computation in the event the Hot Rod client transparently fails over registered listeners. The performance of the includeCurrentState parameter is impacted by the cache size, and therefore it is disabled by default.

@ClientCacheFailover

Rather than relying on receiving state, users can define a method with the @ClientCacheFailover annotation, which receives ClientCacheFailoverEvent parameter inside the client listener implementation. If the node where a Hot Rod client has registered a client listener fails, the Hot Rod client detects it transparently, and fails over all listeners registered in the node that failed to another node.

During this failover, the client may miss some events. To avoid this, the includeCurrentState parameter can be set to true. With this enabled a client is able to clear its data, receive all of the CacheEntryCreatedEvent instances, and cache these events with all keys. Alternatively, Hot Rod clients can be made aware of failover events by adding a callback handler. This callback method is an efficient solution to handling cluster topology changes affecting client listeners, and allows the client listener to determine how to behave on a failover. Near Caching takes this approach and clears the near cache upon receiving a ClientCacheFailoverEvent.

@ClientCacheFailover

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class EventLogListener {
// ...

    @ClientCacheFailover
    public void handleFailover(ClientCacheFailoverEvent e) {
      // Deal with client failover, e.g. clear a near cache.
    }
}

Note

The ClientCacheFailoverEvent is only thrown when the node that has the client listener installed fails.