1 2 Previous Next 19 Replies Latest reply on Sep 29, 2009 5:54 AM by chtimi2 Go to original post
      • 15. Re: Asynchronicity and transaction context propagation
        marklittle

        For more information about CheckedAction look in the ArjunaCore Programmer's Guide. If that doesn't answer your questions then let us know.

        • 16. Re: Asynchronicity and transaction context propagation

          Are you referring to the TrCoreProgrammersGuide pdf? I think so (was probably renamed) but just to be sure.

          I did find something of interest in it:

          "TrCoreProgrammersGuide.pdf page33 (Checking transactions)" wrote:

          When a thread attempts to terminate the transaction and there are active threads within it, the system will invoke the check method on the transaction’s CheckedAction object.
          The parameters to the check method are:
          - isCommit: indicates whether the transaction is in the process of committing or rolling back.
          - actUid: the transaction identifier.
          - list: a list of all of the threads currently marked as active within this transaction.
          When check returns, the transaction termination will continue. Obviously the state of the transaction at this point may be different from that when check was called, e.g., the transaction may subsequently have been committed.

          As i thought, the check method is passed a list of the active threads.
          If i'm not mistaken, i would just need to implement a BlockingCheckedAction, that calls join() on all those threads, and use it for the global transaction.

          To sum it up:
          1/ Launch the server with -Dcom.arjuna.ats.jts.checkedTransactions=YES to enable checked transactions
          2/ Implement BlockingCheckedAction that does:
          for (Thread t: list) { t.join() }

          3/ Just after the global transaction has started i would need to do
          Current current = OTSManager.get_current().
          current.setCheckedAction ( new BlockingCheckedAction() );



          I thought of a simple unit test to verify the correctness of the transactional behaviour.
          Do you think this would work? I prefer asking before i start spending time integrating another TransactionManager in Spring.

          1/ UnitTest (UT) calls transactional service void updateXY ( int X , int Y , boolean failOnUpdatingY ) synchronously
          2/ The global transaction is initiated and updateXY updates X
          3/ updateXY then calls transactional service updateY ( int Y , boolean failOnUpdatingY ) asynchronously
          4/ updateY executes within the same global transaction; if (failOnUpdatingY) a RuntimeException is thrown, which must rollback the global transaction
          5/ UT sleeps for a second, by then the transaction should have been committed or rolled back
          6/ UT calls getX and getY
          7/ Verifications:
          Case ! failOnUpdatingY: X and Y should have their new values because the global transaction has been committed
          Case failOnUpdatingY: X and Y should have their old values because the global transaction has been rolled back



          • 17. Re: Asynchronicity and transaction context propagation
            marklittle

             

            "chtimi2" wrote:
            Are you referring to the TrCoreProgrammersGuide pdf? I think so (was probably renamed) but just to be sure.


            Yes.


            As i thought, the check method is passed a list of the active threads.
            If i'm not mistaken, i would just need to implement a BlockingCheckedAction, that calls join() on all those threads, and use it for the global transaction.


            That should work.


            To sum it up:
            1/ Launch the server with -Dcom.arjuna.ats.jts.checkedTransactions=YES to enable checked transactions


            Last time I checked it was on by default.


            2/ Implement BlockingCheckedAction that does:
            for (Thread t: list) { t.join() }

            3/ Just after the global transaction has started i would need to do
            Current current = OTSManager.get_current().
            current.setCheckedAction ( new BlockingCheckedAction() );



            I thought of a simple unit test to verify the correctness of the transactional behaviour.
            Do you think this would work? I prefer asking before i start spending time integrating another TransactionManager in Spring.

            1/ UnitTest (UT) calls transactional service void updateXY ( int X , int Y , boolean failOnUpdatingY ) synchronously
            2/ The global transaction is initiated and updateXY updates X
            3/ updateXY then calls transactional service updateY ( int Y , boolean failOnUpdatingY ) asynchronously
            4/ updateY executes within the same global transaction; if (failOnUpdatingY) a RuntimeException is thrown, which must rollback the global transaction
            5/ UT sleeps for a second, by then the transaction should have been committed or rolled back
            6/ UT calls getX and getY
            7/ Verifications:
            Case ! failOnUpdatingY: X and Y should have their new values because the global transaction has been committed
            Case failOnUpdatingY: X and Y should have their old values because the global transaction has been rolled back


            Assuming you're not looking at distributed invocations here then that looks like it should work (but I haven't put a lot of time into thinking about it.)

            • 18. Re: Asynchronicity and transaction context propagation

              Allright, looks like it is worth trying then. I'll let you know how it goes.

              • 19. Re: Asynchronicity and transaction context propagation

                Alright. The first step is to write a unit test that asserts checked transaction semantics.
                1/ atomicitySync is the standard synchronous case, and passes
                2/ atomicityAsync is the asynchronous case, and fails

                I use JBoss Cache (core edition) as the transactional resource.

                Now the goal is to make 2/ pass in steps:
                -same test, same results, but using JBossTS (verify correct JBTS integration)
                -set up JBossTS as explained in the previous post to make it pass

                But first do you agree with my test?


                Here is the test:

                @RunWith(SpringJUnit4ClassRunner.class)
                @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} )
                public class ComportementTransactionnelAsync implements ApplicationContextAware
                {
                 private Cache cache;
                 @Resource ( name="tracksTable" )
                 protected TracksTable table;
                 protected static ApplicationContext applicationContext;
                
                 @Test
                 //@Ignore
                 public void atomicitySync () throws Exception
                 {
                 atomicity ( false );
                 }
                
                 @Test
                 public void atomicityAsync () throws Exception
                 {
                 atomicity ( true );
                 }
                
                 private void atomicity ( boolean async ) throws Exception
                 {
                 final String A_OK="A", B_OK="B", A_KO="C" , B_KO="D";
                 updateAB ( A_OK , B_OK , async , false );
                 assertEquals ( A_OK , table.getA() );
                 assertEquals ( B_OK , table.getB() );
                
                 try
                 {
                 updateAB ( A_KO , B_KO , async , true );
                 fail ();
                 }
                 catch ( Exception e ) {}
                 finally
                 {
                 assertEquals ( B_OK , table.getB() );
                 assertEquals ( A_OK , table.getA() );
                 }
                 }
                
                 private void updateAB(String a, String b, boolean async, boolean failOnB) throws InterruptedException
                 {
                 table.updateAB ( a , b , async , failOnB );
                 if ( async )
                 {
                 /* Commented since getA can be blocked by updateAB since IsolationLevel=Serializable
                 * assertEquals ( "" , table.getA() ); //Transaction should not be committed yet
                 assertEquals ( "" , table.getB() ); //Transaction should not be committed yet*/
                
                 Thread.sleep(1500);
                 }
                 }
                
                 @Before
                 public void before ()
                 {
                 cache = createCoreCache ();
                 cache.start();
                 table.setCache ( cache );
                
                 table.resetAB ();
                 assertEquals ( "" , table.getA() );
                 assertEquals ( "" , table.getB() );
                 }
                
                 @After
                 public void stop ()
                 {
                 table.unsetCache ( cache );
                 cache.stop();
                 }
                
                 private Cache createCoreCache()
                 {
                 CacheFactory factory = new DefaultCacheFactory();
                 Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false);
                
                 cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
                
                 cache.create();
                 return cache;
                 }
                
                 @Override
                 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
                 {
                 TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" );
                 AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
                 }
                }


                And here is the service implementation:
                @Service("tracksTable")
                @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
                public class TracksTableImpl implements TracksTable
                {
                 //--------------JBC-----------------
                
                 private Cache coreCache;
                
                 @Override
                 public void updateAB ( String a , String b , boolean async , boolean failOnUpdateB )
                 {
                 setA ( a );
                 Thread t = new Thread ( new SleepAndSetB ( b , failOnUpdateB ) );
                 if ( async ) { t.start(); } else t.run();
                 }
                
                 private class SleepAndSetB implements Runnable
                 {
                 private final String b;
                 private final boolean failOnUpdateB;
                 SleepAndSetB ( String b, boolean failOnUpdateB ) { this.b = b; this.failOnUpdateB = failOnUpdateB; }
                 @Override public void run()
                 {
                 ObjectUtils.sleep(500);
                 if ( failOnUpdateB ) throw new TrackException ();
                 setB ( b );
                 }
                 }
                
                 @Override
                 public String getA()
                 {
                 Node rootNode = coreCache.getRoot();
                 return (String)rootNode.get ( "a" );
                 }
                
                 @Override
                 public String getB()
                 {
                 Node rootNode = coreCache.getRoot();
                 return (String)rootNode.get ( "b" );
                 }
                
                 private void setA ( String a )
                 {
                 System.out.println ( "setA [ a=" + a + " ]" );
                 Node rootNode = coreCache.getRoot();
                 rootNode.put ( "a" , a );
                 }
                
                 private void setB ( String b )
                 {
                 System.out.println ( "setB [ b=" + b + " ]" );
                 Node rootNode = coreCache.getRoot();
                 rootNode.put ( "b" , b );
                 }
                
                 @Override
                 public void resetAB()
                 {
                 setA ( "" );
                 setB ( "" );
                 }
                
                 @Override
                 public void setCache ( Cache cache )
                 {
                 coreCache = cache;
                 }
                
                 @Override
                 public void unsetCache ( Cache cache )
                 {
                 coreCache = null;
                 }
                }
                


                JBossCache conf (replSync-service.xml):
                <?xml version="1.0" encoding="UTF-8"?>
                <server>
                 <mbean code="org.jboss.cache.jmx.CacheJmxWrapper" name="jboss.cache:service=TreeCache">
                
                 <attribute name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute>
                 <attribute name="NodeLockingScheme">PESSIMISTIC</attribute>
                 <attribute name="IsolationLevel">SERIALIZABLE</attribute>
                 <attribute name="CacheMode">LOCAL</attribute>
                 <attribute name="UseReplQueue">false</attribute>
                 <attribute name="ReplQueueInterval">0</attribute>
                 <attribute name="ReplQueueMaxElements">0</attribute>
                 <attribute name="ClusterName">JBossCache-Cluster</attribute>
                
                 <attribute name="ClusterConfig">
                 <config>
                 <UDP mcast_addr="228.10.10.10"
                 mcast_port="45588"
                 tos="8"
                 ucast_recv_buf_size="20000000"
                 ucast_send_buf_size="640000"
                 mcast_recv_buf_size="25000000"
                 mcast_send_buf_size="640000"
                 loopback="false"
                 discard_incompatible_packets="true"
                 max_bundle_size="64000"
                 max_bundle_timeout="30"
                 use_incoming_packet_handler="true"
                 ip_ttl="2"
                 enable_bundling="false"
                 enable_diagnostics="true"
                
                 use_concurrent_stack="true"
                
                 thread_naming_pattern="pl"
                
                 thread_pool.enabled="true"
                 thread_pool.min_threads="1"
                 thread_pool.max_threads="25"
                 thread_pool.keep_alive_time="30000"
                 thread_pool.queue_enabled="true"
                 thread_pool.queue_max_size="10"
                 thread_pool.rejection_policy="Run"
                
                 oob_thread_pool.enabled="true"
                 oob_thread_pool.min_threads="1"
                 oob_thread_pool.max_threads="4"
                 oob_thread_pool.keep_alive_time="10000"
                 oob_thread_pool.queue_enabled="true"
                 oob_thread_pool.queue_max_size="10"
                 oob_thread_pool.rejection_policy="Run"/>
                
                 <PING timeout="2000" num_initial_members="3"/>
                 <MERGE2 max_interval="30000" min_interval="10000"/>
                 <FD_SOCK/>
                 <FD timeout="10000" max_tries="5" shun="true"/>
                 <VERIFY_SUSPECT timeout="1500"/>
                 <pbcast.NAKACK max_xmit_size="60000"
                 use_mcast_xmit="false" gc_lag="0"
                 retransmit_timeout="300,600,1200,2400,4800"
                 discard_delivered_msgs="true"/>
                 <UNICAST timeout="300,600,1200,2400,3600"/>
                 <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                 max_bytes="400000"/>
                 <pbcast.GMS print_local_addr="true" join_timeout="5000"
                 join_retry_timeout="2000" shun="false"
                 view_bundling="true" view_ack_collection_timeout="5000"/>
                 <FRAG2 frag_size="60000"/>
                 <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
                 <!-- <pbcast.STATE_TRANSFER/> -->
                 <pbcast.FLUSH timeout="0"/>
                 </config>
                 </attribute>
                
                
                 <attribute name="FetchInMemoryState">true</attribute>
                 <attribute name="StateRetrievalTimeout">15000</attribute>
                 <attribute name="SyncReplTimeout">15000</attribute>
                 <attribute name="LockAcquisitionTimeout">10000</attribute>
                 <attribute name="UseRegionBasedMarshalling">true</attribute>
                 </mbean>
                </server>
                


                Spring conf (application-context.xml):
                <?xml version="1.0" encoding="UTF-8"?>
                <beans xmlns="http://www.springframework.org/schema/beans"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:context="http://www.springframework.org/schema/context"
                 xmlns:tx="http://www.springframework.org/schema/tx"
                 xmlns:aop="http://www.springframework.org/schema/aop"
                 xsi:schemaLocation="
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
                
                 <!--CONF GENERALE vvvvv-->
                 <context:component-scan base-package="hellotrackworld"/>
                 <context:annotation-config/>
                 <!--CONF GENERALE ^^^^^-->
                
                
                 <!--PARAMETRAGE JTA vvvvv-->
                 <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
                 <property name="forceShutdown" value="true"/>
                 </bean>
                 <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
                 <property name="transactionTimeout" value="300"/>
                 </bean>
                
                 <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
                 <property name="transactionManager" ref="atomikosTransactionManager" />
                 <property name="userTransaction" ref="atomikosUserTransaction" />
                 <property name="allowCustomIsolationLevels" value="true" />
                 </bean>
                
                 <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
                 <!--PARAMETRAGE JTA ^^^^^-->
                
                </beans>
                
                



                1 2 Previous Next