Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-14219 IRAC: NPE on topology change during write command #10385

Merged
merged 1 commit into from Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,6 +35,7 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.CacheTopologyUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

Expand Down Expand Up @@ -67,7 +68,7 @@ protected Object primaryReturnHandler(InvocationContext ctx, AbstractDataWriteCo
command);
return localResult;
}
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, distributionManager.getCacheTopology());
DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(command.getSegment());

Collection<Address> owners = distributionInfo.writeOwners();
Expand Down
Expand Up @@ -66,6 +66,7 @@
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.CacheTopologyUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

Expand Down Expand Up @@ -132,7 +133,7 @@ protected DistributionInfo retrieveDistributionInfo(LocalizedCacheTopology topol
*/
protected <C extends FlagAffectedCommand & TopologyAffectedCommand> CompletionStage<Void> remoteGetSingleKey(
InvocationContext ctx, C command, Object key, boolean isWrite) {
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
int topologyId = cacheTopology.getTopologyId();

DistributionInfo info = retrieveDistributionInfo(cacheTopology, command, key);
Expand Down Expand Up @@ -190,7 +191,7 @@ protected final Object handleNonTxWriteCommand(InvocationContext ctx, AbstractDa
return invokeNext(ctx, command);
}

LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
DistributionInfo info = cacheTopology.getSegmentDistribution(SegmentSpecificCommand.extractSegment(command, key,
keyPartitioner));

Expand Down Expand Up @@ -232,7 +233,7 @@ protected Object primaryReturnHandler(InvocationContext ctx, AbstractDataWriteCo
if (log.isTraceEnabled()) log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command);
return localResult;
}
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
int segment = SegmentSpecificCommand.extractSegment(command, command.getKey(), keyPartitioner);
DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(segment);
Collection<Address> owners = distributionInfo.writeOwners();
Expand Down Expand Up @@ -302,7 +303,7 @@ CompletionStage<Void> remoteGetMany(InvocationContext ctx, C command, Collection
private <C extends FlagAffectedCommand & TopologyAffectedCommand>
CompletionStage<Void> doRemoteGetMany(InvocationContext ctx, C command, Collection<?> keys,
Map<Object, Collection<Address>> unsureOwners, boolean hasSuspectedOwner) {
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
Map<Address, List<Object>> requestedKeys = getKeysByOwner(ctx, keys, cacheTopology, null, unsureOwners);
if (requestedKeys.isEmpty()) {
for (Object key : keys) {
Expand Down Expand Up @@ -350,7 +351,7 @@ protected <C extends TopologyAffectedCommand & FlagAffectedCommand> Object handl
return handleLocalOnlyReadManyCommand(ctx, command, helper.keys(command));
}

LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
Collection<?> keys = helper.keys(command);
if (!ctx.isOriginLocal()) {
return handleRemoteReadManyCommand(ctx, command, keys, helper);
Expand Down Expand Up @@ -477,7 +478,7 @@ private void handleMissingResponse(Response response) {
for (Object key : keys) {
contactedNodes.computeIfAbsent(key, k -> new ArrayList<>(4)).add(target);
}
requestedKeys = getKeysByOwner(ctx, keys, checkTopologyId(command), null, contactedNodes);
requestedKeys = getKeysByOwner(ctx, keys, CacheTopologyUtil.checkTopology(command, getCacheTopology()), null, contactedNodes);
}
int pos = destinationIndex;
for (Map.Entry<Address, List<Object>> addressKeys : requestedKeys.entrySet()) {
Expand Down Expand Up @@ -638,7 +639,7 @@ public Object visitReadOnlyKeyCommand(InvocationContext ctx, ReadOnlyKeyCommand
return UnsureResponse.INSTANCE;
}
if (readNeedsRemoteValue(command)) {
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
Collection<Address> owners = cacheTopology.getDistribution(key).readOwners();
if (log.isTraceEnabled())
log.tracef("Doing a remote get for key %s in topology %d to %s", key, cacheTopology.getTopologyId(), owners);
Expand Down
Expand Up @@ -43,8 +43,7 @@
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.remoting.transport.impl.SingletonMapResponseCollector;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.util.CacheTopologyUtil;

/**
* Non-transactional interceptor used by distributed caches that support concurrent writes.
Expand All @@ -65,8 +64,6 @@
*/
public class NonTxDistributionInterceptor extends BaseDistributionInterceptor {

private static Log log = LogFactory.getLog(NonTxDistributionInterceptor.class);

private final PutMapHelper putMapHelper = new PutMapHelper(this::createRemoteCallback);
private final ReadWriteManyHelper readWriteManyHelper = new ReadWriteManyHelper(this::createRemoteCallback);
private final ReadWriteManyEntriesHelper readWriteManyEntriesHelper = new ReadWriteManyEntriesHelper(this::createRemoteCallback);
Expand Down Expand Up @@ -189,7 +186,7 @@ private <C extends WriteCommand, Container, Item> Object handleWriteOnlyManyComm
// it is possible that the function will be applied multiple times on some of the nodes.
// There is no general solution for this ATM; proper solution will probably record CommandInvocationId
// in the entry, and implement some housekeeping
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
ConsistentHash ch = cacheTopology.getWriteConsistentHash();
if (ctx.isOriginLocal()) {
Map<Address, IntSet> segmentMap = primaryOwnersOfSegments(ch);
Expand Down Expand Up @@ -289,7 +286,7 @@ protected <C extends WriteCommand, Container, Item> Object handleReadWriteManyCo
// it is possible that the function will be applied multiple times on some of the nodes.
// There is no general solution for this ATM; proper solution will probably record CommandInvocationId
// in the entry, and implement some housekeeping
LocalizedCacheTopology topology = checkTopologyId(command);
LocalizedCacheTopology topology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
ConsistentHash ch = topology.getWriteConsistentHash();
if (ctx.isOriginLocal()) {
Map<Address, IntSet> segmentMap = primaryOwnersOfSegments(ch);
Expand Down Expand Up @@ -496,7 +493,7 @@ private final static class MutableInt {
private <C extends WriteCommand> Object writeManyRemoteCallback(WriteManyCommandHelper<C , ?, ?> helper,InvocationContext ctx, C command, Object rv) {
// The node running this method must be primary owner for all the command's keys
// Check that the command topology is actual, so we can assume that we really are primary owner
LocalizedCacheTopology topology = checkTopologyId(command);
LocalizedCacheTopology topology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
Map<Address, IntSet> backups = backupOwnersOfSegments(topology, extractCommandSegments(command, topology));
if (backups.isEmpty()) {
return rv;
Expand Down
Expand Up @@ -64,6 +64,7 @@
import org.infinispan.remoting.transport.impl.SingleResponseCollector;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.util.CacheTopologyUtil;
import org.infinispan.util.TriangleFunctionsUtil;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.commons.util.concurrent.CompletableFutures;
Expand Down Expand Up @@ -234,7 +235,7 @@ private <R, C extends WriteCommand> Object handleLocalManyKeysCommand(Invocation
MultiKeyBackupBuilder<C> backupBuilder) {

//local command. we need to split by primary owner to send the command to them
final LocalizedCacheTopology cacheTopology = checkTopologyId(command);
final LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
final PrimaryOwnerClassifier filter = new PrimaryOwnerClassifier(cacheTopology, command.getAffectedKeys());

return isSynchronous(command) ?
Expand All @@ -255,7 +256,7 @@ private <C extends WriteCommand> Object handleRemoteManyKeysCommand(InvocationCo
private <C extends WriteCommand> Object remoteBackupManyKeysWrite(InvocationContext ctx, C command,
Set<Object> keys) {
//backup & remote
final LocalizedCacheTopology cacheTopology = checkTopologyId(command);
final LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
return asyncInvokeNext(ctx, command,
checkRemoteGetIfNeeded(ctx, command, keys, cacheTopology, command.loadType() == OWNER));
}
Expand All @@ -264,7 +265,7 @@ private <C extends WriteCommand> Object remotePrimaryManyKeysWrite(InvocationCon
Set<Object> keys,
MultiKeyBackupBuilder<C> backupBuilder) {
//primary owner & remote
final LocalizedCacheTopology cacheTopology = checkTopologyId(command);
final LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
//primary, we need to send the command to the backups ordered!
sendToBackups(command, keys, cacheTopology, backupBuilder);
return asyncInvokeNext(ctx, command,
Expand Down Expand Up @@ -405,7 +406,7 @@ private <C extends DataWriteCommand> Object handleSingleKeyWriteCommand(Invocati
//don't go through the triangle
return invokeNext(context, command);
}
LocalizedCacheTopology topology = checkTopologyId(command);
LocalizedCacheTopology topology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
DistributionInfo distributionInfo = topology.getDistributionForSegment(command.getSegment());

if (distributionInfo.isPrimary()) {
Expand Down
Expand Up @@ -76,6 +76,7 @@
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.CacheTopologyUtil;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand Down Expand Up @@ -171,7 +172,7 @@ public Object visitLockControlCommand(TxInvocationContext ctx, LockControlComman
throws Throwable {
if (ctx.isOriginLocal()) {
TxInvocationContext<LocalTransaction> localTxCtx = (TxInvocationContext<LocalTransaction>) ctx;
Collection<Address> affectedNodes = checkTopologyId(command).getWriteOwners(command.getKeys());
Collection<Address> affectedNodes = CacheTopologyUtil.checkTopology(command, getCacheTopology()).getWriteOwners(command.getKeys());
localTxCtx.getCacheTransaction().locksAcquired(affectedNodes);
log.tracef("Registered remote locks acquired %s", affectedNodes);
RpcOptions rpcOptions = rpcManager.getSyncRpcOptions();
Expand Down Expand Up @@ -264,7 +265,7 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman

TxInvocationContext<LocalTransaction> localTxCtx = (TxInvocationContext<LocalTransaction>) rCtx;
LocalTransaction localTx = localTxCtx.getCacheTransaction();
LocalizedCacheTopology cacheTopology = checkTopologyId(rCommand);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(rCommand, getCacheTopology());
Collection<Address> writeOwners = cacheTopology.getWriteOwners(localTxCtx.getAffectedKeys());
localTx.locksAcquired(writeOwners);
Collection<Address> recipients = isReplicated ? null : localTx.getCommitNodes(writeOwners, cacheTopology);
Expand Down Expand Up @@ -328,7 +329,7 @@ private Object handleSecondPhaseCommand(TxInvocationContext ctx, TransactionBoun

private Collection<Address> getCommitNodes(TxInvocationContext ctx, TopologyAffectedCommand command) {
LocalTransaction localTx = (LocalTransaction) ctx.getCacheTransaction();
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
Collection<Address> affectedNodes =
isReplicated ? null : cacheTopology.getWriteOwners(ctx.getAffectedKeys());
return localTx.getCommitNodes(affectedNodes, cacheTopology);
Expand All @@ -337,7 +338,7 @@ private Collection<Address> getCommitNodes(TxInvocationContext ctx, TopologyAffe
protected void checkTxCommandResponses(Map<Address, Response> responseMap,
TransactionBoundaryCommand command, TxInvocationContext<LocalTransaction> context,
Collection<Address> recipients, PrepareResponse prepareResponse) {
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
for (Map.Entry<Address, Response> e : responseMap.entrySet()) {
Address recipient = e.getKey();
Response response = e.getValue();
Expand Down Expand Up @@ -392,7 +393,7 @@ private Object handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteComm
Object key) throws Throwable {
try {
if (!ctx.isOriginLocal()) {
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
// Ignore any remote command when we aren't the owner
if (!cacheTopology.isSegmentWriteOwner(command.getSegment())) {
return null;
Expand Down Expand Up @@ -428,7 +429,7 @@ private Object handleTxWriteCommand(InvocationContext ctx, AbstractDataWriteComm
boolean ignorePreviousValue = command.hasAnyFlag(SKIP_REMOTE_FLAGS) || command.loadType() == VisitableCommand.LoadType.DONT_LOAD;
Map<K, V> filtered = new HashMap<>(entries.size());
Collection<Object> remoteKeys = new ArrayList<>();
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
for (Map.Entry<K, V> e : entries.entrySet()) {
K key = e.getKey();
if (ctx.isOriginLocal() || cacheTopology.isWriteOwner(key)) {
Expand All @@ -451,7 +452,7 @@ protected <C extends VisitableCommand & FlagAffectedCommand & TopologyAffectedCo
boolean ignorePreviousValue = command.hasAnyFlag(SKIP_REMOTE_FLAGS) || command.loadType() == VisitableCommand.LoadType.DONT_LOAD;
List<K> filtered = new ArrayList<>(keys.size());
List<Object> remoteKeys = null;
LocalizedCacheTopology cacheTopology = checkTopologyId(command);
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
for (K key : keys) {
if (ctx.isOriginLocal() || cacheTopology.isWriteOwner(key)) {
if (ctx.lookupEntry(key) == null) {
Expand All @@ -473,6 +474,7 @@ protected <C extends VisitableCommand & FlagAffectedCommand & TopologyAffectedCo

public <C extends AbstractDataWriteCommand & FunctionalCommand> Object handleTxFunctionalCommand(InvocationContext ctx, C command) {
Object key = command.getKey();
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
if (ctx.isOriginLocal()) {
CacheEntry entry = ctx.lookupEntry(key);
if (entry == null) {
Expand All @@ -481,7 +483,6 @@ public <C extends AbstractDataWriteCommand & FunctionalCommand> Object handleTxF
return invokeNext(ctx, command);
}

LocalizedCacheTopology cacheTopology = checkTopologyId(command);
int segment = command.getSegment();
DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(segment);

Expand All @@ -508,7 +509,7 @@ public <C extends AbstractDataWriteCommand & FunctionalCommand> Object handleTxF
// It's possible that this is not an owner, but the entry was loaded from L1 - let the command run
return invokeNext(ctx, command);
} else {
if (!checkTopologyId(command).isWriteOwner(key)) {
if (!cacheTopology.isWriteOwner(key)) {
return null;
}
CacheEntry entry = ctx.lookupEntry(key);
Expand Down