Skip to content

Latest commit

 

History

History
1310 lines (989 loc) · 56.9 KB

Consistency-guarantees-in-Infinispan.asciidoc

File metadata and controls

1310 lines (989 loc) · 56.9 KB

Embedded caches

1. Local mode

1.1. Non-transactional

A non-transactional local cache behaves very much like a java.util.ConcurrentHashMap:

  • Operations happen instantaneously at some point between the user-visible start and finish time. The finish time for async operations (e.g. putAsync) is the moment future.isDone() is true.

  • Overlapping operations can happen in any order.

  • An operation that happens-after a write operation sees the updated value (or a newer one)

  • An operation that happens-after a get operation sees the same value (or a newer one)

  • Happens-after has the same meaning as in the JMM.

  • Overlapping operations (those for which there is no happens-before relation) can happen in any order.

Multi-key operations like putAll and clear also behave atomically with respect to other write operations: they lock their keys one-by-one, so it’s possible to have concurrent operations, but we can find an ordering that applies all the modifications in a single step. Just like in ConcurrentHashMap however, they are not truly atomic, and this is possible:

Thread1: cache.putAll({k1: v1, k2: v2})
         cache.putAll({k1: v11, k2: v22})

Thread2: cache.get(k1) -> v1
         cache.get(k2) -> v22
         cache.get(k1) -> v1

1.1.1. Cache stores

If a cache store is attached and passivation is disabled, every cache write will also cause a write to the store. If the write to the store fails (e.g. because there is no more space on the disk), the user will see an exception and the cache will not store the value in memory either.

If passivation is enabled, cache entries are written to the store only when they are passivated - which could be when another entry is read (i.e. activated). If write-behind is disabled, a get operation could fail because of a passivation error, but already written data will not be removed.

With eviction enabled, when a read operation does not find the key in memory, it will attempt to load the entry from the cache store and then store it in the data container. This cache store read + data container write is atomic.

If write-behind is enabled, writes are asynchronous and there is no feedback on whether writes succeed or fail. The asynchronous writer ensures that cache reads see the in-flight values, but if there is a write failure the in-memory state can get out of sync with the store. On stop, the cache waits for a limited amount of time for the asynchronous writes to be persisted (transaction.cacheStopTimeout()).

On restart, assuming the store was not corrupted (e.g. because of faulty hardware), all the entries that were successfully persisted are available (unless purging on startup is enabled in the store configuration).

Multiple stores: when more than one store is configured for the cache, the stores are updated serially on the application thread. If one of the stores fails, the operation fails and the previous writes are not rolled back. If write-behind is enabled, the stores are updated in parallel; write failures are not reported and they do not affect other writes.

1.2. Transactional

There are 4 transaction modes:

  • BATCH: using a built-in transaction manager, registering as a Synchronization.

  • NON_XA: same as BATCH, but with an external transaction manager.

  • NON_DURABLE_XA: registering with an external transaction manager as a proper XA resource.

  • FULL_XA: same as NON_DURABLE_XA, but also supports recovery (provided the recovery cache is persistent).

The only supported isolation levels are READ_COMMITTED and REPEATABLE_READ. Regardless of configuration, transactions only update the values during the commit phase, so it’s not possible to read another transaction’s uncommitted values. Notably, Infinispan does not support Snapshot isolation or Read Atomic isolation : if a transaction T1 writes K1 and K2, an overlapping transaction T2 may see both K1 and K2, only K1, only K2, or neither.

Transactional caches also support two different locking modes: pessimistic or optimistic.

  • Optimistic caches will only lock their keys during the prepare phase ("short locks").

  • Pessimistic caches will lock each key as it is touched by a write operation and unlock all of them after the commit ("long locks").

If two overlapping transactions update the same keys, optimistic caches will fail to prepare one of the transactions and instead they will throw a WriteSkewException.

If the write skew check is disabled in the configuration, the last write will always win. This means conditional operations are not reliable, e.g. two overlapping transactions trying to increment a counter with replace(key, counter, counter + 1) could both succeed (incrementing the counter by 1 instead of 2).

In a pessimistic cache, one of the overlapping transactions would block until the other was finished. But this requires all transactions to lock their keys in the same order, otherwise the write/lock operation will time out with a generic TimeoutException. If deadlock detection is enabled in the configuration, the application will instead get a DeadlockDetectedException after a (usually much shorter) spin duration.

Pessimistic caches allow the user to control the lock order by locking keys explicitly, ahead of use (lock(key)) or during a read operation (withFlags(FORCE_LOCK).get(key)).

If needed, serializability can be emulated with pessimistic locking, by locking all the keys read by the transaction at the beginning. With optimistic locking, there is no lock equivalent, so the transaction would have to write the existing value back into each key that it needs to stay unmodified (and handle the resulting WriteSkewExceptions).

1.2.1. Cache stores

Interaction with the cache stores happens outside the XA transaction. A store read failure is always propagated directly to the user. A store write failure will always cause the in-memory updates to be rolled back, but the application may or may not see the an exception depending on how the cache is registered with the transaction manager (similar to WriteSkewException).

If a transaction writes to more than one key (or to the same key in more than one store), and one of the updates fails, previous writes to the store will not be rolled back.

Like with stores in a non-transactional cache, a read can undo an overlapping transaction’s modifications.

1.2.2. XA recovery

TODO

1.3. Expiration

When an entry has a lifespan set, read operations will stop seeing the value sometime between write_start_timestamp + lifespan millis and write_finish_timestamp + lifespan millis. The entries will actually be removed only later, depending on how eviction is configured, but they will not be visible.

maxIdle behaves similarly, except the reference timestamps are from the last write OR read operation. However, the access timestamp is not updated for entries in cache stores, making maxIdle behave exactly like lifespan (ISPN-3202).

TODO With repeatable read, can a value expire in a transaction after it was read once?

1.4. PutForExternalRead

cache.putForExternalRead(key, value) behaves like cache.putIfAbsent(key, value), except it doesn’t wait for any locks - it will silently fail if another write operation is accessing the same key.

In transactional caches, it also suspends the current transaction for the duration of the operation, and the next read in the transaction will not see the value.

2. Invalidation mode

Invalidation mode is the most lightweight clustered mode, but it allows for missing and/or stale data, so it should only used in applications that tolerate that.

2.1. Non-transactional mode

In invalidation mode, writes on one node are not automatically replicated to other nodes, instead writes on one node will invalidate the key on all the other nodes. There are three way to synchronize data across the cluster:

  • Using a shared cache store. Invalidations caused by write operations only remove the in-memory values, forcing the cache to read the value from the store on the next access.

  • Using an external shared data store without a cache store. The application has to update the values in the external store itself, and the values read from the external store must be written to the cache with putForExternalRead() to avoid removing them on other nodes.

  • Using a ClusterLoader. If enabled, this cache loader will request the value from all the other nodes (using multicast, if available).

2.1.1. Shared cache store

Locks are only acquired on the originating node, so different nodes can execute their writes in parallel.

The cache will stay "consistent", i.e. all nodes will see the same value after both writes are finished. But conditional operations are not reliable, and neither is the return value of non-conditional write operations. E.g. if there are two overlapping put(key, value) operations, both might return null.

An invalidation cache can also be transactional. But locks are still local to each node, so transactions aren’t atomic either.

It’s also possible that two overlapping writes from different nodes will invalidate each other, removing the key on both nodes. However, this will not break the consistency of the cache, it will just require additional reads from the cache store.

Because data is stored in the shared store after the invalidation command was executed on all the nodes, a node might execute the invalidation command first, then read the old value from the shared store, all before the originator managed to update the value in the store. If that happens, the node will keep the stale entry until it expires or there it is updated by another write operation. TODO Create bug in JIRA

2.1.2. External data store

The external store may not store the actual cache values, instead it could have just the data from which the cache values can be re-computed.

Just like with a shared cache store, the locks are local to each node. So the application must rely on the external store to provide atomicity and isolation, and Infinispan will only work as a cache.

One particular concern is that putForExternalRead() necessarily uses data that was read from (or computed with data from) the external store some time ago. If another write modified the data in the external store and invalidated the key right before, the PFER operation will write stale data in the cache.

2.1.3. ClusterLoader

In this scenario, the values are only stored in the cache. So two overlapping writes from different nodes invalidating each other will really remove the entry from the cache completely. Therefore, the ClusterLoader should only be used in conjunction with either an external store or a shared cache store.

In such a combination, the ClusterLoader is only a performance optimization, and the consistency guarantees stay the same as when using the other mechanism by itself. Note that the ClusterLoader may also slow things down by triggering a storm of ClusteredGetCommand responses.

2.1.4. Handling topology changes

Invalidations triggered by write operations are synchronous and the operation will fail with a SuspectException if another node crashes. The entry will not be updated on the originator, but invalidation will still be performed on the non-crashed nodes. TODO Create an issue in JIRA to ignore suspect exceptions.

2.1.5. Handling network partitions

If the cluster splits, each partition install its own topology and will continue working. Writes in progress before Infinispan at the time of the split may fail. Writes started after the split was detected will succeed, invalidating the key only on the originator’s partition.

When the partitions merge back up, partition with the highest cache topology id is considered the most up-to-date one, and its topology is used as the merge topology. Nodes not in the merge topology will be wiped, and they will receive the latest entries from one of the nodes in the merge topology.

2.1.6. Asynchronous replication

With asynchronous replication, the write operations will invalidate the value on the remote nodes asynchronously. So reads on other nodes issued after the put operation finished may still see the previous value for a short while.

2.2. Transactional mode

Transactions keep track already read values, providing repeatable reads but without any other consistency guarantees. The keys modified by a transaction are invalidated with a single remote command. There are no global key locks, and all the consistency problems that can appear in non-transactional mode can also appear in transactional mode.

2.3. Private (non-shared) cache stores

The write invalidations do not remove entries from private cache stores, although I’m not sure this is intentional. This means a private store may keep stale values indefinitely, requiring expiration to remove them.

A private store may also work in passivation mode, with more or less the same behaviour.

2.4. Expiration

In all clustered cache modes, expiration happens independently on each node, based on the time the entry was updated on the local node. So it’s not safe to assume that an entry that expired on one node has also expired on all the other nodes.

2.5. State Transfer

State transfer uses the consistent hash, same as in a replication cache. Each node pushes to the joiner the entries for which it is a primary owner. Entries that were already invalidated on the joiner are not revived by state transfer.

3. Replicated mode

In replicated mode, each node has a copy of all the entries, making it very similar to a local cache. Each key has a primary owner node, and write operations need to acquire a lock on that node.

3.1. Non-transactional mode

In non-transactional mode, writes are sent to the primary owner, which acquires the key lock, then broadcasts the update to all the other nodes while holding the lock (including the originator of the write). If the originator is the primary owner, it will acquire the lock and broadcast the update directly. This approach preserves the consistency guarantees provided by local non-transactional caches as long as the cache topology is stable.

For multi-key operations like putAll(), the operation is sent to all the primary owners of the affected keys. When there is a single primary owner for all the affected keys, the operation appears to behave like in a local cache. When there are multiple primary owners, the operation is not atomic not even with regards to other write operations.

Clear does not acquire global key locks, instead each node clears its in-memory data container and cache stores independently. As such, clear is not atomic

Read operations are always local, unless the node just joined and hasn’t received all the cache entries yet.

3.1.1. Handling topology changes

When a new node joins, state transfer will transfer all the entries from the existing nodes on the new one.

If a read operation tries to access a key that is not on the joiner yet, it will fetch it from the other nodes. The clustered get request is made to all the nodes and only uses the fastest response, so the value seen by a thread can go back and forth between the old value of a key and the new value written by a concurrent update:

Thread1: cache.put(key, v1)
         cache.put(key, v2)

Thread2: cache.get(key) -> v2
         cache.get(key) -> v1
         cache.get(key) -> v2

TODO ISPN-5042

Originator crashes

If one of the owners installs a new topology without the originator, it will throw an OutdatedTopologyException and the command will not be retried. The update will not be applied on the primary owner, even if the OutdatedTopologyException was thrown by another node. Other nodes may apply the update, however, so the key will have inconsistent values.

Backup owner is removed/added

The primary owner or another backup may install the new topology and throw an OutdatedTopologyException. The originator will retry the operation after receiving the new topology, and the primary owner will replay the operation on the new backup owners.

The primary owner may also get a SuspectException if the backup owner crashed. The exception will be propagated to the originator, which will retry the operation the same way.

Again, the problem is when some of the nodes successfully update the value without detecting the new topology. If there is an overlapping write operation executing between the initial update and the retry, it will be able to proceed, because the primary owner isn’t holding a key lock waiting for the retry. This means we can’t treat the update as happening everywhere in a single instant between the start and the finish time. In this example, the value read by thread 3 can go back in time (i.e. we don’t have session consistency):

Thread1: cache.put(key, v1)

Thread2: cache.put(key, v2)

Thread3: cache.get(key) -> v1
         cache.get(key) -> v2
         cache.get(key) -> v1

However, the success of conditional write operations and the return values of unconditional write operations only depend on the value on the primary owner, so they are not affected in this case.

Backup owner becomes primary owner

If the primary owner crashed, the originator will get a SuspectException and will retry the command on the new primary owner (after receiving the new topology). If the primary owner didn’t crash, the retry is triggered by one of the nodes installing the new topology and throwing an OutdatedTopologyException.

When executing the retried operation, the new primary owner may or may not have the updated value. A conditional write operation seeing the "new" value would normally fail, but when retrying it will succeed and it will also broadcast the update to the other members.

Because the expected value check relies on the actual value and not on versions, it’s possible for two overlapping conditional operations writing the same value to both succeed (in addition to the session consistency issue in the previous scenario):

Thread1: cache.putIfAbsent(key, v1) -> null

Thread2: cache.putIfAbsent(key, v1) -> null
Originator becomes primary owner

This is a special case of the backup owner becoming primary owner. The behaviour is the same, except the originator forwards the command to the other members directly.

Originator is no longer primary owner

Another special case, the only difference is that the originator may detect the topology change and throw the OutdatedTopologyException itself.

Joiner becomes primary owner

Yet another special case that only differs in implementation details. Most of the time, the joiner will not have received the "new" value from the old primary owner, but it’s possible to have a combination of the primary owner crashing and then another node joining and becoming primary owner before the command is retried.

3.1.2. Handling network partitions

JGroups doesn’t have a special event for a network partitioning, so for Infinispan a network partition is indistinguishable from one or more nodes crashing.

Partition handling disabled

With partition handling disabled, a node that leaves the JGroups view unexpectedly is assumed to be crashed and to only rejoin the cluster when restarted: with a different JGroups address, and without holding any data.

If a split does occur, the two partitions will be able to work separately. Writes on nodes that have not yet detected the split will block waiting for a response from every node, and they will be retried on the originator’s partition when the new cache topology is installed. Writes initiated before the split may be applied on some of the nodes in other partitions, but there is no guarantee.

When the partitions merge, Infinispan does not attempt to merge the different values that each partition might have. The largest partition (i.e. the one with the most nodes) is assumed to be the correct one, and its topology becomes the merge topology. Data from nodes not in the merge CH is wiped, and they will receive the latest data from nodes in the merge CH.

If a node is suspected because of a Full GC, it might go from the initial JGroups view straight to the merge view. If that happens, its topology will be the largest one, and it will not be wiped, neither will it receive new data. Instead, it will keep the (possibly stale) entries it had before the Full GC.

Partition handling enabled

With partition handling enabled, a node that leaves the JGroups view unexpectedly is assumed to merge back without a restart.

The cache will still install a topology without the node, and when it joins back it will receive the latest entries from the other members (wiping its own entries in the process).

After the new topology is installed on all the nodes, it becomes the stable topology. If at least half of the nodes in the stable topology leave in quick succession (i.e. before the stable topology is updated), the cache becomes Degraded and none of the keys are readable or writeable. The assumption is that the leaving nodes could be forming a partition by themselves and updating the entries, so the values in the smaller (i.e. minority) partition could be stale.

If a node leaves gracefully, the cache may become Unavailable instead of Degraded in order to signal that it can not merge back to become Available again. TODO ISPN-5060

During a merge, there are five possible scenarios:

  • Both (or rather all) partitions are Degraded, and together they have enough nodes to become Available.

    The cache becomes Available, with the current nodes, and the stable topology is also updated.

  • Both partitions are Degraded, and they don’t have enough nodes together to become Available.

    The cache stays Degraded, and the topology with the highest id is installed everywhere.

  • One partition is Available, the other is Degraded.

    The entries on the Degrade partition nodes are wiped, and they receive the latest state from the Available partition nodes. The stable topology is updated afterwards.

    A special case is if the Available partition didn’t yet update its topology to remove the nodes in the Degraded partition (and possibly become Degraded). Since all the nodes are present in the Available partition’s topology, none of them are wiped. And if some of the nodes in the Available partition’s consistent hash are not really accessible after the merge, the merged partition might stay Degraded.

  • Both partitions are Available.

    This means one partition didn’t detect the split. The topology with the highest topology id is used as the merge topology. The nodes not in the merge topology are wiped, and they receive the latest state from the Available partition nodes.

  • One partition is Unavailable.

    The cache becomes Unavailable everywhere.

While a partition is in Degraded mode, attempting to read or write a key will yield an AvailabilityException. But between a minority partition being split from the rest of the cluster and the cache becoming Degraded, partition nodes are able to read any key, even though the value might have been updated in the majority partition.

Write operations in the minority are blocked on the primary owner until the cache enters Degraded mode, and they will fail with an AvailabilityException when retried. But some of the backup owners in the majority partition could have already updated the value before the split was detected, leaving the cache inconsistent even after the merge.

If only some backup owners in the minority partition updated the value, read operations on that node will see the new value until the cache enters Degraded mode. If the other partition stayed available, the value will be replaced by the old value (or a newer one) on merge. But if the other partition also entered Degraded mode, there is no state transfer and the cache will be inconsistent.

3.1.3. Timeout errors

It is sometimes possible for a remote command invocation to time out without the any node being down. For example, the NAKACK2 protocol uses a negative acknowledgement system for retransmitting messages, and in the worst case the time to retransmit a message can be bigger than the default replication timeout.

If the request from the primary owner to a backup owner times out, the update will not be applied on the primary owner, but it will still be applied on the other backup owners. If the request from the originator to the primary owner times out, the operation may still be applied successfully on all the owners.

Acquiring the lock on the primary owner can also time out. If that happens with a single-key write operation, the entry is not updated. With a multiple-key operation, the update will still be applied for keys with a different primary owner.

In general, when an application receives a TimeoutException, it can assume that the update was performed on some but not all of the nodes.

3.1.4. Asynchronous replication

With asynchronous replication, write commands are sent asynchronously both from the originator to the primary owner and from the primary owner to the other nodes, and neither gets any response. The value is only updated on the originator when it receives the command forwarded by the primary owner, so a thread may not see its own updates for a short while.

The primary owner initiates the broadcast of the update while holding the key lock, and JGroups guarantees that messages from one owner will be delivered in the same order on all the targets. That means as long as the consistent hash doesn’t change, updates to the same key will be applied in the same order everywhere.

If the primary owner of a key changes, updates from the old primary owner and from the new primary owner will not be ordered, so different nodes may end up with different histories and final values.

If a node joins and becomes a backup owner after a write command was sent from the primary owner to the backups, but before the primary owner updates its own data container, it may not receive the value neither as a write command nor via state transfer.

If there is a timeout acquiring the key lock on the primary owner, the change will not be applied anywhere. As with synchronous replication, failure to acquire a lock for one key may not prevent the update of other keys in the same multiple-key write operation.

3.1.5. Shared cache store

With a shared cache store, only the primary owner of each key will write to the cache store. If the primary owner node crashes, the operation is retried and the new primary owner writes the update to the shared store.

When a write to the store fails, it will fail the write operation. The same as in local mode, previous writes to other keys in the same multiple-key write operation will not be rolled back, and neither will be in-memory writes on other nodes.

With write-behind enabled, store write failures are hidden from the user. With async replication and without write-behind, errors are also hidden unless the originator is the primary owner (or write-behind is also enabled).

Assuming there were no write failures, the cluster can be restarted and it will recover the entries it has saved in its shared store (unless purging on startup is enabled in the store configuration).

Like with stores in a non-transactional cache, a read can undo an overlapping write operation’s modifications in memory.

3.1.6. Private cache store

With a private cache store, each node will write its own entries to its store. This allows the use of passivation, storing each entry only in memory or only in the store.

The behaviour on store write failure is the same as with a shared store.

When the cluster is restarted, the order in which nodes are started back up matters, as state transfer will copy the state from the first node to all the other nodes. If the first node to start up has stale entries in its store, it will overwrite newer entries on the joiners.

Like with stores in a non-transactional cache, a read can undo an overlapping write operation’s modifications in memory.

3.2. Transactional mode

Similar to transactional local caches, transactional replicated caches register with a transaction manager on the originator.

3.2.1. Pessimistic locking

With pessimistic locking, each key is locked on its primary owner as the application writes to it. Lock-only operations also invoke a remote command on the primary owner.

When the transaction is committed, a one-phase prepare command is invoked synchronously on all the nodes. The prepare command updates the transaction modifications on all the nodes. Conditional writes are executed unconditionally at this phase, because the originator already checked the condition.

Key locks are released asynchronously, after all the nodes acknowledged the prepare command. This prevents another transaction from executing between the time the transaction is committed on the primary owner and the time it is committed on the other members.

Handling topology changes

Every node knows about every transaction currently being executed. When a node joins, it receives the currently-prepared transactions from the existing nodes before accepting transactions itself. After processing a prepare command from a previous topology (with a lower topology id), a node will also forward the command to all the other nodes (synchronously).

After a topology change, the new primary owner knows that the transactions started in a previous topology must have acquired their key locks on the previous primary owner. So it doesn’t allow any new transaction to lock a key as long as there are transactions with a lower topology id.

If a node other than the originator of the transaction leaves the cluster (gracefully or not), the transaction will still succeed.

If the originator of crashes before all the other nodes have received the one-phase prepare command, however, nodes that have not received the command will assume it was aborted and roll back any local changes. This will leave the cache inconsistent, with only some nodes having applied the modifications (ISPN-5046).

Note: If the subject of write/lock operation is a key primary-owned by the originator, the lock command is not replicated to other nodes unless state transfer is in progress.

This can allow a different transaction to execute between the time the lock is acquired on the old primary owner and the time the one-phase prepare command executes on the new primary owner, breaking atomicity and leaving the key value inconsistent (ISPN-5076).

Partition handling: disabled

As in non-transactional caches, the cache will become inconsistent when partition handling is disabled and the cluster does split.

Each partition installs its own consistent hash, so a key will have a primary owner on each partition and both partitions can update the value in parallel. Transactions that sent the one-phase prepare command before the split can also not guaranteed to commit on all or none of the nodes in a particular partition.

While split, each partition will be able to read its own values. When the partitions merge back, there is no effort to replicate the values from one partition to another. Instead, just like in the non-transactional case, the partition topology with the most nodes becomes the merge topology, nodes not in the merge topology are wiped, and they receive new data from the primary owners in the merge topology.

Partition handling: enabled

Transactional caches use the same algorithm as non-transactional caches to enter or exit degraded mode when nodes crash or partitions merge.

Read or write operations started while the cache is not available will fail with an AvailabilityException. Between the cluster splitting and the minority partition(s) entering degraded mode, read operations will succeed, and read-only transactions will be able to commit. Write operations may block waiting for responses from nodes in the other partition(s), if the primary owner of the written key is not in the local partition. Eventually the partition detects the split and the remote lock commands will fail.

If the primary owners of the keys written by the transaction are all in the local transaction, or if the transaction acquired all its locks before the split, the originator will try to commit, and the one-phase prepare command will perform its updates on the nodes in the local partition. The commit will block on the originator while waiting for all the initial members to reply, and it will return successfully when the other partitions' nodes are removed from the JGroups view (partial commit).

If one partition stays available, its entries will replace all the other partitions' entries on merge, undoing partial commits in those partitions. But if all the partitions entered degraded mode, partial writes will not be rolled back, and neither will they be replicated to other nodes.

During a merge, entries and transactions started in the majority partition will be replicated to the nodes in other partitions as if those nodes just joined. However, it’s possible that a partition is only available because it didn’t detect the split yet. If there is another properly-active partition, transactions in both partitions may acquire the same key locks on different primary owners (with the single-local-key optimization) and only commit after the merge, leading to inconsistent values.

Timeout errors

Timeouts acquiring key locks or communicating with the primary owners before the commit cause the transaction to be aborted, without writing any updates.

If the one-phase prepare command times out on any of the nodes, the originator will send a rollback command to all the nodes and will roll back the local changes. The nodes that already executed the one-phase prepare command will not roll back their changes, so the key value is going to be inconsistent.

On the other hand, there is no transaction timeout: once a key is locked, it is only unlocked if the transaction is committed/aborted, or if the originator is removed from the cache topology.

Asynchronous commit

The one-phase prepare command is sent asynchronously when the cache mode is REPL_ASYNC. JGroups does not guarantee that multicast messages are delivered in the same order on all the nodes, so this can easily lead to some nodes writing their updates long after the primary node released the key lock and causing inconsistent values across different nodes.

Asynchronous commands are also buffered on the originator, so if the originator crashes it is likely to lose updates that were reported to the application as successfully committed.

3.2.2. Optimistic locking

The originator broadcasts a synchronous prepare command to all the members, which acquires the key locks for the keys modified by the transaction on each primary owner node. Each primary owner is also responsible for checking for write skew, i.e. another transaction having updated the key between the read and the commit.

After everybody confirmed the prepare, the originator broadcasts a synchronous commit command, which actually performs the update. After everybody confirmed the commit, the originator broadcasts an asynchronous command to release the transaction locks. Same as with pessimistic locking, this is needed to prevent another transaction from slipping between the commit on the primary and the commit on the other nodes.

With no topology changes, the cache will behave just like an optimistic local cache.

Handling topology changes

Exactly like with pessimistic locking, transactions are replicated to joining nodes by transferring transaction information to all joiners in the initial state transfer and by replaying commands with low topology ids received by one node to all the other nodes.

If a node other than the originator of the transaction crashes without confirming the prepare command, the transaction is aborted with a SuspectException. If a node crashes without confirming the commit command, the transaction still succeeds and its updates are performed on all the other nodes.

If the originator of a transaction crashes after sending the commit command, the nodes that have not yet received the commit command will presume it to be aborted, leaving the key with inconsistent values on different nodes.

Unlike with pessimistic locking, there is no optimization when the originator is the primary owner of a key, so the primary owner changing is not an issue.

Partition handling: disabled

Same as with pessimistic and non-transactional caches.

Partition handling: enabled

Same as with pessimistic locking. The only difference is that the problematic step is the commit command, instead of the one-phase prepare command.

Prepare commands need to succeed on all the members of the cache, so partitions that have not detected the split yet are not able to prepare any new transactions. Transactions already prepared, however, will appear to commit successfully, even if the originator is in a minority partitions. Depending on when the commit command was issued, the transaction updates may be applied on all the nodes, only in the originator’s partition, or in any super-set of the originator’s partition.

Transaction updates only committed to a minority partition will be undone when merging with an Available partition, as the minority nodes are wiped an re-initialized. Nothing happens on a merge if all partitions are Degraded.

If a minority partition does not detect the split before the merge, and the other partition stays available, the prepare command will only succeed after it is forwarded to the merge topology owners, including the new primary owner. So unlike pessimistic mode, it’s not possible for a transaction in a minority partition to commit after the merge without having the proper key locks.

Timeout errors

If the prepare command times out waiting for one or more confirmations, the transaction is rolled back everywhere. If the commit command times out, a rollback command is sent everywhere, and nodes haven’t executed the commit yet (including the originator) abort the transaction.

When a transaction needs to acquire more than one key lock with the same primary node, they are acquired in the order or their hash codes, so this will not cause a deadlock unless two keys have the same hash code (see ISPN-2491). But if two transactions each need to acquire lock kA on primary owner A and lock kB on primary owner B, a deadlock is possible. Enabling deadlock detection will decrease the time it takes for one of the transactions to fail, signaling it wit a DeadlockDetectedException.

Write skew check disabled

In the following example, with write skew check enabled, two replace operations in concurrent transactions both succeed, but one of them will fail to commit with a WriteSkewCheckException:

Thread1: tm.begin()
         cache.get(key) -> v1
         cache.replace(key, v1, v2) -> true
         tm.commit()

Thread2: tm.begin()
         cache.get(key) -> v1
         cache.replace(key, v1, v3) -> true
         tm.commit()

With write skew disabled, both transactions will commit successfully. Still, because of the key locks acquired during the prepare command, the updates will execute in the same order on all the nodes, and the value will be consistent (last write wins).

Asynchronous commit command

Sending the commit command asynchronously means a transaction T2 started on node B after transaction T1 reported a successful commit on node A may not see T1’s updates. The commit is always synchronous on the originator, so a transaction T3 started on node A after T1 finished will see T1’s updates.

Asynchronous commit also makes it more likely to have a partial commit when the cluster splits into two degraded partitions or when the originator of the transaction crashes.

Asynchronous one-phase commit

Used when the cache mode is REPL_ASYNC. The same as asynchronous commit with pessimistic locking.

Synchronous one-phase commit

Used for implicit transactions, when use1PcForAutoCommitTransactions is enabled. A single one-phase prepare command is sent synchronously to all the nodes, and there are no consistency guarantees as commands from different transactions can execute in a different order on each node.

3.2.3. Cache stores

The write to the attached cache store(s) is performed during the commit command (REPL_SYNC with optimistic locking), or the one-phase prepare command (REPL_ASYNC, pessimistic locking, or use1PcForAutoCommitTransactions). If the cache store is private, every node will write to its own store. If the store is shared, only the primary owner of each key will write to it.

The write to the store happens before the write to the in-memory data container, and a store write failure will trigger a partial rollback. On the node where the write fails, stores that have already been updated are not rolled back. Assuming the commit/one-phase prepare was synchronous, the originator will not write anything. It will send a rollback command to all the nodes, and nodes that execute the rollback command before the commit/one-phase prepare command will not update anything either.

Depending on whether the cache registered with the transaction manager as a full XA resource or as a synchronization, the application may or may not receive an exception.

Like with stores in a non-transactional cache, a read can undo an overlapping transaction’s modifications.

3.2.4. XA Recovery

TODO

3.3. Expiration

Like in all clustered modes, expiration is not atomic across all the members.

3.4. putForExternalRead

PutForExternalRead can be used to populate the cache with data from an external store, just like in invalidation mode.

PutForExternalRead is performed like a non-transactional command, even in transactional caches. Because replication is always asynchronous, it will write its updates without holding the key lock on the primary owner, possibly interacting with regular transactions or write operations. For the same reason, the thread that called putForExternalRead is not guaranteed to see the inserted value immediately.

TODO Should putForExternalRead only write the entry on the originating node, without locking, just like in invalidation mode?

3.5. State transfer disabled

It is possible to disable state transfer for in-memory data (stateTransfer.fetchInMemoryState), for persistent data (store.fetchPersistentState), or for both.

It is only safe to disable any part of transfer if the cache has a shared cache store (persistent data transfer is a no-op in that case). Transaction data is always transferred, so transactions are applied atomically on the joiner, and transactions that run during the join may even fetch some values from the existing nodes. But most of the values will not be transferred, and read operations on the joiner will not see them.

It is possible to use a ClusterLoader to transfer the cache data to the joining node lazily. However, there are no guarantees that an entry will be stored on more than one node, so any one node crashing (or even stopping gracefully) can cause data loss (in addition to regular cache store issues).

4. Distributed mode

In distribution node, keys are divided into segments based on their hash code, and each segment is mapped to a primary owner and numOwners-1 backup owners. This mapping is the consistent hash.

When a node leaves, its segment copies will be redistributed between remaining nodes (the operation is called rebalancing). For a while, the cache is able to work with less than numOwners owners: the first backup owner becomes the primary owner. But if all the owners of a segment leave before the initial rebalancing is done, the entries in the segment will be lost. The exact behaviour when attempting to read a lost key depends on whether partition handling is enabled or not.

When a node joins, rebalancing is also performed to give the joiner a fairly equal share of the total segment copies.

During a rebalance, two consistent hashes are used in parallel: the current consistent hash is used for reading (i.e. the ReadCH), and a union of the current and future consistent hashes (i.e. the WriteCH) is used for writing. Outside of a rebalance, the ReadCH and WriteCH are the same.

4.1. Non-transactional

Similar to replicated mode, the originator of a single-key write operation will send it to the primary owner of the modified key, which then replicates the write to the backup owner(s). After the backup owners confirmed the write, the primary owner updates the value locally, unlocks the key, and replies to the originator.

A multiple-key write operation is forwarded to the primary owners of all the modified keys in parallel, and each primary owner forwards the command to the backup owners of the keys it owns. The operation succeeds after all the backup owners confirmed the write to all the primary. When a node receives the command forwarded from a primary owner, it will only update the keys it owns, but it might update keys with a different primary owner. So using putAll concurrently with any other write operation can lead to an inconsistent cache.

The same as in replicated non-transactional caches, clear operations are executed independently on each node and are not atomic.

When a node tries to read a key it does not own, it sends a remote get command to all its owners in the ReadCH and uses the first successful non-null response. Because of this, distribution mode does not preserve session consistency even when the cache topology is stable:

Thread1: cache.put(key, v1)
         cache.put(key, v2) -> updates key on backup B first and primary A last

Thread2: cache.get(key) -> v1 from A
         cache.get(key) -> v2 from B
         cache.get(key) -> v1 from A

4.1.1. Handling topology changes

Read operations

The ownership of a key can change between the originator sending the remote get command and the owner nodes processing it. (In fact, it’s possible that the new topology was already installed on the owners before the remote get command was sent, but it was delayed on the originator.)

If all the responses from the owners were null, there are three options:

  1. If the topology changed on the originator before receiving all the responses, retry the remote get on the new owners.

  2. If the topology did not change, but there was a rebalance in progress, assume that the other nodes have a new topology and the WriteCH owners became the new ReadCH owners. Repeat the process, but this time send the remote get command to the WriteCH owners that are not ReadCH owners.

  3. If the topology did not change and a rebalance was not in progress, return null.

The algorithm is safe because a rebalance cannot end (and the old owners cannot delete obsolete entries) before all the nodes received the new entries. So a null response from a ReadCH owner means either the key doesn’t exist anywhere, or the WriteCH-only owners have already installed the new topology and become ReadCH owners.

Write operations

Topology changes overlapping with write operations are handled exactly the same as in a replicated non-transactional cache.

There are are a few more special cases when one of the owners becomes a non-owner, or the originator becomes an owner (or primary owner), but they are handled the same way: nodes that have a newer topology throw an OutdatedTopologyException, and the originator retries the operation. The retry algorithm only checks the topology id, so a write operation might be retried even if the owners of the affected key stay the same.

Session consistency is not guaranteed even without topology changes, so the only problem from the replicated-mode topology change scenarios worth mentioning is that conditional write operations are no longer atomic (e.g. ISPN-4286).

There is another consistency issue on the nodes that owned a key before rebalance and do not own it after the rebalance. Since the rebalance does not finish on all nodes at once, it’s possible for an owner of a key in the pre-rebalance CH to keep seeing the old value after a write operation finished successfully on the post-rebalance CH owners. TODO ISPN-5021.

4.1.2. Handling network partitions

Partition handling disabled

Nodes removed from the JGroups view are assumed to be crashed and to be restarted before re-joining.

Same as in replicated mode, if the cluster does split, the partitions will keep working separately. Each partition will rebalance its consistent hash so that every entry has numOwners copies. If a partition has at least one of the original owners of the key, the entry will be replicated to numOwners-1 other nodes. If a partition doesn’t contain any of the original owners, the entry will be lost.

Write operations started before the split not fail because of the split. If some of the owners are in another partition (or just crashed), the operation will be retried after JGroups suspects the node and it is removed from the cache topology. Some backup owners in other partitions may also apply the update, but if there are two backup owners in a partition they may end up with different values.

When the partitions merge, one of the partitions' topologies is elected as the merge topology (the one with the highest topology id). Data in the other partitions is wiped, and entries from the merge topology members are replicated to the other nodes. If the partition with the merge topology lost some entries during the split, those entries will not be recovered during merge.

Partition handling enabled

With partition handling enabled, nodes removed from the JGroups view are assumed to be still active. Same as in replicated mode, the cache will enter degraded mode if at least half of the nodes in the stable topology leave in a short period of time, to ensure that at most one partition is active at the same time. Because each key is only stored on numOwners nodes, the cache will also enter degraded mode a segment loses all its owners.

If these two conditions are not met, the partition stays available, and its consistent hash will be rebalanced. After the rebalance, the stable topology is updated, so that the next time a node leaves the majority is counted against the current members. If one or more other nodes are suspected during the rebalance, however, the majority rule is checked against the old stable majority

Same as in replicated mode, if there is a network split but for a while only the majority partition sees it, nodes in the minority partition will be able to read the old values for keys with at least one owner in the minority partition. If the partitions then merge, reads from a non-owner in the minority partition with the old topology may see either the updated value from the majority partition owners or the old value from the minority partition owners, and there is no session consistency.

Unlike replicated mode, keys that are wholly-owned by nodes in one partition are still accessible for reading and for writing even if the partition is in degraded mode.

Same as in replicated mode, write operations started just before the partitions split may be applied only partially. If all the owners of a key are in the same partition as the originator, the operation will succeed (possibly after being retried). If one of the owners is in a different partition, the command is blocked until JGroups detects the split and Infinispan installs the new cache topology. Then the command is retried and fails with an AvailabilityException. However, some owners may have already applied the update, both in the originator’s partition and in the other, making the cache inconsistent.

Write operations overlapping with the merge behave just like operations overlapping with a join: the final value will be the same on all the owners, but the operation will not appear to be atomic.

4.1.3. Asynchronous replication

Same as in replicated mode.

4.1.4. Timeout errors

Same as in replicated mode.

4.1.5. Cache stores

Same as in replicated mode.

4.2. Transactional

Transactions work the same as in replicated mode, except the prepare, one-phase prepare, and commit commands are sent to all the owners of the affected keys instead of all the members of the cache.

The session consistency problems described for non-transactional mode appear with transactions as well. But transactions cache values read from remote nodes, so session consistency is preserved inside transactions. This caching also means that the READ_COMMITTED isolation mode setting sometimes behaves just like REPEATABLE_READ.

4.2.1. Handling topology changes

Topology changes are handled by transferring transaction information during state transfer and forwarding commands with older topology ids, just like in replicated mode.

The cache can become inconsistent if the originator leaves while the commit command is being executed on the affected keys' owners (one-phase prepare with pessimistic locking).

4.2.2. Handling network partitions

Partition handling disabled

Like in replicated mode, transactions with the commit command overlapping with the cluster split may update their entries on only some of the owners.

In addition, the lock information is lost if all the owners of a key leave the cluster. A transaction that acquired its locks before the split can still commit after the split, even though it doesn’t have any locks any more, and another transaction may have acquired the same locks.

Partition handling enabled

Like in replicated mode, transactions with the commit command overlapping with the cluster split may update their entries on only some of the owners and be rolled back on others.

As long as a minority partition does not detect the split and doesn’t enter degraded mode, it will be able to read values that may have been updated in the majority partition The minority partition only needs one owner to read a value, and the majority partition only needs one owner to stay available (and allow writes).

4.2.3. Asynchronous commit

4.2.4. Timeout errors

4.2.5. Cache stores

Same as in replicated mode.

4.3. L1

With L1 enabled, nodes are able to store entries they do not own in memory. L1 entries are never persisted to a cache store.

Every write invalidates L1 entries, and there are two working modes for L1 invalidation:

  1. Broadcast the invalidation command to every node in the cluster.

  2. Send the invalidation command only to nodes that have previously requested the key.

In both working modes, invalidation is performed by each owner that has seen at least one remote get or write request for the affected keys. Since remote get commands are sent in parallel to all the owners, it’s very likely for a node to receive extra L1 invalidation requests.

The invalidation command is normally sent before the entry is updated in memory and in the stores. If an owner receives a remote get command between sending the L1 invalidation command and updating the entry, it will send another L1 invalidation command. Both L1 invalidation commands are synchronous, so that read operations starting after the write has finished always see the new value.

5. Total order

TODO

6. Entry iterator

TODO