Version 1

    NOTE that this is defunct and will not be implemented.  Refer to DIST - distributed cache mode

    Partitioning - Buddy Replication on Steroids

     

    Partitioning is the concept of breaking up state in the cache and spreading it around a cluster, on sufficient nodes so that there will always be a backup somewhere, and with intelligent enough a data gravitation algorithm such that the data is always efficiently available on any given node.

     

    Another way to look at it is buddy replication without the need for session affinity.  (See JBossCacheBuddyReplicationDesign)

     

    The motivation

     

    Buddy replication effectively solves the scalability issue encountered when dealing with synchronous state replication, but imposes a very restrictive rule that session affinity must be present.  While this fits in perfectly with usage where session affinity is available anyway (state replication for HTTP sessions and EJB SFSBs) this is still restrictive on other use cases - such as in data grids where data access patterns are not fixed.

     

    The solution

     

    Involves something very similar to Buddy Replication - with a few changes.

     

    • Still use BuddyGroups to define a replication group

      • Except that buddies in a group are treated as Peers, rather than DataOwner and Buddy relationship

        • Let's call this PartitionGroup for disambiguity

      • Just like with BR, changes are replicated to the entire PartitionGroup

    • Still use data gravitation as a means of querying the cluster and gravitating data back

      • Except that when data is gravitated back, rather than removing data from the sender, the receiver joins the PartitionGroup of the sender

    • A cache instance can be in several PartitionGroups

    • Each cache instance runs a PartitionMaintenanceThread

      • The purpose of this thread is to make sure a "distributed eviction' takes place

      • Based on hard and soft upper and lower limits, defining how many copies of each data element should exist in the cluster

        • Colocated hosts need to be considered when shrinking partitions.  Should not be counted as a "backup".

      • Potentially re-use the eviction algorithms in deciding which cache instances should evict state

    • Regions would be the granularity of tree nodes moved around

    • Metadata - consisting of PartitionGroups, the region they represent, and peers in the group - would exist in a special "Metadata" partition.  More in this later.

      • Used to prevent unnecessary gravitation attempts for data that may not exist

     

    Initial partition group for a given region of data

     

     

    Partition grows

     

     

    • Server C attempts to look up node /a/b.

    • Looking at the metadata, it knows that this is a defined region and it attempts to join the region.

      • This is allowed since this is within the upper and lower limits for the number of peers per partition.

      • Which kicks off a partial state transfer event

      • Will involve updated metadata being broadcast

     

    Partition shrinks

     

     

    • Assuming 3 is outside the configuration's soft limit, after a period of time one server will be selected for removal from the partition

      • Configurable algorithms

      • Will involve voting and consensus within the partition

      • Will involve updated metadata being broadcast

     

    Proxying calls

     

    Instances wouldn't always join partitions when they are involved in a transaction that spans a partition.  A partition may be configured such that the call is proxied to a instance which is a member of the partition instead.

     

    The instance proxying the call may later decide, based on a frequency of transactions involving a partition and the way that partition is configured, to stop proxying calls and join the partition instead, based on:

     

    • The partition proxy behaviour (ALWAYS_PROXY, ALWAYS_JOIN, THRESHOLD)

    • And a threshold, which, if crossed, the proxying node stops proxying and joins the partition.

      • The threshold is encoded as a frequency of transactions per minute.

     

    Selecting an instance to proxy to would be based on a round-robin algorithm, although this could be pluggable.

     

    Evictions

    Never proxy evictions.

     

    Cache loading

    • When proxying a get, don't ever store the results in a cache loader!

    • Partition or proxy interceptor should sit before the CacheLoaderInterceptor to prevent unnecessary lookups in the cache loader.

    • Make sure state is removed from the cache loader when an instance leaves a partition

      • but not if the cache loader is shared!!

     

    The "Metadata" Partition

     

    This is a special partition, set up to hold metadata (MD) of all partitions.  This MD includes partition region and all members of that partition at any given time.  The purpose of MD is to prevent cluster-wide broadcasts to find data and partition membership when a node needs data it doesn't already have locally.

     

    The original design for handling MD was to replicate it to every node in the cluster, and while viable for small clusters with little MD, this can be cumbersome if the cluster is large and/or the partitions defined are very fine-grained (more partitions -> more MD).  Replicating MD can impact scalability.

     

    We are now considering a special MD partition which always includes the coordinator and have a fixed size.  This is so that if a node needs MD, it can look up the MD partition by asking a member of the special partition.  Lookups can be load balanced by using a random number between 0 and numMetadataNodes and ask the JGroups View for the corresponding address.

     

    When is Metadata needed?

     

    MD is needed whenever a node requires data that it doesn't have locally.  This should be infrequent enough to warrant a remote call to request MD.

     

    MD is only updated when a partition changes membership, which again should be infrequent enough to warrant updating the MD remotely.  Depending on the size/membership of the MD partition, this would be more efficient that replicating the changes to MD across the cluster anyway.

     

    Defaults

     

    The defaults for the MD partition would be tuned for small clusters, with MD partition upper hard limit == upper soft limit == lower hard limit == lower soft limit == cluster size, which will result in effectively replicating MD across all nodes.

     

    Metadata Design

     

    Metadata to contain:

    • Partition name

    • List of partition members

    • Join policy (Always join/always proxy/threshold)

    • hU, sU, hL, sL

    • threshold (encoded as a float; a metric based on frequency of partition accesses after which to join the partition.  Needs to take time into account)

     

    MD should not exceed 1k in size.

     

    Updating metadata

     

    • Metadata updates should be done asynchronously, and should be multicast to everyone.

    • When new partitions are created or a change in partition "shape" occurs, this is multicast so everyone gets the MD update.

    • Nodes proxying to or otherwise involved in the partition in question can update their MD.  Others - except the special partition members - would discard.

     

    Caching metadata

    • Instances should cache MD for all partitions it is interested in, such as partitions it participates in or proxies to, and listen for MD update multicasts.

    • Handle failed proxy requests by purging MD for the partition in question and retrieving MD from the "special" partition.

     

    Alternative to MetaData

     

    Consistent hashing? -- needs research.  http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html  So far, consistent hashing will not really help.

     

    Data Gravitation

     

    Data gravitation is a concept we introduced when we implemented buddy replication.  Building on what we have here, we'd need to enhance data gravitation to work with partitions as described here.

     

    NOTE This section talks about FLUSH when transferring state.  This may not be the final implementation, as hopefully we'd have a better state transfer mechanism in place by the time we implement partitioning.  See JBossCacheNewStateTransfer

     

    Performing data gravitation

     

    The current buddy replication implementation does the following when a new buddy is assigned to a buddy group.

     

    • Perform a cluster-wide FLUSH to prevent changes to the cache

    • The data owner of the group generates it's state (excluding it's BUDDYBACKUP_ region)

    • Performs a state transfer of this state to the new buddy

    • The new buddy applies this state to a new, backup sub-root

    • The FLUSH block is released and the network resumes.

     

    This would be changed, since partition groups only contain the state of a specific region rather than the entire state of a data owner, plus the concept of buddy subtrees are removed.

     

    • Perform a cluster-wide FLUSH to prevent changes to the cache

    • A single member of the partition group is selected as the state provider

    • The state provider generates state for the required region

    • This is streamed to the new member joining the region

      • Applied directly on the recipient, not in a backup subtree

    • The FLUSH block is released and the network resumes.

     

    Detecting a "global" cache miss versus a "local" cache miss

     

    As is evident from above, gravitation can be expensive and hence should be minimised.  To do this, we rely on region metadata that is replicated across the cluster.

     

    • If a request for tree node N is received by an instance in the cluster,

      • Check region metadata to determine which region this resides in

      • Check if the current instance is a part of this region

        • If so, the tree node should be available locally

        • If not available locally, create this tree node.

      • If the current instance is not a member of the region,

        • Join region

        • Will involve gravitation, as above

        • Create node if necessary

     

    Distributed eviction

     

    As the PartitionManagementThread runs and decides that a partition has too many members (based on hard and soft limits) a voting protocol commences to determine which instances should be forced to leave the partition.

     

    • Partition member that initiates the distributed eviction broadcasts a "DE" message to partition members.

    • Each partition member reports a single value representing the 'participation weight' in that partition to the initiator.

    • The initiator uses this to select which member or members have the lowest partition, based on the number of members that need to leave the partition group.

      • In the event of a tie, the first instance in the tie is selected

    • The initiator then broadcasts it's 'decision' on the instance that is elected to leave the partition.

      • This will contain a new PartitionGroup and updates region metadata.

      • The elected members respond with an acknowledgement, evicts the partition state from memory and leaves the partition.

      • The other instances just respond with an acknowledgement.

     

    Participation weight algorithm, based on half-life decay of participation importance

     

    To calculate the degree of participation in a partition, each node maintains:

     

    • a degree of participation (Dp)

    • a timestamp of when this was calculated (Dt)

    • a list of timestamps when the partition was accessed (At)

    • a half-life decay constant which is used to reduce the importance of accesses over time (H)

      • E.g., an event that occured H milliseconds ago is deemed half as important as an event that occurs now.

     

    Every time the PartitionManagementThread runs, OR a DE message is received, each instance recalculates it's degree of participation, updates Dp and Dt, and clears Lt.  If a DE message was received, Dp is sent back to the initiator of DE.

     

    Dp is calculated using the following algorithm:

     

    • Let Tc be the current time, in millis, of the calculation (System.currentTimeMillis())

    • if Dp != 0, re-weight Dp

      • Dp = Dp x ( 1 / 2 ^ ((Tc - Dt)/H) )

      • Dt = Tc

    • Now add the new access events

      • Dp = Dp + ( 1 / 2 ^ ((Tc - At)/H) )

    • Empty the list of At collected

     

    • This gives is a mechanism of maintaining the participation degree (Dp) without maintaining all access events as this is digested periodically.

    • The algorithm above maintains the weightage of accesses, treating more recent accesses as more important than past accesses.

      • Older accesses decay in importance exponentially, using the half-life constant passed in

    • Includes a function to add the number of accesses as well, since Dp = Dp + f(At)

      • so instances that accessed a participation a a significantly larger number of times in the past may still have a higher participation weightage to instances that accessed the partition only once, but very recently.

    • H will be configurable, with a sensible default of 8 hours.

     

    Transactions

    We should never allow transactions to span > 1 partition (for now, at least) since this can lead to ugly deadlocks, especially if proxying is involved.

    When proxying calls - even get()&146;s, to maintain Repeatable Read - any transaction context should be propagated and the proxy node should be able to perform commits and potentially broadcast put()s to it's partition.

     

     

    New servers joining the cluster

     

    We would provide configurable behaviour for new nodes joining the cluster.  New nodes would have one of the following "default join" policies:

    JoinNone, JoinSpecifiedPartitions, JoinAll, JoinSome.  Some of these may take additional "join options".

     

    JoinNone

     

    Simple - the server joins the cluster but does not join any partition (except the metadata partition, if the metadata partition is configured to be on all instances).  It is assumed that as requests come to this new node, perhaps via a load balancer, it would join partitions as needed.

     

    JoinSpecifiedPartitions

     

    A configuration option - PartitionsToJoin - would provide a comma-delimited list of region names to join on startup.

     

    JoinAll

     

    Does what it says on the can.

     

    JoinSome

     

    This would take in a further configuration option - PartitionSelectionAlgorithm - which would consist of JoinLargest, JoinSmallest, JoinMostHeavilyUsed, and JoinLeastHeavilyUsed, as well as another option NumberOfPartitionsToJoin - which will define how many partitions to select based on the required PartitionSelectionAlgorithm.

     

    Largest and smallest are calculated based on number of members; usage is calculated based on degree of participation.

     

    Creating and destroying partitions

    • Do we need an explicit API for destroying a partition?

    • Need to clean up metadata when destroying entire partitions.

    • We need to be able to create new partitions programmatically and on the fly.

      • Needed for HTTP Session use case

      • Creation needs to be optimised for reduced weight and fast response time

     

    Striping

     

    Taking the concept of partitioning further, there is more that can be done if the cache were to be used to store very large data items (for exaple, a DVD - a byte{FOOTNOTE DEF  } of 4GiB).

     

    Using RAID-style error correction algorithms, the byte{FOOTNOTE DEF  } could be broken up into chunks, the chunks stored in various nodes in the tree, each node defined as a separate partition.

     

    The alternative is to break down the replication granularity of the tree into something smaller than a tree node - which I am not keen on doing as this means implicit knowledge of the relationships between the attributes in a node.