3 Replies Latest reply on Jan 21, 2010 10:03 AM by galder.zamarreno

    Concurrency problems - cache puts lost

    akluge

      Hi,

      In running some concurrency tests, where multiple threads do simultaneous puts
      against a cache, some of the puts appear to be lost. I run a set of 5 threads to do
      puts, then run a set of five threads to do gets. Many times, all the keys that are put
      into the cache are not present in the cache. I see this with 3.1.0, and also the newest
      stable version, 3.2.1.

      Running the contained code produces messages such as:
      24 Nov 2009 14:44:50,135 ERROR [] com.m1.test.local.ConcurrencyTest Missing value for Key1.

      The trace shows that the get was done, and that it returned null.
      24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true} and InvocationContext [InvocationContext{transaction=null, globalTransaction=null, transactionContext=null, optionOverrides=Option{failSilently=false, cacheModeLocal=false, dataVersion=null, suppressLocking=false, lockAcquisitionTimeout=-1, forceDataGravitation=false, skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, suppressPersistence=false, suppressEventNotification=false}, originLocal=true, bypassUnmarshalling=false}]
      24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.CallInterceptor Executing command: GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true}.
      24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.commands.read.GetKeyValueCommand Found value null

      However, earlier in the run I have put the Key into the cache.

      24 Nov 2009 14:44:40,831 INFO [] com.m1.test.local.ConcurrencyTest Putting Key1

      And the trace indicates that a value was put for the key:

      24 Nov 2009 14:44:40,829 TRACE [] org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command PutKeyValueCommand{fqn=/BigNode, dataVersion=null, globalTransaction=null, key=Key1, value=This is a test.} and InvocationContext [InvocationContext{transaction=null, globalTransaction=null, transactionContext=null, optionOverrides=Option{failSilently=false, cacheModeLocal=false, dataVersion=null, suppressLocking=false, lockAcquisitionTimeout=-1, forceDataGravitation=false, skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, suppressPersistence=false, suppressEventNotification=false}, originLocal=true, bypassUnmarshalling=false}]

      These log records are all for the same run of the test code. Since the get happens after the put, I would expect for the corresponding value to be returned.

      I have appended the test, which is a reasonably simple and self contained case.
      Is there anything else, perhaps in terms of MVCC options I should try?

      Thanks,
      Alex

      package com.m1.test.local;
      
      import org.jboss.cache.Cache;
      import org.jboss.cache.config.Configuration;
      import java.util.concurrent.CyclicBarrier;
      import org.jboss.cache.DefaultCacheFactory;
      import org.jboss.cache.Fqn;
      import java.util.HashSet;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      
      /**
       * This test will be used to put a specific number of enteries into the cache,
       * and measure the amount of memory taken by the cache.
       */
      public class ConcurrencyTest
      {
       private static final Fqn<String> FQN = Fqn.fromString("BigNode");
       private static final Log LOG = LogFactory.getLog(ConcurrencyTest.class);
       private static final int NTHREADS = 5;
       private static final String VALUE = "This is a test.";
      
       private final CyclicBarrier barrier = new CyclicBarrier(NTHREADS);
       private Thread[] threads = new Thread[NTHREADS];
      
      
       public ConcurrencyTest()
       throws Throwable
       {
       Cache<Object, Object> cache = createCache();
      
       for (int iteration=0; iteration<10; iteration++)
       {
       for(int j=0; j<NTHREADS; j++)
       {
       threads[j] = new WriteThread(cache, barrier, iteration, j);
       threads[j].start();
       }
      
       for(int j=0; j<NTHREADS; j++)
       {
       threads[j].join(500);
       }
       }
      
       for (int iteration=0; iteration<10; iteration++)
       {
       for(int j=0; j<NTHREADS; j++)
       {
       threads[j] = new ReadThread(cache, barrier, iteration, j);
       threads[j].start();
       }
      
       for(int j=0; j<NTHREADS; j++)
       {
       threads[j].join(500);
       }
       }
      
       while(true)
       {
       Thread.sleep(3600000);
       }
       }
      
       public Cache<Object, Object> createCache()
       throws Exception
       {
       Cache<Object, Object> cache = new DefaultCacheFactory().createCache();
       cache.create();
       cache.start();
       return cache;
       }
      
       private static class WriteThread extends Thread
       {
       private CyclicBarrier barrier;
       private Cache<Object, Object> cache;
       private int iteration;
       private int writer;
      
       public WriteThread(Cache<Object, Object> cache, CyclicBarrier barrier,
       int iteration, int writer)
       throws Throwable
       {
       this.cache = cache;
       this.barrier = barrier;
       this.iteration = iteration;
       this.writer = writer;
       }
      
       /**
       * Run a common set of tests in each thread.
       */
       public void run()
       {
       try
       {
       barrier.await();
      
       for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
       {
       String key = "Key" + id;
       cache.put(FQN, key, VALUE);
       LOG.info("Putting " + key);
       }
       }
       catch (Exception exception)
       {
       LOG.error("Write thread failed.", exception);
       }
       }
       }
      
      
       private static class ReadThread extends Thread
       {
       private CyclicBarrier barrier;
       private Cache<Object, Object> cache;
       private int iteration;
       private int writer;
      
       public ReadThread(Cache<Object, Object> cache, CyclicBarrier barrier,
       int iteration, int writer)
       throws Throwable
       {
       this.cache = cache;
       this.barrier = barrier;
       this.iteration = iteration;
       this.writer = writer;
       }
      
       /**
       * Run a common set of tests in each thread.
       */
       public void run()
       {
       try
       {
       barrier.await();
      
       for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
       {
       String key = "Key" + id;
       Object result = cache.get(FQN, key);
       if (result == null)
       {
       LOG.error("Missing value for " + key + ".");
       }
       }
       }
       catch (Exception exception)
       {
       LOG.error("ReadThread failed.", exception);
       }
       }
       }
      
       public static void main(String[] args)
       throws Throwable
       {
       ConcurrencyTest test = new ConcurrencyTest();
       }
      }
      


      Some other details:

      $ java -version
      java version "1.6.0_16"
      Java(TM) SE Runtime Environment (build 1.6.0_16-b01)
      Java HotSpot(TM) 64-Bit Server VM (build 14.2-b01, mixed mode)

      $ uname -a
      Linux aklugelnx 2.6.28-16-generic #55-Ubuntu SMP Tue Oct 20 19:48:32 UTC 2009 x86_64 GNU/Linux

      The only JVM option I use is -Xmx2g to ensure enough space in the cache.

      I would welcome suggestions, and even requests for more details or some additional tests to further pin down why I don see what I expect. I be happy to find it to be a configuration option I missed.

      Thanks,
      Alex





        • 1. Re: Concurrency problems - cache puts lost
          akluge

          I extended the test a bit by adding transactions.

          A minimal cache.xml:

          <?xml version="1.0" encoding="UTF-8"?>
          <jbosscache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns="urn:jboss:jbosscache-core:config:3.0">
          
           <!-- Configure the TransactionManager -->
           <transaction transactionManagerLookupClass="org.jboss.cache.transaction.GenericTransactionManagerLookup
          "/>
          
          </jbosscache>
          


           TransactionManager txmanager = new DummyTransactionManagerLookup().getTransactionManager();
          


          Then starting and ending a transaction around the put in the write thread:
           for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
           {
           String key = "Key" + id;
           txmanager.begin();
           cache.put(FQN, key, VALUE);
           LOG.info("Putting " + key);
           Object result = cache.get(FQN, key);
           if (result == null)
           {
           LOG.error("Missing value for immediate get of " + key + ".");
           }
           txmanager.commit();
           result = cache.get(FQN, key);
           if (result == null)
           {
           LOG.error("Missing value for post transaction get of " + key + ".");
           }
           }
          


          This produced some interesting results:
          The immediate get never failed.
          The post commit get failed sometimes.
          The readthread get failed sometimes.
          The readthread failed on items that the post commit thread successfully returned.

          For example

          $ grep "post transaction" errors.20091201.log | wc -l
          228
          $ grep "Missing value for Key" errors.20091201.log | wc -l
          3660


          Next, I synchronized a section of the write thread:

           for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
           {
           String key = "Key" + id;
           synchronized (LOCK_HOLDER)
           {
           txmanager.begin();
           cache.put(FQN, key, VALUE);
           LOG.info("Putting " + key);
           Object result = cache.get(FQN, key);
           if (result == null)
           {
           LOG.error("Missing value for immediate get of " + key + ".");
           }
           txmanager.commit();
           result = cache.get(FQN, key);
           if (result == null)
           {
           LOG.error("Missing value for post transaction get of " + key + ".");
           }
           }
           }
          


          With this, I have no missed gets.

          What happens if I move the commit outside of the synchronization block?

           for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
           {
           String key = "Key" + id;
           synchronized (LOCK_HOLDER)
           {
           txmanager.begin();
           cache.put(FQN, key, VALUE);
           LOG.info("Putting " + key);
           Object result = cache.get(FQN, key);
           if (result == null)
           {
           LOG.error("Missing value for immediate get of " + key + ".");
           }
           }
           txmanager.commit();
           Object result = cache.get(FQN, key);
           if (result == null)
           {
           LOG.error("Missing value for post transaction get of " + key + ".");
           }
           }
          


          Sure enough, the missing values issues return when the commit is not synchronized.

          In my production code I have placed the cache writes (puts, remove, etc) inside of
          synchronized blocks using a common lock object, and my data integrity issues
          have vanished. However, the need for the synchronization in these cases was
          definitely unexpected.

          Alex



          • 2. Re: Concurrency problems - cache puts lost
            galder.zamarreno
            I suspect the issue that you're encountering is JBCACHE-1555
            • 3. Re: Concurrency problems - cache puts lost
              galder.zamarreno
              This should be fixed now in JBossCache 3.2.2