8 Replies Latest reply: Jun 15, 2007 10:11 AM by Vladimir Blagojevic RSS

STREAMING_STATE_TRANSFER fails with lots of data.

hernan silberman Newbie

Setup: I'm using Java 5 and JBossCache CR2.0 on Linux.

I have two cache instances with their own BDBJE cache loaders configured to "fetchPersistentState" on startup. This works with a standard STATE_TRANSFER and also works for STREAMING_STATE_TRANSFERs except when I have lots of cache state to replicate (about 500MB in this case).

I start up the first cache and it loads the state from its cache loader, everything looks fine. Then I start the second cache and I can see it contact the first cache to request an initial state transfer. Here's what the first cache sees when the new cache calls up asking for the initial state:

DEBUG 17:26.49 [UpHandler (GMS)] STREAMING_STATE_TRANSFER - GET_DIGEST_STATE_OK: digest is [0 : 5], [0 : 0]
DEBUG 17:26.49 [StateProviderThreadSpawner] STREAMING_STATE_TRANSFER$StateProviderThreadSpawner - StateProviderThreadSpawner listening at
DEBUG 17:26.49 [StateProviderThreadSpawner] STREAMING_STATE_TRANSFER$StateProviderThreadSpawner - StateProviderThreadSpawner listening at
DEBUG 17:26.49 [STREAMING_STATE_TRANSFER.poolid=1] STREAMING_STATE_TRANSFER$StateProviderThreadSpawner$1 - Accepted request for state transfer from / handing of to PooledExecutor thread
DEBUG 17:26.49 [STREAMING_STATE_TRANSFER.poolid=1] STREAMING_STATE_TRANSFER$StateProviderHandler - Running on Thread[STREAMING_STATE_TRANSFER.poolid=1,5,JGroups threads]. Accepted request for state transfer from /, original buffer size was 8192 and was reset to 8192, passing outputstream up...
INFO 17:26.49 [STREAMING_STATE_TRANSFER.poolid=1] StateTransferManager - locking the / subtree to return the in-memory (transient) state
DEBUG 17:26.52 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:26.52 [UpHandler (FD)] FD - received ack from
DEBUG 17:26.55 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:26.55 [UpHandler (FD)] FD - received ack from
DEBUG 17:26.58 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:26.58 [UpHandler (FD)] FD - received ack from
INFO 17:26.59 [STREAMING_STATE_TRANSFER.poolid=1] StateTransferManager - Successfully generated state in 9929 msec
DEBUG 17:26.59 [STREAMING_STATE_TRANSFER.poolid=1] STREAMING_STATE_TRANSFER$StreamingOutputStreamWrapper - State writer Socket[addr=/,port=43102,localport=58357] is closing the socket
DEBUG 17:27.00 [MERGE2.FindSubgroups thread (channel=ACS_CLUSTER)] MERGE2$FindSubgroups - initial_mbrs=[[own_addr=, coord_addr=, is_server=true], [own_addr=, coord_addr=, is_server=true]]
DEBUG 17:27.01 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:27.01 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.04 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:27.04 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.07 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:27.07 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.09 [UpHandler (GMS)] GMS - received LEAVE_REQ for from
DEBUG 17:27.09 [ViewHandler] CoordGmsImpl - new=[], suspected=[], leaving=[], new view: [|2] []
DEBUG 17:27.09 [UpHandler (GMS)] CoordGmsImpl - view=[|2] []
DEBUG 17:27.09 [UpHandler (GMS)] GMS - [local_addr=] view is [|2] []
INFO 17:27.09 [UpHandler (GMS)] CacheImpl$MembershipListenerAdaptor - viewAccepted(): [|2] []
DEBUG 17:27.09 [UpHandler (GMS)] NAKACK - removing from received_msgs (not member anymore)
DEBUG 17:27.09 [DownHandler (FD)] FD$Broadcaster - suspected_mbrs: [], after adjustment: []

The new cache's log appears below, you can see it start up and eventually fail during the initial state transfer:

DEBUG 17:26.42 [WrapperSimpleAppMain] XmlConfigurationParser - Attribute size: 18
DEBUG 17:26.42 [WrapperSimpleAppMain] XmlConfigurationParser - Attribute size: 3
DEBUG 17:26.42 [WrapperSimpleAppMain] CacheImpl - cache mode is REPL_SYNC
INFO 17:26.42 [WrapperSimpleAppMain] JChannel - JGroups version: 2.4.1 SP-3
DEBUG 17:26.42 [WrapperSimpleAppMain] ClassConfigurator - mapping is:
1: class org.jgroups.stack.IpAddress
2: class org.jgroups.protocols.CAUSAL$CausalHeader
3: class org.jgroups.protocols.FD$FdHeader
4: class org.jgroups.protocols.FD_PID$FdHeader
5: class org.jgroups.protocols.FD_PROB$FdHeader
6: class org.jgroups.protocols.FD_SOCK$FdHeader
7: class org.jgroups.protocols.FragHeader
8: class org.jgroups.protocols.MERGE$MergeHeader
9: class org.jgroups.protocols.NakAckHeader
10: class org.jgroups.protocols.PARTITIONER$PartitionerHeader
11: class org.jgroups.protocols.PerfHeader
12: class org.jgroups.protocols.PIGGYBACK$PiggybackHeader
13: class org.jgroups.protocols.PingHeader
14: class org.jgroups.protocols.TcpHeader
15: class org.jgroups.protocols.TOTAL$Header
16: class org.jgroups.protocols.TOTAL_OLD$TotalHeader
17: class org.jgroups.protocols.TOTAL_TOKEN$TotalTokenHeader
18: class org.jgroups.protocols.TOTAL_TOKEN$RingTokenHeader
19: class org.jgroups.protocols.TunnelHeader
20: class org.jgroups.protocols.UdpHeader
21: class org.jgroups.protocols.UNICAST$UnicastHeader
22: class org.jgroups.protocols.VERIFY_SUSPECT$VerifyHeader
23: class org.jgroups.protocols.WANPIPE$WanPipeHeader
24: class org.jgroups.protocols.pbcast.GMS$GmsHeader
25: class org.jgroups.protocols.pbcast.NakAckHeader
26: class org.jgroups.protocols.pbcast.PbcastHeader
27: class org.jgroups.protocols.pbcast.STABLE$StableHeader
28: class org.jgroups.protocols.pbcast.STATE_TRANSFER$StateHeader
29: class org.jgroups.protocols.SMACK$SmackHeader
30: class org.jgroups.Message
31: class org.jgroups.View
32: class org.jgroups.ViewId
33: class org.jgroups.util.List
34: interface org.jgroups.Address
35: class org.jgroups.blocks.RequestCorrelator$Header
36: class org.jgroups.protocols.PingRsp
38: class java.util.Vector
39: class org.jgroups.protocols.pbcast.JoinRsp
40: class org.jgroups.protocols.pbcast.Digest
41: class java.util.Hashtable
53: class org.jgroups.protocols.COMPRESS$CompressHeader
54: class org.jgroups.protocols.FC$FcHeader
55: class org.jgroups.protocols.WanPipeAddress
56: class org.jgroups.protocols.TpHeader
57: class org.jgroups.protocols.ENCRYPT$EncryptHeader
58: class org.jgroups.protocols.SEQUENCER$SequencerHeader
59: class org.jgroups.protocols.FD_SIMPLE$FdHeader
60: class org.jgroups.protocols.VIEW_SYNC$ViewSyncHeader

DEBUG 17:26.42 [WrapperSimpleAppMain] AUTOCONF - frag_size=64000
DEBUG 17:26.42 [WrapperSimpleAppMain] GMS - changed role to org.jgroups.protocols.pbcast.ClientGmsImpl
DEBUG 17:26.42 [WrapperSimpleAppMain] VersionAwareMarshaller - Initialised with version 2.0.0 and versionInt 20
DEBUG 17:26.42 [WrapperSimpleAppMain] VersionAwareMarshaller - Using default marshaller class org.jboss.cache.marshall.CacheMarshaller200
INFO 17:26.42 [WrapperSimpleAppMain] CacheImpl - Using marshaller org.jboss.cache.marshall.VersionAwareMarshaller
INFO 17:26.42 [WrapperSimpleAppMain] InterceptorChainFactory - interceptor chain is:
class org.jboss.cache.interceptors.CallInterceptor
class org.jboss.cache.interceptors.EvictionInterceptor
class org.jboss.cache.interceptors.CacheStoreInterceptor
class org.jboss.cache.interceptors.CacheLoaderInterceptor
class org.jboss.cache.interceptors.UnlockInterceptor
class org.jboss.cache.interceptors.PessimisticLockInterceptor
class org.jboss.cache.interceptors.ReplicationInterceptor
class org.jboss.cache.interceptors.NotificationInterceptor
class org.jboss.cache.interceptors.TxInterceptor
class org.jboss.cache.interceptors.CacheMgmtInterceptor
class org.jboss.cache.interceptors.InvocationContextInterceptor
DEBUG 17:26.47 [WrapperSimpleAppMain] BdbjeCacheLoader - Created JE environment com.sleepycat.je.Environment@44f787 for cache loader org.jboss.cache.loader.bdbje.BdbjeCacheLoader@1d6fbb3
DEBUG 17:26.47 [DownHandler (UDP)] UDP - creating sockets and starting threads
INFO 17:26.47 [DownHandler (UDP)] UDP - sockets will use interface
INFO 17:26.47 [DownHandler (UDP)] UDP - socket information:
local_addr=, mcast_addr=, bind_addr=/, ttl=64
sock: bound to, receive buffer size=80000, send buffer size=131071
mcast_recv_sock: bound to, send buffer size=131071, receive buffer size=80000
mcast_send_sock: bound to, send buffer size=131071, receive buffer size=80000
DEBUG 17:26.47 [DownHandler (UDP)] UDP - created unicast receiver thread
DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - initial_mbrs are [[own_addr=, coord_addr=, is_server=true]]
DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - election results: {}
DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - sending handleJoin( to
DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - []: JoinRsp=[|1] [,]
DEBUG 17:26.49 [DownHandler (GMS)] ClientGmsImpl - new_view=[|1] [,]
DEBUG 17:26.49 [DownHandler (GMS)] GMS - [local_addr=] view is [|1] [,]
INFO 17:26.49 [DownHandler (GMS)] CacheImpl$MembershipListenerAdaptor - viewAccepted(): [|1] [,]
DEBUG 17:26.49 [DownHandler (FD)] FD$Broadcaster - suspected_mbrs: [], after adjustment: []
DEBUG 17:26.49 [DownHandler (GMS)] GMS - changed role to org.jgroups.protocols.pbcast.ParticipantGmsImpl
INFO 17:26.49 [WrapperSimpleAppMain] CacheImpl - CacheImpl local address is
DEBUG 17:26.49 [WrapperSimpleAppMain] STREAMING_STATE_TRANSFER - Member asking for full state
DEBUG 17:26.49 [DownHandler (GMS)] STABLE - suspending message garbage collection
DEBUG 17:26.49 [DownHandler (GMS)] STABLE - resume task started, max_suspend_time=22000
DEBUG 17:26.49 [UpHandler (GMS)] STREAMING_STATE_TRANSFER - Connecting to state provider /, original buffer size was 43690 and was reset to 8192
DEBUG 17:26.49 [UpHandler (GMS)] STREAMING_STATE_TRANSFER - Connected to state provider, my end of the socket is / passing inputstream up...
DEBUG 17:26.52 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:26.52 [UpHandler (FD)] FD - received ack from
DEBUG 17:26.55 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:26.55 [UpHandler (FD)] FD - received ack from
INFO 17:26.56 [STREAMING_STATE_TRANSFER.reader] StateTransferManager - starting state integration at node UnversionedNode[ / data=[] RL]
DEBUG 17:26.58 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:26.58 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.01 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:27.01 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.04 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:27.04 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.07 [Timer-1] FD$Monitor - sending are-you-alive msg to (own address=
DEBUG 17:27.07 [UpHandler (FD)] FD - received ack from
DEBUG 17:27.09 [DownHandler (GMS)] STABLE - resuming message garbage collection
DEBUG 17:27.09 [DownHandler (GMS)] ParticipantGmsImpl - sending LEAVE request to (local_addr=
DEBUG 17:27.09 [DownHandler (GMS)] GMS - changed role to org.jgroups.protocols.pbcast.ClientGmsImpl
DEBUG 17:27.09 [DownHandler (UDP)] UDP - closing sockets and stopping threads
DEBUG 17:27.09 [UDP mcast receiver] UDP - multicast thread terminated
DEBUG 17:27.09 [DownHandler (UDP)] UDP - multicast receive socket closed
DEBUG 17:27.09 [DownHandler (UDP)] UDP - multicast send socket closed
DEBUG 17:27.09 [DownHandler (UDP)] UDP - socket closed
DEBUG 17:27.09 [UDP.UcastReceiverThread] UDP$UcastReceiver - unicast receiver socket is closed, exception=java.net.SocketException: Socket closed
DEBUG 17:27.09 [UDP.UcastReceiverThread] UDP$UcastReceiver - unicast receiver thread terminated
ERROR 17:27.09 [WrapperSimpleAppMain] Main - Error starting ShardServer.
org.jboss.cache.CacheException: Unable to fetch state on startup
 at org.jboss.cache.CacheImpl.start(CacheImpl.java:791)
 at org.jboss.cache.DefaultCacheFactory.createCache(DefaultCacheFactory.java:87)
 at org.jboss.cache.DefaultCacheFactory.createCache(DefaultCacheFactory.java:58)
 at org.jboss.cache.DefaultCacheFactory.createCache(DefaultCacheFactory.java:51)
 at com.scea.scne.acs.shard.cache.JBossCacheShardStore.initialize(JBossCacheShardStore.java:44)
 at com.scea.scne.acs.shard.cache.JBossCacheShardStore.<init>(JBossCacheShardStore.java:35)
 at com.scea.scne.acs.shard.server.Main.buildShardStore(Main.java:65)
 at com.scea.scne.acs.shard.server.Main.main(Main.java:28)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:585)
 at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240)
 at java.lang.Thread.run(Thread.java:595)
Caused by: org.jboss.cache.CacheException: Initial state transfer failed: Channel.getState() returned false
 at org.jboss.cache.CacheImpl.fetchStateOnStartup(CacheImpl.java:1190)
 at org.jboss.cache.CacheImpl.start(CacheImpl.java:783)
 ... 13 more

I've tried changing lots of my settings based on other forum posts like this one and a reading of the JGroups documentation but nothing seems to make a difference--I see the same sequence of events where the new cache decides it's time to leave the cluster:

DEBUG 17:27.09 [DownHandler (GMS)] STABLE - resuming message garbage collection
DEBUG 17:27.09 [DownHandler (GMS)] ParticipantGmsImpl - sending LEAVE request to (local_addr=

I've tried changing my transaction isolation level to make sure there aren't any lock-related issues here and I'm running this test without any cache clients reading or writing to either cache so they should be unencumbered and free to exchange state.

I have tested quite a bit with these two machines. They replicate changes properly and the STREAMING_STATE_TRANSFER works until the data gets too big which makes me think that something is timing out but I've had no luck understanding it.

Here's my cache configuration:
<?xml version="1.0" encoding="UTF-8"?>

<!-- ===================================================================== -->
<!-- -->
<!-- Sample JBoss Cache Service Configuration -->
<!-- -->
<!-- ===================================================================== -->

 <classpath codebase="./lib" archives="jboss-cache.jar, jgroups.jar"/>

 <!-- ==================================================================== -->
 <!-- Defines JBoss Cache configuration -->
 <!-- ==================================================================== -->

 <mbean code="org.jboss.cache.CacheImpl" name="jboss.cache:service=Cache">

 <!-- Configure the TransactionManager -->
 <attribute name="TransactionManagerLookupClass">org.jboss.cache.transaction.GenericTransactionManagerLookup</attribute>
 <!-- Use our custom TxManagerLookup class -->
 <attribute name="TransactionManagerLookupClass">com.scea.scne.acs.shard.tx.CustomTxManagerLookup</attribute>

 Node locking level : SERIALIZABLE
 <attribute name="IsolationLevel">READ_COMMITTED</attribute>

 <!-- Lock parent before doing node additions/removes -->
 <attribute name="LockParentForChildInsertRemove">false</attribute>

 Valid modes are LOCAL
 <attribute name="CacheMode">REPL_SYNC</attribute>

 <!-- Name of cluster. Needs to be the same for all JBoss Cache nodes in a
 cluster in order to find each other. -->

 <attribute name="ClusterName">ACS_CLUSTER</attribute>

 <!--Uncomment next three statements to enable JGroups multiplexer.
 This configuration is dependent on the JGroups multiplexer being
 registered in an MBean server such as JBossAS. -->
 <attribute name="MultiplexerService">jgroups.mux:name=Multiplexer</attribute>
 <attribute name="MultiplexerStack">fc-fast-minimalthreads</attribute>

 <!-- JGroups protocol stack properties.
 ClusterConfig isn't used if the multiplexer is enabled and successfully initialized.
 <attribute name="ClusterConfig">
 <!-- UDP: if you have a multihomed machine,
 set the bind_addr attribute to the appropriate NIC IP address -->
 <!-- UDP: On Windows machines, because of the media sense feature
 being broken with multicast (even after disabling media sense)
 set the loopback attribute to true -->
 <UDP mcast_addr="" mcast_port="48866"
 ip_ttl="64" ip_mcast="true"
 mcast_send_buf_size="150000" mcast_recv_buf_size="80000"
 ucast_send_buf_size="150000" ucast_recv_buf_size="80000"
 <PING timeout="2000" num_initial_members="3"
 up_thread="false" down_thread="false"/>
 <MERGE2 min_interval="10000" max_interval="20000"/>
 <FD shun="true" up_thread="true" down_thread="true"/>
 <VERIFY_SUSPECT timeout="1500"
 up_thread="false" down_thread="false"/>
 <pbcast.NAKACK gc_lag="50" retransmit_timeout="600,1200,2400,4800"
 max_xmit_size="8192" up_thread="false" down_thread="false"/>
 <UNICAST timeout="600,1200,2400" down_thread="false"/>
 <pbcast.STABLE desired_avg_gossip="20000"
 up_thread="false" down_thread="false"/>
 <FRAG frag_size="8192"
 down_thread="false" up_thread="false"/>
 <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
 shun="true" print_local_addr="true"/>
 <pbcast.STATE_TRANSFER up_thread="false" down_thread="false"/>
 <pbcast.STREAMING_STATE_TRANSFER down_thread="false" up_thread="false"
 use_flush="true" use_reading_thread="true"/>

 The max amount of time (in milliseconds) we wait until the
 initial state (ie. the contents of the cache) are retrieved from
 existing members in a clustered environment
 <attribute name="InitialStateRetrievalTimeout">30000</attribute>

 Number of milliseconds to wait until all responses for a
 synchronous call have been received.
 <attribute name="SyncReplTimeout">20000</attribute>

 <!-- Max number of milliseconds to wait for a lock acquisition -->
 <attribute name="LockAcquisitionTimeout">15000</attribute>

 <!-- Specific eviction policy configurations.-->
 <attribute name="EvictionPolicyConfig">
 <attribute name="wakeUpIntervalSeconds">5</attribute>
 <!-- This defaults to 200000 if not specified -->
 <attribute name="eventQueueSize">200000</attribute>
 <attribute name="policyClass">org.jboss.cache.eviction.FIFOPolicy</attribute>

 <!-- Cache wide default -->
 <region name="/_default_">
 <attribute name="maxNodes">10000</attribute>
 <attribute name="timeToLiveSeconds">0</attribute>
 <attribute name="maxAgeSeconds">0</attribute>

 <attribute name="CacheLoaderConfiguration">


Thanks for reading this far, I appreciate any insights you may have on this issue.