12 Replies Latest reply: Oct 5, 2007 10:00 AM by Matthias Huber RSS

seam and batch/queue-processing

Matthias Huber Newbie

hello everybody !

Is there an easy way in seam to implement batchprocessing ?

On jboss-startup there should be a thread created which performes tasks
in an iterative and synchron way.
As i know @IntervalDuration and @Asynchronous does not wait for the previously started Thread - therefore it's not useful here.

Thanks in advise for your input !
Matthias

  • 1. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    Is there a way to ask seam, whether is is started up ?

    I tried to create an MBean which spawn's a thread doing the recurring job.
    But MBeans are initialized before Seam is up in JBoss ...

    Any ideas ?

  • 2. Re: seam and batch/queue-processing
    Pete Muir Master

    I don't understand why @Asynchronous doesn't work for you?

  • 3. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    If i start an Asynchronous method with an IntervalDuration assigned, the interval will be timed from the last 'start' of the method not from the the last execution-end.

    This causes the Tasks to be launched in parallel if the previous task was not finished because it needs longer than the interval is set.
    In fact i need a sequential execution of tasks in an asynchron fashion (a thread which runs from application-deploy to undeploy)

  • 4. Re: seam and batch/queue-processing
    Pete Muir Master

    So, if we fix the bug with not being able to schedule an async inside an async call, then you can do that right?

  • 5. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    No, i don't think so ...

    I need something like the following
    PseudoCode:

    @Stateful
    @Startup
    @Name("batch")
    @Scope (ScopeType.APPLICATION)
    @TransactionManagement (TransactionManagementType.BEAN)
    public class BatchBean implements Batch {
    
     private AtomicBoolean atomicBoolean = new AtomicBoolean();
    
    // Stateless bean
     @In IncomingMessages incomingMessages;
    // Stateless bean
     @In Task task;
     @Resource SessionContext sessionContext;
    
     @Asynchronous
     @Create
     public void parent () {
    
     atomicBoolean.set(Boolean.TRUE);
     task.init (incomingMessages);
    
     while (atomicBoolean.get()) {
    
     try {
     Thread.sleep(100);
     } catch (InterruptedException ie ) {
     // nothing
     }
    
    
     while (atomicBoolean.get()) {
     UserTransaction tx = sessionContext.getUserTransaction();
     try {
     tx.begin();
     Object o = incomingMessages.next();
     if (o != null) {
     // synchonous call
     task.doTask(o);
     tx.commit();
     } else {
     tx.commit();
     break;
     }
     } catch (Exception e) {
     tx.rollback();
     }
     }
     }
     }
    
     @Shutdown
     public void destroy () {
     task.finishLastMessageAndClose();
     atomicBoolean.set(Boolean.FALSE);
     }
    }
    



    @Shutdown is called before @Destroy and @Remove on undeployment and server-shutdown

    This approach is not possible because an applicationbean cannot be invoked by two different threads at the same time ... (the parent- and the destroy-method)
    Second there is no @Shutdown which waites for the asynchronous 'parent'-method to finish after invokation of the @Shutdown - method ...

    As asked in another thread i had also problems to get BEAN-managed transactions work with seam-components .. For the most components i need CONTAINER-managed Transactions, therefore i've installed
    <transaction:ejb-transaction/>
    in components.xml

    The MBean - approach above (Fri Sep 21, 2007 10:41 AM) creates a simple Thread, initialize it with the beans and have it run, but this has the sideeffect, that the method-invokations incomingMessages.next() and task.doTask() are running both in there own transactions ... that could cause the loss of messages ... ). As described before there is also a problem to synchronize the MBean with the Component-creation (Task, IncomingMessages) because the MBean is earlier up than seam. And second the MBean lives on shutdown and on undeploy longer than seam ..

    Maybe i'm mixing some concepts which should better stand alone ..
    I've not much experience with EJB3, Jboss, etc.
    ;)




  • 6. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    the @Asynchronous inside the @Asynchronous - topic is needed for another task in my application ...

  • 7. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    Have you got the idea what i'm doing ? Is jBPM an option for doing this ?

  • 8. Re: seam and batch/queue-processing
    Pete Muir Master

    My suggestion was to use an @Asynchronous method which schedules another @Asynchronous method when it completes.

    No, don't know what you are trying to do as you've never explained that.

  • 9. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    I tried to explain it by posting pseudo-code ...

    I like to create a simple service which runs in it's own thread from startup of seam until shutdown. The service itselve performs synchron tasks ..

    Hmm. How could i explain it better ?

  • 10. Re: seam and batch/queue-processing
    Pete Muir Master

    No, you're still explaining how you want to solve your problem, not what your problem is.

    Your solution is definitely an "anti-pattern" when used with Seam.

  • 11. Re: seam and batch/queue-processing
    Ken Glover Newbie

    Maybe this will help...

    I needed to schedule a regular recurring job from startup to shutdown. This job polls out LDAP servers and keeps a list of the ones that are up.

    There are 2 components, a controller, and the asynchronous job. The controller runs at startup and schedules the job, keeping a timer handle in the application scope.

    The async job compares the timer handle from the controller and terminates if it does not work (to ensure we only have one job running). Note that a crash or kill of the JBoss server may leave an extra scheduled job, which is why the official one is checked for.

    Here is the controller:

    @Name("LDAPWatcherController")
    @Startup
    @Scope(ScopeType.APPLICATION)
    public class LDAPWatcherController {
    
     @In(create=true)
     private LDAPWatcher LDAPWatcher;
    
     @Logger
     private Log log;
    
     private TimerHandle timerHandle;
    
     @Create
     public void startup() throws Exception
     {
     log.debug("Starting checkLDAP timer");
     Timer timer = LDAPWatcher.checkLDAP( new Long(60000));
     timerHandle = timer.getHandle();
     }
    
     @Destroy
     public void shutdown()
     {
     log.debug("Shutting down timed event " + timerHandle.toString());
     timerHandle.getTimer().cancel();
     }
    
     public Timer getTimerHandle()
     {
     return timerHandle.getTimer();
     }
    
    }
    


    Here is the LDAP Watcher (async job). The LDAPWatcher that is implemented is a simple interface class that defines the public method with an @Asynchronous above it.
    @Stateless
    @Name("LDAPWatcher")
    public class LDAPWatcherImpl implements LDAPWatcher {
     @Logger
     private Log log;
    
     @In
     private Timer timer;
    
     public Timer checkLDAP(Long interval) {
     log.info("Checking LDAPs " + this.toString());
    
     // Make sure this is the "official" job and that we do not have too many of these running.
     LDAPWatcherController appStart = (LDAPWatcherController) Contexts.getApplicationContext().get("LDAPWatcherController");
     log.debug("My Info :" + this.timer.getInfo().toString());
     log.debug("App Info:" + appStart.getTimerHandle().getInfo().toString());
     if (this.timer.getInfo() != appStart.getTimerHandle().getInfo())
     {
     log.info("I am not the timer that should be running. Stopping myself...");
     this.timer.cancel();
     return null;
     }
    
     // Now on with the work
     [... snip ...]
    
     return null;
     }
    
    }
    




  • 12. Re: seam and batch/queue-processing
    Matthias Huber Newbie

    @kenglover: Thanks, but i'm using Quartz not EJB-Timers. I'm sorry, should have told it.

    @pete.muir: I tried the approach by scheduling an asynchronous task at startup, which performs some operations and gives the control for the next operation to a new asynchronous task. This works fine for some time, but it always ends up in a java.lang.StackOverflowError.

    Caused by: java.lang.StackOverflowError
     at java.lang.ClassLoader.findBootstrapClass(Native Method)
     at java.lang.ClassLoader.findBootstrapClass0(ClassLoader.java:891)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:301)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:299)
     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:268)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:299)
     at java.lang.ClassLoader.loadClass(ClassLoader.java:251)
     at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:319)
     at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:58)
     at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:342)
     at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:304)
     at org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:236)
     at org.apache.log4j.WriterAppender.append(WriterAppender.java:159)
     at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:230)
     at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:65)
     at org.apache.log4j.Category.callAppenders(Category.java:203)
     at org.apache.log4j.Category.forcedLog(Category.java:388)
     at org.apache.log4j.Category.log(Category.java:853)
     at org.jboss.seam.log.Log4JProvider.error(Log4JProvider.java:57)
     at org.jboss.seam.log.LogImpl.error(LogImpl.java:121)
    


    The new instance is created by calling.

    (Task) Component.getInstance(Task.NAME, ScopeType.STATELESS, true);
    


    I'm a little frustrated about this ..

    I'd like to poll the database for new messages (they are stored in a table) and perform some tasks/checks with it to get it into another sink (another table).
    It's a system where n foreign processes send messages via http/POST. They get serialized into the first table and are processed later in a synchron recurring way by the Task. (because reordering has to be done, etc.)