7 Replies Latest reply on Jun 26, 2012 9:03 AM by mgencur

    custome delta aware implementation

    agrawal24_7

      We are using infinispan and in our system we have a big object in which we have to push small changes per transaction. I have implemented the DeltaAware interface for this object and also the Delta. The problem i am facing is that the changes are not getting propagated to other nodes and only the initial object state is prapogated to other nodes. Also the delta and commit methods are not called on the big object which implements DeltaAware. Do i need to register this object somewhere other than simply putting it in the cache ? 

      Thanks

        • 2. Re: custome delta aware implementation
          galder.zamarreno

          My reply:

          --------------

           

          It's probably better if you simply use an AtomicHashMap, which is a construction within Infinispan. This allows you to group a series of key/value pairs as a single value. Infinispan can detect changes in this AtomicHashMap because it implements the DeltaAware interface. AHM is a higher level construct than DeltaAware, and one that probably suits you better.

          To give you an example where AtomicHashMaps are used, they're heavily used by JBoss AS7 HTTP session replication, where each session id is mapped to an AtomicHashMap. This means that we can detect when individual session data changes and only replicate that.

          Cheers, Galder

          • 3. Re: custome delta aware implementation
            galder.zamarreno

            Ankur's reply:

            ---------------------

             

            Thanks for the response. We tried using AHM but some how the performance was not that great thats when we thought of implementing our own DeltaAware object. I am having distributed cache over 2 machine. while putting data directly in cache we get an average time of 72ms per transaction with 10 puts in a single transaction while when we did the same thing except that instead of putting it in cache directly we used AHM, it took an average of 508ms per transaction with the same key and value size(both strings) and 10puts per transaction. Is there a way by which i can improve AHM performance?

            • 4. Re: custome delta aware implementation
              galder.zamarreno

              Ankur, do you have that performance test around? It'd be interesting to see if we can improve the performance of AHM in your particular case.

               

              For deltaaware, we don't really have any other examples than what's done for AHM, so I'd suggest mimicking what we do there for your object.

               

              Make sure you that you provide a way to serialize the delta aware impl, i.e. provide an externalizer for it: https://docs.jboss.org/author/x/PwY5

              • 5. Re: custome delta aware implementation
                agrawal24_7

                Thanks for taking the time for resolving the issue

                 

                Yes i have the performance test around. Follwoing are some more results of that test.

                 

                Configuration: {transactionSize=1, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,001 had delay 16,161, average transaction time: 16

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {transactionSize=3, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,036 had delay 11,189, average transaction time: 32

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {transactionSize=10, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,060 had delay 7,653, average transaction time: 72

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {transactionSize=10, lengthOfKey=1000, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=10000, totalWrites=1000}

                Total writes: 1,088 had delay 21,932, average transaction time: 201

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {transactionSize=1, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,001 had delay 16,609, average transaction time: 16

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {testType=atomicMap, transactionSize=1, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,001 had delay 86,665, average transaction time: 86

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {testType=atomicMap, transactionSize=3, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,027 had delay 69,830, average transaction time: 203

                ------------------------------------------------------------------------------------------------------------------------------

                Configuration: {testType=atomicMap, transactionSize=10, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}

                Total writes: 1,090 had delay 55,374, average transaction time: 508

                 

                Also following is the test implementation. we have wrapped infinispan with a wrapper for our use which is dumb and simply call infinispans method.

                 

                 

                 

                /**

                *

                * @author ankur.goenka

                */

                public class CachePerformanceTest {

                 

                   

                 

                    private ExecutorService executor = null;

                    private int numberOfThreads;

                    private int lengthOfKey;

                    private int lengthOfValue;

                    private int frequencyOfWrite;

                    private long totalWrites;

                    private int frequencyOfRead;

                    private long totalReads;

                    private int readWait;

                    private int writeWait;

                    private long bucketSize;

                 

                    private ICacheManagerFactory cacheManagerFactory;

                 

                    private ICacheManager cacheManager;

                 

                    private IReplicatedCache<TestKey, TestValue> cache;

                 

                    private String keyPrefix;

                 

                    private String valuePrefix;

                 

                    private AtomicLong entryNumber = new AtomicLong();

                 

                    private AtomicLong totalWritingDelay = new AtomicLong();

                    private HashMap<Long, AtomicLong> bucketedDelays = new HashMap<Long, AtomicLong>();

                 

                    private AtomicLong transactionTotalWriteDelay = new AtomicLong();

                    private HashMap<Long, AtomicLong> transactionBucketedDelays = new HashMap<Long, AtomicLong>();

                 

                    private String testType;

                 

                    private int transactionSize;

                 

                    // This method is called to setup the test

                    public void statTest() {

                        logger.info("Going to try Starting Cache Performance Test");

                        Properties properties = new Properties();

                        String cacheTestConfig = System.getProperty("cacheTestConfig");

                        if (StringUtils.isStringNull(cacheTestConfig)) {

                            cacheTestConfig = "cacheTestConfig.properties";

                        }

                        logger.info("Going to try Starting Cache Performance Test with configuration file: {0}", cacheTestConfig);

                 

                        try {

                            properties.load(new FileInputStream(cacheTestConfig));

                        } catch (Exception e) {

                        }

                        logger.info("Configuration: {0}", properties);

                 

                        String startTest = properties.getProperty("startTest");

                        if (startTest == null || !startTest.equals("true")) {

                            logger.info("Exiting Cache Performance Test as it is not set to start");

                            return;

                        }

                        testType = properties.getProperty("testType");

                 

                        transactionSize = Integer.parseInt(properties.getProperty("transactionSize"));

                 

                        numberOfThreads = Integer.parseInt(properties.getProperty("numberOfThreads"));

                        bucketSize = Integer.parseInt(properties.getProperty("bucketSize"));

                 

                        lengthOfKey = Integer.parseInt(properties.getProperty("lengthOfKey"));

                        lengthOfValue = Integer.parseInt(properties.getProperty("lengthOfValue"));

                        keyPrefix = "k";

                        for (int i = 0; i <= lengthOfKey; i++) {

                            keyPrefix = keyPrefix + i;

                        }

                 

                        valuePrefix = "v";

                        for (int i = 1; i <= lengthOfValue; i++) {

                            valuePrefix = valuePrefix + i;

                        }

                 

                        frequencyOfWrite = Integer.parseInt(properties.getProperty("frequencyOfWrite"));

                        totalWrites = Long.parseLong(properties.getProperty("totalWrites"));

                        writeWait = 1000 / frequencyOfWrite;

                 

                        frequencyOfRead = Integer.parseInt(properties.getProperty("frequencyOfRead"));

                        totalReads = Long.parseLong(properties.getProperty("totalReads"));

                        readWait = 1000 / frequencyOfRead;

                 

                        ILabsPlatform.addPlatformListener(this);

                    }

                 

                    @Override

                    public void platformStarted() throws Exception {

                 

                        new Thread(new Runnable() {

                 

                            @Override

                            public void run() {

                                runTest();

                            }

                        }).start();

                    }

                 

                    // This method is called once our system (which is based on OSGi) is up and test starts from here

                    private void runTest() {

                        logger.info("Cache test started with type: {0}", testType);

                        executor = Executors.newFixedThreadPool(numberOfThreads);

                 

                        cacheManagerFactory = CachePlugin.getDefault().getService(ICacheManagerFactory.class);

                 

                        cacheManager = cacheManagerFactory.getCacheManager("PERF_TEST_CACHE");

                 

                        cacheManager.registerCache("testCache");

                 

                        //Here we get the cache

                        cache = cacheManager.getCache("testCache");

                 

                        // if test is set to use atomic map then we use AHM for further use else we use regular cache as both are map, further test does not depend upon what we pich here

                        final Map<TestKey, TestValue> cacheMap = StringUtils.areStringsEqual(testType, "atomicMap") ? cache

                                .getAtomicMap(getTestKey("atomicMap")) : cache;

                        logger.info("CacheClass: {0}", cacheMap.getClass());

                 

                        new Thread(new Runnable() {

                 

                            @Override

                            public void run() {

                                Runnable runnable = new Runnable() {

                 

                                    @Override

                                    public void run() {

                 

                                        try {

                                            boolean shouldPrintStats = false;

                                            long transactionStartTime = System.currentTimeMillis();

                                            // Here we are starting the transaction

                                            cacheManager.beginTransaction();

                                            for (int i = 0; i < transactionSize; i++) {

                                                long entry = entryNumber.incrementAndGet();

                                                String suffix = entry + "";

                                                TestKey testKey = getTestKey(suffix);

                                                TestValue testValue = getTestValue(suffix);

                                                long cachePutStart = System.currentTimeMillis();

                                                cacheMap.put(testKey, testValue);

                                                long delay = System.currentTimeMillis() - cachePutStart;

                                                manageDelay(delay, totalWritingDelay, bucketedDelays);

                                                if (entry == totalWrites) {

                                                    shouldPrintStats = true;

                                                    break;

                                                }

                                            }

                                            // Here we are ending the transaction

                                            cacheManager.commitTransaction();

                                            long transactionDelay = System.currentTimeMillis() - transactionStartTime;

                                            manageDelay(transactionDelay, transactionTotalWriteDelay, transactionBucketedDelays);

                                            if (shouldPrintStats) {

                                                printStats();

                                            }

                 

                                        } catch (TransactionException e) {

                                            logger.error(e);

                                        }

                                    }

                 

                                    private void manageDelay(long delay, AtomicLong cacheTotalWriteDelay,

                                            HashMap<Long, AtomicLong> cacheBucketDelays) {

                                        cacheTotalWriteDelay.addAndGet(delay);

                                        Long bucket = delay / bucketSize;

                                        AtomicLong count = cacheBucketDelays.get(bucket);

                                        if (count == null) {

                                            synchronized (cacheBucketDelays) {

                                                count = cacheBucketDelays.get(bucket);

                                                if (count == null) {

                                                    count = new AtomicLong();

                                                    cacheBucketDelays.put(bucket, count);

                                                }

                                            }

                                        }

                                        count.incrementAndGet();

                                    }

                 

                                };

                 

                                logger.info("Going to do first write");

                                for (long readNumber = 0; readNumber <= totalWrites; readNumber++) {

                                    try {

                                        Thread.sleep(writeWait);

                                    } catch (Exception e) {

                                        logger.error(e);

                                    }

                                    executor.execute(runnable);

                                }

                            }

                 

                        }).start();

                    }

                 

                    private TestValue getTestValue(String suffix) {

                        return new TestValue(keyPrefix + suffix);

                    }

                 

                    private TestKey getTestKey(String suffix) {

                        return new TestKey(valuePrefix + suffix);

                    }

                 

                    private void printStats() {

                        logger.info("Key length: {0}, value length: {1}, transaction size: {2}", lengthOfKey, lengthOfValue,

                                transactionSize);

                        logger.info("Total writes: {0} had delay {1}, average: {2}", entryNumber.get(), totalWritingDelay.get(),

                                totalWritingDelay.get() / entryNumber.get());

                        Set<Entry<Long, AtomicLong>> entrySet = bucketedDelays.entrySet();

                 

                        TreeSet<Entry<Long, AtomicLong>> sortingSet = new TreeSet<Entry<Long, AtomicLong>>(

                                new Comparator<Entry<Long, AtomicLong>>() {

                 

                                    @Override

                                    public int compare(Entry<Long, AtomicLong> o1, Entry<Long, AtomicLong> o2) {

                                        if (o1.getValue().get() > o2.getValue().get())

                                            return 1;

                                        return (int) (o1.getKey() - o2.getKey());

                                    }

                                });

                        sortingSet.addAll(entrySet);

                 

                        for (Iterator<Entry<Long, AtomicLong>> iterator = sortingSet.iterator(); iterator.hasNext();) {

                            Entry<Long, AtomicLong> entry = iterator.next();

                            logger.info("delay: {0} count: {1}", entry.getKey() * bucketSize, entry.getValue().get());

                        }

                 

                        logger.info("Total writes: {0} had delay {1}, average write: {2}, average transaction: {3}", entryNumber.get(),

                                transactionTotalWriteDelay.get(), transactionTotalWriteDelay.get() / (entryNumber.get()),

                                (transactionTotalWriteDelay.get() * transactionSize) / entryNumber.get());

                 

                        sortingSet.clear();

                        sortingSet.addAll(transactionBucketedDelays.entrySet());

                 

                        for (Iterator<Entry<Long, AtomicLong>> iterator = sortingSet.iterator(); iterator.hasNext();) {

                            Entry<Long, AtomicLong> entry = iterator.next();

                            logger.info("Transaction delay: {0} count: {1}", entry.getKey() * bucketSize, entry.getValue().get());

                        }

                 

                    }

                 

                }

                • 6. Re: custome delta aware implementation
                  galder.zamarreno

                  @Ankur, thx a million for providing the test and some sample results. Time's a bit tight right now to look into this, but I've created a JIRA in https://issues.jboss.org/browse/ISPN-1989 to look into this more deeply.

                  • 7. Re: custome delta aware implementation
                    mgencur

                    Ankur, if you're still keen on finding the right solution for DeltaAware implementation, look at http://infinispan.blogspot.cz/2012/06/fine-grained-replication-in-infinispan.html