Continuous Query with Infinispan

Introduction

Infinispan's Continuous Query (CQ) is a building block for Complex Event Processing (CEP) systems. It is build on top of Drools' live queries, to which it adds fault tolerance: if the node where the query runs crashes then another node would pick up its job. In Infinspan such a fault tolerant query is named clustered query.

The query is executed on the data within the cache to which the query is associated.

Status

CQ is in "incubation" (technical preview) state and it is hosted on github. The API is almost finalised (see next section), but it is not frozen yet so any suggestions are extremely welcomed. It also ships with an demo applications to play around. The only supported cluster mode at the moment is replicated, distribution is to be added in a future release.

 

API

Drools queries

All the queries are being expressed in drools query language. Here is an example of such a query (from the demo application):

 

             import org.infinispan.continuousquery.demo.*
             query tradeWatchdog(String $traderName)
                ourTrader: Trader(name == $traderName)
                ourTrade : Trade(trader.name == ourTrader.name)
                stockInfo : StockInfo(ourTrade.stockInfo.company == company)
             end
import org.infinispan.continuousquery.demo.*
query tradeWatchdog(String $traderName)
  ourTrader: Trader(name == $traderName)
  ourTrade : Trade(trader.name == ourTrader.name)
  stockInfo : StockInfo(ourTrade.stockInfo.company == company)
end

In this example we have a query that have a parameter: "traderName". Queries need to be defined within the system before executing them. Query execution(or instantiation) requires the user to give values to query parameters.

For more information on querying syntax refer to Drools documentation.

 

Defining queries

 

ContinuousQueryManager cqManager = new ContinuousQueryManager(getCacheManager());
//definition
String query =
    "         import org.infinispan.continuousquery.demo.*\n" +
    "         query listStock(String $profile)\n" +
    "            stockInfo : StockInfo(profile == $profile)\n" +
    "         end";
QueryDefinition queryDefinition =
       new QueryDefinition("listStock", query, Collections.singletonList("stockInfo"));
cqManager.defineQuery(queryDefinition);

 

The ContinuousQueryManager wraps a CacheManager instance. It is the API entry point for defining and running queries.A QueryDefinition holds the query string together with a name and the list of output entries( in this example only one - "stockInfo").

The query needs to be added to the ContinuousQueryManager before it can be instantiated: after defineQuery returns the query can be executed on all nodes of the cluster.

 

Executing queries and listening on changes

After the query is defined it can be executed by giving values to its parameters.

 

//instantiation
ContinuousQuery cq = cqManager.
        executeContinuousQuery(queryDefinition.getQueryName(),
                               true, new Object[]{"Nick"});
//associate behaviour with result set changes
cq.addQueryListener(new ResultSetListener() {
  public void entryAdded(MatchingEntry row) {
    StockInfo stockInfo = (StockInfo) row.get("stockInfo");
    // ...
  }
  public void entryRemoved(MatchingEntry row) {
    // ...
  }
  public void entryUpdated(MatchingEntry row) {
    // ...
  }
});

The code snippet above shows how the query defined in the previous section is instantiated and as a result an ContinuousQuery object is returned to the user. This object can be further used to associate behaviour to the changes in the result set of the query. This is done by attaching result set listeners which would be notified whenever data matching the query is changed, added or removed from the underlying cache.

 

 

Replay listener

The ResultSetListener listener in the previous example is not aware of the data that previously existed in cache at the time it was added. In order to obtain all the results of a query at a moment in time(a snapshot) one can do the following:

 

ContinuousQuery cq = cqManager.
            executeContinuousQuery(queryDefinition.getQueryName(),
                                  true, new Object[]{"Nick"});
List<MatchingEntry> result = cq.getQueryResult();
process(result);

 

The returned result set is not dynamic: if data is removed from the cache while iterating over the result set, it won't be modified.

If one wants to be able to process the existing data and make sure that won't miss any update, it would have to use an ReplayResultSetListener as below:

ContinuousQuery cq = cqManager.
     executeContinuousQuery(queryDefinition.getQueryName(), 
                            true, new Object[]{"Nick"});

/*
* the query listener implements ReplayResultSetListener (v.s. ResultSetListener 
* in the previous example).  
*/
cq.addQueryListener(new ReplayResultSetListener() {
  public void replay(Iterator<MatchingEntry> existingQueryResults) {
    //here you can process data existing in cache before the query was instantiated 
  }

  public void entryAdded(MatchingEntry row) {
    StockInfo stockInfo = (StockInfo) row.get("stockInfo");
    // ...
  }

  public void entryRemoved(MatchingEntry row) {
    // ...
  }

  public void entryUpdated(MatchingEntry row) {
    // ...
  }
});
This keeps track of the changes that take place whilst replay method is processed, and would call corresponding listener methods afterwards. E.g. if an stock info is removed whilst entryAdded is running, then entryRemoved is called after the former returns.

 

Local vs Clustered Queries

Once a query defined, it can be executed in two ways: local or clustered. A local query is a query that gets executed on the same node where it was defined. When the node crashed or is turned down, all the listeners associated with that query instance are no longer called.

On the other hand a clustered query has the following characteristics: it is executed on one node only as long as there is at least one running node in the cluster. If the executing node crashes the task would automatically failover to another running node.

The local vs cluster aspect is controlled through the isLocal boolean flag passed to ContinuousQuery.executeContinuousQuery.

 

Define queries in XML

Queries can be defined in XML as well. The query XML files are stand-alone, i.e. not included in Infinispan's XML configuration files. Here is an example of a file containing a query:

<cq-definitions>
   <continuous-query name="tradeWatchdog"
        defaultResultSetListener="org.infinispan.continuousquery.demo.StockInfoResultSetListener">
      <query>
       <![CDATA[
             import org.infinispan.continuousquery.demo.*
             query tradeWatchdog(String $traderName)
                ourTrader: Trader(name == $traderName)
                ourTrade : Trade(trader.name == ourTrader.name)
                stockInfo : StockInfo(ourTrade.stockInfo.company == company)
             end
      ]]>
      </query>
      <output>
         <item>ourTrader</item>
         <item>ourTrade</item>
         <item>stockInfo</item>
      </output>
   </continuous-query>
</cq-definitions>

Such a query can then be added to an ContinuousQueryManager: ContinuousQueryManager.defineQueries(String fileName)

 

 

Download and use

1. Get the code from github

 

2. In order to be able to build maven 2.2+ is needed.

    mvn clean install -Dmaven.test.skip.exec=true     

You can find the binaries in target/continuous-query.jar.

Feedback

 

Any suggestions or questions are extremely welcomed! You can do that on the user forums or you can chat with us on IRC.