1 Reply Latest reply on Sep 21, 2011 4:10 AM by mscetin

    ReceiveTaskHandler handling multiple process intances with the same message-id

    mscetin

      BPMN2 JUnit test for BPMN2-ReceiveTask is implemented as

       

      ksession.getWorkItemManager().registerWorkItemHandler("Receive Task", receiveTaskHandler);

      ...

      receiveTaskHandler.messageReceived("HelloMessage", "Hello john!");

       

      I tried to use ReceiveTaskHandler class in a project to receive a notification from an external system for a specific process-instance and resume the process upon receipt of the message. I could not figure out how to specify the process-instance for which the received message is applicable.

       

      JUinit test works fine as there is only one process instance in the session during the execution of this test. The source code for ReceiveTaskHandler is:

       

      public class ReceiveTaskHandler implements WorkItemHandler {

         

          // TODO: use correlation instead of message id

          private Map<String, Long> waiting = new HashMap<String, Long>();

              ...

          public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {

              String messageId = (String) workItem.getParameter("MessageId");

              waiting.put(messageId, workItem.getId());

              // If waiting map previously contained a mapping for messageId, the old value is replaced !!!

          }

         

          public void messageReceived(String messageId, Object message) {

              Long workItemId = waiting.get(messageId);

              if (workItemId == null) {

                  return;

              }

              Map<String, Object> results = new HashMap<String, Object>();

              results.put("Message", message);

              ksession.getWorkItemManager().completeWorkItem(workItemId, results);

          }

          ...

      }

       

      The comment of // TODO: use correlation instead of message id at the top of the source file  also gave the impression that the implementation is not complete. As an interim solution, I decided to extend ReceiveTaskHandler class as follows:

       

      public class ReceiveTaskHandler extends org.jbpm.bpmn2.handler.ReceiveTaskHandler {

         

          private Map<String, Long> waiting = new HashMap<String, Long>();

          private KnowledgeRuntime ksession;

       

          public ReceiveTaskHandler(KnowledgeRuntime ksession) {

             super(ksession);

             this.ksession = ksession;

          }

       

          private String constructKey(long   processInstanceId, String messageId) {

              return processInstanceId + "|" + messageId;

          }

         

          @Override

          public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {

              long processInstanceId = workItem.getProcessInstanceId();

              String messageId = (String) workItem.getParameter("MessageId");

              waiting.put(constructKey(processInstanceId, messageId), workItem.getId());

              // If waiting map previously contained a mapping for messageId, the old value is replaced !!!

          }

         

          @Override

          public void messageReceived(String messageId, Object message) {

              throw new UnsupportedOperationException(

                        "messageReceived(String messageId, Object message) method is not supported. " +

                        "Instead use messageReceived(long, String, Object) method" );

          }

         

          public void messageReceived(long processInstanceId, String  messageId, Object message) {

              Long workItemId = waiting.get(constructKey(processInstanceId, messageId));

              if (workItemId == null) {

                  return;

              }

              Map<String, Object> results = new HashMap<String, Object>();

              results.put("Message", message);

              ksession.getWorkItemManager().completeWorkItem(workItemId, results);

          }

      }

       

      I was wondering if "Receive Task" is the right task to use for waiting an external notification. If yes, then can any core jBPM contributor review the extension that I used locally and consider adding it to the next release of jBPM.

        • 1. Re: ReceiveTaskHandler handling multiple process intances with the same message-id
          mscetin

          Original implementation did not work as expected after server re-start. This is a an improved version in case someone else finds it useful.

           

          I still would like someone from jBPM team to provide some remarks about this implementation

           

          package com.mycomp.bpm.handler;

           

          import java.util.HashMap;

          import java.util.Map;

           

          import org.apache.log4j.Logger;

          import org.drools.runtime.KnowledgeRuntime;

          import org.drools.runtime.process.NodeInstance;

          import org.drools.runtime.process.ProcessInstance;

          import org.drools.runtime.process.WorkItem;

          import org.drools.runtime.process.WorkItemHandler;

          import org.drools.runtime.process.WorkItemManager;

          import org.drools.runtime.process.WorkflowProcessInstance;

          import org.jbpm.workflow.instance.node.CompositeContextNodeInstance;

          import org.jbpm.workflow.instance.node.WorkItemNodeInstance;

           

          public class ReceiveTaskHandler implements WorkItemHandler {

             

              private static Logger logger = Logger.getLogger(ReceiveTaskHandler.class.getName());

             

              private Map<String, Long> waiting = new HashMap<String, Long>();

              private KnowledgeRuntime ksession;

           

              public ReceiveTaskHandler(KnowledgeRuntime ksession) {

                 this.ksession = ksession;

              }

           

              public void setKnowledgeRuntime(KnowledgeRuntime ksession) {

                  this.ksession = ksession;

              }

           

              private String constructKey(long   processInstanceId, String messageId) {

                  return processInstanceId + "|" + messageId;

              }

             

              @Override

              public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {

                  long processInstanceId = workItem.getProcessInstanceId();

                  String messageId = (String) workItem.getParameter("MessageId");

                  waiting.put(constructKey(processInstanceId, messageId), workItem.getId());

                  // If waiting map previously contained a mapping for messageId, the old value is replaced !!!

              }

             

              public void messageReceived(String messageId, Object message) {

                  throw new UnsupportedOperationException("messageReceived(String messageId, Object message) method is not supported. " +

                                                          "Instead use messageReceived(long, String, Object) method" );

              }

             

              private Long findWorkItemId(NodeInstance prmNodeInstance,

                                          String       prmMessageId) {

                  Long workItemId = null;

                  if (prmNodeInstance instanceof WorkItemNodeInstance) {

                      WorkItemNodeInstance workItemNode = (WorkItemNodeInstance)prmNodeInstance;

                      WorkItem workItem = workItemNode.getWorkItem();

                      if (workItem.getName().equals("Receive Task") &&

                          workItem.getParameter("MessageId").equals(prmMessageId)) {

                          workItemId = workItem.getId();

                      }

                  }

                  if (prmNodeInstance instanceof CompositeContextNodeInstance) {

                      for (NodeInstance ni : ((CompositeContextNodeInstance)prmNodeInstance).getNodeInstances(false)) {

                          workItemId = findWorkItemId(ni, prmMessageId);

                          if (workItemId != null) {

                              break;

                          }

                      }

                  }

                  return workItemId;

              }

             

              public void messageReceived(long processInstanceId, String  messageId, Object message) {

                  Long workItemId = waiting.get(constructKey(processInstanceId, messageId));

                  if (workItemId == null) {

                      // See if this is a work item persisted pefore the re-start of the server

                      ProcessInstance pi = ksession.getProcessInstance(processInstanceId);

                      final WorkflowProcessInstance workflowProcessInstance = ((WorkflowProcessInstance) pi);

                      for (NodeInstance nodeInstance : workflowProcessInstance.getNodeInstances()) {

                          workItemId = findWorkItemId(nodeInstance, messageId);

                          if (workItemId != null) {

                              break;

                          }

                      }

                  }

                  if (workItemId != null) {

                      Map<String, Object> results = new HashMap<String, Object>();

                      results.put("Message", message);

                      ksession.getWorkItemManager().completeWorkItem(workItemId, results);

                  } else {

                      logger.error("No workItem found for receiveTask of process with id:" + processInstanceId);

                  }

              }

             

              @Override

              public void abortWorkItem(WorkItem workItem, WorkItemManager manager) {

                  long processInstanceId = workItem.getProcessInstanceId();

                  String messageId = (String) workItem.getParameter("MessageId");

                  waiting.remove(constructKey(processInstanceId, messageId));

              }

          }