Chapter 5. Creating continuous queries

Applications can register listeners to receive continual updates about cache entries that match query filters.

5.1. Continuous queries

Continuous queries provide applications with real-time notifications about data in Data Grid caches that are filtered by queries. When entries match the query Data Grid sends the updated data to any listeners, which provides a stream of events instead of applications having to execute the query.

Continuous queries can notify applications about incoming matches, for values that have joined the set; updated matches, for matching values that were modified and continue to match; and outgoing matches, for values that have left the set.

For example, continuous queries can notify applications about all:

  • Persons with an age between 18 and 25, assuming the Person entity has an age property and is updated by the user application.
  • Transactions for dollar amounts larger than $2000.
  • Times where the lap speed of F1 racers were less than 1:45.00 seconds, assuming the cache contains Lap entries and that laps are entered during the race.
Note

Continuous queries can use all query capabilities except for grouping, aggregation, and sorting operations.

How continuous queries work

Continuous queries notify client listeners with the following events:

Join
A cache entry matches the query.
Update
A cache entry that matches the query is updated and still matches the query.
Leave
A cache entry no longer matches the query.

When a client registers a continuous query listener it immediately receives Join events for any entries that match the query. Client listeners receive subsequent events each time a cache operation modifies entries that match the query.

Data Grid determines when to send Join, Update, or Leave events to client listeners as follows:

  • If the query on both the old and new value does not match, Data Grid does not sent an event.
  • If the query on the old value does not match but the new value does, Data Grid sends a Join event.
  • If the query on both the old and new values match, Data Grid sends an Update event.
  • If the query on the old value matches but the new value does not, Data Grid sends a Leave event.
  • If the query on the old value matches and the entry is then deleted or it expires, Data Grid sends a Leave event.

5.1.1. Continuous queries and Data Grid performance

Continuous queries provide a constant stream of updates to applications, which can generate a significant number of events. Data Grid temporarily allocates memory for each event it generates, which can result in memory pressure and potentially lead to OutOfMemoryError exceptions, especially for remote caches. For this reason, you should carefully design your continuous queries to avoid any performance impact.

Data Grid strongly recommends that you limit the scope of your continuous queries to the smallest amount of information that you need. To achieve this, you can use projections and predicates. For example, the following statement provides results about only a subset of fields that match the criteria rather than the entire entry:

SELECT field1, field2 FROM Entity WHERE x AND y

It is also important to ensure that each ContinuousQueryListener you create can quickly process all received events without blocking threads. To achieve this, you should avoid any cache operations that generate events unnecessarily.

5.2. Creating continuous queries

You can create continuous queries for remote and embedded caches.

Procedure

  1. Create a Query object.
  2. Obtain the ContinuousQuery object of your cache by calling the appropriate method:

    • Remote caches: org.infinispan.client.hotrod.Search.getContinuousQuery(RemoteCache<K, V> cache)
    • Embedded caches: org.infinispan.query.Search.getContinuousQuery(Cache<K, V> cache)
  3. Register the query and a ContinuousQueryListener object as follows:

    continuousQuery.addContinuousQueryListener(query, listener);
  4. When you no longer need the continuous query, remove the listener as follows:

    continuousQuery.removeContinuousQueryListener(listener);

Continuous query example

The following code example demonstrates a simple continuous query with an embedded cache.

In this example, the listener receives notifications when any Person instances under the age of 21 are added to the cache. Those Person instances are also added to the "matches" map. When the entries are removed from the cache or their age becomes greater than or equal to 21, they are removed from "matches" map. ⁠

Registering a Continuous Query

import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.Search;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.Query;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

[...]

// We have a cache of Person objects.
Cache<Integer, Person> cache = ...

// Create a ContinuousQuery instance on the cache.
ContinuousQuery<Integer, Person> continuousQuery = Search.getContinuousQuery(cache);

// Define a query.
// In this example, we search for Person instances under 21 years of age.
QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.create("FROM Person p WHERE p.age < 21");

final Map<Integer, Person> matches = new ConcurrentHashMap<Integer, Person>();

// Define the ContinuousQueryListener.
ContinuousQueryListener<Integer, Person> listener = new ContinuousQueryListener<Integer, Person>() {
    @Override
    public void resultJoining(Integer key, Person value) {
        matches.put(key, value);
    }

    @Override
    public void resultUpdated(Integer key, Person value) {
        // We do not process this event.
    }

    @Override
    public void resultLeaving(Integer key) {
        matches.remove(key);
    }
};

// Add the listener and the query.
continuousQuery.addContinuousQueryListener(query, listener);

[...]

// Remove the listener to stop receiving notifications.
continuousQuery.removeContinuousQueryListener(listener);