-
15. Re: Transactions: atomicity OK, isolation KO
manik Sep 29, 2009 8:58 AM (in response to chtimi2)I'm going to focus on RR and the test hanging, since this is incorrect behaviour. Do you have a thread dump of this when it hangs?
-
16. Re: Transactions: atomicity OK, isolation KO
chtimi2 Sep 29, 2009 12:28 PM (in response to chtimi2)jconsole's deadlock detection (not that it is 100% fiable...)
findDeadlockedThreads: null findMonitorDeadlockedThreads: null
jstack output (the thread dump):D:\>jstack 4088 2009-09-29 18:24:16 Full thread dump Java HotSpot(TM) Client VM (14.0-b16 mixed mode): "RMI TCP Connection(5)-10.46.35.83" daemon prio=6 tid=0x0afb8400 nid=0xfa4 runna ble [0x0c62f000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:129) at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) at java.io.BufferedInputStream.read(BufferedInputStream.java:237) - locked <0x031ae1c0> (a java.io.BufferedInputStream) at java.io.FilterInputStream.read(FilterInputStream.java:66) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:5 17) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTranspor t.java:790) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport .java:649) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) "RMI TCP Connection(4)-10.46.35.83" daemon prio=6 tid=0x0b349400 nid=0x868 runna ble [0x0c5df000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:129) at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) at java.io.BufferedInputStream.read(BufferedInputStream.java:237) - locked <0x031ae3e0> (a java.io.BufferedInputStream) at java.io.FilterInputStream.read(FilterInputStream.java:66) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:5 17) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTranspor t.java:790) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport .java:649) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) "JMX server connection timeout 33" daemon prio=6 tid=0x0b4b8400 nid=0xab0 in Obj ect.wait() [0x0c4df000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x0317c3d0> (a [I) at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(Serve rCommunicatorAdmin.java:150) - locked <0x0317c3d0> (a [I) at java.lang.Thread.run(Thread.java:619) "RMI Scheduler(0)" daemon prio=6 tid=0x0b271400 nid=0xe68 waiting on condition [ 0x0c48f000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x03160d10> (a java.util.concurrent.locks.Abstra ctQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198 ) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject .awaitNanos(AbstractQueuedSynchronizer.java:1963) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.tak e(ScheduledThreadPoolExecutor.java:583) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.tak e(ScheduledThreadPoolExecutor.java:576) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.ja va:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:907) at java.lang.Thread.run(Thread.java:619) "RMI TCP Connection(1)-10.46.35.83" daemon prio=6 tid=0x0ab03400 nid=0xa28 in Ob ject.wait() [0x0c43e000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x031b2aa8> (a com.sun.jmx.remote.internal.ArrayNotificati onBuffer) at com.sun.jmx.remote.internal.ArrayNotificationBuffer.fetchNotification s(ArrayNotificationBuffer.java:417) - locked <0x031b2aa8> (a com.sun.jmx.remote.internal.ArrayNotificationBu ffer) at com.sun.jmx.remote.internal.ArrayNotificationBuffer$ShareBuffer.fetch Notifications(ArrayNotificationBuffer.java:209) at com.sun.jmx.remote.internal.ServerNotifForwarder.fetchNotifs(ServerNo tifForwarder.java:258) at javax.management.remote.rmi.RMIConnectionImpl$2.run(RMIConnectionImpl .java:1227) at javax.management.remote.rmi.RMIConnectionImpl$2.run(RMIConnectionImpl .java:1225) at javax.management.remote.rmi.RMIConnectionImpl.fetchNotifications(RMIC onnectionImpl.java:1231) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305) at sun.rmi.transport.Transport$1.run(Transport.java:159) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.Transport.serviceCall(Transport.java:155) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:5 35) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTranspor t.java:790) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport .java:649) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec utor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:908) at java.lang.Thread.run(Thread.java:619) "RMI TCP Accept-0" daemon prio=6 tid=0x0aaae400 nid=0xad0 runnable [0x0c3df000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390) - locked <0x03160e68> (a java.net.SocksSocketImpl) at java.net.ServerSocket.implAccept(ServerSocket.java:453) at java.net.ServerSocket.accept(ServerSocket.java:421) at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRM IServerSocketFactory.java:34) at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTr ansport.java:369) at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:3 41) at java.lang.Thread.run(Thread.java:619) "ReaderThread" prio=6 tid=0x0b0cd800 nid=0xdd4 runnable [0x0b46f000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:129) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158) - locked <0x02eae848> (a java.io.InputStreamReader) at java.io.InputStreamReader.read(InputStreamReader.java:167) at java.io.BufferedReader.fill(BufferedReader.java:136) at java.io.BufferedReader.readLine(BufferedReader.java:299) - locked <0x02eae848> (a java.io.InputStreamReader) at java.io.BufferedReader.readLine(BufferedReader.java:362) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner$ReaderThread.r un(RemoteTestRunner.java:140) "Low Memory Detector" daemon prio=6 tid=0x0aaa4400 nid=0xefc runnable [0x0000000 0] java.lang.Thread.State: RUNNABLE "CompilerThread0" daemon prio=10 tid=0x0aa9f800 nid=0xa7c waiting on condition [ 0x00000000] java.lang.Thread.State: RUNNABLE "Attach Listener" daemon prio=10 tid=0x0aa9d400 nid=0xe98 waiting on condition [ 0x00000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x0aa9c000 nid=0xe44 runnable [0x00000000 ] java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=8 tid=0x0aa8c400 nid=0xb0c in Object.wait() [0x0abff000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x02e76ec8> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x02e76ec8> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) "Reference Handler" daemon prio=10 tid=0x0aa87800 nid=0x838 in Object.wait() [0x 0abaf000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x02e76f50> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:485) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked <0x02e76f50> (a java.lang.ref.Reference$Lock) "main" prio=6 tid=0x0039a000 nid=0xbd4 waiting on condition [0x003ff000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x03147ce8> (a java.util.concurrent.CountDownLat ch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInt errupt(AbstractQueuedSynchronizer.java:747) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared Interruptibly(AbstractQueuedSynchronizer.java:905) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedIn terruptibly(AbstractQueuedSynchronizer.java:1217) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 1 at hellotrackworld.test.ComportementTransactionnelTableCoreCache.isolati on_RW_RW(ComportementTransactionnelTableCoreCache.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.test.context.junit4.SpringTestMethod.invoke(Sprin gTestMethod.java:163) at org.springframework.test.context.junit4.SpringMethodRoadie.runTestMet hod(SpringMethodRoadie.java:233) at org.springframework.test.context.junit4.SpringMethodRoadie$RunBefores ThenTestThenAfters.run(SpringMethodRoadie.java:333) at org.springframework.test.context.junit4.SpringMethodRoadie.runWithRep etitions(SpringMethodRoadie.java:217) at org.springframework.test.context.junit4.SpringMethodRoadie.runTest(Sp ringMethodRoadie.java:197) at org.springframework.test.context.junit4.SpringMethodRoadie.run(Spring MethodRoadie.java:143) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.invok eTestMethod(SpringJUnit4ClassRunner.java:142) at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRu nner.java:51) at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner. java:44) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.jav a:27) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java: 37) at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.ja va:42) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4 TestReference.java:45) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution .java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(Remot eTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(Remot eTestRunner.java:673) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTest Runner.java:386) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTes tRunner.java:196) "VM Thread" prio=10 tid=0x0aa83800 nid=0xcf0 runnable "VM Periodic Task Thread" prio=10 tid=0x0aab0400 nid=0xfc0 waiting on condition JNI global references: 811
Log output:[org.springframework.test.context.TestContextManager][main] - @TestExecutionListeners is not present for class [class hellotrackworld.test.ComportementTransactionnelTableCoreCache]: using defaults. [org.springframework.beans.factory.xml.XmlBeanDefinitionReader][main] - Loading XML bean definitions from class path resource [application-context_Jdbc_Atomikos.xml] [org.springframework.context.support.GenericApplicationContext][main] - Refreshing org.springframework.context.support.GenericApplicationContext@513cf0: display name [org.springframework.context.support.GenericApplicationContext@513cf0]; startup date [Tue Sep 29 18:08:45 CEST 2009]; root of context hierarchy [org.springframework.context.support.GenericApplicationContext][main] - Bean factory for application context [org.springframework.context.support.GenericApplicationContext@513cf0]: org.springframework.beans.factory.support.DefaultListableBeanFactory@1f1235b [org.springframework.beans.factory.support.DefaultListableBeanFactory][main] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1f1235b: defining beans [tracksTable,org.springframework.context.annotation.internalPersistenceAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,dataSource,jdbcTemplate,atomikosTransactionManager,atomikosUserTransaction,jtaTransactionManager,org.springframework.aop.config.internalAutoProxyCreator,org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#0,org.springframework.transaction.interceptor.TransactionInterceptor#0,org.springframework.transaction.config.internalTransactionAdvisor]; root of factory hierarchy Using init file: /D:/ff/jta.properties [org.springframework.transaction.jta.JtaTransactionManager][main] - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@bf5555 [org.springframework.transaction.jta.JtaTransactionManager][main] - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@17b4703 [org.jboss.cache.config.parsing.RootElementBuilder][main] - Configuration warning: cvc-elt.1: Cannot find the declaration of element 'server'. [org.jboss.cache.config.parsing.RootElementBuilder][main] - org.jboss.cache.config.ConfigurationException: Incorrect configuration file. Use '-Djbosscache.config.validate=false' to disable validation. !!!!!!!!!!!!SpringAtomikosTransactionManagerLookup!!!!!!!!!!!!!! [org.jboss.cache.jmx.PlatformMBeanServerRegistration][main] - JBossCache MBeans were successfully registered to the platform mbean server. [org.jboss.cache.factories.ComponentRegistry][main] - JBoss Cache version: JBossCache 'Naga' 3.0.0.GA [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-2] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-7] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-8] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-6] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-3] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-5] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-4] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update. [org.jboss.cache.mvcc.RepeatableReadNode][pool-1-thread-10] - Detected write skew on Fqn [/]. Another process has changed the node since we last read it!. Unable to copy node for update.
-
17. Re: Transactions: atomicity OK, isolation KO
chtimi2 Sep 30, 2009 10:48 AM (in response to chtimi2)Shame on me..
The test hanged because i had written:protected class AsyncIncrement implements Runnable { private final CountDownLatch latch; public AsyncIncrement ( CountDownLatch latch ) { this.latch = latch; } @Override public void run() { table.incrementScalar (); latch.countDown (); } }
instead of:protected class AsyncIncrement implements Runnable { private final CountDownLatch latch; public AsyncIncrement ( CountDownLatch latch ) { this.latch = latch; } @Override public void run() { try { table.incrementScalar (); } finally { latch.countDown (); } } }
-
18. Re: Transactions: atomicity OK, isolation KO
chtimi2 Sep 30, 2009 10:51 AM (in response to chtimi2)Now that this miserable "issue" is resolved, what do you think about my question in the previous post (regarding the SERIALIZABLE isolation level in MVCC and serializable snapshot isolation)?
-
19. Re: Transactions: atomicity OK, isolation KO
jason.greene Sep 30, 2009 6:16 PM (in response to chtimi2)Your test looks incorrect.
If you want to serialize access to something in MVCC, you just need to start a transaction ensure there is a write lock on the node, perform your operations and then commit. The lock is held for the length of the transaction. No longer. This is similar to select for update, in that you can use a node as a cooperative lock, effectively serializing everything using the same node in that manner.
There are two ways to do this:
1. Force a write lock on a read:
tx.begin();
cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
count = cache.get("/foo/mynode", "counter");
cache.put("/foo/mynode", count + 1);
tx.commit();
This has to be done immediately before the read operation. It lasts for only one cache operation. Also there is a drawback that it does not work across a cluster, it is only useful for local cache usage.
2. Write to a key first in the transaction, this forces a write lock to be aquired on the node such that all subsequent reads are current. This also works across a cluster.
tx.begin();
cache.put("/foo/mynode", "_lockthisplease_");
count = cache.get("/foo/mynode", "counter");
cache.put("/foo/mynode", count + 1);
tx.commit();
Hopefully that clears it up. -
20. Re: Transactions: atomicity OK, isolation KO
jason.greene Sep 30, 2009 6:21 PM (in response to chtimi2)"jason.greene@jboss.com" wrote:
It lasts for only one cache operation..
To clarify, I mean that any kind of invocation flag (the write-lock flag), only applies to the subsequent method call. So if you want to lock 2 different reads, you have to repeat it. The locks are still held for the transaction.
cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
cache.get("/blah1");
cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true);
cache.get("/blah2"); -
21. Re: Transactions: atomicity OK, isolation KO
chtimi2 Oct 1, 2009 4:12 AM (in response to chtimi2)I see, i didn't know that about the scope setForceWriteLock.
Actually doing a write is another idea that i hadn't tried.
But i tried doing either and both, and the test still fails.
Here are my changes to the service implementation:@Service("tracksTable") @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000) public class TracksTableImpl implements TracksTable { private Cache cache; @Override public int getScalar() { return (Integer)cache.get("/foo/mynode", "scalar"); } @Override public void incrementScalar() { cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true); cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_"); int tmp = getScalar(); tmp++; setScalar ( tmp ); } @Override public void initScalar() { setScalar ( 0 ); } private void setScalar ( int scalar ) { cache.put ( "/foo/mynode" , "scalar" , scalar ); } @Override public void setCache ( Cache cache ) { this.cache = cache; } @Override public void unsetCache () { this.cache = null; } }
-
22. Re: Transactions: atomicity OK, isolation KO
chtimi2 Oct 1, 2009 5:09 AM (in response to chtimi2)I have written the same test without using Spring.
The test:package hellotrackworld.test; import static org.junit.Assert.assertEquals; import hellotrackworld.TracksTable; import hellotrackworld.impl.TracksTableImpl; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.transaction.TransactionManager; import org.jboss.cache.Cache; import org.jboss.cache.CacheFactory; import org.jboss.cache.DefaultCacheFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.springframework.context.ApplicationContext; public class ComportementTransactionnelTableCoreCache { private Cache cache; protected TracksTable table = new TracksTableImpl (); protected final ExecutorService executorService = Executors.newCachedThreadPool(); protected static ApplicationContext applicationContext; @Test public void isolation_RW_RW () throws Exception { table.initScalar (); int nbWriters = 10; CountDownLatch latch = new CountDownLatch ( nbWriters ); for ( int i = 0 ; i < nbWriters ; i ++ ) { executorService.submit ( new AsyncIncrement ( latch ) ); } latch.await (); assertEquals ( nbWriters , table.getScalar() ); } protected class AsyncIncrement implements Runnable { private final CountDownLatch latch; public AsyncIncrement ( CountDownLatch latch ) { this.latch = latch; } @Override public void run() { try { table.incrementScalar (); } finally { latch.countDown (); } } } @Before public void before () { cache = initPojoCache (); cache.start(); table.setCache ( cache ); table.initScalar(); assertEquals ( 0 , table.getScalar() ); } @After public void stop () { table.unsetCache (); cache.stop(); } private Cache initPojoCache() { CacheFactory factory = new DefaultCacheFactory(); Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false); TransactionManager tm = new com.atomikos.icatch.jta.UserTransactionManager (); cache.getConfiguration().getRuntimeConfig().setTransactionManager(tm); cache.create(); return cache; } }
The service implementation:package hellotrackworld.impl; import javax.transaction.UserTransaction; import hellotrackworld.TracksTable; import org.jboss.cache.Cache; public class TracksTableImpl implements TracksTable { private Cache cache; private int scalar; @Override public void incrementScalar() { try { UserTransaction tx = getTx (); tx.begin(); cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_"); cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true); int tmp = (Integer)cache.get("/foo/mynode", "scalar"); tmp++; cache.put ( "/foo/mynode" , "scalar" , tmp ); tx.commit(); } catch ( Exception e ) { e.printStackTrace(); throw new RuntimeException (); } } @Override public int getScalar() { try { UserTransaction tx = getTx (); tx.begin(); int ret = (Integer)cache.get("/foo/mynode", "scalar"); tx.commit(); return ret; } catch ( Exception e ) { e.printStackTrace(); throw new RuntimeException (); } } @Override public void initScalar() { try { UserTransaction tx = getTx (); tx.begin(); cache.put ( "/foo/mynode" , "scalar" , 0 ); tx.commit(); } catch ( Exception e ) { e.printStackTrace(); throw new RuntimeException (); } } @Override public void setCache ( Cache cache ) { this.cache = cache; } @Override public void unsetCache () { this.cache = null; } private UserTransaction getTx() { return new com.atomikos.icatch.jta.UserTransactionImp(); } }
The replSync-service:<?xml version="1.0" encoding="UTF-8"?> <server> <mbean code="org.jboss.cache.jmx.CacheJmxWrapper" name="jboss.cache:service=TreeCache"> <!--<attribute name="NodeLockingScheme">PESSIMISTIC</attribute> <attribute name="IsolationLevel">SERIALIZABLE</attribute>--> <attribute name="CacheMode">LOCAL</attribute> <attribute name="UseReplQueue">false</attribute> <attribute name="ReplQueueInterval">0</attribute> <attribute name="ReplQueueMaxElements">0</attribute> <attribute name="ClusterName">JBossCache-Cluster</attribute> <attribute name="ClusterConfig"> <config> <UDP mcast_addr="228.10.10.10" mcast_port="45588" tos="8" ucast_recv_buf_size="20000000" ucast_send_buf_size="640000" mcast_recv_buf_size="25000000" mcast_send_buf_size="640000" loopback="false" discard_incompatible_packets="true" max_bundle_size="64000" max_bundle_timeout="30" use_incoming_packet_handler="true" ip_ttl="2" enable_bundling="false" enable_diagnostics="true" use_concurrent_stack="true" thread_naming_pattern="pl" thread_pool.enabled="true" thread_pool.min_threads="1" thread_pool.max_threads="25" thread_pool.keep_alive_time="30000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="10" thread_pool.rejection_policy="Run" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="4" oob_thread_pool.keep_alive_time="10000" oob_thread_pool.queue_enabled="true" oob_thread_pool.queue_max_size="10" oob_thread_pool.rejection_policy="Run"/> <PING timeout="2000" num_initial_members="3"/> <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> <FD timeout="10000" max_tries="5" shun="true"/> <VERIFY_SUSPECT timeout="1500"/> <pbcast.NAKACK max_xmit_size="60000" use_mcast_xmit="false" gc_lag="0" retransmit_timeout="300,600,1200,2400,4800" discard_delivered_msgs="true"/> <UNICAST timeout="300,600,1200,2400,3600"/> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="400000"/> <pbcast.GMS print_local_addr="true" join_timeout="5000" join_retry_timeout="2000" shun="false" view_bundling="true" view_ack_collection_timeout="5000"/> <FRAG2 frag_size="60000"/> <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/> <!-- <pbcast.STATE_TRANSFER/> --> <pbcast.FLUSH timeout="0"/> </config> </attribute> <attribute name="FetchInMemoryState">true</attribute> <attribute name="StateRetrievalTimeout">15000</attribute> <attribute name="SyncReplTimeout">15000</attribute> <attribute name="LockAcquisitionTimeout">10000</attribute> <attribute name="UseRegionBasedMarshalling">true</attribute> </mbean> </server>
With this configuration the outcome is identical: passes with pessimistic locking, fails otherwise (incorrect scalar) even using setForceWriteLock and/or actually writing to the node.
If i add the optioncache.getConfiguration().setWriteSkewCheck(true);
, the write fails withorg.jboss.cache.optimistic.DataVersioningException: Detected write skew on Fqn [/foo/mynode]. Another process has changed the node since we last read it!
like it did in the test that uses Spring.
JBC detects a write skew, so the writes were not Serialized. -
23. Re: Transactions: atomicity OK, isolation KO
chtimi2 Oct 1, 2009 5:12 AM (in response to chtimi2)I can't edit my post but the spring import in the test refers to the private UNUSED attribute ApplicationContext.
-
24. Re: Transactions: atomicity OK, isolation KO
chtimi2 Oct 13, 2009 4:32 AM (in response to chtimi2)No ideas? Is there no way to serialize the writes in MVCC?
-
25. Re: Transactions: atomicity OK, isolation KO
jason.greene Oct 15, 2009 12:54 PM (in response to chtimi2)private UserTransaction getTx() { return new com.atomikos.icatch.jta.UserTransactionImp(); }
You have to use the same transaction manager that JBoss Cache is configured to use. This is probably your problem. -
26. Re: Transactions: atomicity OK, isolation KO
chtimi2 Oct 22, 2009 11:04 AM (in response to chtimi2)Am i not already using "the same TM that JBC is using"?
I create the cache with the following:private Cache initPojoCache() { CacheFactory factory = new DefaultCacheFactory(); Cache cache = factory.createCache("resources/META-INF/replSync-service2.xml", false); TransactionManager tm = new com.atomikos.icatch.jta.UserTransactionManager (); cache.getConfiguration().getRuntimeConfig().setTransactionManager(tm); cache.getConfiguration().setWriteSkewCheck(true); cache.create(); return cache; }
And i create a new transaction with:return new com.atomikos.icatch.jta.UserTransactionImp();
So it looks to me i'm using the same TM.
But indeed maybe a good test would be using JBC's default TM. How do i do that (specifically creating a new UserTransaction)? I can't see anything to that effect in the doc.
By the way i now use the new JBC config (here with everything default):<?xml version="1.0" encoding="UTF-8"?> <jbosscache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:jboss:jbosscache-core:config:3.2"> <!-- isolation levels supported: READ_COMMITTED and REPEATABLE_READ nodeLockingSchemes: mvcc, pessimistic (deprecated), optimistic (deprecated) --> <!--<locking isolationLevel="SERIALIZABLE" lockParentForChildInsertRemove="false" lockAcquisitionTimeout="20000" nodeLockingScheme="pessimistic" writeSkewCheck="false" concurrencyLevel="500"/>--> <!-- By not specifying the 'clustering' element, the cache runs in LOCAL mode. --> </jbosscache>
and the behaviour is identical. -
27. Re: Transactions: atomicity OK, isolation KO
chtimi2 Nov 3, 2009 4:26 AM (in response to chtimi2)Seriously, how do i instantiate a new UserTransaction with JBC's local transactions? What is the equivalent of
return new com.atomikos.icatch.jta.UserTransactionImp();
I triednew DummyUserTransaction (new TransactionManager())
but it looks suspicious.
So far with everything i tried i'm still not convinced that it is possible to serialize operations with the MVCC locking scheme. -
28. Re: Transactions: atomicity OK, isolation KO
manik Nov 3, 2009 4:50 AM (in response to chtimi2)You need to look at how Atomikos binds a user transaction to JNDI. And based on this, you retrieve the UserTransaction and call begin().
Have a look at Sun's JavaEE tutorial on JTA: http://java.sun.com/j2ee/tutorial/1_3-fcs/doc/Transaction4.html -
29. Re: Transactions: atomicity OK, isolation KO
chtimi2 Nov 3, 2009 9:45 AM (in response to chtimi2)I don't get it why would i need to use JNDI? My test passes without JNDI, using Atomikos and the pessimistic locking scheme.
I don't need JNDI to start and commit a transaction in the same method, like i do:UserTransaction tx = getTx (); tx.begin(); cache.put("/foo/mynode", "_lockthisplease_", "_lockthisplease_"); cache.getInvocationContext().getOptionOverrides().setForceWriteLock(true); int tmp = (Integer)cache.get("/foo/mynode", "scalar"); tmp++; cache.put ( "/foo/mynode" , "scalar" , tmp ); tx.commit(); } [...] private UserTransaction getTx() { return new com.atomikos.icatch.jta.UserTransactionImp(); //return new DummyUserTransaction ( new DummyTransactionManager () ); } }
I also set JBC's TM at startup:TransactionManager tm = new com.atomikos.icatch.jta.UserTransactionManager (); cache.getConfiguration().getRuntimeConfig().setTransactionManager(tm);
The idea was to test whether i have the same failure using JBC's default transactions.
If that's meaningless (in an analogy with JDBC local transactions, i should call commit on a JBC resource) then what else can i test?
This test is pretty simple, it's just a concurrent write. There should be a way to test if it works with mvcc right?
Can you confirm if it's really possible to make the test pass with mvcc?? I'm starting to think it's impossible.