10 Replies Latest reply on Jun 28, 2017 11:22 PM by zhangwei22

    Concurrency problems with JBPM 5.4

    jkranes

      I am working on a JBPM 5.4 / jBoss 7.1.1 application and have been unable to get it to run reliably using multiple threads.

       

      The basic approach I am taking is as follows:

       

      • Process instances are triggered by an incoming message (AMQP/ RabbitMQ) received by a message listener.  For optimal performance I need to be able to use multiple listener threads.   (Ultimately there will be multiple application server host instances as well but first things first).
      • I am creating a new StatefulKnowlegeSession for each new ProcessInstance.
      • I am using a local HumanTaskService (note that I am able to trigger concurrency problems mentioned below even while using a workflow that has no user tasks in it).
      • I am disposing of the session and its associated handlers immediately after starting the process.  If I need to respond to an asynch signal (e.g. completion of a human task) I create the session by ID from the database (this is part of the scaling strategy - need to be able to continue the process at a future time on an entirely different machine).
      • Mainly using Oracle (11g XE) but have also tested with MySQL and see the exact same issues.

       

      I am using Spring with declarative transactions.  I am using JTA transactions through the jBoss platform JTA transaction manager.

       

      Here's what I am experiencing:

       

      • Everything works fine as long as as I run a single thread only (i.e. have only one instance of my message listener class).  I can run this way for hours without a single exception being thrown.
      • With two or more message listeners, I get exceptions (not necessarily right away, but if I run enough iterations it happens pretty quickly).
      • I see a variety of different exceptions each time I run the program.  The exceptions vary but all are related to inability to commit, invalid sessions, inability to find a certain piece of data (the session or process instance), deadlocks, concurrent modification exceptions, etc.  All the kinds of things you would expect in non threadsafe code.
      • Once an exception is thrown on a given thread, it seems that the thread will just keep throwing the 'Entity Manager is Closed' exception.  I am guessing there is something happenging here with a ThreadLocal tied to the thread that never gets cleared up once problems occur on that thread.  If I run long enough, all threads end up with this problem and the system basically becomes useless and needs a full restart.

       

      So without trying to debug each and every exception one at a time, I'm wondering if there is anything obvious I am doing in my code that would lead to a non threadsafe condition.  I have read several threads on issues surrounding the StatefulKnowledgeSession so I have deliberately chosen to NOT share sessions between threads for that reason (and others).

       

      A few main questions also:

       

      • Wrapping the Spring JTA PlatformTransactionManager in the DroolsSpringTransactionManager: i have seen this in several examples but have not seen any good explanation of whether/when it is necessary.
      • Use of the DroolsSpringJPAManager: again, have seen this in examples but no real documentation explaining it.
      • Should the StatefulKnowledgeSession be created inside a transaction?  Must it be the same transaction that it is used in?   I know that it has to be disposed outside of the transaction boundaries.

       


      Thanks for any help or suggestions.  I'm happy to provide more code as needed; I wanted to keep this limited to what seems relevant rather than zip up an entire complex application and force people to wade through all of it.

       

      -- Jon Kranes

       

       

       

      Relevant code:

       

      I have a wrapper class to hold a reference to the StatefulKnowledgeSession and its associated handlers, to make it easier to manage disposal:

       


      public class SessionWrapper {
        
       private static Logger log = Logger.getLogger(SessionWrapper.class);
        
       private StatefulKnowledgeSession session;
       private GenericHTWorkItemHandler htHandler;
       private JPAWorkingMemoryDbLogger dbLogger;
       private LocalTaskService localTaskService;
        
       public void dispose() {
       try {
                                                                   log.info("disposing local task service");
       localTaskService.dispose();
       }
       catch (Exception e) {
                                                                   log.error(e);
       }
       try {
                                                                   // log.info("disposing db logger");
                                                                   // this does not do anything.
       // dbLogger.dispose();
       }
       catch (Exception e) {
                                                                   log.error(e);
       }
       try {
                                                                   log.info("disposing ht handler");
                                                                   htHandler.dispose();
       }
       catch (Exception e) {
                                                                   log.error(e);
       }
       try {
                                                                   log.info("Disposing session ID " + session.getId());
                                                                   session.dispose();
       }
       catch (Exception e) {
       log.error(e);
       }
       }
        
        
       public StatefulKnowledgeSession getSession() {
       return session;
       }
       public void setSession(StatefulKnowledgeSession session) {
       this.session = session;
       }
       public GenericHTWorkItemHandler getHtHandler() {
       return htHandler;
       }
       public void setHtHandler(GenericHTWorkItemHandler htHandler) {
       this.htHandler = htHandler;
       }
       public JPAWorkingMemoryDbLogger getDbLogger() {
       return dbLogger;
       }
       public void setDbLogger(JPAWorkingMemoryDbLogger dbLogger) {
       this.dbLogger = dbLogger;
       }
       public LocalTaskService getLocalTaskService() {
       return localTaskService;
       }
       public void setLocalTaskService(LocalTaskService localTaskService) {
       this.localTaskService = localTaskService;
       }
      
      }
      
      

       

      The SessionWrapper instance is created here

       


      public class KnowledgeSessionFactory {
        
       @Autowired
       KnowledgeBase kbase;
        
       @Autowired
       org.jbpm.task.service.TaskService taskService;
        
       @Autowired
       EntityManagerFactory emf;
        
       @Autowired
       EventDrivenTaskHandler handler;
        
       @Autowired
               DroolsSpringTransactionManager txManager;
          
          @Autowired
          AuditService auditService;
          
          @Autowired
          AuditTaskHandler auditHandler;
         
          private static final Logger log = Logger.getLogger(KnowledgeSessionFactory.class);
          
       public SessionWrapper createKnowledgeSession() {
       log.info("Creating new StatefulKnowledgeSession");
       StatefulKnowledgeSession ksession = null;
       try {
                                                                   ksession = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, getEnvironment() );
       }
       catch (Exception e) {
                                                                   log.error("Unable to create sesssion!");
                                                                   log.error(e);
                                                                   throw new RuntimeException(e);
       }
       log.info("Session ID: " + ksession.getId());
       return createSessionWrapper(ksession); 
       }
        
       public SessionWrapper createKnowledgeSession(int sessionId) {
       log.info("Loading StatefulKnowledgeSession for ID: " + sessionId);
       StatefulKnowledgeSession ksession = JPAKnowledgeService.loadStatefulKnowledgeSession(sessionId, kbase, null, getEnvironment());
       return createSessionWrapper(ksession); 
       } 
        
       private SessionWrapper createSessionWrapper(StatefulKnowledgeSession ksession) {
       LocalTaskService taskClient = new LocalTaskService(taskService);
       GenericHTWorkItemHandler htHandler = new LocalHTWorkItemHandler(taskClient, ksession);
        
       ksession.getWorkItemManager().registerWorkItemHandler("Human Task", htHandler);
       ksession.getWorkItemManager().registerWorkItemHandler("Task", handler);
       ksession.getWorkItemManager().registerWorkItemHandler("Audit", auditHandler);
       ksession.addEventListener(new DroolsListener());
        
       // This allows access of the global in script tasks
       ksession.setGlobal("auditService", auditService);
        
       JPAWorkingMemoryDbLogger dbLogger = new JPAWorkingMemoryDbLogger(ksession);
       SessionWrapper wrapper = new SessionWrapper();
       wrapper.setSession(ksession);
       wrapper.setHtHandler(htHandler);
       wrapper.setDbLogger(dbLogger);
       wrapper.setLocalTaskService(taskClient);
       return wrapper; 
                     }
        
       private Environment getEnvironment() {
       log.info("creating environment");
       env = KnowledgeBaseFactory.newEnvironment();
       env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf );
       env.set(EnvironmentName.TRANSACTION_MANAGER, txManager);
       PersistenceContextManager persistenceContextManager = new DroolsSpringJpaManager(env);
       env.set(EnvironmentName.PERSISTENCE_CONTEXT_MANAGER, persistenceContextManager);
       log.info("environment created"); 
       return env;
       }
       
      }
      
      
      


      Here is the message driven bean that initiates starting new process instances:

       

      public class StartProcessListener implements MessageListener {
        
       private static Logger log = Logger.getLogger(StartProcessListener.class);
        
       @Autowired
       EventService service;
        
       @Autowired
       KnowledgeSessionFactory factory;
        
       @Autowired
       SerializerMessageConverter converter;
        
       public void onMessage(Message message) {
       SessionWrapper wrapper = null;
       try {
                                                                   Map<String, Object> map = (Map<String, Object>)converter.fromMessage(message);
                                                                   Map<String, Object> params = new HashMap<String, Object>();
                                                                   params.put("returnStatus", "VALID");
                                                                   params.put("return", map.get("return"));
        
                                                                   String processId = (String)map.get("processId");
                                                                   try {
                                                                                  wrapper = service.createSessionWrapper();
                                                                                  service.startProcess(wrapper, processId, params);
                                                                   }
                                                                   catch (Exception e) {
                                                                                  log.error(e);
                                                                   }
       }
       catch (Exception e) {
                                                                   log.error(e);
       }
       finally {
                                                                   if (wrapper != null) {
                                                                                  wrapper.dispose(); 
                                                                   }
       }
       }
      
      


      And finally the service class that uses declarative transactions:

       

       

      public class EventServiceImpl implements EventService {
        
       private static Logger log = Logger.getLogger(EventServiceImpl.class);
        
       @Autowired
       KnowledgeSessionFactory factory;
        
       @Autowired
       TaskService taskService;
        
       @Override
       @Transactional
       public SessionWrapper createSessionWrapper() {
       return factory.createKnowledgeSession();
       }
        
       @Transactional
       @Override
       public void startProcess(SessionWrapper wrapper, String processId, Map<String, Object> params) {
       // SessionWrapper wrapper = factory.createKnowledgeSession();
       StatefulKnowledgeSession ksession = wrapper.getSession();
        
       // initializes the human task handler for the session
       // org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
        
       // this allows us to insert the processInstance into the session before starting it.
       ProcessInstance processInstance = ksession.createProcessInstance(processId, params); 
       // for drools
       ksession.insert(processInstance);
        
       log.info("Starting process instance");
       ksession.startProcessInstance(processInstance.getId());
       //return wrapper;
       }
        
       @Transactional
       @Override
                     public SessionWrapper completeWorkItem(int sessionId, long workItemId, Map<String, Object> results) {
       SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
       StatefulKnowledgeSession ksession = wrapper.getSession();
       completeWorkItem(ksession, workItemId, results); 
       return wrapper;
       }
        
       @Override
       @Transactional
       public SessionWrapper completeTask(int sessionId, Long taskId, Map<String, Object> results) {
       log.info("Completing task: " + taskId);
        
       for (String key : results.keySet()) {
                                                                   log.info(key + "=" + results.get(key));
       }
        
       // LocalTaskService lts = new LocalTaskService(taskService);
       // StatefulKnowledgeSession ksession = factory.getKnowledgeSession(sessionId);
       SessionWrapper wrapper = factory.createKnowledgeSession(sessionId);
       StatefulKnowledgeSession ksession = wrapper.getSession();
        
       // org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession);
       org.jbpm.task.TaskService lts = wrapper.getLocalTaskService();
       Task task = lts.getTask(taskId);
       // int sessionId = task.getTaskData().getProcessSessionId();
       // completeWorkItem(sessionId, task.getTaskData().getWorkItemId(), results);
        
       try {
                                                                   lts.start(taskId, "USER");
       }
       catch (Exception e) {
                                                                   log.error(e);
       }
        
       try {
                                                                   ContentData contentData = ContentMarshallerHelper.marshal(results, null);
                                                                   lts.complete(taskId, "USER", contentData);
                                                                   lts.disconnect(); // does this do anything useful?
       }
       catch (Exception e) {
                                                                   log.error(e);
       }
        
       // for drools
       ProcessInstance pi = ksession.getProcessInstance(task.getTaskData().getProcessInstanceId());
       ksession.insert(pi);
        
       completeWorkItem(ksession, task.getTaskData().getWorkItemId(), results);
        
       return wrapper;
        
       }
        
       private void completeWorkItem(StatefulKnowledgeSession ksession, Long workItemId, Map<String, Object> results) {
       log.info("Completing work item: " + workItemId);
       ksession.getWorkItemManager().completeWorkItem(workItemId, results); 
       }
       
       
      }
      
      


       

      Spring configuration:

       

       

      <?xml version="1.0" encoding="UTF-8"?>
      
      
      <beans ... >
                       
          <context:annotation-config />
          
          <!-- Loads all classes annotated with @Component or @Service -->
          <context:component-scan base-package="org.mitre.irs.eda.bpm"/>
         
          <context:property-placeholder location="classpath:eda.properties" order="2"/>     
          
                <jee:jndi-lookup id="emf" jndi-name="persistence/emf"/>
        
                <!-- Spring JtaTransactionManager -->
                <bean id="jtaTxManager" class="org.springframework.transaction.jta.JtaTransactionManager" />
        
                <bean id="txManager" class="org.drools.container.spring.beans.persistence.DroolsSpringTransactionManager"
                          c:ptm-ref="jtaTxManager" />
      
      
                <tx:annotation-driven transaction-manager="jtaTxManager" /> 
        
                <jpa:repositories base-package="org.mitre.irs.eda.common.repo" entity-manager-factory-ref="emf" transaction-manager-ref="jtaTxManager" />
        
                <bean id="jpaDialect" class="org.springframework.orm.jpa.vendor.HibernateJpaDialect" />  
        
                <!-- JBPM -->
                <bean id="sel" class="org.drools.SystemEventListenerFactory"
                          factory-method="getSystemEventListener" />
      
                <bean id="jbpmTaskService" class="org.jbpm.task.service.TaskService"
                          c:emf-ref="emf" c:systemEventListener-ref="sel" /> 
      
               <bean id="kbaseFactory" class="org.mitre.irs.eda.bpm.jbpm.KnowledgeBaseFactory" />
          
               <bean id="kbase" factory-bean="kbaseFactory" factory-method="newKnowledgeBase" />
      
                  
          
                <!-- Rabbit MQ -->
                <rabbit:connection-factory id="connectionFactory" channel-cache-size="20"
                          username="${amqp.username}" password="${amqp.password}" host="${amqp.host}" port="${amqp.port}" />
        
                <rabbit:admin id="admin" connection-factory="connectionFactory" />
                <bean id="serializerConverter" class="org.springframework.amqp.support.converter.SerializerMessageConverter" />
                <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="serializerConverter"/>
                <rabbit:queue id="start" name="process.start" />
                <rabbit:queue id="task.start" name="process.task.start" />
                <rabbit:queue id="task.complete" name="process.task.complete" />
                <bean id="startListener" class="org.mitre.irs.eda.bpm.listener.StartProcessListener" />
                <bean id="taskCompleteListener" class="org.mitre.irs.eda.bpm.listener.TaskCompleteListener" />
                <rabbit:listener-container connection-factory="connectionFactory" concurrency="4">
                          <rabbit:listener queues="start" ref="startListener" />
                          <rabbit:listener queues="task.complete" ref="taskCompleteListener" />
                </rabbit:listener-container>   
          
      
                
      </beans>
      
      
        • 1. Re: Concurrency problems with JBPM 5.4
          calca

          I have had problems with concurrency, and it was because LocalTaskService is not thread safe. It was solved by creating a new LocalTaskService per request.

           

          Hope this helps,

           

          Demian

          • 2. Re: Concurrency problems with JBPM 5.4
            jkranes

            Each new processInstance gets its own session, and each session gets its own private LocalTaskService, so neither of these is being shared.  So I don't think that is the problem (at least not in any obvoius way.

             

            Jon

            • 3. Re: Concurrency problems with JBPM 5.4
              thomas.setiabudi

              Hi Jon Kranes,

               

              I got this problem too before.

              I am using JBPM5.4 Final, Spring 3, and MS SQL Server 2008 Db.

               

              few things that I remember:

              1. when you start process or when you do completeTask, remove the @Transactional annotation. And let the jBPM engine handle the transaction.

              (I got a problem where my Web App got hang like endless locking, and this clears the issue)

               

              2. add unique index for "processInstanceId" field in database table "ProcessInstanceLog"

              (I got a problem where the jBPM engine throws deadlock exception when starting task without human task with multiple thread)

               

              Hope this helps.

               

              Regards,

              Thomas Setiabudi

              • 4. Re: Concurrency problems with JBPM 5.4
                jkranes

                Thomas, thanks for your suggestion.

                 

                Unfortunately even I do not use Spring transactions, I still get errors when running multiple threads.

                 

                The exception stack (using JBPM transactions without the Spring @Transactional) is:

                 

                 

                10:38:10,431 WARN  [com.arjuna.ats.arjuna] (SimpleAsyncTaskExecutor-1) ARJUNA012125: TwoPhaseCoordinator.beforeCompletion - failed for SynchronizationImple< 0:ffff0a900c53:275a5d1:51ee948c:2c29, org.hibernate.engine.transaction.synchronization.internal.RegisteredSynchronization@4bd56e0c >: javax.persistence.PersistenceException: error during managed flush
                          at org.hibernate.ejb.AbstractEntityManagerImpl$CallbackExceptionMapperImpl.mapManagedFlushFailure(AbstractEntityManagerImpl.java:1486) [hibernate-entitymanager-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorImpl.beforeCompletion(SynchronizationCallbackCoordinatorImpl.java:109) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.engine.transaction.synchronization.internal.RegisteredSynchronization.beforeCompletion(RegisteredSynchronization.java:53) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at com.arjuna.ats.internal.jta.resources.arjunacore.SynchronizationImple.beforeCompletion(SynchronizationImple.java:76)
                          at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.beforeCompletion(TwoPhaseCoordinator.java:273)
                          at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:93)
                          at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:164)
                          at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1165)
                          at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:117)
                          at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75)
                          at org.jboss.tm.usertx.client.ServerVMClientUserTransaction.commit(ServerVMClientUserTransaction.java:167)
                          at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011) [spring-tx-3.2.2.RELEASE.jar:3.2.2.RELEASE]
                          at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755) [spring-tx-3.2.2.RELEASE.jar:3.2.2.RELEASE]
                          at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724) [spring-tx-3.2.2.RELEASE.jar:3.2.2.RELEASE]
                          at org.drools.container.spring.beans.persistence.DroolsSpringTransactionManager.commit(DroolsSpringTransactionManager.java:65) [drools-spring-5.5.0.Final.jar:5.5.0.Final]
                          at org.drools.persistence.SingleSessionCommandService.<init>(SingleSessionCommandService.java:155) [drools-persistence-jpa-5.5.0.Final.jar:5.5.0.Final]
                          at sun.reflect.GeneratedConstructorAccessor244.newInstance(Unknown Source) [:1.7.0_25]
                          at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [rt.jar:1.7.0_25]
                          at java.lang.reflect.Constructor.newInstance(Constructor.java:526) [rt.jar:1.7.0_25]
                          at org.drools.persistence.jpa.KnowledgeStoreServiceImpl.buildCommandService(KnowledgeStoreServiceImpl.java:128) [drools-persistence-jpa-5.5.0.Final.jar:5.5.0.Final]
                          at org.drools.persistence.jpa.KnowledgeStoreServiceImpl.newStatefulKnowledgeSession(KnowledgeStoreServiceImpl.java:66) [drools-persistence-jpa-5.5.0.Final.jar:5.5.0.Final]
                          at org.drools.persistence.jpa.JPAKnowledgeService.newStatefulKnowledgeSession(JPAKnowledgeService.java:122) [knowledge-api-5.5.0.Final.jar:5.5.0.Final]
                          at org.mitre.irs.eda.bpm.jbpm.KnowledgeSessionFactory.createSession(KnowledgeSessionFactory.java:70) [classes:]
                          at org.mitre.irs.eda.bpm.jbpm.KnowledgeSessionFactory.createKnowledgeSession(KnowledgeSessionFactory.java:57) [classes:]
                          at org.mitre.irs.eda.bpm.service.EventServiceImpl.startProcess(EventServiceImpl.java:44) [classes:]
                          at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source) [:1.7.0_25]
                          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_25]
                          at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_25]
                          at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) [spring-aop-3.2.2.RELEASE.jar:3.2.2.RELEASE]
                          at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:198) [spring-aop-3.2.2.RELEASE.jar:3.2.2.RELEASE]
                          at com.sun.proxy.$Proxy106.startProcess(Unknown Source)
                          at org.mitre.irs.eda.bpm.listener.StartProcessListener.onMessage(StartProcessListener.java:46) [classes:]
                          at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:328) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:546) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:58) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:107) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:608) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:471) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:455) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:58) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:548) [spring-rabbit-1.1.4.RELEASE.jar:]
                          at java.lang.Thread.run(Thread.java:724) [rt.jar:1.7.0_25]
                Caused by: java.util.ConcurrentModificationException
                          at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:819) [rt.jar:1.7.0_25]
                          at java.util.ArrayList$Itr.next(ArrayList.java:791) [rt.jar:1.7.0_25]
                          at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:263) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:186) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:326) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:52) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1081) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:315) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          at org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorImpl.beforeCompletion(SynchronizationCallbackCoordinatorImpl.java:104) [hibernate-core-4.0.1.Final.jar:4.0.1.Final]
                          ... 42 more
                
                
                • 5. Re: Concurrency problems with JBPM 5.4
                  swiderski.maciej

                  one additional chack that you might do is to make sure that Environment (that is used to create or load ksession) is not shared between different threads as it get's local data during execution such as EntityManager and that make the code not thread safe. Once you have always dedicated environment for single execution you should get rid of concurrency issues. Although this might not be the case for you.

                   

                  HTH

                  1 of 1 people found this helpful
                  • 6. Re: Concurrency problems with JBPM 5.4
                    jkranes

                    I discovered the Environment issue myself earlier but it's a good point to emphasize, as it's not intuitively obvious that it cannot be shared between threads.

                     

                    Recently I've removed the process instance logging (JPAProcessInstanceDbLog) (it was too costly for performance and I did not absolutely need it) along with some other changes (including putting Spring transactions back in after trying to do without them for a while), and the concurrency issues seem much improved.  At some point I'll try to go back and figure out whether it was the logging or some other change that made the difference.

                     

                    Jon

                    • 7. Re: Concurrency problems with JBPM 5.4
                      skumar0105

                      Hi Jon,

                       

                      Good day to you . I am using Jbpm 5 in my current project which has a very similar requirement that you have listed above . I am trying to figure out a way to trigger a process instance based on the incoming AMQP messages . My requirement is to read Idoc xml messages from the Rabbit AMQP and process them via jbpm 5. Though the flow works fine within eclipse, I am unable to get it configured as a long running service deployed within gwt console. My question to you would be , was your project deployed within gwt console  ? and the message listener that you confgured trigger a process instance that showed up within the web console whenever a message got read .?.

                       

                      Your response will be greatly appreciated.

                       

                      Thanks

                       

                      Senthil Sathiya

                      • 8. Re: Concurrency problems with JBPM 5.4
                        veskop86

                        Hello,

                         

                        I have same problem with JBoss 7.1.1, JBPM 5.4 and PostgreSql 9.2. One thread works fine but for more than one there is lots of exceptions but most of them was on completeTask method. I tried to find solution in different examples, on different forums but there was no success.

                        Because of exceptions are throwed mostly on completeTask method I made quick fix with ReentrantLock and that works fine in multithread environment.

                         

                        Spring context

                         

                        <bean name="taskService" class="com.xxx.xxx.TaskSerice" init-method="setUp">
                        ...
                          <property name="lock" ref="reentrantLock"/>
                        </bean>
                        
                        
                        <bean name="reentrantLock" class="java.util.concurrent.locks.ReentrantLock" scope="singleton"/>
                        

                         

                        CompleteTask method looks like this:

                         

                        public StatusTO completeTaskWithParameters(Long taskId, BpmContextTO context, ArrayListTO<KeyValueTO> parameters) {
                          ...
                          getLock().lock();
                          try {
                               status = doCompleteTaskWithParameters(taskId, context, parameters);
                          } catch (Exception e) {
                               ...
                               e.printStackTrace();
                          }finally{
                               getLock().unlock();
                          }
                          return status;
                        }
                        

                         

                        That was quick fix until someone find better solution

                        Sorry for bad English

                        Kind regards

                        --Veselin Perovic

                        • 9. Re: Concurrency problems with JBPM 5.4
                          blazob

                          Great!

                          This was my nightmare for weeks.

                          Thanx Veselin

                          • 10. Re: Concurrency problems with JBPM 5.4
                            zhangwei22

                            I encountered the same problem with him about concurrency. This is the url to this question.

                            I use thread to run drools,and version is 6.5.0.Final. This will cause a calculation error. Looked at your answer, I used threadLocal to create kieSession. The result of my test is not bad. But I would like to know if there is a better solution to solve this problem.

                            Drools concurrent issues · Issue #9 · mswiderski/jbpm-examples · GitHub