diff --git a/core/src/main/java/org/infinispan/util/concurrent/CompletionStages.java b/core/src/main/java/org/infinispan/util/concurrent/CompletionStages.java index 6bd239632dab..92fa528d2d0f 100644 --- a/core/src/main/java/org/infinispan/util/concurrent/CompletionStages.java +++ b/core/src/main/java/org/infinispan/util/concurrent/CompletionStages.java @@ -6,6 +6,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -145,8 +146,13 @@ public static CompletionStage handleAndCompose(CompletionStage stag return stage.handle(handleFunction).thenCompose(Function.identity()); } + public static CompletionStage handleAndComposeAsync(CompletionStage stage, + BiFunction> handleFunction, Executor executor) { + return stage.handleAsync(handleFunction, executor).thenCompose(Function.identity()); + } + public static CompletionStage schedule(Runnable command, ScheduledExecutorService executor, - long delay, TimeUnit timeUnit) { + long delay, TimeUnit timeUnit) { CompletableFuture future = new CompletableFuture<>(); executor.schedule(() -> { try { diff --git a/core/src/test/java/org/infinispan/distribution/DistributionTestHelper.java b/core/src/test/java/org/infinispan/distribution/DistributionTestHelper.java index 6e91973d250c..2030ee89901e 100644 --- a/core/src/test/java/org/infinispan/distribution/DistributionTestHelper.java +++ b/core/src/test/java/org/infinispan/distribution/DistributionTestHelper.java @@ -107,6 +107,15 @@ public static Cache getFirstOwner(Object key, List> cac return getOwners(key, caches).iterator().next(); } + public static Cache getFirstBackupOwner(Object key, List> caches) { + for (Cache c : caches) { + if (isOwner(c, key) && !isFirstOwner(c, key)) { + return c; + } + } + return null; + } + public static Collection> getNonOwners(Object key, List> caches) { List> nonOwners = new ArrayList<>(); for (Cache c : caches) diff --git a/server/resp/pom.xml b/server/resp/pom.xml index e6cfdb742679..372114c4bd83 100644 --- a/server/resp/pom.xml +++ b/server/resp/pom.xml @@ -41,6 +41,11 @@ testng test + + org.mockito + mockito-core + test + diff --git a/server/resp/src/main/java/org/infinispan/server/resp/Resp3AuthHandler.java b/server/resp/src/main/java/org/infinispan/server/resp/Resp3AuthHandler.java index c41699c4c3c8..b505523278ca 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/Resp3AuthHandler.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/Resp3AuthHandler.java @@ -1,22 +1,21 @@ package org.infinispan.server.resp; -import static org.infinispan.server.resp.Resp3Handler.handleThrowable; import static org.infinispan.server.resp.Resp3Handler.statusOK; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; import javax.security.auth.Subject; -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.CharsetUtil; import org.infinispan.AdvancedCache; import org.infinispan.commons.util.Version; import org.infinispan.commons.util.concurrent.CompletableFutures; -public class Resp3AuthHandler implements RespRequestHandler { +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.CharsetUtil; + +public class Resp3AuthHandler extends RespRequestHandler { protected final RespServer respServer; protected AdvancedCache cache; @@ -27,8 +26,8 @@ public Resp3AuthHandler(RespServer server) { } @Override - public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, List arguments) { - boolean success = false; + public CompletionStage handleRequest(ChannelHandlerContext ctx, String type, List arguments) { + CompletionStage successStage = null; switch (type) { case "HELLO": byte[] respProtocolBytes = arguments.get(0); @@ -39,53 +38,52 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, } if (arguments.size() == 4) { - success = performAuth(ctx, arguments.get(2), arguments.get(3)); + successStage = performAuth(ctx, arguments.get(2), arguments.get(3)); } else { helloResponse(ctx); } break; case "AUTH": - success = performAuth(ctx, arguments.get(0), arguments.get(1)); + successStage = performAuth(ctx, arguments.get(0), arguments.get(1)); break; case "QUIT": - // TODO: need to close connection - ctx.flush(); + ctx.close(); break; default: - if (isAuthorized()) RespRequestHandler.super.handleRequest(ctx, type, arguments); + if (isAuthorized()) super.handleRequest(ctx, type, arguments); else handleUnauthorized(ctx); } - return success ? respServer.newHandler() : this; + if (successStage != null) { + return stageToReturn(successStage, ctx, + auth -> auth ? respServer.newHandler() : this); + } + + return myStage; } - private boolean performAuth(ChannelHandlerContext ctx, byte[] username, byte[] password) { + private CompletionStage performAuth(ChannelHandlerContext ctx, byte[] username, byte[] password) { return performAuth(ctx, new String(username, StandardCharsets.UTF_8), new String(password, StandardCharsets.UTF_8)); } - private boolean performAuth(ChannelHandlerContext ctx, String username, String password) { + private CompletionStage performAuth(ChannelHandlerContext ctx, String username, String password) { Authenticator authenticator = respServer.getConfiguration().authentication().authenticator(); if (authenticator == null) { - return handleAuthResponse(ctx, null); + return CompletableFutures.booleanStage(handleAuthResponse(ctx, null)); } - CompletionStage cs = authenticator.authenticate(username, password.toCharArray()) - .thenApply(r -> handleAuthResponse(ctx, r)) + return authenticator.authenticate(username, password.toCharArray()) + // Note we have to write to our variables in the event loop (in this case cache) + .thenApplyAsync(r -> handleAuthResponse(ctx, r), ctx.channel().eventLoop()) .exceptionally(t -> { handleUnauthorized(ctx); return false; }); - try { - return CompletableFutures.await(cs.toCompletableFuture()); - } catch (ExecutionException | InterruptedException e) { - handleThrowable(ctx, e); - } - - return false; } private boolean handleAuthResponse(ChannelHandlerContext ctx, Subject subject) { + assert ctx.channel().eventLoop().inEventLoop(); if (subject == null) { - ctx.writeAndFlush(ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR Client sent AUTH, but no password is set\r\n", ctx.alloc()))); + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR Client sent AUTH, but no password is set\r\n", ctx.alloc())); return false; } @@ -95,6 +93,7 @@ private boolean handleAuthResponse(ChannelHandlerContext ctx, Subject subject) { } private void handleUnauthorized(ChannelHandlerContext ctx) { + assert ctx.channel().eventLoop().inEventLoop(); ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-WRONGPASS invalid username-password pair or user is disabled.\r\n", ctx.alloc())); } diff --git a/server/resp/src/main/java/org/infinispan/server/resp/Resp3Handler.java b/server/resp/src/main/java/org/infinispan/server/resp/Resp3Handler.java index d06b5e9c112b..024a27d2ea3f 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/Resp3Handler.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/Resp3Handler.java @@ -36,8 +36,7 @@ static ByteBuf statusOK() { } @Override - public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, - List arguments) { + public CompletionStage handleRequest(ChannelHandlerContext ctx, String type, List arguments) { switch (type) { case "PING": if (arguments.size() == 0) { @@ -53,170 +52,46 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, ctx.writeAndFlush(bufferToWrite); break; case "SET": - performSet(ctx, cache, arguments.get(0), arguments.get(1), -1, type, statusOK()); - break; + return performSet(ctx, cache, arguments.get(0), arguments.get(1), -1, type, statusOK()); case "GET": byte[] keyBytes = arguments.get(0); - cache.getAsync(keyBytes) - .whenComplete((innerValueBytes, t) -> { - if (t != null) { - log.trace("Exception encountered while performing GET", t); - handleThrowable(ctx, t); - } else if (innerValueBytes != null) { - int length = innerValueBytes.length; - ByteBuf buf = RespRequestHandler.stringToByteBufWithExtra("$" + length + "\r\n", ctx.alloc(), length + 2); - buf.writeBytes(innerValueBytes); - buf.writeByte('\r').writeByte('\n'); - ctx.writeAndFlush(buf); - } else { - ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", ctx.alloc())); - } - }); - break; - case "DEL": - int keysToRemove = arguments.size(); - if (keysToRemove == 1) { - keyBytes = arguments.get(0); - cache.removeAsync(keyBytes) - .whenComplete((prev, t) -> { - if (t != null) { - log.trace("Exception encountered while performing DEL", t); - handleThrowable(ctx, t); - return; - } - ctx.writeAndFlush(RespRequestHandler.stringToByteBuf(":" + (prev == null ? "0" : "1") + - "\r\n", ctx.alloc())); - }); - } else if (keysToRemove == 0) { - // TODO: is this an error? - ctx.writeAndFlush(RespRequestHandler.stringToByteBuf(":0\r\n", ctx.alloc())); - } else { - AtomicInteger removes = new AtomicInteger(); - AggregateCompletionStage deleteStages = CompletionStages.aggregateCompletionStage(removes); - for (byte[] keyBytesLoop : arguments) { - deleteStages.dependsOn(cache.removeAsync(keyBytesLoop) - .thenAccept(prev -> { - if (prev != null) { - removes.incrementAndGet(); - } - })); + return stageToReturn(cache.getAsync(keyBytes), ctx, (innerValueBytes, t) -> { + if (t != null) { + log.trace("Exception encountered while performing GET", t); + handleThrowable(ctx, t); + } else if (innerValueBytes != null) { + int length = innerValueBytes.length; + ByteBuf buf = RespRequestHandler.stringToByteBufWithExtra("$" + length + "\r\n", ctx.alloc(), length + 2); + buf.writeBytes(innerValueBytes); + buf.writeByte('\r').writeByte('\n'); + ctx.writeAndFlush(buf); + } else { + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", ctx.alloc())); } - deleteStages.freeze() - .whenComplete((removals, t) -> { - if (t != null) { - log.trace("Exception encountered while performing multiple DEL", t); - handleThrowable(ctx, t); - return; - } - ctx.writeAndFlush(RespRequestHandler.stringToByteBuf(":" + removals.get() + "\r\n", ctx.alloc())); - }); - } - break; + }); + case "DEL": + return performDelete(ctx, cache, arguments); case "MGET": - int keysToRetrieve = arguments.size(); - if (keysToRetrieve == 0) { - ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("*0\r\n", ctx.alloc())); - break; - } - List results = Collections.synchronizedList(Arrays.asList( - new byte[keysToRetrieve][])); - AtomicInteger resultBytesSize = new AtomicInteger(); - AggregateCompletionStage getStage = CompletionStages.aggregateCompletionStage(); - for (int i = 0; i < keysToRetrieve; ++i) { - int innerCount = i; - keyBytes = arguments.get(i); - getStage.dependsOn(cache.getAsync(keyBytes) - .whenComplete((returnValue, t) -> { - if (returnValue != null) { - results.set(innerCount, returnValue); - int length = returnValue.length; - if (length > 0) { - // byte length + digit length (log10 + 1) + $ - resultBytesSize.addAndGet(returnValue.length + (int) Math.log10(length) + 1 + 1); - } else { - // $0 - resultBytesSize.addAndGet(2); - } - } else { - // $-1 - resultBytesSize.addAndGet(3); - } - // /r/n - resultBytesSize.addAndGet(2); - })); - } - getStage.freeze() - .whenComplete((ignore, t) -> { - if (t != null) { - log.trace("Exception encountered while performing multiple DEL", t); - handleThrowable(ctx, t); - return; - } - int elements = results.size(); - // * + digit length (log10 + 1) + \r\n - ByteBuf byteBuf = ctx.alloc().buffer(resultBytesSize.addAndGet(1 + (int) Math.log10(elements) - + 1 + 2)); - byteBuf.writeCharSequence("*" + results.size(), CharsetUtil.UTF_8); - byteBuf.writeByte('\r'); - byteBuf.writeByte('\n'); - for (byte[] value : results) { - if (value == null) { - byteBuf.writeCharSequence("$-1", CharsetUtil.UTF_8); - } else { - byteBuf.writeCharSequence("$" + value.length, CharsetUtil.UTF_8); - byteBuf.writeByte('\r'); - byteBuf.writeByte('\n'); - byteBuf.writeBytes(value); - } - byteBuf.writeByte('\r'); - byteBuf.writeByte('\n'); - } - ctx.writeAndFlush(byteBuf); - }); - break; + return performMget(ctx, cache, arguments); case "MSET": - int keyValuePairCount = arguments.size(); - if ((keyValuePairCount & 1) == 1) { - log.tracef("Received: %s count for keys and values combined, should be even for MSET", keyValuePairCount); - ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR Missing a value for a key" + "\r\n", ctx.alloc())); - break; - } - AggregateCompletionStage setStage = CompletionStages.aggregateCompletionStage(); - for (int i = 0; i < keyValuePairCount; i += 2) { - keyBytes = arguments.get(i); - byte[] valueBytes = arguments.get(i + 1); - setStage.dependsOn(cache.putAsync(keyBytes, valueBytes)); - } - setStage.freeze().whenComplete((ignore, t) -> { + return performMset(ctx, cache, arguments); + case "INCR": + return stageToReturn(counterIncOrDec(cache, arguments.get(0), true), ctx, (longValue, t) -> { if (t != null) { - log.trace("Exception encountered while performing MSET", t); handleThrowable(ctx, t); } else { - ctx.writeAndFlush(statusOK()); + handleLongResult(ctx, longValue); } }); - break; - case "INCR": - counterIncOrDec(cache, arguments.get(0), true) - .whenComplete((longValue, t) -> { - if (t != null) { - handleThrowable(ctx, t); - } else { - handleLongResult(ctx, longValue); - } - }); - break; case "DECR": - counterIncOrDec(cache, arguments.get(0), false) - .whenComplete((longValue, t) -> { - if (t != null) { - handleThrowable(ctx, t); - } else { - handleLongResult(ctx, longValue); - } - }); - break; + return stageToReturn(counterIncOrDec(cache, arguments.get(0), false), ctx, (longValue, t) -> { + if (t != null) { + handleThrowable(ctx, t); + } else { + handleLongResult(ctx, longValue); + } + }); case "INFO": ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR not implemented yet\r\n", ctx.alloc())); break; @@ -224,9 +99,8 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, // TODO: should we return the # of subscribers on this node? // We use expiration to remove the event values eventually while preventing them during high periods of // updates - performSet(ctx, cache, SubscriberHandler.keyToChannel(arguments.get(0)), + return performSet(ctx, cache, SubscriberHandler.keyToChannel(arguments.get(0)), arguments.get(1), 3, type, RespRequestHandler.stringToByteBuf(":0\r\n", ctx.alloc())); - break; case "SUBSCRIBE": SubscriberHandler subscriberHandler = new SubscriberHandler(respServer, this); return subscriberHandler.handleRequest(ctx, type, arguments); @@ -239,10 +113,9 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, ctx.writeAndFlush(statusOK()); break; case "RESET": - // TODO: do we need to reset anything in this case? ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("+RESET\r\n", ctx.alloc())); if (respServer.getConfiguration().authentication().enabled()) { - return new Resp3AuthHandler(respServer); + return CompletableFuture.completedFuture(new Resp3AuthHandler(respServer)); } break; case "COMMAND": @@ -277,7 +150,7 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, default: return super.handleRequest(ctx, type, arguments); } - return this; + return myStage; } private static void addCommand(StringBuilder builder, String name, int arity, int firstKeyPos, int lastKeyPos, int steps) { @@ -338,16 +211,139 @@ private static CompletionStage counterIncOrDec(Cache cache }); } - private void performSet(ChannelHandlerContext ctx, Cache cache, byte[] key, byte[] value, + private CompletionStage performSet(ChannelHandlerContext ctx, Cache cache, byte[] key, byte[] value, long lifespan, String type, ByteBuf messageOnSuccess) { - cache.putAsync(key, value, lifespan, TimeUnit.SECONDS) - .whenComplete((ignore, t) -> { - if (t != null) { - log.trace("Exception encountered while performing " + type, t); - handleThrowable(ctx, t); - } else { - ctx.writeAndFlush(messageOnSuccess); - } - }); + return stageToReturn(cache.putAsync(key, value, lifespan, TimeUnit.SECONDS), ctx, (ignore, t) -> { + if (t != null) { + log.trace("Exception encountered while performing " + type, t); + handleThrowable(ctx, t); + } else { + ctx.writeAndFlush(messageOnSuccess); + } + }); + } + + private CompletionStage performDelete(ChannelHandlerContext ctx, Cache cache, List arguments) { + int keysToRemove = arguments.size(); + if (keysToRemove == 1) { + byte[] keyBytes = arguments.get(0); + return stageToReturn(cache.removeAsync(keyBytes), ctx, (prev, t) -> { + if (t != null) { + log.trace("Exception encountered while performing DEL", t); + handleThrowable(ctx, t); + return; + } + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf(":" + (prev == null ? "0" : "1") + + "\r\n", ctx.alloc())); + }); + } else if (keysToRemove == 0) { + // TODO: is this an error? + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf(":0\r\n", ctx.alloc())); + return myStage; + } else { + AtomicInteger removes = new AtomicInteger(); + AggregateCompletionStage deleteStages = CompletionStages.aggregateCompletionStage(removes); + for (byte[] keyBytesLoop : arguments) { + deleteStages.dependsOn(cache.removeAsync(keyBytesLoop) + .thenAccept(prev -> { + if (prev != null) { + removes.incrementAndGet(); + } + })); + } + return stageToReturn(deleteStages.freeze(), ctx, (removals, t) -> { + if (t != null) { + log.trace("Exception encountered while performing multiple DEL", t); + handleThrowable(ctx, t); + return; + } + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf(":" + removals.get() + "\r\n", ctx.alloc())); + }); + } + } + + private CompletionStage performMget(ChannelHandlerContext ctx, Cache cache, List arguments) { + int keysToRetrieve = arguments.size(); + if (keysToRetrieve == 0) { + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("*0\r\n", ctx.alloc())); + return myStage; + } + List results = Collections.synchronizedList(Arrays.asList( + new byte[keysToRetrieve][])); + AtomicInteger resultBytesSize = new AtomicInteger(); + AggregateCompletionStage getStage = CompletionStages.aggregateCompletionStage(); + for (int i = 0; i < keysToRetrieve; ++i) { + int innerCount = i; + byte[] keyBytes = arguments.get(i); + getStage.dependsOn(cache.getAsync(keyBytes) + .whenComplete((returnValue, t) -> { + if (returnValue != null) { + results.set(innerCount, returnValue); + int length = returnValue.length; + if (length > 0) { + // byte length + digit length (log10 + 1) + $ + resultBytesSize.addAndGet(returnValue.length + (int) Math.log10(length) + 1 + 1); + } else { + // $0 + resultBytesSize.addAndGet(2); + } + } else { + // $-1 + resultBytesSize.addAndGet(3); + } + // /r/n + resultBytesSize.addAndGet(2); + })); + } + return stageToReturn(getStage.freeze(), ctx, (ignore, t) -> { + if (t != null) { + log.trace("Exception encountered while performing multiple GET", t); + handleThrowable(ctx, t); + return; + } + int elements = results.size(); + // * + digit length (log10 + 1) + \r\n + ByteBuf byteBuf = ctx.alloc().buffer(resultBytesSize.addAndGet(1 + (int) Math.log10(elements) + + 1 + 2)); + byteBuf.writeCharSequence("*" + results.size(), CharsetUtil.UTF_8); + byteBuf.writeByte('\r'); + byteBuf.writeByte('\n'); + for (byte[] value : results) { + if (value == null) { + byteBuf.writeCharSequence("$-1", CharsetUtil.UTF_8); + } else { + byteBuf.writeCharSequence("$" + value.length, CharsetUtil.UTF_8); + byteBuf.writeByte('\r'); + byteBuf.writeByte('\n'); + byteBuf.writeBytes(value); + } + byteBuf.writeByte('\r'); + byteBuf.writeByte('\n'); + } + ctx.writeAndFlush(byteBuf); + }); + } + + private CompletionStage performMset(ChannelHandlerContext ctx, Cache cache, List arguments) { + int keyValuePairCount = arguments.size(); + if ((keyValuePairCount & 1) == 1) { + log.tracef("Received: %s count for keys and values combined, should be even for MSET", keyValuePairCount); + ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR Missing a value for a key" + "\r\n", ctx.alloc())); + return myStage; + } + AggregateCompletionStage setStage = CompletionStages.aggregateCompletionStage(); + for (int i = 0; i < keyValuePairCount; i += 2) { + byte[] keyBytes = arguments.get(i); + byte[] valueBytes = arguments.get(i + 1); + setStage.dependsOn(cache.putAsync(keyBytes, valueBytes)); + } + return stageToReturn(setStage.freeze(), ctx, (ignore, t) -> { + if (t != null) { + log.trace("Exception encountered while performing MSET", t); + handleThrowable(ctx, t); + } else { + ctx.writeAndFlush(statusOK()); + } + }); } } diff --git a/server/resp/src/main/java/org/infinispan/server/resp/RespLettuceHandler.java b/server/resp/src/main/java/org/infinispan/server/resp/RespLettuceHandler.java index 08702b6a048e..03c1f2f585f6 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/RespLettuceHandler.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/RespLettuceHandler.java @@ -2,9 +2,12 @@ import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.concurrent.CompletionStage; import org.infinispan.commons.logging.LogFactory; +import org.infinispan.commons.util.Util; import org.infinispan.server.resp.logging.Log; +import org.infinispan.util.concurrent.CompletionStages; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.output.PushOutput; @@ -19,18 +22,60 @@ public class RespLettuceHandler extends ByteToMessageDecoder { private final RedisStateMachine stateMachine = new RedisStateMachine(ByteBufAllocator.DEFAULT); private RespRequestHandler requestHandler; + private boolean disabledRead = false; public RespLettuceHandler(RespRequestHandler initialHandler) { this.requestHandler = initialHandler; } + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + super.channelUnregistered(ctx); + requestHandler.handleChannelDisconnect(ctx); + } + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { + // Don't read any of the ByteBuf if we disabled reads + if (disabledRead) { + return; + } PushOutput pushOutput = new PushOutput<>(ByteArrayCodec.INSTANCE); if (stateMachine.decode(in, pushOutput)) { String type = pushOutput.getType().toUpperCase(); List content = pushOutput.getContent(); - requestHandler = requestHandler.handleRequest(ctx, type, content.subList(1, content.size())); + List contentToUse = content.subList(1, content.size()); + log.tracef("Received command: %s with arguments %s", type, Util.toStr(contentToUse)); + CompletionStage stage = requestHandler.handleRequest(ctx, type, contentToUse); + if (CompletionStages.isCompletedSuccessfully(stage)) { + requestHandler = CompletionStages.join(stage); + } else { + log.tracef("Disabling auto read for channel %s until previous command is complete", ctx.channel()); + // Disable reading any more from socket - until command is complete + ctx.channel().config().setAutoRead(false); + disabledRead = true; + stage.whenComplete((handler, t) -> { + assert ctx.channel().eventLoop().inEventLoop(); + log.tracef("Re-enabling auto read for channel %s as previous command is complete", ctx.channel()); + ctx.channel().config().setAutoRead(true); + disabledRead = false; + if (t != null) { + exceptionCaught(ctx, t); + } else { + // Instate the new handler if there was no exception + requestHandler = handler; + } + + // If there is any readable bytes left before we paused make sure to try to decode, just in case + // if a pending message was read before we disabled auto read + ByteBuf buf = internalBuffer(); + if (buf.isReadable()) { + log.tracef("Bytes available from previous read for channel %s, trying decode directly", ctx.channel()); + // callDecode will call us until the ByteBuf is no longer consumed + callDecode(ctx, buf, List.of()); + } + }); + } } } diff --git a/server/resp/src/main/java/org/infinispan/server/resp/RespRequestHandler.java b/server/resp/src/main/java/org/infinispan/server/resp/RespRequestHandler.java index 5a51e1e0ee21..3a928fabe35d 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/RespRequestHandler.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/RespRequestHandler.java @@ -1,16 +1,120 @@ package org.infinispan.server.resp; import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.util.concurrent.CompletionStages; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; -public interface RespRequestHandler { - default RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, List arguments) { +public abstract class RespRequestHandler { + protected final CompletionStage myStage = CompletableFuture.completedStage(this); + + /** + * Handles the RESP request returning a stage that when complete notifies the command has completed as well as + * providing the request handler for subsequent commands. + * + * @param ctx Netty context pipeline for this request + * @param type The command type + * @param arguments The remaining arguments to the command + * @return stage that when complete returns the new handler to instate. This stage must be completed on the event loop + */ + public CompletionStage handleRequest(ChannelHandlerContext ctx, String type, List arguments) { + if ("QUIT".equals(type)) { + ctx.close(); + return myStage; + } ctx.writeAndFlush(stringToByteBuf("-ERR unknown command\r\n", ctx.alloc())); - return this; + return myStage; + } + + public void handleChannelDisconnect(ChannelHandlerContext ctx) { + // Do nothing by default + } + + protected CompletionStage stageToReturn(CompletionStage stage, ChannelHandlerContext ctx, + BiConsumer biConsumer) { + return stageToReturn(stage, ctx, Objects.requireNonNull(biConsumer), null); + } + + protected CompletionStage stageToReturn(CompletionStage stage, ChannelHandlerContext ctx, + Function handlerWhenComplete) { + return stageToReturn(stage, ctx, null, Objects.requireNonNull(handlerWhenComplete)); + } + + /** + * Handles ensuring that a stage BiConsumer is only invoked on the event loop. The BiConsumer can then do things + * such as writing to the underlying channel or update variables in a thread safe manner without additional + * synchronization or volatile variables. + *

+ * If the BiConsumer provided is null the provided stage must be completed only on the event loop of the + * provided ctx. + * This can be done via the Async methods on {@link CompletionStage} such as {@link CompletionStage#thenApplyAsync(Function, Executor)}; + *

+ * If the returned stage was completed exceptionally, any exception is ignored, assumed to be handled in the + * biConsumer or further up the stage. + * The handlerWhenComplete is not invoked if the stage was completed exceptionally as well. + * If the biConsumer or handlerWhenComplete throw an exception when invoked the returned stage will + * complete with that exception. + *

+ * This method can only be invoked on the event loop for the provided ctx. + * + * @param The stage return value + * @param stage The stage that will complete. Note that if biConsumer is null this stage may only be completed on the event loop + * @param ctx The context used for this request, normally the event loop is the thing used + * @param biConsumer The consumer to be ran on + * @param handlerWhenComplete A function to map the result to which handler will be used for future requests. Only ever invoked on the event loop. May be null, if so the current handler is returned + * @return A stage that is only ever completed on the event loop that provides the new handler to use + */ + private CompletionStage stageToReturn(CompletionStage stage, ChannelHandlerContext ctx, BiConsumer biConsumer, + Function handlerWhenComplete) { + assert ctx.channel().eventLoop().inEventLoop(); + // Only one or the other can be null + assert (biConsumer != null && handlerWhenComplete == null) || (biConsumer == null && handlerWhenComplete != null) : + "biConsumer was: " + biConsumer + " and handlerWhenComplete was: " + handlerWhenComplete; + if (CompletionStages.isCompletedSuccessfully(stage)) { + E result = CompletionStages.join(stage); + try { + if (handlerWhenComplete != null) { + return CompletableFuture.completedFuture(handlerWhenComplete.apply(result)); + } + biConsumer.accept(result, null); + } catch (Throwable t) { + return CompletableFutures.completedExceptionFuture(t); + } + return myStage; + } + if (biConsumer != null) { + // Note that this method is only ever invoked in the event loop, so this whenCompleteAsync can never complete + // until this request completes, meaning the thenApply will always be invoked in the event loop as well + return CompletionStages.handleAndComposeAsync(stage, (e, t) -> { + try { + biConsumer.accept(e, t); + return myStage; + } catch (Throwable innerT) { + if (t != null) { + innerT.addSuppressed(t); + } + return CompletableFutures.completedExceptionFuture(innerT); + } + }, ctx.channel().eventLoop()); + } + return stage.handle((value, t) -> { + if (t != null) { + // If exception, never use handler + return this; + } + return handlerWhenComplete.apply(value); + }); } static ByteBuf stringToByteBufWithExtra(CharSequence string, ByteBufAllocator allocator, int extraBytes) { diff --git a/server/resp/src/main/java/org/infinispan/server/resp/RespServer.java b/server/resp/src/main/java/org/infinispan/server/resp/RespServer.java index 68faea935f74..eeae8801c873 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/RespServer.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/RespServer.java @@ -49,7 +49,7 @@ protected void startInternal() { if (cacheManager.getCacheManagerConfiguration().isClustered()) { // We are running in clustered mode builder.clustering().cacheMode(CacheMode.REPL_SYNC); } - builder.encoding().key().mediaType(MediaType.TEXT_PLAIN_TYPE); + builder.encoding().key().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE); builder.encoding().value().mediaType(MediaType.APPLICATION_OCTET_STREAM_TYPE); } cacheManager.defineConfiguration(configuration.defaultCacheName(), builder.build()); diff --git a/server/resp/src/main/java/org/infinispan/server/resp/SubscriberHandler.java b/server/resp/src/main/java/org/infinispan/server/resp/SubscriberHandler.java index 327505020f7d..0546b29dbcd6 100644 --- a/server/resp/src/main/java/org/infinispan/server/resp/SubscriberHandler.java +++ b/server/resp/src/main/java/org/infinispan/server/resp/SubscriberHandler.java @@ -2,7 +2,9 @@ import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -11,19 +13,21 @@ import org.infinispan.commons.logging.LogFactory; import org.infinispan.commons.marshall.WrappedByteArray; +import org.infinispan.commons.util.concurrent.CompletableFutures; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; import org.infinispan.server.resp.logging.Log; -import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.util.concurrent.AggregateCompletionStage; +import org.infinispan.util.concurrent.CompletionStages; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; -public class SubscriberHandler implements RespRequestHandler { +public class SubscriberHandler extends RespRequestHandler { private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class); // Random bytes to keep listener keys separate from others public static final byte[] PREFIX_CHANNEL_BYTES = new byte[]{-114, 16, 78, -3, 127}; @@ -78,11 +82,17 @@ public CompletionStage onEvent(CacheEntryEvent entryEvent) Map specificChannelSubscribers = new HashMap<>(); @Override - public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, - List arguments) { + public void handleChannelDisconnect(ChannelHandlerContext ctx) { + removeAllListeners(); + } + + @Override + public CompletionStage handleRequest(ChannelHandlerContext ctx, String type, + List arguments) { switch (type) { case "SUBSCRIBE": + AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage(); for (byte[] keyChannel : arguments) { if (log.isTraceEnabled()) { log.tracef("Subscriber for channel: " + CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel))); @@ -92,83 +102,101 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, PubSubListener pubSubListener = new PubSubListener(ctx.channel()); specificChannelSubscribers.put(wrappedByteArray, pubSubListener); byte[] channel = keyToChannel(keyChannel); - respServer.getCache().addListenerAsync(pubSubListener, (key, prevValue, prevMetadata, value, metadata, eventType) -> - Arrays.equals(key, channel), null) - .whenComplete((ignore, t) -> { - if (t != null) { - log.exceptionWhileRegisteringListener(t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(channel))); - ctx.writeAndFlush("-ERR Failure adding client listener"); - } else { - ByteBuf subscribeBuffer = ctx.alloc().buffer(20 + (int) Math.log10(keyChannel.length) + 1 + keyChannel.length + 2); - subscribeBuffer.writeCharSequence("*2\r\n$9\r\nsubscribe\r\n$" + keyChannel.length + "\r\n", CharsetUtil.UTF_8); - subscribeBuffer.writeBytes(keyChannel); - subscribeBuffer.writeCharSequence("\r\n", CharsetUtil.UTF_8); - ctx.writeAndFlush(subscribeBuffer); - } - }); + CompletionStage stage = respServer.getCache().addListenerAsync(pubSubListener, + (key, prevValue, prevMetadata, value, metadata, eventType) -> Arrays.equals(key, channel), null); + aggregateCompletionStage.dependsOn(handleStageListenerError(stage, keyChannel, true)); } } - break; + return sendSubscriptions(ctx, aggregateCompletionStage.freeze(), arguments, true); case "UNSUBSCRIBE": + aggregateCompletionStage = CompletionStages.aggregateCompletionStage(); if (arguments.size() == 0) { - for (Iterator> iterator = specificChannelSubscribers.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry entry = iterator.next(); - PubSubListener listener = entry.getValue(); - respServer.getCache().removeListenerAsync(listener); - iterator.remove(); - sendUnsubscribe(ctx, entry.getKey().getBytes()); - } + return unsubscribeAll(ctx); } else { for (byte[] keyChannel : arguments) { WrappedByteArray wrappedByteArray = new WrappedByteArray(keyChannel); PubSubListener listener = specificChannelSubscribers.remove(wrappedByteArray); if (listener != null) { - respServer.getCache().removeListenerAsync(listener) - .whenComplete((ignore, t) -> { - if (t != null) { - log.exceptionWhileRemovingListener(t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel))); - ctx.writeAndFlush("-ERR Failure unsubscribing client listener"); - } else { - sendUnsubscribe(ctx, keyChannel); - } - }); - } else { - sendUnsubscribe(ctx, keyChannel); + aggregateCompletionStage.dependsOn(handleStageListenerError(respServer.getCache().removeListenerAsync(listener), keyChannel, false)); } } } - break; + return sendSubscriptions(ctx, aggregateCompletionStage.freeze(), arguments, false); case "PING": - // Note we don't return the handler and just use it to handle the ping + // Note we don't return the handler and just use it to handle the ping - we assume stage is always complete handler.handleRequest(ctx, type, arguments); break; case "RESET": - for (Iterator> iterator = specificChannelSubscribers.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry entry = iterator.next(); - PubSubListener listener = entry.getValue(); - respServer.getCache().removeListenerAsync(listener); - iterator.remove(); - sendUnsubscribe(ctx, entry.getKey().getBytes()); - } - return handler.handleRequest(ctx, type, arguments); case "QUIT": - ctx.close(); - break; + removeAllListeners(); + return handler.handleRequest(ctx, type, arguments); case "PSUBSCRIBE": case "PUNSUBSCRIBE": ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR not implemented yet\r\n", ctx.alloc())); break; default: - return RespRequestHandler.super.handleRequest(ctx, type, arguments); + return super.handleRequest(ctx, type, arguments); + } + return myStage; + } + + private CompletionStage handleStageListenerError(CompletionStage stage, byte[] keyChannel, boolean subscribeOrUnsubscribe) { + return stage.whenComplete((__, t) -> { + if (t != null) { + if (subscribeOrUnsubscribe) { + log.exceptionWhileRegisteringListener(t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel))); + } else { + log.exceptionWhileRemovingListener(t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel))); + } + } + }); + } + + private void removeAllListeners() { + for (Iterator> iterator = specificChannelSubscribers.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + PubSubListener listener = entry.getValue(); + respServer.getCache().removeListenerAsync(listener); + iterator.remove(); + } + } + + private CompletionStage unsubscribeAll(ChannelHandlerContext ctx) { + var aggregateCompletionStage = CompletionStages.aggregateCompletionStage(); + List channels = new ArrayList<>(specificChannelSubscribers.size()); + for (Iterator> iterator = specificChannelSubscribers.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + PubSubListener listener = entry.getValue(); + CompletionStage stage = respServer.getCache().removeListenerAsync(listener); + byte[] keyChannel = entry.getKey().getBytes(); + channels.add(keyChannel); + aggregateCompletionStage.dependsOn(handleStageListenerError(stage, keyChannel, false)); + iterator.remove(); } - return this; + return sendSubscriptions(ctx, aggregateCompletionStage.freeze(), channels, false); } - private void sendUnsubscribe(ChannelHandlerContext ctx, byte[] keyChannel) { - ByteBuf subscribeBuffer = ctx.alloc().buffer(22 + (int) Math.log10(keyChannel.length) + 1 + keyChannel.length + 2); - subscribeBuffer.writeCharSequence("*2\r\n$11\r\nunsubscribe\r\n$" + keyChannel.length + "\r\n", CharsetUtil.UTF_8); - subscribeBuffer.writeBytes(keyChannel); - subscribeBuffer.writeCharSequence("\r\n", CharsetUtil.UTF_8); - ctx.writeAndFlush(subscribeBuffer); + private CompletionStage sendSubscriptions(ChannelHandlerContext ctx, CompletionStage stageToWaitFor, + Collection keyChannels, boolean subscribeOrUnsubscribe) { + return stageToReturn(stageToWaitFor, ctx, (__, t) -> { + if (t != null) { + if (subscribeOrUnsubscribe) { + ctx.writeAndFlush("-ERR Failure adding client listener"); + } else { + ctx.writeAndFlush("-ERR Failure unsubscribing client listener"); + } + return; + } + for (byte[] keyChannel : keyChannels) { + int bufferCap = subscribeOrUnsubscribe ? 20 : 22; + String initialCharSeq = subscribeOrUnsubscribe ? "*2\r\n$9\r\nsubscribe\r\n$" : "*2\r\n$11\r\nunsubscribe\r\n$"; + + ByteBuf subscribeBuffer = ctx.alloc().buffer(bufferCap + (int) Math.log10(keyChannel.length) + 1 + keyChannel.length + 2); + subscribeBuffer.writeCharSequence(initialCharSeq + keyChannel.length + "\r\n", CharsetUtil.UTF_8); + subscribeBuffer.writeBytes(keyChannel); + subscribeBuffer.writeCharSequence("\r\n", CharsetUtil.UTF_8); + ctx.writeAndFlush(subscribeBuffer); + } + }); } } diff --git a/server/resp/src/test/java/org/infinispan/server/resp/RespSingleNodeTest.java b/server/resp/src/test/java/org/infinispan/server/resp/RespSingleNodeTest.java index b7b370acdca1..d32960b6415a 100644 --- a/server/resp/src/test/java/org/infinispan/server/resp/RespSingleNodeTest.java +++ b/server/resp/src/test/java/org/infinispan/server/resp/RespSingleNodeTest.java @@ -7,6 +7,7 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; import java.util.ArrayList; @@ -144,16 +145,19 @@ private BlockingQueue addPubSubListener(RedisPubSubCommands() { @Override public void message(String channel, String message) { + log.tracef("Received message on channel %s of %s", channel, message); handOffQueue.add("message-" + channel + "-" + message); } @Override public void subscribed(String channel, long count) { + log.tracef("Subscribed to %s with %s", channel, count); handOffQueue.add("subscribed-" + channel + "-" + count); } @Override public void unsubscribed(String channel, long count) { + log.tracef("Unsubscribed to %s with %s", channel, count); handOffQueue.add("unsubscribed-" + channel + "-" + count); } }); @@ -164,11 +168,13 @@ public void unsubscribed(String channel, long count) { @DataProvider(name = "booleans") Object[][] booleans() { // Reset disabled for now as the client isn't sending a reset command to the server - return new Object[][]{/*{true},*/ {false}}; + return new Object[][]{{true}, {false}}; } @Test(dataProvider = "booleans") - public void testPubSubUnsubscribe(boolean reset) throws InterruptedException { + public void testPubSubUnsubscribe(boolean quit) throws InterruptedException { + int listenersBefore = cache.getAdvancedCache().getListeners().size(); + RedisPubSubCommands connection = createPubSubConnection(); BlockingQueue handOffQueue = addPubSubListener(connection); @@ -179,20 +185,32 @@ public void testPubSubUnsubscribe(boolean reset) throws InterruptedException { value = handOffQueue.poll(10, TimeUnit.SECONDS); assertEquals("subscribed-test-0", value); + + // 2 listeners, one for each sub above + assertEquals(listenersBefore + 2, cache.getAdvancedCache().getListeners().size()); // Unsubscribe to all channels - if (reset) { - connection.reset(); + if (quit) { + // Originally wanted to use reset or quit, but they don't do what we expect from lettuce + connection.getStatefulConnection().close(); + + // Have to use eventually as they are removed asynchronously + eventuallyEquals(listenersBefore, () -> cache.getAdvancedCache().getListeners().size()); + + assertTrue(handOffQueue.isEmpty()); } else { connection.unsubscribe(); - } - // Unsubscribed channels can be in different orders - for (int i = 0; i < 2; ++i) { - value = handOffQueue.poll(10, TimeUnit.SECONDS); - assertNotNull("Didn't receive any notifications", value); - if (!value.equals("unsubscribed-channel2-0") && !value.equals("unsubscribed-test-0")) { - fail("Notification doesn't match expected, was: " + value); + // Unsubscribed channels can be in different orders + for (int i = 0; i < 2; ++i) { + value = handOffQueue.poll(10, TimeUnit.SECONDS); + assertNotNull("Didn't receive any notifications", value); + if (!value.equals("unsubscribed-channel2-0") && !value.equals("unsubscribed-test-0")) { + fail("Notification doesn't match expected, was: " + value); + } } + + assertEquals(listenersBefore, cache.getAdvancedCache().getListeners().size()); + assertEquals("PONG", connection.ping()); } } diff --git a/server/resp/src/test/java/org/infinispan/server/resp/RespTwoNodeTest.java b/server/resp/src/test/java/org/infinispan/server/resp/RespTwoNodeTest.java new file mode 100644 index 000000000000..e940df1ed826 --- /dev/null +++ b/server/resp/src/test/java/org/infinispan/server/resp/RespTwoNodeTest.java @@ -0,0 +1,106 @@ +package org.infinispan.server.resp; + +import static org.infinispan.server.resp.test.RespTestingUtil.createClient; +import static org.infinispan.server.resp.test.RespTestingUtil.killClient; +import static org.infinispan.server.resp.test.RespTestingUtil.killServer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.infinispan.Cache; +import org.infinispan.commons.test.TestResourceTracker; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.distribution.DistributionTestHelper; +import org.infinispan.interceptors.locking.ClusteringDependentLogic; +import org.infinispan.server.resp.configuration.RespServerConfigurationBuilder; +import org.infinispan.server.resp.test.RespTestingUtil; +import org.infinispan.test.Mocks; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; +import org.infinispan.test.fwk.CheckPoint; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; + +@Test(groups = "functional", testName = "server.resp.RespTwoNodeTest") +public class RespTwoNodeTest extends MultipleCacheManagersTest { + protected RedisClient client1; + protected RespServer server1; + protected RespServer server2; + protected StatefulRedisConnection redisConnection1; + protected static final int timeout = 60; + + @Override + protected void createCacheManagers() { + ConfigurationBuilder cacheBuilder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); + createCluster(cacheBuilder, 2); + waitForClusterToForm(); + + server1 = RespTestingUtil.startServer(cacheManagers.get(0), serverConfiguration(0).build()); + server2 = RespTestingUtil.startServer(cacheManagers.get(1), serverConfiguration(1).build()); + client1 = createClient(30000, server1.getPort()); + redisConnection1 = client1.connect(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void destroy() { + log.debug("Test finished, close resp server"); + killClient(client1); + killServer(server1); + killServer(server2); + super.destroy(); + } + + protected RespServerConfigurationBuilder serverConfiguration(int offset) { + String serverName = TestResourceTracker.getCurrentTestShortName(); + return new RespServerConfigurationBuilder().name(serverName) + .host(RespTestingUtil.HOST) + .port(RespTestingUtil.port() + offset); + } + + public void testConcurrentOperations() throws ExecutionException, InterruptedException, TimeoutException { + CheckPoint checkPoint = new CheckPoint(); + checkPoint.triggerForever(Mocks.AFTER_RELEASE); + + String blockedKey = "foo"; + // We block the non owner, so we know the primary owner of the data's netty thread isn't blocked on accident + Cache nonOwner = DistributionTestHelper.getFirstBackupOwner(blockedKey, caches(server1.getConfiguration().defaultCacheName())); + + var original = Mocks.blockingMock(checkPoint, ClusteringDependentLogic.class, nonOwner, (stubber, clusteringDependentLogic) -> + stubber.when(clusteringDependentLogic).commitEntry(any(), any(), any(), any(), anyBoolean())); + RedisAsyncCommands redis = redisConnection1.async(); + try { + RedisFuture futureSet = redis.set(blockedKey, "bar"); + + checkPoint.awaitStrict(Mocks.BEFORE_INVOCATION, 10, TimeUnit.SECONDS); + + RedisFuture futurePing = redis.ping(); + RedisFuture> futureCommand = redis.command(); + + checkPoint.triggerForever(Mocks.BEFORE_RELEASE); + + String getResponse = futurePing.get(10, TimeUnit.SECONDS); + assertEquals("OK", futureSet.get(10, TimeUnit.SECONDS)); + assertEquals("PONG", getResponse); + List results = futureCommand.get(10, TimeUnit.SECONDS); + assertTrue("Results were: " + results, results.size() > 10); + } finally { + TestingUtil.replaceComponent(nonOwner, ClusteringDependentLogic.class, original, true); + } + + RedisFuture getFuture = redis.get(blockedKey); + assertEquals("bar", getFuture.get(10, TimeUnit.SECONDS)); + } +}