Infinispan 6.0 - Distributed execution enhancements

Pluggable task failover policy

 

One of the long standing requirements for distributed executors as well as map/reduce is the pluggable task fail over policy. Task failure can be both due to task itself throwing an exception and due to Infinispan execution node failure (node crashed/left cluster). Pluggable fail over policy should address both types of task failures and allow users to migrate the failed task to another node if required to do so. Fail over policy is set per instance of DistributedExecutorService and MapReduceTask. The following interface is a current working proposal along with a proposal for DistributedExecutionException.

 

 

public interface DistributedTaskFailoverPolicy<T> {

   /**
    * Allows custom failure handling of a remotely executed distributed task.
    * 
    * <p>
    * Tasks can fail due to the task itself throwing an exception or it can be an Infinispan system
    * caused failure (e.g node failed or left cluster during task execution). Either way,
    * DistributedExecutionException captures the failure context, the relevant task and the
    * exception. Failure exception, the failed task along with the given list of available execution
    * candidates should be used for a possible fail-over and execution of a failed task on another
    * Infinispan node.
    * 
    * 
    * @param executionTargets
    *           a list of nodes available for execution of distributed task
    * @param cause
    *           the Exception capturing details of a failed task execution
    * @return result of re-attempted execution
    */
   public T executionFailed(List<Address> executionTargets, DistributedExecutionException cause);
}

public class DistributedExecutionException extends Exception{

   private Address addressOfExecution;
   private DistributedRunnableFuture<?> f;

   public DistributedExecutionException() {
   }

   public DistributedExecutionException(String message, Throwable cause, Address addressOfExecution, DistributedRunnableFuture<?> f) {
      super(message, cause);
      this.addressOfExecution = addressOfExecution;
      this.f = f;
   }

   public Address getExecutionLocation(){
      return addressOfExecution;
   }

   public DistributedRunnableFuture<?> getDistributedTask(){
      return f;
   }      
}


 

 

Pluggable task mapping policy

 

Distributed executors and map reduce should allow users to veto and override nodes selected for task execution. For example, someone might want to execute tasks exclusively on a local network island instead of a backup remote island. Others might, for example, use only a dedicated subset of nodes in Infinispan cluster for task execution. Task mapping policy is set per instance of DistributedExecutorService and MapReduceTask. The following interface is a current working proposal.

 

 

public interface DistributedTaskMappingPolicy {

   /**
    * Given a set of input keys for a distributed task an implementation should return an address to
    * list of keys mapping where each Address key is a node to execute a distributed task T and list
    * of keys value is a list of input keys for distributed task T.
    * 
    * @param <K>
    *           the type for input keys
    * @param input
    *           all input keys for a distributed task
    * @return an address to list of keys mapping
    * 
    */
   public <K> Map<Address, List<K>> mapKeysToExecutionNodes(K... input);

   /**
    * Implementations of DistributedTaskMappingPolicy are given an opportunity to override suggested
    * execution node with a another node.
    * <p>
    * 
    * 
    * @param executionTarget
    * @param candidates
    * @param future
    */
   public <T> Address executionTargetSelected(Address executionTarget, List<Address> candidates,
            DistributedRunnableFuture<T> future);

}