diff --git a/anchored-keys/src/main/java/org/infinispan/anchored/impl/AnchoredDistributionInterceptor.java b/anchored-keys/src/main/java/org/infinispan/anchored/impl/AnchoredDistributionInterceptor.java index 9806f468d64a..56953110e55b 100644 --- a/anchored-keys/src/main/java/org/infinispan/anchored/impl/AnchoredDistributionInterceptor.java +++ b/anchored-keys/src/main/java/org/infinispan/anchored/impl/AnchoredDistributionInterceptor.java @@ -35,6 +35,7 @@ import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.impl.MapResponseCollector; import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.util.CacheTopologyUtil; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -67,7 +68,7 @@ protected Object primaryReturnHandler(InvocationContext ctx, AbstractDataWriteCo command); return localResult; } - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, distributionManager.getCacheTopology()); DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(command.getSegment()); Collection
owners = distributionInfo.writeOwners(); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java index 08b88e48168c..c783a173a9bb 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java @@ -66,6 +66,7 @@ import org.infinispan.statetransfer.OutdatedTopologyException; import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.util.CacheTopologyUtil; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -132,7 +133,7 @@ protected DistributionInfo retrieveDistributionInfo(LocalizedCacheTopology topol */ protected CompletionStage remoteGetSingleKey( InvocationContext ctx, C command, Object key, boolean isWrite) { - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); int topologyId = cacheTopology.getTopologyId(); DistributionInfo info = retrieveDistributionInfo(cacheTopology, command, key); @@ -190,7 +191,7 @@ protected final Object handleNonTxWriteCommand(InvocationContext ctx, AbstractDa return invokeNext(ctx, command); } - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); DistributionInfo info = cacheTopology.getSegmentDistribution(SegmentSpecificCommand.extractSegment(command, key, keyPartitioner)); @@ -232,7 +233,7 @@ protected Object primaryReturnHandler(InvocationContext ctx, AbstractDataWriteCo if (log.isTraceEnabled()) log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command); return localResult; } - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); int segment = SegmentSpecificCommand.extractSegment(command, command.getKey(), keyPartitioner); DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(segment); Collection
owners = distributionInfo.writeOwners(); @@ -302,7 +303,7 @@ CompletionStage remoteGetMany(InvocationContext ctx, C command, Collection private CompletionStage doRemoteGetMany(InvocationContext ctx, C command, Collection keys, Map> unsureOwners, boolean hasSuspectedOwner) { - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); Map> requestedKeys = getKeysByOwner(ctx, keys, cacheTopology, null, unsureOwners); if (requestedKeys.isEmpty()) { for (Object key : keys) { @@ -350,7 +351,7 @@ protected Object handl return handleLocalOnlyReadManyCommand(ctx, command, helper.keys(command)); } - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); Collection keys = helper.keys(command); if (!ctx.isOriginLocal()) { return handleRemoteReadManyCommand(ctx, command, keys, helper); @@ -477,7 +478,7 @@ private void handleMissingResponse(Response response) { for (Object key : keys) { contactedNodes.computeIfAbsent(key, k -> new ArrayList<>(4)).add(target); } - requestedKeys = getKeysByOwner(ctx, keys, checkTopologyId(command), null, contactedNodes); + requestedKeys = getKeysByOwner(ctx, keys, CacheTopologyUtil.checkTopology(command, getCacheTopology()), null, contactedNodes); } int pos = destinationIndex; for (Map.Entry> addressKeys : requestedKeys.entrySet()) { @@ -638,7 +639,7 @@ public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand return UnsureResponse.INSTANCE; } if (readNeedsRemoteValue(command)) { - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); Collection
owners = cacheTopology.getDistribution(key).readOwners(); if (log.isTraceEnabled()) log.tracef("Doing a remote get for key %s in topology %d to %s", key, cacheTopology.getTopologyId(), owners); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java index cb36739e2c8e..7cc27deb5c85 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java @@ -43,8 +43,7 @@ import org.infinispan.remoting.transport.impl.SingleResponseCollector; import org.infinispan.remoting.transport.impl.SingletonMapResponseCollector; import org.infinispan.statetransfer.OutdatedTopologyException; -import org.infinispan.util.logging.Log; -import org.infinispan.util.logging.LogFactory; +import org.infinispan.util.CacheTopologyUtil; /** * Non-transactional interceptor used by distributed caches that support concurrent writes. @@ -65,8 +64,6 @@ */ public class NonTxDistributionInterceptor extends BaseDistributionInterceptor { - private static Log log = LogFactory.getLog(NonTxDistributionInterceptor.class); - private final PutMapHelper putMapHelper = new PutMapHelper(this::createRemoteCallback); private final ReadWriteManyHelper readWriteManyHelper = new ReadWriteManyHelper(this::createRemoteCallback); private final ReadWriteManyEntriesHelper readWriteManyEntriesHelper = new ReadWriteManyEntriesHelper(this::createRemoteCallback); @@ -189,7 +186,7 @@ private Object handleWriteOnlyManyComm // it is possible that the function will be applied multiple times on some of the nodes. // There is no general solution for this ATM; proper solution will probably record CommandInvocationId // in the entry, and implement some housekeeping - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); ConsistentHash ch = cacheTopology.getWriteConsistentHash(); if (ctx.isOriginLocal()) { Map segmentMap = primaryOwnersOfSegments(ch); @@ -289,7 +286,7 @@ protected Object handleReadWriteManyCo // it is possible that the function will be applied multiple times on some of the nodes. // There is no general solution for this ATM; proper solution will probably record CommandInvocationId // in the entry, and implement some housekeeping - LocalizedCacheTopology topology = checkTopologyId(command); + LocalizedCacheTopology topology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); ConsistentHash ch = topology.getWriteConsistentHash(); if (ctx.isOriginLocal()) { Map segmentMap = primaryOwnersOfSegments(ch); @@ -496,7 +493,7 @@ private final static class MutableInt { private Object writeManyRemoteCallback(WriteManyCommandHelper helper,InvocationContext ctx, C command, Object rv) { // The node running this method must be primary owner for all the command's keys // Check that the command topology is actual, so we can assume that we really are primary owner - LocalizedCacheTopology topology = checkTopologyId(command); + LocalizedCacheTopology topology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); Map backups = backupOwnersOfSegments(topology, extractCommandSegments(command, topology)); if (backups.isEmpty()) { return rv; diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java index aac9a2c1eb86..f6eaf9a300fe 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java @@ -64,6 +64,7 @@ import org.infinispan.remoting.transport.impl.SingleResponseCollector; import org.infinispan.statetransfer.OutdatedTopologyException; import org.infinispan.statetransfer.StateTransferInterceptor; +import org.infinispan.util.CacheTopologyUtil; import org.infinispan.util.TriangleFunctionsUtil; import org.infinispan.util.concurrent.CommandAckCollector; import org.infinispan.commons.util.concurrent.CompletableFutures; @@ -234,7 +235,7 @@ private Object handleLocalManyKeysCommand(Invocation MultiKeyBackupBuilder backupBuilder) { //local command. we need to split by primary owner to send the command to them - final LocalizedCacheTopology cacheTopology = checkTopologyId(command); + final LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); final PrimaryOwnerClassifier filter = new PrimaryOwnerClassifier(cacheTopology, command.getAffectedKeys()); return isSynchronous(command) ? @@ -255,7 +256,7 @@ private Object handleRemoteManyKeysCommand(InvocationCo private Object remoteBackupManyKeysWrite(InvocationContext ctx, C command, Set keys) { //backup & remote - final LocalizedCacheTopology cacheTopology = checkTopologyId(command); + final LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); return asyncInvokeNext(ctx, command, checkRemoteGetIfNeeded(ctx, command, keys, cacheTopology, command.loadType() == OWNER)); } @@ -264,7 +265,7 @@ private Object remotePrimaryManyKeysWrite(InvocationCon Set keys, MultiKeyBackupBuilder backupBuilder) { //primary owner & remote - final LocalizedCacheTopology cacheTopology = checkTopologyId(command); + final LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); //primary, we need to send the command to the backups ordered! sendToBackups(command, keys, cacheTopology, backupBuilder); return asyncInvokeNext(ctx, command, @@ -405,7 +406,7 @@ private Object handleSingleKeyWriteCommand(Invocati //don't go through the triangle return invokeNext(context, command); } - LocalizedCacheTopology topology = checkTopologyId(command); + LocalizedCacheTopology topology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); DistributionInfo distributionInfo = topology.getDistributionForSegment(command.getSegment()); if (distributionInfo.isPrimary()) { diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java index 6ab006c3798b..705fb2a8c4f5 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java @@ -76,6 +76,7 @@ import org.infinispan.transaction.impl.LocalTransaction; import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.util.CacheTopologyUtil; import org.infinispan.util.concurrent.CompletionStages; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -171,7 +172,7 @@ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlComman throws Throwable { if (ctx.isOriginLocal()) { TxInvocationContext localTxCtx = (TxInvocationContext) ctx; - Collection
affectedNodes = checkTopologyId(command).getWriteOwners(command.getKeys()); + Collection
affectedNodes = CacheTopologyUtil.checkTopology(command, getCacheTopology()).getWriteOwners(command.getKeys()); localTxCtx.getCacheTransaction().locksAcquired(affectedNodes); log.tracef("Registered remote locks acquired %s", affectedNodes); RpcOptions rpcOptions = rpcManager.getSyncRpcOptions(); @@ -264,7 +265,7 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman TxInvocationContext localTxCtx = (TxInvocationContext) rCtx; LocalTransaction localTx = localTxCtx.getCacheTransaction(); - LocalizedCacheTopology cacheTopology = checkTopologyId(rCommand); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(rCommand, getCacheTopology()); Collection
writeOwners = cacheTopology.getWriteOwners(localTxCtx.getAffectedKeys()); localTx.locksAcquired(writeOwners); Collection
recipients = isReplicated ? null : localTx.getCommitNodes(writeOwners, cacheTopology); @@ -328,7 +329,7 @@ private Object handleSecondPhaseCommand(TxInvocationContext ctx, TransactionBoun private Collection
getCommitNodes(TxInvocationContext ctx, TopologyAffectedCommand command) { LocalTransaction localTx = (LocalTransaction) ctx.getCacheTransaction(); - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); Collection
affectedNodes = isReplicated ? null : cacheTopology.getWriteOwners(ctx.getAffectedKeys()); return localTx.getCommitNodes(affectedNodes, cacheTopology); @@ -337,7 +338,7 @@ private Collection
getCommitNodes(TxInvocationContext ctx, TopologyAffe protected void checkTxCommandResponses(Map responseMap, TransactionBoundaryCommand command, TxInvocationContext context, Collection
recipients, PrepareResponse prepareResponse) { - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); for (Map.Entry e : responseMap.entrySet()) { Address recipient = e.getKey(); Response response = e.getValue(); @@ -392,7 +393,7 @@ private Object handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteComm Object key) throws Throwable { try { if (!ctx.isOriginLocal()) { - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); // Ignore any remote command when we aren't the owner if (!cacheTopology.isSegmentWriteOwner(command.getSegment())) { return null; @@ -428,7 +429,7 @@ private Object handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteComm boolean ignorePreviousValue = command.hasAnyFlag(SKIP_REMOTE_FLAGS) || command.loadType() == VisitableCommand.LoadType.DONT_LOAD; Map filtered = new HashMap<>(entries.size()); Collection remoteKeys = new ArrayList<>(); - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); for (Map.Entry e : entries.entrySet()) { K key = e.getKey(); if (ctx.isOriginLocal() || cacheTopology.isWriteOwner(key)) { @@ -451,7 +452,7 @@ protected filtered = new ArrayList<>(keys.size()); List remoteKeys = null; - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); for (K key : keys) { if (ctx.isOriginLocal() || cacheTopology.isWriteOwner(key)) { if (ctx.lookupEntry(key) == null) { @@ -473,6 +474,7 @@ protected Object handleTxFunctionalCommand(InvocationContext ctx, C command) { Object key = command.getKey(); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); if (ctx.isOriginLocal()) { CacheEntry entry = ctx.lookupEntry(key); if (entry == null) { @@ -481,7 +483,6 @@ public Object handleTxF return invokeNext(ctx, command); } - LocalizedCacheTopology cacheTopology = checkTopologyId(command); int segment = command.getSegment(); DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(segment); @@ -508,7 +509,7 @@ public Object handleTxF // It's possible that this is not an owner, but the entry was loaded from L1 - let the command run return invokeNext(ctx, command); } else { - if (!checkTopologyId(command).isWriteOwner(key)) { + if (!cacheTopology.isWriteOwner(key)) { return null; } CacheEntry entry = ctx.lookupEntry(key); diff --git a/core/src/main/java/org/infinispan/interceptors/impl/AbstractIracLocalSiteInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/AbstractIracLocalSiteInterceptor.java index eceb5bea7a4d..7cf7a42a87b5 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/AbstractIracLocalSiteInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/AbstractIracLocalSiteInterceptor.java @@ -22,7 +22,6 @@ import org.infinispan.context.impl.RemoteTxInvocationContext; import org.infinispan.distribution.DistributionInfo; import org.infinispan.distribution.LocalizedCacheTopology; -import org.infinispan.distribution.Ownership; import org.infinispan.distribution.ch.KeyPartitioner; import org.infinispan.factories.annotations.Inject; import org.infinispan.interceptors.DDAsyncInterceptor; @@ -30,6 +29,7 @@ import org.infinispan.interceptors.locking.ClusteringDependentLogic; import org.infinispan.metadata.impl.IracMetadata; import org.infinispan.metadata.impl.PrivateMetadata; +import org.infinispan.util.CacheTopologyUtil; import org.infinispan.util.IracUtils; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -94,10 +94,6 @@ static void updateCommandMetadata(Object key, WriteCommand command, IracMetadata command.setInternalMetadata(key, interMetadata); } - protected Ownership getOwnership(int segment) { - return getDistributionInfo(segment).writeOwnership(); - } - protected DistributionInfo getDistributionInfo(int segment) { return getCacheTopology().getSegmentDistribution(segment); } @@ -138,16 +134,15 @@ protected Stream streamKeysFromCommand(WriteCommand command) { } protected boolean skipEntryCommit(InvocationContext ctx, WriteCommand command, Object key) { - switch (getOwnership(getSegment(command, key))) { + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); + switch (cacheTopology.getSegmentDistribution(getSegment(command, key)).writeOwnership()) { case NON_OWNER: //not a write owner, we do nothing return true; case BACKUP: //if it is local, we do nothing. //the update happens in the remote context after the primary validated the write - if (ctx.isOriginLocal()) { - return true; - } + return ctx.isOriginLocal(); } return false; } @@ -171,8 +166,9 @@ protected Object visitNonTxDataWriteCommand(InvocationContext ctx, DataWriteComm * The primary owner generates a new {@link IracMetadata} and stores it in the {@link WriteCommand}. */ protected void visitNonTxKey(InvocationContext ctx, Object key, WriteCommand command) { + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); int segment = getSegment(command, key); - if (getOwnership(segment) != Ownership.PRIMARY) { + if (!cacheTopology.getSegmentDistribution(segment).isPrimary()) { return; } Optional entryMetadata = IracUtils.findIracMetadataFromCacheEntry(ctx.lookupEntry(key)); diff --git a/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java index 67c274ac2b79..9262f881e70f 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java @@ -10,8 +10,6 @@ import java.util.function.Consumer; import org.infinispan.commands.CommandsFactory; -import org.infinispan.commands.FlagAffectedCommand; -import org.infinispan.commands.TopologyAffectedCommand; import org.infinispan.commands.read.SizeCommand; import org.infinispan.commons.CacheException; import org.infinispan.container.impl.EntryFactory; @@ -38,6 +36,7 @@ import org.infinispan.remoting.transport.impl.SingleResponseCollector; import org.infinispan.statetransfer.AllOwnersLostException; import org.infinispan.statetransfer.OutdatedTopologyException; +import org.infinispan.util.CacheTopologyUtil; import org.infinispan.util.concurrent.locks.LockManager; /** @@ -70,21 +69,8 @@ public void init() { } } - protected LocalizedCacheTopology checkTopologyId(TopologyAffectedCommand command) { - LocalizedCacheTopology cacheTopology = distributionManager.getCacheTopology(); - int currentTopologyId = cacheTopology.getTopologyId(); - int cmdTopology = command.getTopologyId(); - if (command instanceof FlagAffectedCommand && ((((FlagAffectedCommand) command).hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL)))) { - getLog().tracef("Skipping topology check for command %s", command); - return cacheTopology; - } - if (getLog().isTraceEnabled()) { - getLog().tracef("Current topology %d, command topology %d", currentTopologyId, cmdTopology); - } - if (cmdTopology >= 0 && currentTopologyId != cmdTopology) { - throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY; - } - return cacheTopology; + protected LocalizedCacheTopology getCacheTopology() { + return distributionManager.getCacheTopology(); } private static abstract class AbstractTouchResponseCollector extends ValidResponseCollector { @@ -169,7 +155,7 @@ public Object visitTouchCommand(InvocationContext ctx, TouchCommand command) thr if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL | FlagBitSets.SKIP_REMOTE_LOOKUP)) { return invokeNext(ctx, command); } - LocalizedCacheTopology cacheTopology = checkTopologyId(command); + LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology()); DistributionInfo info = cacheTopology.getSegmentDistribution(command.getSegment()); // Scattered any node could be a backup, so we have to touch all members List
owners = isScattered ? cacheTopology.getActualMembers() : info.readOwners(); diff --git a/core/src/main/java/org/infinispan/util/CacheTopologyUtil.java b/core/src/main/java/org/infinispan/util/CacheTopologyUtil.java new file mode 100644 index 000000000000..00ba1ef90228 --- /dev/null +++ b/core/src/main/java/org/infinispan/util/CacheTopologyUtil.java @@ -0,0 +1,38 @@ +package org.infinispan.util; + +import org.infinispan.commands.FlagAffectedCommand; +import org.infinispan.commands.TopologyAffectedCommand; +import org.infinispan.context.impl.FlagBitSets; +import org.infinispan.distribution.LocalizedCacheTopology; +import org.infinispan.statetransfer.OutdatedTopologyException; +import org.infinispan.topology.CacheTopology; + +/** + * Utility methods related to {@link CacheTopology}. + * + * @since 14.0 + */ +public enum CacheTopologyUtil { + ; + + private static final long SKIP_TOPOLOGY_FLAGS = FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL; + + /** + * Check if the current {@link LocalizedCacheTopology} is valid for the {@link TopologyAffectedCommand}. + * + * @param command The {@link TopologyAffectedCommand} that will use the {@link LocalizedCacheTopology}. + * @param current The current {@link LocalizedCacheTopology}. + * @return The current {@link LocalizedCacheTopology}. + */ + public static LocalizedCacheTopology checkTopology(TopologyAffectedCommand command, LocalizedCacheTopology current) { + int currentTopologyId = current.getTopologyId(); + int cmdTopology = command.getTopologyId(); + if (command instanceof FlagAffectedCommand && (((FlagAffectedCommand) command).hasAnyFlag(SKIP_TOPOLOGY_FLAGS))) { + return current; + } + if (cmdTopology >= 0 && currentTopologyId != cmdTopology) { + throw OutdatedTopologyException.RETRY_NEXT_TOPOLOGY; + } + return current; + } +} diff --git a/core/src/test/java/org/infinispan/xsite/irac/IracOwnershipChangeTest.java b/core/src/test/java/org/infinispan/xsite/irac/IracOwnershipChangeTest.java new file mode 100644 index 000000000000..d596761804c6 --- /dev/null +++ b/core/src/test/java/org/infinispan/xsite/irac/IracOwnershipChangeTest.java @@ -0,0 +1,267 @@ +package org.infinispan.xsite.irac; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.infinispan.commands.write.PutKeyValueCommand; +import org.infinispan.configuration.cache.BackupConfiguration; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.context.InvocationContext; +import org.infinispan.context.impl.FlagBitSets; +import org.infinispan.distribution.LocalizedCacheTopology; +import org.infinispan.interceptors.AsyncInterceptorChain; +import org.infinispan.interceptors.DDAsyncInterceptor; +import org.infinispan.interceptors.impl.NonTxIracLocalSiteInterceptor; +import org.infinispan.test.TestingUtil; +import org.infinispan.util.ControlledConsistentHashFactory; +import org.infinispan.xsite.AbstractMultipleSitesTest; +import org.testng.AssertJUnit; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests topology change while write command is executing. + * + * @since 14.0 + */ +@Test(groups = "functional", testName = "xsite.irac.IracOwnershipChangeTest") +public class IracOwnershipChangeTest extends AbstractMultipleSitesTest { + + private final ControlledConsistentHashFactory site0CHFactory = new ControlledConsistentHashFactory.Default(0, 1); + private final ControlledConsistentHashFactory site1CHFactory = new ControlledConsistentHashFactory.Default(0, 1); + + + @Override + protected int defaultNumberOfSites() { + return 2; + } + + @Override + protected int defaultNumberOfNodes() { + return 3; + } + + @Override + protected ConfigurationBuilder defaultConfigurationForSite(int siteIndex) { + ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC); + builder.clustering().hash() + .numSegments(1) + .numOwners(2) + .consistentHashFactory(siteIndex == 0 ? site0CHFactory : site1CHFactory); + builder.sites().addBackup() + .site(siteName(siteIndex == 0 ? 1 : 0)) + .strategy(BackupConfiguration.BackupStrategy.ASYNC); + return builder; + } + + + @BeforeMethod(alwaysRun = true) + @Override + public void createBeforeMethod() { + super.createBeforeMethod(); + // reset consistent hash + site0CHFactory.setOwnerIndexes(0, 1); + site1CHFactory.setOwnerIndexes(0, 1); + + site0CHFactory.triggerRebalance(cache(0, 0)); + site1CHFactory.triggerRebalance(cache(1, 0)); + + site(0).waitForClusterToForm(null); + site(1).waitForClusterToForm(null); + } + + public void testPrimaryOwnerLosesOwnership() throws InterruptedException, ExecutionException, TimeoutException { + String key = "key-1"; + String value = "primary-loses-ownership"; + + assertOwnership(key, 0, 1, 2); + + BlockingInterceptor interceptor = blockingInterceptor(0, 0); + CommandBlocker blocker = interceptor.blockCommand(); + + CompletableFuture stage = cache(0, 0).putAsync(key, value); + + AssertJUnit.assertTrue(blocker.blocked.await(10, TimeUnit.SECONDS)); + + site0CHFactory.setOwnerIndexes(1, 2); + site0CHFactory.triggerRebalance(cache(0, 0)); + site(0).waitForClusterToForm(null); + assertOwnership(key, 1, 2, 0); + + blocker.release(); + stage.get(10, TimeUnit.SECONDS); + + eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get(key))); + } + + public void testBackupOwnerLosesOwnership() throws InterruptedException, ExecutionException, TimeoutException { + String key = "key-2"; + String value = "backup-loses-ownership"; + + assertOwnership(key, 0, 1, 2); + + BlockingInterceptor interceptor = blockingInterceptor(0, 1); + CommandBlocker blocker = interceptor.blockCommand(); + + CompletableFuture stage = cache(0, 1).putAsync(key, value); + + AssertJUnit.assertTrue(blocker.blocked.await(10, TimeUnit.SECONDS)); + + site0CHFactory.setOwnerIndexes(0, 2); + site0CHFactory.triggerRebalance(cache(0, 0)); + site(0).waitForClusterToForm(null); + assertOwnership(key, 0, 2, 1); + + blocker.release(); + stage.get(10, TimeUnit.SECONDS); + + eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get(key))); + } + + public void testPrimaryChangesOwnershipWithBackup() throws InterruptedException, ExecutionException, TimeoutException { + String key = "key-3"; + String value = "primary-backup-swap"; + + assertOwnership(key, 0, 1, 2); + + BlockingInterceptor interceptor = blockingInterceptor(0, 0); + CommandBlocker blocker = interceptor.blockCommand(); + + CompletableFuture stage = cache(0, 0).putAsync(key, value); + + AssertJUnit.assertTrue(blocker.blocked.await(10, TimeUnit.SECONDS)); + + site0CHFactory.setOwnerIndexes(1, 0); + site0CHFactory.triggerRebalance(cache(0, 0)); + site(0).waitForClusterToForm(null); + assertOwnership(key, 1, 0, 2); + + blocker.release(); + stage.get(10, TimeUnit.SECONDS); + + eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get(key))); + } + + public void testNonOwnerBecomesBackup() throws InterruptedException, ExecutionException, TimeoutException { + String key = "key-4"; + String value = "non-owner-to-backup"; + + assertOwnership(key, 0, 1, 2); + + BlockingInterceptor interceptor = blockingInterceptor(0, 2); + CommandBlocker blocker = interceptor.blockCommand(); + + CompletableFuture stage = cache(0, 2).putAsync(key, value); + + AssertJUnit.assertTrue(blocker.blocked.await(10, TimeUnit.SECONDS)); + + site0CHFactory.setOwnerIndexes(0, 2); + site0CHFactory.triggerRebalance(cache(0, 0)); + site(0).waitForClusterToForm(null); + assertOwnership(key, 0, 2, 1); + + blocker.release(); + stage.get(10, TimeUnit.SECONDS); + + eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get(key))); + } + + public void testNonOwnerBecomesPrimary() throws InterruptedException, ExecutionException, TimeoutException { + String key = "key-5"; + String value = "non-owner-to-backup"; + + assertOwnership(key, 0, 1, 2); + + BlockingInterceptor interceptor = blockingInterceptor(0, 2); + CommandBlocker blocker = interceptor.blockCommand(); + + CompletableFuture stage = cache(0, 2).putAsync(key, value); + + AssertJUnit.assertTrue(blocker.blocked.await(10, TimeUnit.SECONDS)); + + site0CHFactory.setOwnerIndexes(2, 1); + site0CHFactory.triggerRebalance(cache(0, 0)); + site(0).waitForClusterToForm(null); + assertOwnership(key, 2, 1, 0); + + blocker.release(); + stage.get(10, TimeUnit.SECONDS); + + eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get(key))); + } + + private LocalizedCacheTopology cacheTopology(int site, int index) { + return TestingUtil.extractCacheTopology(cache(site, index)); + } + + private BlockingInterceptor blockingInterceptor(int site, int index) { + AsyncInterceptorChain interceptorChain = TestingUtil.extractInterceptorChain(cache(site, index)); + BlockingInterceptor interceptor = interceptorChain.findInterceptorExtending(BlockingInterceptor.class); + if (interceptor != null) { + return interceptor; + } + interceptor = new BlockingInterceptor(); + AssertJUnit.assertTrue(interceptorChain.addInterceptorAfter(interceptor, NonTxIracLocalSiteInterceptor.class)); + return interceptor; + } + + private void assertOwnership(String key, int primary, int backup, int nonOwner) { + AssertJUnit.assertTrue(cacheTopology(0, primary).getDistribution(key).isPrimary()); + AssertJUnit.assertTrue(cacheTopology(0, backup).getDistribution(key).isWriteBackup()); + AssertJUnit.assertFalse(cacheTopology(0, nonOwner).getDistribution(key).isWriteOwner()); + } + + public static class BlockingInterceptor extends DDAsyncInterceptor { + + volatile CommandBlocker afterCompleted; + + @Override + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { + CommandBlocker blocker = afterCompleted; + if (blocker == null || blocker.delay.isDone() || command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { + log.tracef("Skipping command %s", command); + return invokeNext(ctx, command); + } + return invokeNextAndHandle(ctx, command, (rCtx, rCommand, rv, throwable) -> delayedValue(blocker.notifyBlocked(rCommand), rv, throwable)); + } + + CommandBlocker blockCommand() { + CommandBlocker existing; + CommandBlocker newBlocker = new CommandBlocker(new CountDownLatch(1), new CompletableFuture<>()); + synchronized (this) { + existing = afterCompleted; + afterCompleted = newBlocker; + } + if (existing != null) { + existing.release(); + } + return newBlocker; + } + } + + private static class CommandBlocker { + final CountDownLatch blocked; + final CompletableFuture delay; + + CommandBlocker(CountDownLatch blocked, CompletableFuture delay) { + this.blocked = Objects.requireNonNull(blocked); + this.delay = Objects.requireNonNull(delay); + } + + CompletableFuture notifyBlocked(Object command) { + log.tracef("Blocking command %s", command); + blocked.countDown(); + return delay.thenRun(() -> log.tracef("Unblocking command %s", command)); + } + + void release() { + blocked.countDown(); + delay.complete(null); + } + } +}