12 Replies Latest reply on Oct 5, 2007 10:00 AM by motte1979

    seam and batch/queue-processing

    motte1979

      hello everybody !

      Is there an easy way in seam to implement batchprocessing ?

      On jboss-startup there should be a thread created which performes tasks
      in an iterative and synchron way.
      As i know @IntervalDuration and @Asynchronous does not wait for the previously started Thread - therefore it's not useful here.

      Thanks in advise for your input !
      Matthias

        • 1. Re: seam and batch/queue-processing
          motte1979

          Is there a way to ask seam, whether is is started up ?

          I tried to create an MBean which spawn's a thread doing the recurring job.
          But MBeans are initialized before Seam is up in JBoss ...

          Any ideas ?

          • 2. Re: seam and batch/queue-processing
            pmuir

            I don't understand why @Asynchronous doesn't work for you?

            • 3. Re: seam and batch/queue-processing
              motte1979

              If i start an Asynchronous method with an IntervalDuration assigned, the interval will be timed from the last 'start' of the method not from the the last execution-end.

              This causes the Tasks to be launched in parallel if the previous task was not finished because it needs longer than the interval is set.
              In fact i need a sequential execution of tasks in an asynchron fashion (a thread which runs from application-deploy to undeploy)

              • 4. Re: seam and batch/queue-processing
                pmuir

                So, if we fix the bug with not being able to schedule an async inside an async call, then you can do that right?

                • 5. Re: seam and batch/queue-processing
                  motte1979

                  No, i don't think so ...

                  I need something like the following
                  PseudoCode:

                  @Stateful
                  @Startup
                  @Name("batch")
                  @Scope (ScopeType.APPLICATION)
                  @TransactionManagement (TransactionManagementType.BEAN)
                  public class BatchBean implements Batch {
                  
                   private AtomicBoolean atomicBoolean = new AtomicBoolean();
                  
                  // Stateless bean
                   @In IncomingMessages incomingMessages;
                  // Stateless bean
                   @In Task task;
                   @Resource SessionContext sessionContext;
                  
                   @Asynchronous
                   @Create
                   public void parent () {
                  
                   atomicBoolean.set(Boolean.TRUE);
                   task.init (incomingMessages);
                  
                   while (atomicBoolean.get()) {
                  
                   try {
                   Thread.sleep(100);
                   } catch (InterruptedException ie ) {
                   // nothing
                   }
                  
                  
                   while (atomicBoolean.get()) {
                   UserTransaction tx = sessionContext.getUserTransaction();
                   try {
                   tx.begin();
                   Object o = incomingMessages.next();
                   if (o != null) {
                   // synchonous call
                   task.doTask(o);
                   tx.commit();
                   } else {
                   tx.commit();
                   break;
                   }
                   } catch (Exception e) {
                   tx.rollback();
                   }
                   }
                   }
                   }
                  
                   @Shutdown
                   public void destroy () {
                   task.finishLastMessageAndClose();
                   atomicBoolean.set(Boolean.FALSE);
                   }
                  }
                  



                  @Shutdown is called before @Destroy and @Remove on undeployment and server-shutdown

                  This approach is not possible because an applicationbean cannot be invoked by two different threads at the same time ... (the parent- and the destroy-method)
                  Second there is no @Shutdown which waites for the asynchronous 'parent'-method to finish after invokation of the @Shutdown - method ...

                  As asked in another thread i had also problems to get BEAN-managed transactions work with seam-components .. For the most components i need CONTAINER-managed Transactions, therefore i've installed
                  <transaction:ejb-transaction/>
                  in components.xml

                  The MBean - approach above (Fri Sep 21, 2007 10:41 AM) creates a simple Thread, initialize it with the beans and have it run, but this has the sideeffect, that the method-invokations incomingMessages.next() and task.doTask() are running both in there own transactions ... that could cause the loss of messages ... ). As described before there is also a problem to synchronize the MBean with the Component-creation (Task, IncomingMessages) because the MBean is earlier up than seam. And second the MBean lives on shutdown and on undeploy longer than seam ..

                  Maybe i'm mixing some concepts which should better stand alone ..
                  I've not much experience with EJB3, Jboss, etc.
                  ;)




                  • 6. Re: seam and batch/queue-processing
                    motte1979

                    the @Asynchronous inside the @Asynchronous - topic is needed for another task in my application ...

                    • 7. Re: seam and batch/queue-processing
                      motte1979

                      Have you got the idea what i'm doing ? Is jBPM an option for doing this ?

                      • 8. Re: seam and batch/queue-processing
                        pmuir

                        My suggestion was to use an @Asynchronous method which schedules another @Asynchronous method when it completes.

                        No, don't know what you are trying to do as you've never explained that.

                        • 9. Re: seam and batch/queue-processing
                          motte1979

                          I tried to explain it by posting pseudo-code ...

                          I like to create a simple service which runs in it's own thread from startup of seam until shutdown. The service itselve performs synchron tasks ..

                          Hmm. How could i explain it better ?

                          • 10. Re: seam and batch/queue-processing
                            pmuir

                            No, you're still explaining how you want to solve your problem, not what your problem is.

                            Your solution is definitely an "anti-pattern" when used with Seam.

                            • 11. Re: seam and batch/queue-processing
                              kenglover

                              Maybe this will help...

                              I needed to schedule a regular recurring job from startup to shutdown. This job polls out LDAP servers and keeps a list of the ones that are up.

                              There are 2 components, a controller, and the asynchronous job. The controller runs at startup and schedules the job, keeping a timer handle in the application scope.

                              The async job compares the timer handle from the controller and terminates if it does not work (to ensure we only have one job running). Note that a crash or kill of the JBoss server may leave an extra scheduled job, which is why the official one is checked for.

                              Here is the controller:

                              @Name("LDAPWatcherController")
                              @Startup
                              @Scope(ScopeType.APPLICATION)
                              public class LDAPWatcherController {
                              
                               @In(create=true)
                               private LDAPWatcher LDAPWatcher;
                              
                               @Logger
                               private Log log;
                              
                               private TimerHandle timerHandle;
                              
                               @Create
                               public void startup() throws Exception
                               {
                               log.debug("Starting checkLDAP timer");
                               Timer timer = LDAPWatcher.checkLDAP( new Long(60000));
                               timerHandle = timer.getHandle();
                               }
                              
                               @Destroy
                               public void shutdown()
                               {
                               log.debug("Shutting down timed event " + timerHandle.toString());
                               timerHandle.getTimer().cancel();
                               }
                              
                               public Timer getTimerHandle()
                               {
                               return timerHandle.getTimer();
                               }
                              
                              }
                              


                              Here is the LDAP Watcher (async job). The LDAPWatcher that is implemented is a simple interface class that defines the public method with an @Asynchronous above it.
                              @Stateless
                              @Name("LDAPWatcher")
                              public class LDAPWatcherImpl implements LDAPWatcher {
                               @Logger
                               private Log log;
                              
                               @In
                               private Timer timer;
                              
                               public Timer checkLDAP(Long interval) {
                               log.info("Checking LDAPs " + this.toString());
                              
                               // Make sure this is the "official" job and that we do not have too many of these running.
                               LDAPWatcherController appStart = (LDAPWatcherController) Contexts.getApplicationContext().get("LDAPWatcherController");
                               log.debug("My Info :" + this.timer.getInfo().toString());
                               log.debug("App Info:" + appStart.getTimerHandle().getInfo().toString());
                               if (this.timer.getInfo() != appStart.getTimerHandle().getInfo())
                               {
                               log.info("I am not the timer that should be running. Stopping myself...");
                               this.timer.cancel();
                               return null;
                               }
                              
                               // Now on with the work
                               [... snip ...]
                              
                               return null;
                               }
                              
                              }
                              




                              • 12. Re: seam and batch/queue-processing
                                motte1979

                                @kenglover: Thanks, but i'm using Quartz not EJB-Timers. I'm sorry, should have told it.

                                @pete.muir: I tried the approach by scheduling an asynchronous task at startup, which performs some operations and gives the control for the next operation to a new asynchronous task. This works fine for some time, but it always ends up in a java.lang.StackOverflowError.

                                Caused by: java.lang.StackOverflowError
                                 at java.lang.ClassLoader.findBootstrapClass(Native Method)
                                 at java.lang.ClassLoader.findBootstrapClass0(ClassLoader.java:891)
                                 at java.lang.ClassLoader.loadClass(ClassLoader.java:301)
                                 at java.lang.ClassLoader.loadClass(ClassLoader.java:299)
                                 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:268)
                                 at java.lang.ClassLoader.loadClass(ClassLoader.java:299)
                                 at java.lang.ClassLoader.loadClass(ClassLoader.java:251)
                                 at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:319)
                                 at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:58)
                                 at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:342)
                                 at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:304)
                                 at org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:236)
                                 at org.apache.log4j.WriterAppender.append(WriterAppender.java:159)
                                 at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:230)
                                 at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:65)
                                 at org.apache.log4j.Category.callAppenders(Category.java:203)
                                 at org.apache.log4j.Category.forcedLog(Category.java:388)
                                 at org.apache.log4j.Category.log(Category.java:853)
                                 at org.jboss.seam.log.Log4JProvider.error(Log4JProvider.java:57)
                                 at org.jboss.seam.log.LogImpl.error(LogImpl.java:121)
                                


                                The new instance is created by calling.

                                (Task) Component.getInstance(Task.NAME, ScopeType.STATELESS, true);
                                


                                I'm a little frustrated about this ..

                                I'd like to poll the database for new messages (they are stored in a table) and perform some tasks/checks with it to get it into another sink (another table).
                                It's a system where n foreign processes send messages via http/POST. They get serialized into the first table and are processed later in a synchron recurring way by the Task. (because reordering has to be done, etc.)