-
Language:
English
-
Language:
English
8.5. Remote Event Listeners (Hot Rod)
CacheEntryCreated
, CacheEntryModified
, CacheEntryExpired
and CacheEntryRemoved
. Clients can choose whether or not to listen to these events to avoid flooding connected clients. This assumes that clients maintain persistent connections to the servers.
Example 8.6. Event Print Listener
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { @ClientCacheEntryCreated public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) { System.out.println(e); } @ClientCacheEntryModified public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) { System.out.println(e); } @ClientCacheEntryExpired public void handleExpiredEvent(ClientCacheEntryExpiredEvent e) { System.out.println(e); } @ClientCacheEntryRemoved public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) { System.out.println(e); } }
ClientCacheEntryCreatedEvent
andClientCacheEntryModifiedEvent
instances provide information on the key and version of the entry. This version can be used to invoke conditional operations on the server, such areplaceWithVersion
orremoveWithVersion
.ClientCacheEntryExpiredEvent
events are sent when either aget()
is called on an expired entry, or when the expiration reaper detects that an entry has expired. Once the entry has expired the cache will nullify the entry, and adjust its size appropriately; however, the event will only be generated in the two scenarios listed.ClientCacheEntryRemovedEvent
events are only sent when the remove operation succeeds. If a remove operation is invoked and no entry is found or there are no entries to remove, no event is generated. If users require remove events regardless of whether or not they are successful, a customized event logic can be created.- All client cache entry created, modified, and removed events provide a
boolean isCommandRetried()
method that will returntrue
if the write command that caused it has to be retried due to a topology change. This indicates that the event has been duplicated or that another event was dropped and replaced, such as where a Modified event replaced a Created event.
Important
Important
8.5.1. Adding and Removing Event Listeners
The following example registers the Event Print Listener with the server. See Example 8.6, “Event Print Listener”.
Example 8.7. Adding an Event Listener
RemoteCache<Integer, String> cache = rcm.getCache(); cache.addClientListener(new EventLogListener());
A client event listener can be removed as follows
Example 8.8. Removing an Event Listener
EventLogListener listener = ... cache.removeClientListener(listener);
8.5.2. Remote Event Client Listener Example
Procedure 8.2. Configuring Remote Event Listeners
Download the Red Hat JBoss Data Grid Server distribution from the Red Hat Customer Portal
The latest Red Hat JBoss Data Grid distribution includes the Hot Rod server with which the client will communicate.Start the server
Start the JBoss Data Grid server by using the following command from the root of the server.$ ./bin/standalone.sh
Write an application to interact with the Hot Rod server
Maven users
Create an application with the following dependency, changing the version to8.3.0-Final-redhat-1
or better.<properties> <infinispan.version>8.3.0-Final-redhat-1</infinispan.version> </properties> [...] <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-remote</artifactId> <version>${infinispan.version}</version> </dependency>
- Non-Maven users, adjust according to your chosen build tool or download the distribution containing all JBoss Data Grid jars.
Write the client application
The following demonstrates a simple remote event listener that logs all events received.import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleRemoteEvent(ClientEvent event) { System.out.println(event); } }
Use the remote event listener to execute operations against the remote cache
The following example demonstrates a simple main java class, which adds the remote event listener and executes some operations against the remote cache.import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); EventLogListener listener = new EventLogListener(); try { cache.addClientListener(listener); cache.put(1, "one"); cache.put(1, "new-one"); cache.remove(1); } finally { cache.removeClientListener(listener); }
Once executed, the console output should appear similar to the following:
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryRemovedEvent(key=1)
8.5.3. Filtering Remote Events
Example 8.9. KeyValueFilter
package sample; import java.io.Serializable; import org.infinispan.notifications.cachelistener.filter.*; import org.infinispan.metadata.*; @NamedFactory(name = "basic-filter-factory") public class BasicKeyValueFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) { return new BasicKeyValueFilter(); } static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable { @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return !"2".equals(key); } } }
8.5.3.1. Custom Filters for Remote Events
Procedure 8.3. Using a Custom Filter
- Create a
JAR
file with the filter implementation within it. Each factory must have a name assigned to it via theorg.infinispan.filter.NamedFactory
annotation. The example uses aKeyValueFilterFactory
. - Create a
META-INF/services/org.infinispan.notifications.cachelistener.filter. CacheEventFilterFactory
file within theJAR
file, and within it write the fully qualified class name of the filter class implementation. - Deploy the
JAR
file in the JBoss Data Grid Server by performing any of the following options:Procedure 8.4. Option 1: Deploy the JAR through the deployment scanner.
- Copy the
JAR
to the$JDG_HOME/standalone/deployments/
directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
Procedure 8.5. Option 2: Deploy the JAR through the CLI
- Connect to the desired instance with the CLI:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
- Once connected execute the
deploy
command:deploy /path/to/artifact.jar
Procedure 8.6. Option 3: Deploy the JAR as a custom module
- Connect to the JDG server by running the below command:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
- The jar containing the Custom Filter must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Filter:
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
- In a different window add the newly added module as a dependency to the
org.infinispan
module by editing$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
. In this file add the following entry:<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- Restart the JDG server.
@ClientListener
annotation to indicate the filter factory to use with the listener.
Example 8.10. Add Filter Factory to the Listener
@org.infinispan.client.hotrod.annotation.ClientListener(filterFactoryName = "basic-filter-factory") public class BasicFilteredEventLogListener extends EventLogListener {}
Example 8.11. Register the Listener with the Server
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener(); try { cache.addClientListener(listener); cache.putIfAbsent(1, "one"); cache.replace(1, "new-one"); cache.putIfAbsent(2, "two"); cache.replace(2, "new-two"); cache.putIfAbsent(3, "three"); cache.replace(3, "new-three"); cache.remove(1); cache.remove(2); cache.remove(3); } finally { cache.removeClientListener(listener); }
The following demonstrates the resulting system output from the provided example.
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryCreatedEvent(key=3,dataVersion=5) ClientCacheEntryModifiedEvent(key=3,dataVersion=6) ClientCacheEntryRemovedEvent(key=1) ClientCacheEntryRemovedEvent(key=3)
Important
8.5.3.2. Enhanced Filter Factories
Example 8.12. Configuring an Enhanced Filter Factory
package sample; import java.io.Serializable; import org.infinispan.notifications.cachelistener.filter.*; import org.infinispan.metadata.*; @NamedFactory(name = "basic-filter-factory") public class BasicKeyValueFilterFactory implements CacheEventFilterFactory { @Override public CacheEventFilter<Integer, String> getFilter(final Object[] params) { return new BasicKeyValueFilter(params); } static class BasicKeyValueFilter implements CacheEventFilter<Integer, String>, Serializable { private final Object[] params; public BasicKeyValueFilter(Object[] params) { this.params = params; } @Override public boolean accept(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return !params[0].equals(key); } } }
Example 8.13. Running an Enhanced Filter Factory
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); BasicFilteredEventLogListener listener = new BasicFilteredEventLogListener(); try { cache.addClientListener(listener, new Object[]{3}, null); // <- Filter parameter passed cache.putIfAbsent(1, "one"); cache.replace(1, "new-one"); cache.putIfAbsent(2, "two"); cache.replace(2, "new-two"); cache.putIfAbsent(3, "three"); cache.replace(3, "new-three"); cache.remove(1); cache.remove(2); cache.remove(3); } finally { cache.removeClientListener(listener); }
The provided example results in the following output:
ClientCacheEntryCreatedEvent(key=1,dataVersion=1) ClientCacheEntryModifiedEvent(key=1,dataVersion=2) ClientCacheEntryCreatedEvent(key=2,dataVersion=3) ClientCacheEntryModifiedEvent(key=2,dataVersion=4) ClientCacheEntryRemovedEvent(key=1) ClientCacheEntryRemovedEvent(key=2)
8.5.4. Customizing Remote Events
CacheEventConverter
instances, which are created by implementing a CacheEventConverterFactory
class. Each factory must have a name associated to it via the @NamedFactory
annotation.
Procedure 8.7. Using a Converter
- Create a
JAR
file with the converter implementation within it. Each factory must have a name assigned to it via theorg.infinispan.filter.NamedFactory
annotation. - Create a
META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory
file within theJAR
file and within it, write the fully qualified class name of the converter class implementation. - Deploy the
JAR
file in the JBoss Data Grid Server by performing any of the following options:Procedure 8.8. Option 1: Deploy the JAR through the deployment scanner.
- Copy the
JAR
to the$JDG_HOME/standalone/deployments/
directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
Procedure 8.9. Option 2: Deploy the JAR through the CLI
- Connect to the desired instance with the CLI:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
- Once connected execute the
deploy
command:deploy /path/to/artifact.jar
Procedure 8.10. Option 3: Deploy the JAR as a custom module
- Connect to the JDG server by running the below command:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
- The jar containing the Custom Converter must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Converter:
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
- In a different window add the newly added module as a dependency to the
org.infinispan
module by editing$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
. In this file add the following entry:<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- Restart the JDG server.
8.5.4.1. Adding a Converter
getConverter
method to get a org.infinispan.filter.Converter
class instance to customize events server side.
Example 8.14. Sending Custom Events
import org.infinispan.notifications.cachelistener.filter.*; @NamedFactory(name = "value-added-converter-factory") class ValueAddedConverterFactory implements CacheEventConverterFactory { // The following types correspond to the Key, Value, and the returned Event, respectively. public CacheEventConverter<Integer, String, ValueAddedEvent> getConverter(final Object[] params) { return new ValueAddedConverter(); } static class ValueAddedConverter implements CacheEventConverter<Integer, String, ValueAddedEvent> { public ValueAddedEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) { return new ValueAddedEvent(key, newValue); } } } // Must be Serializable or Externalizable. class ValueAddedEvent implements Serializable { final Integer key; final String value; ValueAddedEvent(Integer key, String value) { this.key = key; this.value = value; } }
8.5.4.2. Lightweight Events
JAR
file including a service definition inside the META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory
file as follows:
sample.ValueAddedConverterFactor
@ClientListener
annotation.
@ClientListener(converterFactoryName = "value-added-converter-factory") public class CustomEventLogListener { ... }
8.5.4.3. Dynamic Converter Instances
Example 8.15. Dynamic Converter
import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory; import org.infinispan.notifications.cachelistener.filter.CacheEventConverter; class DynamicCacheEventConverterFactory implements CacheEventConverterFactory { // The following types correspond to the Key, Value, and the returned Event, respectively. public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) { return new DynamicCacheEventConverter(params); } } // Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable { final Object[] params; DynamicCacheEventConverter(Object[] params) { this.params = params; } public CustomEvent convert(Integer key, String oldValue, Metadata metadata, String newValue, Metadata prevMetadata, EventType eventType) { // If the key matches a key given via parameter, only send the key information if (params[0].equals(key)) return new ValueAddedEvent(key, null); return new ValueAddedEvent(key, newValue); } }
RemoteCache<Integer, String> cache = rcm.getCache(); cache.addClientListener(new EventLogListener(), null, new Object[]{1});
8.5.4.4. Adding a Remote Client Listener for Custom Events
ClientCacheEntryCustomEvent<T>
, where T
is the type of custom event we are sending from the server. For example:
Example 8.16. Custom Event Listener Implementation
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener(converterFactoryName = "value-added-converter-factory") public class CustomEventLogListener { @ClientCacheEntryCreated @ClientCacheEntryModified @ClientCacheEntryRemoved public void handleRemoteEvent(ClientCacheEntryCustomEvent<ValueAddedEvent> event) { System.out.println(event); } }
Example 8.17. Execute Operations against the Remote Cache
import org.infinispan.client.hotrod.*; RemoteCacheManager rcm = new RemoteCacheManager(); RemoteCache<Integer, String> cache = rcm.getCache(); CustomEventLogListener listener = new CustomEventLogListener(); try { cache.addClientListener(listener); cache.put(1, "one"); cache.put(1, "new-one"); cache.remove(1); } finally { cache.removeClientListener(listener); }
Once executed, the console output should appear similar to the following:
ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='one'}, eventType=CLIENT_CACHE_ENTRY_CREATED) ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='new-one'}, eventType=CLIENT_CACHE_ENTRY_MODIFIED) ClientCacheEntryCustomEvent(eventData=ValueAddedEvent{key=1, value='null'}, eventType=CLIENT_CACHE_ENTRY_REMOVED
Important
8.5.5. Event Marshalling
Procedure 8.11. Deploying a Marshaller
- Create a
JAR
file with the converter implementation within it. Each factory must have a name assigned to it via theorg.infinispan.filter.NamedFactory
annotation. - Create a
META-INF/services/org.infinispan.commons.marshall.Marshaller
file within theJAR
file and within it, write the fully qualified class name of the marshaller class implementation - Deploy the
JAR
file in the JBoss Data Grid Server by performing any of the following options:Procedure 8.12. Option 1: Deploy the JAR through the deployment scanner.
- Copy the
JAR
to the$JDG_HOME/standalone/deployments/
directory. The deployment scanner actively monitors this directory and will deploy the newly placed file.
Procedure 8.13. Option 2: Deploy the JAR through the CLI
- Connect to the desired instance with the CLI:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
- Once connected execute the
deploy
command:deploy /path/to/artifact.jar
Procedure 8.14. Option 3: Deploy the JAR as a custom module
- Connect to the JDG server by running the below command:
[$JDG_HOME] $ bin/cli.sh --connect=$IP:$PORT
- The jar containing the Custom Marshaller must be defined as a module for the Server; to add this substitute the desired name of the module and the .jar name in the below command, adding additional dependencies as necessary for the Custom Marshaller:
module add --name=$MODULE-NAME --resources=$JAR-NAME.jar --dependencies=org.infinispan
- In a different window add the newly added module as a dependency to the
org.infinispan
module by editing$JDG_HOME/modules/system/layers/base/org/infinispan/main/module.xml
. In this file add the following entry:<dependencies> [...] <module name="$MODULE-NAME"> </dependencies>
- Restart the JDG server.
Note
8.5.6. Remote Event Clustering and Failover
The @ClientListener
annotation has an optional includeCurrentState
parameter, which when enabled, has the server send CacheEntryCreatedEvent
event instances for all existing cache entries to the client. As this behavior is driven by the client it detects when the node where the listener is registered goes offline and automatically registers the listener on another node in the cluster. By enabling includeCurrentState
clients may recompute their state or computation in the event the Hot Rod client transparently fails over registered listeners. The performance of the includeCurrentState
parameter is impacted by the cache size, and therefore it is disabled by default.
Rather than relying on receiving state, users can define a method with the @ClientCacheFailover
annotation, which receives ClientCacheFailoverEvent
parameter inside the client listener implementation. If the node where a Hot Rod client has registered a client listener fails, the Hot Rod client detects it transparently, and fails over all listeners registered in the node that failed to another node.
includeCurrentState
parameter can be set to true. With this enabled a client is able to clear its data, receive all of the CacheEntryCreatedEvent
instances, and cache these events with all keys. Alternatively, Hot Rod clients can be made aware of failover events by adding a callback handler. This callback method is an efficient solution to handling cluster topology changes affecting client listeners, and allows the client listener to determine how to behave on a failover. Near Caching takes this approach and clears the near cache upon receiving a ClientCacheFailoverEvent
.
Example 8.18. @ClientCacheFailover
import org.infinispan.client.hotrod.annotation.*; import org.infinispan.client.hotrod.event.*; @ClientListener public class EventLogListener { // ... @ClientCacheFailover public void handleFailover(ClientCacheFailoverEvent e) { // Deal with client failover, e.g. clear a near cache. } }
Note
ClientCacheFailoverEvent
is only thrown when the node that has the client listener installed fails.