Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14.0.x] ISPN-14242 RESP server can send results in incorrect order #10409

Merged
merged 3 commits into from Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -145,8 +146,13 @@ public static <T, U> CompletionStage<U> handleAndCompose(CompletionStage<T> stag
return stage.handle(handleFunction).thenCompose(Function.identity());
}

public static <T, U> CompletionStage<U> handleAndComposeAsync(CompletionStage<T> stage,
BiFunction<T, Throwable, CompletionStage<U>> handleFunction, Executor executor) {
return stage.handleAsync(handleFunction, executor).thenCompose(Function.identity());
}

public static CompletionStage<Void> schedule(Runnable command, ScheduledExecutorService executor,
long delay, TimeUnit timeUnit) {
long delay, TimeUnit timeUnit) {
CompletableFuture<Void> future = new CompletableFuture<>();
executor.schedule(() -> {
try {
Expand Down
Expand Up @@ -107,6 +107,15 @@ public static <K, V> Cache<K, V> getFirstOwner(Object key, List<Cache<K, V>> cac
return getOwners(key, caches).iterator().next();
}

public static <K, V> Cache<K, V> getFirstBackupOwner(Object key, List<Cache<K, V>> caches) {
for (Cache<K, V> c : caches) {
if (isOwner(c, key) && !isFirstOwner(c, key)) {
return c;
}
}
return null;
}

public static <K, V> Collection<Cache<K, V>> getNonOwners(Object key, List<Cache<K, V>> caches) {
List<Cache<K, V>> nonOwners = new ArrayList<>();
for (Cache<K, V> c : caches)
Expand Down
5 changes: 5 additions & 0 deletions server/resp/pom.xml
Expand Up @@ -41,6 +41,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
@@ -1,22 +1,21 @@
package org.infinispan.server.resp;

import static org.infinispan.server.resp.Resp3Handler.handleThrowable;
import static org.infinispan.server.resp.Resp3Handler.statusOK;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import javax.security.auth.Subject;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.Version;
import org.infinispan.commons.util.concurrent.CompletableFutures;

public class Resp3AuthHandler implements RespRequestHandler {
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;

public class Resp3AuthHandler extends RespRequestHandler {

protected final RespServer respServer;
protected AdvancedCache<byte[], byte[]> cache;
Expand All @@ -27,8 +26,8 @@ public Resp3AuthHandler(RespServer server) {
}

@Override
public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
boolean success = false;
public CompletionStage<RespRequestHandler> handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
CompletionStage<Boolean> successStage = null;
switch (type) {
case "HELLO":
byte[] respProtocolBytes = arguments.get(0);
Expand All @@ -39,53 +38,52 @@ public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type,
}

if (arguments.size() == 4) {
success = performAuth(ctx, arguments.get(2), arguments.get(3));
successStage = performAuth(ctx, arguments.get(2), arguments.get(3));
} else {
helloResponse(ctx);
}
break;
case "AUTH":
success = performAuth(ctx, arguments.get(0), arguments.get(1));
successStage = performAuth(ctx, arguments.get(0), arguments.get(1));
break;
case "QUIT":
// TODO: need to close connection
ctx.flush();
ctx.close();
break;
default:
if (isAuthorized()) RespRequestHandler.super.handleRequest(ctx, type, arguments);
if (isAuthorized()) super.handleRequest(ctx, type, arguments);
else handleUnauthorized(ctx);
}

return success ? respServer.newHandler() : this;
if (successStage != null) {
return stageToReturn(successStage, ctx,
auth -> auth ? respServer.newHandler() : this);
}

return myStage;
}

private boolean performAuth(ChannelHandlerContext ctx, byte[] username, byte[] password) {
private CompletionStage<Boolean> performAuth(ChannelHandlerContext ctx, byte[] username, byte[] password) {
return performAuth(ctx, new String(username, StandardCharsets.UTF_8), new String(password, StandardCharsets.UTF_8));
}

private boolean performAuth(ChannelHandlerContext ctx, String username, String password) {
private CompletionStage<Boolean> performAuth(ChannelHandlerContext ctx, String username, String password) {
Authenticator authenticator = respServer.getConfiguration().authentication().authenticator();
if (authenticator == null) {
return handleAuthResponse(ctx, null);
return CompletableFutures.booleanStage(handleAuthResponse(ctx, null));
}
CompletionStage<Boolean> cs = authenticator.authenticate(username, password.toCharArray())
.thenApply(r -> handleAuthResponse(ctx, r))
return authenticator.authenticate(username, password.toCharArray())
// Note we have to write to our variables in the event loop (in this case cache)
.thenApplyAsync(r -> handleAuthResponse(ctx, r), ctx.channel().eventLoop())
.exceptionally(t -> {
handleUnauthorized(ctx);
return false;
});
try {
return CompletableFutures.await(cs.toCompletableFuture());
} catch (ExecutionException | InterruptedException e) {
handleThrowable(ctx, e);
}

return false;
}

private boolean handleAuthResponse(ChannelHandlerContext ctx, Subject subject) {
assert ctx.channel().eventLoop().inEventLoop();
if (subject == null) {
ctx.writeAndFlush(ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR Client sent AUTH, but no password is set\r\n", ctx.alloc())));
ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR Client sent AUTH, but no password is set\r\n", ctx.alloc()));
return false;
}

Expand All @@ -95,6 +93,7 @@ private boolean handleAuthResponse(ChannelHandlerContext ctx, Subject subject) {
}

private void handleUnauthorized(ChannelHandlerContext ctx) {
assert ctx.channel().eventLoop().inEventLoop();
ctx.writeAndFlush(RespRequestHandler.stringToByteBuf("-WRONGPASS invalid username-password pair or user is disabled.\r\n", ctx.alloc()));
}

Expand Down