Skip to content

Latest commit

 

History

History
164 lines (122 loc) · 20.3 KB

Non-Blocking-State-Transfer-V2.asciidoc

File metadata and controls

164 lines (122 loc) · 20.3 KB

TODO: this document supercedes Non-blocking State Transfer. Some parts of the old document might still be applicable and should be copied here.

Introduction

This document describes the changes we need to make in order to implement non-blocking state transfer on top of our current blocking push-based state transfer (ISPN-1424).

State transfer is the process of adjusting the cache’s internal state in response to a view change.

  • The view change can be a join, a leave, a merge, or any combination of joins, leaves and merges.

  • Since we support asymmetric clusters, we deal with each cache independently.

  • The view changes we discuss are "cache view changes" and not JGroups view changes. To make a clear distinction, we’ll introduce the term cache topology from now on to refer to a cache view.

The goals of the non-blocking state transfer design are:

  • Minimize the interval(s) where the entire cluster can’t respond to requests because of a state transfer in progress.

  • Minimize the interval(s) where an existing member stops responding to requests because of a state transfer in progress.

  • Allow the performance of the cluster to drop during state transfer, but it should not throw any exception

  • Don’t add a mechanism for resolving data conflicts after a merge, but make sure it will be feasible to add one in the future

State transfer components

In distributed caches, the cache state that we need to disseminate between nodes consists of several components:

  • The cache membership

  • The consistent hash

  • The actual cache entries

  • Current transactions + acquired locks

  • The L1 entries + requestors information (only with L1 enabled) [TODO: not implemented for Alpha release]

  • In-doubt transactions information (only with recovery enabled) [TODO: not implemented for Alpha release]

In replicated caches, every member has the same information, there is no L1, and state can only be transferred to a joiner. So we will use a consistent hash even for replicated caches [TODO: current implementation uses only one segment, so the joiner fetches all data from one member now. We could use multiple segments even for replicated case].

In invalidation-mode caches, there isn’t even any state to transfer, and transactions never leave the originator.

In the following sections I will focus on transactional DIST_SYNC caches with synchronous commit. At the end I will discuss how other cache modes differ from DIST_SYNC.

New Consistent Hash Algorithm

The way data was mapped to members by the previous Consistent Hashing (CH) scheme was not as flexible as we wanted. It did provide the nice property of minimal rehashing during join/leave in terms of numbers of relocated keys but it did not provide an easy way to implement incremental transfer of data between existing members and/or to joining nodes. The process of updating the CH during a join/leave was complex and needed to be completed all at once and extra complications appeared if new nodes joined or left during this process. We would like this process to become incremental, to be able to relocate partial chunks of data between members instead of the whole data in one shot. We would also like to keep the cluster active during the rehashing. It is also very desirable to be able to freely relocate data on demand, without a join or leave being involved.

A new CH algorithm will be introduced, that has all the advantages of the old CH but also enables us to establish beforehand to which node will each key be assigned to and also allows us to modify this assignment easily if needed (during join/leave/merge or even during a manually triggered data rebalancing) and only when we want to. It is essential that we are thus allowed to decouple cache membership updates and data rebalancing. This new CH algorithm is based on a simple routing table that states to which members each of the values from the hash-space maps to. Even though the hash-space is potentially a lot smaller than the key-space it is still prohibitivelly big to actually have one routing entry per hash value. To address this we use a higher granularity; we divide the hash-space in segments.

Segments existed in the previous CH algorithm too but the new CH algorithm uses a different approach for dividing the hash-space. We split the entire hash-space (32 bit int values) into a number (immutable during runtime) of equal sized non-overlaping segments and each cache member is assigned a dynamic number of segments (an even share, preferably) such that all segments are owned by at least one member. Each key belongs to exactly one hash segment. The mapping of segments to owners is done with a routing table. Each hash segment has an entry in this routing table, that consist of a list of cache member addresses, the segment owners. The first owner of each segment is considered the primary owner, the others are the backup owners (order of backup owners is irrelevant). Given that the routing table is known to all members, everybody is able to deterministically compute the CH of any given key. The routing table MUST be automatically updated to reflect membership changes. In future it could also be updated on demand via JMX to be able to manually move data segments around independently of membership changes.

We must note that:

  • The number of segments is fixed during the life of the cluster (it is a parameter of each cache’s CH).

  • The routing table of the CH is maintained by the coordinator (to ensure determinism) and is disseminated to all members so each member can deterministically locate the owners of any key.

  • A segment always has at least one owner. If the last owner leaves, all data that belongs to that segment is lost and the coordinator has to assign a new owner for the segment.

  • If a segment has less than num_owners owners then the coordinator must assign more owners to that segment and dissemintate the new CH to the cluster.

  • The hash of the address of a node no longer participates in the computation of the CH as it was the case of the old algorithm. This eliminates (pseudo)randomness and is easier to follow and reproduce.

  • Hash segments being equal provides no guarantees that the actual number of keys is equal at runtime. Thus, a strategy of allocating an equal number of segments to nodes does not necessarily result in perfectly even data distribution, but is close enough. Results should be similar to those achieved using virtual nodes in previous CH algorithm. If we intend to have a really even distribution we need to assign segments to members based on actual count of keys that fall into each segment rather than the number of segments. This rebalance strategy can be easily implemented with the current approach.

  • The old approach of using virtual nodes to help spread data more evenly is no longer needed because it is implicitly achieved.

  • Allocation of segments to members could be weighted based on capacity (RAM, CPU) or any arbitrary criteria.

  • As nodes join and leave the cluster the routing table is updated by the JGroups coordinator and a new CH, evolved from the previous one, is installed. We will discuss the rules used for evolving the CH later.

  • In replicated mode we can consider to have a degenerate CH in place, one that has only one segment owned by all members, or even more than one segment owned by all members.

If we intend to keep the cache cluster active during a rehash operation we need to take into account that:

  • The installation of a new CH does not reach every cluster member simultaneously. Simultaneity could be forced by blocking incoming and outgoing cache commands until all members install and confirm the new CH but we do not intend to take such drastic measures.

  • Group communication in Infinispan is not truly FIFO (because of OOB and asynchronous marshalling). Since we do not use FLUSH protocol or halt cluster activity during rehash there will be a high chance of application messages delivered out of order with respect to the CH installation messages.

The summary of a potential solution to these issues is:

  • We tag each cache topology with a unique identifier and also tag all cache commands so the receiver can check if it is on the same page as the sender (will discuss later what happens in case of mismatch). The topology identifier is a monotonically increasing integer, much like a version number. There are important rules for deciding when the id is incremented (explained below).

  • We use one CH during regular cluster operation and two CH functions during rehashing. The single CH is used for both read and write cache operations. During rehasing, one CH is used only for read operations, this is usually the same CH used by the previous cache topology, while write operations will use a second, the pending CH that will later become the current CH of the cluster once the state transfer is complete (actually the discussion gets a bit more complicated when we introduce the balanced CH). For convenience, we are also going to refer to them as the 'read CH' and the 'write CH'. The 'pending CH' reflects recent membership changes and is evolved from the current CH. More detail on this is presented in the request handling section below.

To sum up, the cache topology consists of:

  • a unique monotonically increasing numeric topolgy identifier

  • a current CH

  • an optional pending CH, only used for write operations during rehashing. The pending CH is always a superset of the current CH (important!).

A CH consists of:

  • a list of members

  • the number of segments

  • the number of owners per segment (or per key)

  • the routing table

  • a hashing function which maps keys to hash-space values (int)

The basic flow of state transfer will be like this:

Join flow
  1. The user starts a cache on a node (when CacheContainer.getCache() is invoked). CacheContainer.getCache() method blocks on the joiner until step 4 is completed.

  2. The node sends a join request to the coordinator, containing his consistent hash configuration (CH factory object and factory parameters). This is needed because the coordinator might not be a member of this cache (asymetric cluster) so these details are not known.

  3. The joiner receives from coordinator the current CH of the cluster. The joiner is not included yet in this CH and it does not hold any data. The topology id is not incremented. The other nodes do not receive an updated CH now (will happen later when rebalancing kicks in, see below).

  4. The received initial CH is installed on joiner. The new member has now access to the cache but it does not have any state and also no state transfer is needed yet.

Multiple joins can happen at any time but this is not going to interrupt any existing state transfer.

Rebalance flow
  1. A separate background thread that runs on the coordinator while no topology update is in progress notices that the consistent hash is 'inbalanced' (some nodes are allocated 0 or less than average data segments or num_owners is not satisfied for some segments or whatever arbitrary criteria), so it computes a new "balanced" consistent hash. The CH factory implements the algorithm that is actually responsible for detecting inbalance and for rebalancing and is free to do whatever segment re-assignment it wants. It MUST ensure at least one owner, but ideally it SHOULD ensure num_owners owners for each segment and SHOULD also ensure an even spread of segment ownership between owers. Eventual leavers must be immediatelly removed from both current and balanced CH and orphan segments must receive an owner (random is fine).

  2. The union of the current CH and the balanced CH is computed; we call this the pending CH. The pending CH adds new owners to make it more balanced, removes leavers and very importantly, the union ensures we do not remove pre-existing owners or change the primary owner. In other words the pending CH includes the current CH. The current CH (with eventual leavers removed), the computed pending CH and the incremented topology id is now sent to all members via a topology rebalance command (rebalance is a sub-type of topology update).

  3. The balanced CH is 'remembered' and execution continues at step 1 of topology update flow.

Important observation: the toplogy id is incremented only if there are segment ownership changes except for those strictly caused by leavers. Basically, we want to increase the id only if the new CH requires data transfer to take place.

Topology update flow
  1. Each nodes receives the new topology id, the current CH and an optional pending CH. The new current CH is going to be used for reads. The pending CH is used for writes if available and if it’s null then the current CH is used for writes instead. As a rule, the pending CH always includes segment ownership from current CH (with the exclusion of leavers and crashed members). This allows the cluster to continue to work properly until the state transfer is completed as long as we take some precautions in processing commands (see request handling section below). The following sub-steps are performed (in this order) by each member (acceptor): 1.1 If the received pending CH is not null, compute the difference of segments that it owns in the current CH and pending CH and determine new segments. If incoming pending CH is null then compute the difference between current CH and previous write CH to determine the removed segments. 1.2 Temporarily block all incoming commands. 1.3 For new segments, it asks one of the current owners (the donor) for the transactions and locks. 1.4 The donor only replies with the transactions and locks after it has installed the new toplogy. We need this to make sure a proper pending CH is also installed on the donor and request forwarding to the new pending owners is in place (forwarding is explained in request handling section). 1.5 The transactions and locks are applied on acceptor. 1.6 The acceptor requests new data segments from donor asynchronously. 1.7 Unblock all incoming commands. We are now prepared to process commands although some data segments are still flowing in. 1.8 Removed segments are discarded from data container and cache store. [TODO: We should try to move the old data to L1 but this introduces some complexity we cannot handle in the first release] 1.9 When all requested data segments are received from donors we signal coordinator that topology update was completed by this member.

Donors will perform the following steps:

  • if the request for transactions arrives at a time when the installation of the new topology has not begun yet it will block until it starts

  • block all access to transaction table while transactions and locks are retrieved to be sent to acceptors that requested them

  • if there are any data transfers in progress to members that are no longer present in the new topology these transfers need to be cancelled

    1. After the coordinator receives confirmation of topology update completion from all members: a) If this was started by a rebalance command then we have a balanced CH that was previously computed and 'remembered' during rebalance flow: coordinator sends all members a topology update with the balanced CH as current CH, no pending CH and an incremented topology id and then processing continues back at step 1. b) else If we have a pending CH: coordinator sends all members a topology update with this pending CH as current CH, no pending CH and an incremented topology id and then processing continues back at step 1. c) no previously computed balanced CH, no pending CH: The End!

Leave flow

At any point during normal cluster operation or during the topology update or rebalance flow some members could leave or crash. 1. The leaver sends a leave message to the coordinator or the coordinator notices member crash via a JGroups view event. 2. The coordinator removes the leaver(s) from current CH and from the pending CH (if one exists) of current topology, if some segments remain without any owners it assigns them some owners (trying to maintain uniform allocation). The topology id is not incremented because the new topology does not have state to be transferered with respect to previous topology. 3. The coordinator sends all members a topology update command. This is not going to cause a state transfer, however, if one is already in progress it must continue with the updated CH(es).

Merge flow
  1. A merge happens at JGroups cluster level, all members receive a MergeView event. The list of (N>=2) partitions received from JGroups is not always disjoint so we first compute a list of disjoint partitions: sort the N partitions by JGroups view id (descending, with possible duplicates) and remove P0..Pi-1 from Pi for every i from 1 to N-1. We obtain a list of disjoint cluster subpartitions.

  2. The coordinator asks all members for their status: topology id, current and panding CH. Now all members know who the new coordinator is. [TODO: how does change of coordinator impact existing state transfers?]

  3. a) if some subpartitions have a pending state transfer: wait for state transfer to end and then continue with 3.b of merge flow b) If there is no state transfer in progress in any partition: coordinator computes a pending CH as the union of all current CHes of all subpartitions and sends a topology update with their current CH and this pending CH to each member (here is the spot where we can add data version conflict resolution in future). We cannot have multiple primary owners so the primary owners in the union are considered to be those that are primary owners in the oldest subpartition (in the order of subpartitions seen by new coordinator). The tid of the resulting topology is the maximum + 1 of the subpartition topology ids. Processing continues at step 1 of topology update flow.

Request handling

Here we discuss how a cache member which currently has installed a topology id tid1 handles incoming commands with topology id tid2. We are only interested in discussing read, write and transaction commands.

If a topology update is in progress we block until transactions and locks are received.

  1. tid1 == tid2 Process command and reply normally. No special command forwarding is needed.

  2. tid2 > tid1 a) Read commands can be processed normally. b) Other commands cause the receiver to block until tid2 is installed and then proceed normally.

Here is why. In normal cases (without considering merges for now) a tid2 > tid1 is only possible for tid2 = tid1 +1. This is true because we know every member already confirmed tid1 otherwise it would be impossible to see a higher topology tid2, and during a topology update no more updates with increasing id are performed (tid is not incremented for leavers, and joiners are accepted but not included in the topology, so tid is again not incremented - see step 1 of join flow) so a greater tid2 can only be tid1 + 1. Since during a state transfer all read commands actually use the CH of the previous topology (tid1 in our case) we are safe to process reads normally. For other commands it might not be safe to proceed until tid2 is installed. For example a remove command processed before the topology update is performed could be (wrongly) undone by the incomig data segments.

  1. tid2 < tid1 3.1 Read of key K a) Node is owner of K according to local CH Process request and reply normally. b) Node is not owner of K according to local CH Reply with an error status that indicates that data ownership has changed, include tid1 in reply. The originator has to wait until tid1 (or higher) is installed on his side and then retry the command. 3.2 Write of key K or TX command affecting key K a) Node is owner of K according to its write CH Process request and reply normally. Compute the set difference of tid1 owners of K and tid2 owners and forward the command to them. b) Node is not owner of K according to its write CH Forward the command to all owners from tid1.

Command forwarding is performed synchronously if the incoming command was synchronous, otherwise it is performed asynchronously [TODO: current implementation is always synchronous due to limitations in detecting how the command was sent initially].

This forwarding scheme ensures commands reach all new owners but has a high potential of producing duplicates because the forwarding is possibly done by multiple nodes at the same time. We need to revise processing of all commands to be able to cope with or discard these duplicated commands.