3 Replies Latest reply on Nov 4, 2014 1:03 PM by yanduc

    How to properly use Infinispan's distributed execution support?

    yanduc

      Hi all,

       

      given the docs and what I could find online, I could  not come up with a better solution than the one below, which does not work...

       

      Here's what I need to do: I want do distribute tasks over a data grid (composed of Infinispan distributed cache nodes) and have these tasks executed "local" to the cached data at each node in the grid. I do not want to do map-reduce. I simply want to do simple aggregation: each distributed task instance computes primes numbers (out of numbers kept in cache), and returns that set of prime numbers to the caller, which aggregates all these sets into a single final set.

       

      In order to perform that little experiment, I've implemented a DistributedCallable class, which I dispatch using the DistributedExecutorService. Note in the code below that upon execution, the task acquires the values from the cache (expecting them to correspond to the values at that node, as mentioned in the javadoc).

       

      public class ComputePrimeTask implements DistributedCallable<String, Integer, Set<Integer>>, Serializable {

         

          static final long serialVersionUID = 1L;

         

          private transient Cache<String, Integer> cache;

          private transient Set<String>              keys;

         

          public void setEnvironment(Cache<String, Integer> cache, Set<String> keys) {

              this.cache = cache;

              this.keys  = keys;

          }

         

          public Set<Integer> call() throws Exception {

              System.out.println("Executing ComputePrimeTask - number of elements in cache: " + cache.size());

              Set<Integer> toReturn = new HashSet<Integer>();

              for (Integer v : cache.values()) {

                  if (isPrime(v)) {

                      toReturn.add(v);

                  }

              }

              System.out.println(String.format("Returning %s prime numbers", toReturn.size()));

              return toReturn;

          }

         

          private boolean isPrime(int n) {

              if (n%2 == 0) {

                  return false;

              }

              for(int i = 3; i * i <=n; i += 2) {

                  if (n%i == 0) {

                      return false;

                  }

              }

              return true;

          }

       

      }

       

      I'm getting the following exception, which hints at a marshalling error when calling cache.values():

       

      java.lang.ClassCastException: [B cannot be cast to java.lang.Integer

          at com.experiment.grid.infinispan.ComputePrimeTask.call(IsComputePrimeTask.java:27)

          at com.experiment.grid.infinispan.ComputePrimeTask.call(IsComputePrimeTask.java:1)

          at org.infinispan.commands.read.DistributedExecuteCommand.perform(DistributedExecuteCommand.java:99)

          at org.infinispan.remoting.InboundInvocationHandlerImpl.handleInternal(InboundInvocationHandlerImpl.java:95)

          at org.infinispan.remoting.InboundInvocationHandlerImpl.access$000(InboundInvocationHandlerImpl.java:50)

          at org.infinispan.remoting.InboundInvocationHandlerImpl$2.run(InboundInvocationHandlerImpl.java:172)

          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

          at java.lang.Thread.run(Thread.java:744)

       

      I'm really not sure, with the information out there, how I should proceed:

       

      - Given the stacktrace I'm seeing, I don't think I'm handling my use-case properly.

      - Also, all the methods given access to the "local" content of the cache (such as values(), keySet(), entrySet(), etc.) have the following note in their javadoc - Cache (Infinispan Distribution 6.0.2.Final API): " This method should only be used for testing or debugging purposes such as to verify that the cache contains all the values entered. Any other use involving execution of this method on a production system is not recommended. ". At the same time, I see no other way of accessing cache content to get the "local" data...

      - In another post that I've seen but can't find a reference to anymore, it was mentioned that I could iterate through they keys using keySet(), and then use the CacheManager to determine if it actually owned that key, prior to use it "locally". I was a bit worried by this since that would imply the cache internally pulling keys from the other nodes in order to determine if it owns given keys...Given my troubles, I preferred to overlook that last post, since it did not make sense to me anyway.

       

      I'd like to be provided with the right approach, if such an approach exists for my use-case (which is pretty standard and common, for such a piece of infra, and convinces me that it can be done).

       

      Thanks!

        • 1. Re: How to properly use Infinispan's distributed execution support?
          rvansa

          Please, share your configuration to let us check whether you use something like storeAsBinary or compatibility mode.

           

          The use of values(), keySet() and entrySet() methods is somewhat controversial, as these methods don't scale. These are in the API just because Cache implement java.util.Map, and in order to perform well these methods return local cache contents, although it does not fulfill Map's contract properly. Recently, size() was changed to return the real cache size, and it's possible that in future major versions those methods will be implemented 'properly', throwing exceptions or removed completely. I would discourage from using them.

           

          If you want to iterate local data, you can dive for cache.getAdvancedCache().getDataContainer(). Here you can be sure that the entries are stored locally. However, if the topology changes, some keys can be moved away in the middle of an iteration (this may relate to your third question), or you can miss new keys etc. The iteration simply does not scale. There are at least some consistency guarantees in Map/Reduce or entry retriever, but with such low level access as accessing the data container, you cannot know, or have to develop more complex system reacting to topology changes.

          • 2. Re: How to properly use Infinispan's distributed execution support?
            nadirx

            Actually in Infinispan 7.0 entrySet(), values() and keySet() are implemented using distributed iterators and should behave much better. Expect a blog post which will explain this in detail in the following days.

            • 3. Re: How to properly use Infinispan's distributed execution support?
              yanduc

              Hi Radim,

               

              I tried accessing the entries through DataContainer: same problem.

               

               

              Here's the programmatic config:

               

                             EmbeddedCacheManager cm = new DefaultCacheManager("etc/experiment.infinispan.xml");
                             Configuration c = new ConfigurationBuilder()
                                       .storeAsBinary().enabled(false)
                                       .expiration().lifespan(-1L).maxIdle(-1L).wakeUpInterval(-1L)
                                       .clustering()
                                            .cacheMode(CacheMode.DIST_ASYNC).l1()
                                            .hash().numOwners(2)
                                       .build();
                             cm.defineConfiguration("integers", c);
              

               

              And here's the content of the base xml config file:

                                                                 
              

              <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                  xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"

                  xmlns="urn:infinispan:config:6.0">

               

                  <global>

                      <transport clusterName="demoCluster" />

                      <globalJmxStatistics enabled="true" />

                  </global>

                  <default>

                      <jmxStatistics enabled="true" />

                  </default>

               

              </infinispan>