-
1. Re: custome delta aware implementation
galder.zamarreno Apr 11, 2012 12:17 PM (in response to agrawal24_7) -
2. Re: custome delta aware implementation
galder.zamarreno Apr 11, 2012 12:18 PM (in response to galder.zamarreno)My reply:
--------------
It's probably better if you simply use an AtomicHashMap, which is a construction within Infinispan. This allows you to group a series of key/value pairs as a single value. Infinispan can detect changes in this AtomicHashMap because it implements the DeltaAware interface. AHM is a higher level construct than DeltaAware, and one that probably suits you better.
To give you an example where AtomicHashMaps are used, they're heavily used by JBoss AS7 HTTP session replication, where each session id is mapped to an AtomicHashMap. This means that we can detect when individual session data changes and only replicate that.
Cheers, Galder
-
3. Re: custome delta aware implementation
galder.zamarreno Apr 11, 2012 12:19 PM (in response to galder.zamarreno)Ankur's reply:
---------------------
Thanks for the response. We tried using AHM but some how the performance was not that great thats when we thought of implementing our own DeltaAware object. I am having distributed cache over 2 machine. while putting data directly in cache we get an average time of 72ms per transaction with 10 puts in a single transaction while when we did the same thing except that instead of putting it in cache directly we used AHM, it took an average of 508ms per transaction with the same key and value size(both strings) and 10puts per transaction. Is there a way by which i can improve AHM performance?
-
4. Re: custome delta aware implementation
galder.zamarreno Apr 11, 2012 12:22 PM (in response to galder.zamarreno)Ankur, do you have that performance test around? It'd be interesting to see if we can improve the performance of AHM in your particular case.
For deltaaware, we don't really have any other examples than what's done for AHM, so I'd suggest mimicking what we do there for your object.
Make sure you that you provide a way to serialize the delta aware impl, i.e. provide an externalizer for it: https://docs.jboss.org/author/x/PwY5
-
5. Re: custome delta aware implementation
agrawal24_7 Apr 11, 2012 12:55 PM (in response to galder.zamarreno)Thanks for taking the time for resolving the issue
Yes i have the performance test around. Follwoing are some more results of that test.
Configuration: {transactionSize=1, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,001 had delay 16,161, average transaction time: 16
------------------------------------------------------------------------------------------------------------------------------
Configuration: {transactionSize=3, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,036 had delay 11,189, average transaction time: 32
------------------------------------------------------------------------------------------------------------------------------
Configuration: {transactionSize=10, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,060 had delay 7,653, average transaction time: 72
------------------------------------------------------------------------------------------------------------------------------
Configuration: {transactionSize=10, lengthOfKey=1000, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=10000, totalWrites=1000}
Total writes: 1,088 had delay 21,932, average transaction time: 201
------------------------------------------------------------------------------------------------------------------------------
Configuration: {transactionSize=1, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,001 had delay 16,609, average transaction time: 16
------------------------------------------------------------------------------------------------------------------------------
Configuration: {testType=atomicMap, transactionSize=1, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,001 had delay 86,665, average transaction time: 86
------------------------------------------------------------------------------------------------------------------------------
Configuration: {testType=atomicMap, transactionSize=3, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,027 had delay 69,830, average transaction time: 203
------------------------------------------------------------------------------------------------------------------------------
Configuration: {testType=atomicMap, transactionSize=10, lengthOfKey=100, numberOfThreads=10, frequencyOfWrite=1000, lengthOfValue=1000, totalWrites=1000}
Total writes: 1,090 had delay 55,374, average transaction time: 508
Also following is the test implementation. we have wrapped infinispan with a wrapper for our use which is dumb and simply call infinispans method.
/**
*
* @author ankur.goenka
*/
public class CachePerformanceTest {
private ExecutorService executor = null;
private int numberOfThreads;
private int lengthOfKey;
private int lengthOfValue;
private int frequencyOfWrite;
private long totalWrites;
private int frequencyOfRead;
private long totalReads;
private int readWait;
private int writeWait;
private long bucketSize;
private ICacheManagerFactory cacheManagerFactory;
private ICacheManager cacheManager;
private IReplicatedCache<TestKey, TestValue> cache;
private String keyPrefix;
private String valuePrefix;
private AtomicLong entryNumber = new AtomicLong();
private AtomicLong totalWritingDelay = new AtomicLong();
private HashMap<Long, AtomicLong> bucketedDelays = new HashMap<Long, AtomicLong>();
private AtomicLong transactionTotalWriteDelay = new AtomicLong();
private HashMap<Long, AtomicLong> transactionBucketedDelays = new HashMap<Long, AtomicLong>();
private String testType;
private int transactionSize;
// This method is called to setup the test
public void statTest() {
logger.info("Going to try Starting Cache Performance Test");
Properties properties = new Properties();
String cacheTestConfig = System.getProperty("cacheTestConfig");
if (StringUtils.isStringNull(cacheTestConfig)) {
cacheTestConfig = "cacheTestConfig.properties";
}
logger.info("Going to try Starting Cache Performance Test with configuration file: {0}", cacheTestConfig);
try {
properties.load(new FileInputStream(cacheTestConfig));
} catch (Exception e) {
}
logger.info("Configuration: {0}", properties);
String startTest = properties.getProperty("startTest");
if (startTest == null || !startTest.equals("true")) {
logger.info("Exiting Cache Performance Test as it is not set to start");
return;
}
testType = properties.getProperty("testType");
transactionSize = Integer.parseInt(properties.getProperty("transactionSize"));
numberOfThreads = Integer.parseInt(properties.getProperty("numberOfThreads"));
bucketSize = Integer.parseInt(properties.getProperty("bucketSize"));
lengthOfKey = Integer.parseInt(properties.getProperty("lengthOfKey"));
lengthOfValue = Integer.parseInt(properties.getProperty("lengthOfValue"));
keyPrefix = "k";
for (int i = 0; i <= lengthOfKey; i++) {
keyPrefix = keyPrefix + i;
}
valuePrefix = "v";
for (int i = 1; i <= lengthOfValue; i++) {
valuePrefix = valuePrefix + i;
}
frequencyOfWrite = Integer.parseInt(properties.getProperty("frequencyOfWrite"));
totalWrites = Long.parseLong(properties.getProperty("totalWrites"));
writeWait = 1000 / frequencyOfWrite;
frequencyOfRead = Integer.parseInt(properties.getProperty("frequencyOfRead"));
totalReads = Long.parseLong(properties.getProperty("totalReads"));
readWait = 1000 / frequencyOfRead;
ILabsPlatform.addPlatformListener(this);
}
@Override
public void platformStarted() throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
runTest();
}
}).start();
}
// This method is called once our system (which is based on OSGi) is up and test starts from here
private void runTest() {
logger.info("Cache test started with type: {0}", testType);
executor = Executors.newFixedThreadPool(numberOfThreads);
cacheManagerFactory = CachePlugin.getDefault().getService(ICacheManagerFactory.class);
cacheManager = cacheManagerFactory.getCacheManager("PERF_TEST_CACHE");
cacheManager.registerCache("testCache");
//Here we get the cache
cache = cacheManager.getCache("testCache");
// if test is set to use atomic map then we use AHM for further use else we use regular cache as both are map, further test does not depend upon what we pich here
final Map<TestKey, TestValue> cacheMap = StringUtils.areStringsEqual(testType, "atomicMap") ? cache
.getAtomicMap(getTestKey("atomicMap")) : cache;
logger.info("CacheClass: {0}", cacheMap.getClass());
new Thread(new Runnable() {
@Override
public void run() {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
boolean shouldPrintStats = false;
long transactionStartTime = System.currentTimeMillis();
// Here we are starting the transaction
cacheManager.beginTransaction();
for (int i = 0; i < transactionSize; i++) {
long entry = entryNumber.incrementAndGet();
String suffix = entry + "";
TestKey testKey = getTestKey(suffix);
TestValue testValue = getTestValue(suffix);
long cachePutStart = System.currentTimeMillis();
cacheMap.put(testKey, testValue);
long delay = System.currentTimeMillis() - cachePutStart;
manageDelay(delay, totalWritingDelay, bucketedDelays);
if (entry == totalWrites) {
shouldPrintStats = true;
break;
}
}
// Here we are ending the transaction
cacheManager.commitTransaction();
long transactionDelay = System.currentTimeMillis() - transactionStartTime;
manageDelay(transactionDelay, transactionTotalWriteDelay, transactionBucketedDelays);
if (shouldPrintStats) {
printStats();
}
} catch (TransactionException e) {
logger.error(e);
}
}
private void manageDelay(long delay, AtomicLong cacheTotalWriteDelay,
HashMap<Long, AtomicLong> cacheBucketDelays) {
cacheTotalWriteDelay.addAndGet(delay);
Long bucket = delay / bucketSize;
AtomicLong count = cacheBucketDelays.get(bucket);
if (count == null) {
synchronized (cacheBucketDelays) {
count = cacheBucketDelays.get(bucket);
if (count == null) {
count = new AtomicLong();
cacheBucketDelays.put(bucket, count);
}
}
}
count.incrementAndGet();
}
};
logger.info("Going to do first write");
for (long readNumber = 0; readNumber <= totalWrites; readNumber++) {
try {
Thread.sleep(writeWait);
} catch (Exception e) {
logger.error(e);
}
executor.execute(runnable);
}
}
}).start();
}
private TestValue getTestValue(String suffix) {
return new TestValue(keyPrefix + suffix);
}
private TestKey getTestKey(String suffix) {
return new TestKey(valuePrefix + suffix);
}
private void printStats() {
logger.info("Key length: {0}, value length: {1}, transaction size: {2}", lengthOfKey, lengthOfValue,
transactionSize);
logger.info("Total writes: {0} had delay {1}, average: {2}", entryNumber.get(), totalWritingDelay.get(),
totalWritingDelay.get() / entryNumber.get());
Set<Entry<Long, AtomicLong>> entrySet = bucketedDelays.entrySet();
TreeSet<Entry<Long, AtomicLong>> sortingSet = new TreeSet<Entry<Long, AtomicLong>>(
new Comparator<Entry<Long, AtomicLong>>() {
@Override
public int compare(Entry<Long, AtomicLong> o1, Entry<Long, AtomicLong> o2) {
if (o1.getValue().get() > o2.getValue().get())
return 1;
return (int) (o1.getKey() - o2.getKey());
}
});
sortingSet.addAll(entrySet);
for (Iterator<Entry<Long, AtomicLong>> iterator = sortingSet.iterator(); iterator.hasNext();) {
Entry<Long, AtomicLong> entry = iterator.next();
logger.info("delay: {0} count: {1}", entry.getKey() * bucketSize, entry.getValue().get());
}
logger.info("Total writes: {0} had delay {1}, average write: {2}, average transaction: {3}", entryNumber.get(),
transactionTotalWriteDelay.get(), transactionTotalWriteDelay.get() / (entryNumber.get()),
(transactionTotalWriteDelay.get() * transactionSize) / entryNumber.get());
sortingSet.clear();
sortingSet.addAll(transactionBucketedDelays.entrySet());
for (Iterator<Entry<Long, AtomicLong>> iterator = sortingSet.iterator(); iterator.hasNext();) {
Entry<Long, AtomicLong> entry = iterator.next();
logger.info("Transaction delay: {0} count: {1}", entry.getKey() * bucketSize, entry.getValue().get());
}
}
}
-
6. Re: custome delta aware implementation
galder.zamarreno Apr 18, 2012 4:50 AM (in response to agrawal24_7)@Ankur, thx a million for providing the test and some sample results. Time's a bit tight right now to look into this, but I've created a JIRA in https://issues.jboss.org/browse/ISPN-1989 to look into this more deeply.
-
7. Re: custome delta aware implementation
mgencur Jun 26, 2012 9:03 AM (in response to galder.zamarreno)Ankur, if you're still keen on finding the right solution for DeltaAware implementation, look at http://infinispan.blogspot.cz/2012/06/fine-grained-replication-in-infinispan.html