Concurrency problems with JBPM 5.4
jkranes Jul 19, 2013 9:30 AMI am working on a JBPM 5.4 / jBoss 7.1.1 application and have been unable to get it to run reliably using multiple threads.
The basic approach I am taking is as follows:
- Process instances are triggered by an incoming message (AMQP/ RabbitMQ) received by a message listener. For optimal performance I need to be able to use multiple listener threads. (Ultimately there will be multiple application server host instances as well but first things first).
- I am creating a new StatefulKnowlegeSession for each new ProcessInstance.
- I am using a local HumanTaskService (note that I am able to trigger concurrency problems mentioned below even while using a workflow that has no user tasks in it).
- I am disposing of the session and its associated handlers immediately after starting the process. If I need to respond to an asynch signal (e.g. completion of a human task) I create the session by ID from the database (this is part of the scaling strategy - need to be able to continue the process at a future time on an entirely different machine).
- Mainly using Oracle (11g XE) but have also tested with MySQL and see the exact same issues.
I am using Spring with declarative transactions. I am using JTA transactions through the jBoss platform JTA transaction manager.
Here's what I am experiencing:
- Everything works fine as long as as I run a single thread only (i.e. have only one instance of my message listener class). I can run this way for hours without a single exception being thrown.
- With two or more message listeners, I get exceptions (not necessarily right away, but if I run enough iterations it happens pretty quickly).
- I see a variety of different exceptions each time I run the program. The exceptions vary but all are related to inability to commit, invalid sessions, inability to find a certain piece of data (the session or process instance), deadlocks, concurrent modification exceptions, etc. All the kinds of things you would expect in non threadsafe code.
- Once an exception is thrown on a given thread, it seems that the thread will just keep throwing the 'Entity Manager is Closed' exception. I am guessing there is something happenging here with a ThreadLocal tied to the thread that never gets cleared up once problems occur on that thread. If I run long enough, all threads end up with this problem and the system basically becomes useless and needs a full restart.
So without trying to debug each and every exception one at a time, I'm wondering if there is anything obvious I am doing in my code that would lead to a non threadsafe condition. I have read several threads on issues surrounding the StatefulKnowledgeSession so I have deliberately chosen to NOT share sessions between threads for that reason (and others).
A few main questions also:
- Wrapping the Spring JTA PlatformTransactionManager in the DroolsSpringTransactionManager: i have seen this in several examples but have not seen any good explanation of whether/when it is necessary.
- Use of the DroolsSpringJPAManager: again, have seen this in examples but no real documentation explaining it.
- Should the StatefulKnowledgeSession be created inside a transaction? Must it be the same transaction that it is used in? I know that it has to be disposed outside of the transaction boundaries.
Thanks for any help or suggestions. I'm happy to provide more code as needed; I wanted to keep this limited to what seems relevant rather than zip up an entire complex application and force people to wade through all of it.
-- Jon Kranes
Relevant code:
I have a wrapper class to hold a reference to the StatefulKnowledgeSession and its associated handlers, to make it easier to manage disposal:
public class SessionWrapper { private static Logger log = Logger.getLogger(SessionWrapper.class); private StatefulKnowledgeSession session; private GenericHTWorkItemHandler htHandler; private JPAWorkingMemoryDbLogger dbLogger; private LocalTaskService localTaskService; public void dispose() { try { log.info("disposing local task service"); localTaskService.dispose(); } catch (Exception e) { log.error(e); } try { // log.info("disposing db logger"); // this does not do anything. // dbLogger.dispose(); } catch (Exception e) { log.error(e); } try { log.info("disposing ht handler"); htHandler.dispose(); } catch (Exception e) { log.error(e); } try { log.info("Disposing session ID " + session.getId()); session.dispose(); } catch (Exception e) { log.error(e); } } public StatefulKnowledgeSession getSession() { return session; } public void setSession(StatefulKnowledgeSession session) { this.session = session; } public GenericHTWorkItemHandler getHtHandler() { return htHandler; } public void setHtHandler(GenericHTWorkItemHandler htHandler) { this.htHandler = htHandler; } public JPAWorkingMemoryDbLogger getDbLogger() { return dbLogger; } public void setDbLogger(JPAWorkingMemoryDbLogger dbLogger) { this.dbLogger = dbLogger; } public LocalTaskService getLocalTaskService() { return localTaskService; } public void setLocalTaskService(LocalTaskService localTaskService) { this.localTaskService = localTaskService; } }
The SessionWrapper instance is created here
public class KnowledgeSessionFactory { @Autowired KnowledgeBase kbase; @Autowired org.jbpm.task.service.TaskService taskService; @Autowired EntityManagerFactory emf; @Autowired EventDrivenTaskHandler handler; @Autowired DroolsSpringTransactionManager txManager; @Autowired AuditService auditService; @Autowired AuditTaskHandler auditHandler; private static final Logger log = Logger.getLogger(KnowledgeSessionFactory.class); public SessionWrapper createKnowledgeSession() { log.info("Creating new StatefulKnowledgeSession"); StatefulKnowledgeSession ksession = null; try { ksession = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, getEnvironment() ); } catch (Exception e) { log.error("Unable to create sesssion!"); log.error(e); throw new RuntimeException(e); } log.info("Session ID: " + ksession.getId()); return createSessionWrapper(ksession); } public SessionWrapper createKnowledgeSession(int sessionId) { log.info("Loading StatefulKnowledgeSession for ID: " + sessionId); StatefulKnowledgeSession ksession = JPAKnowledgeService.loadStatefulKnowledgeSession(sessionId, kbase, null, getEnvironment()); return createSessionWrapper(ksession); } private SessionWrapper createSessionWrapper(StatefulKnowledgeSession ksession) { LocalTaskService taskClient = new LocalTaskService(taskService); GenericHTWorkItemHandler htHandler = new LocalHTWorkItemHandler(taskClient, ksession); ksession.getWorkItemManager().registerWorkItemHandler("Human Task", htHandler); ksession.getWorkItemManager().registerWorkItemHandler("Task", handler); ksession.getWorkItemManager().registerWorkItemHandler("Audit", auditHandler); ksession.addEventListener(new DroolsListener()); // This allows access of the global in script tasks ksession.setGlobal("auditService", auditService); JPAWorkingMemoryDbLogger dbLogger = new JPAWorkingMemoryDbLogger(ksession); SessionWrapper wrapper = new SessionWrapper(); wrapper.setSession(ksession); wrapper.setHtHandler(htHandler); wrapper.setDbLogger(dbLogger); wrapper.setLocalTaskService(taskClient); return wrapper; } private Environment getEnvironment() { log.info("creating environment"); env = KnowledgeBaseFactory.newEnvironment(); env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf ); env.set(EnvironmentName.TRANSACTION_MANAGER, txManager); PersistenceContextManager persistenceContextManager = new DroolsSpringJpaManager(env); env.set(EnvironmentName.PERSISTENCE_CONTEXT_MANAGER, persistenceContextManager); log.info("environment created"); return env; } }
Here is the message driven bean that initiates starting new process instances:
public class StartProcessListener implements MessageListener { private static Logger log = Logger.getLogger(StartProcessListener.class); @Autowired EventService service; @Autowired KnowledgeSessionFactory factory; @Autowired SerializerMessageConverter converter; public void onMessage(Message message) { SessionWrapper wrapper = null; try { Map<String, Object> map = (Map<String, Object>)converter.fromMessage(message); Map<String, Object> params = new HashMap<String, Object>(); params.put("returnStatus", "VALID"); params.put("return", map.get("return")); String processId = (String)map.get("processId"); try { wrapper = service.createSessionWrapper(); service.startProcess(wrapper, processId, params); } catch (Exception e) { log.error(e); } } catch (Exception e) { log.error(e); } finally { if (wrapper != null) { wrapper.dispose(); } } }
And finally the service class that uses declarative transactions:
public class EventServiceImpl implements EventService { private static Logger log = Logger.getLogger(EventServiceImpl.class); @Autowired KnowledgeSessionFactory factory; @Autowired TaskService taskService; @Override @Transactional public SessionWrapper createSessionWrapper() { return factory.createKnowledgeSession(); } @Transactional @Override public void startProcess(SessionWrapper wrapper, String processId, Map<String, Object> params) { // SessionWrapper wrapper = factory.createKnowledgeSession(); StatefulKnowledgeSession ksession = wrapper.getSession(); // initializes the human task handler for the session // org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession); // this allows us to insert the processInstance into the session before starting it. ProcessInstance processInstance = ksession.createProcessInstance(processId, params); // for drools ksession.insert(processInstance); log.info("Starting process instance"); ksession.startProcessInstance(processInstance.getId()); //return wrapper; } @Transactional @Override public SessionWrapper completeWorkItem(int sessionId, long workItemId, Map<String, Object> results) { SessionWrapper wrapper = factory.createKnowledgeSession(sessionId); StatefulKnowledgeSession ksession = wrapper.getSession(); completeWorkItem(ksession, workItemId, results); return wrapper; } @Override @Transactional public SessionWrapper completeTask(int sessionId, Long taskId, Map<String, Object> results) { log.info("Completing task: " + taskId); for (String key : results.keySet()) { log.info(key + "=" + results.get(key)); } // LocalTaskService lts = new LocalTaskService(taskService); // StatefulKnowledgeSession ksession = factory.getKnowledgeSession(sessionId); SessionWrapper wrapper = factory.createKnowledgeSession(sessionId); StatefulKnowledgeSession ksession = wrapper.getSession(); // org.jbpm.task.TaskService lts = LocalHumanTaskService.getTaskService(ksession); org.jbpm.task.TaskService lts = wrapper.getLocalTaskService(); Task task = lts.getTask(taskId); // int sessionId = task.getTaskData().getProcessSessionId(); // completeWorkItem(sessionId, task.getTaskData().getWorkItemId(), results); try { lts.start(taskId, "USER"); } catch (Exception e) { log.error(e); } try { ContentData contentData = ContentMarshallerHelper.marshal(results, null); lts.complete(taskId, "USER", contentData); lts.disconnect(); // does this do anything useful? } catch (Exception e) { log.error(e); } // for drools ProcessInstance pi = ksession.getProcessInstance(task.getTaskData().getProcessInstanceId()); ksession.insert(pi); completeWorkItem(ksession, task.getTaskData().getWorkItemId(), results); return wrapper; } private void completeWorkItem(StatefulKnowledgeSession ksession, Long workItemId, Map<String, Object> results) { log.info("Completing work item: " + workItemId); ksession.getWorkItemManager().completeWorkItem(workItemId, results); } }
Spring configuration:
<?xml version="1.0" encoding="UTF-8"?> <beans ... > <context:annotation-config /> <!-- Loads all classes annotated with @Component or @Service --> <context:component-scan base-package="org.mitre.irs.eda.bpm"/> <context:property-placeholder location="classpath:eda.properties" order="2"/> <jee:jndi-lookup id="emf" jndi-name="persistence/emf"/> <!-- Spring JtaTransactionManager --> <bean id="jtaTxManager" class="org.springframework.transaction.jta.JtaTransactionManager" /> <bean id="txManager" class="org.drools.container.spring.beans.persistence.DroolsSpringTransactionManager" c:ptm-ref="jtaTxManager" /> <tx:annotation-driven transaction-manager="jtaTxManager" /> <jpa:repositories base-package="org.mitre.irs.eda.common.repo" entity-manager-factory-ref="emf" transaction-manager-ref="jtaTxManager" /> <bean id="jpaDialect" class="org.springframework.orm.jpa.vendor.HibernateJpaDialect" /> <!-- JBPM --> <bean id="sel" class="org.drools.SystemEventListenerFactory" factory-method="getSystemEventListener" /> <bean id="jbpmTaskService" class="org.jbpm.task.service.TaskService" c:emf-ref="emf" c:systemEventListener-ref="sel" /> <bean id="kbaseFactory" class="org.mitre.irs.eda.bpm.jbpm.KnowledgeBaseFactory" /> <bean id="kbase" factory-bean="kbaseFactory" factory-method="newKnowledgeBase" /> <!-- Rabbit MQ --> <rabbit:connection-factory id="connectionFactory" channel-cache-size="20" username="${amqp.username}" password="${amqp.password}" host="${amqp.host}" port="${amqp.port}" /> <rabbit:admin id="admin" connection-factory="connectionFactory" /> <bean id="serializerConverter" class="org.springframework.amqp.support.converter.SerializerMessageConverter" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="serializerConverter"/> <rabbit:queue id="start" name="process.start" /> <rabbit:queue id="task.start" name="process.task.start" /> <rabbit:queue id="task.complete" name="process.task.complete" /> <bean id="startListener" class="org.mitre.irs.eda.bpm.listener.StartProcessListener" /> <bean id="taskCompleteListener" class="org.mitre.irs.eda.bpm.listener.TaskCompleteListener" /> <rabbit:listener-container connection-factory="connectionFactory" concurrency="4"> <rabbit:listener queues="start" ref="startListener" /> <rabbit:listener queues="task.complete" ref="taskCompleteListener" /> </rabbit:listener-container> </beans>