Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix client call close race condition message leak #7106

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -311,7 +311,10 @@ protected final boolean isOutboundClosed() {
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata headers) {
checkState(!statusReported, "Received headers on closed stream");
// checkState(!statusReported, "Received headers on closed stream");
if (statusReported) {
return; // possible due to call close race
}
statsTraceCtx.clientInboundHeaders();

boolean compressedStream = false;
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Expand Up @@ -53,6 +53,7 @@
import java.nio.charset.Charset;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -402,7 +403,11 @@ public void run() {
new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
TimeUnit.NANOSECONDS);
executeCloseObserverInContext(observer, status);
try {
executeCloseObserverInContext(observer, status);
} catch (RejectedExecutionException ree) {
// call close race
}
}

private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
Expand Down Expand Up @@ -620,6 +625,8 @@ private void runInternal() {

try {
callExecutor.execute(new HeadersRead());
} catch (RejectedExecutionException ree) {
// call close race
} finally {
PerfMark.stopTask("ClientStreamListener.headersRead", tag);
}
Expand Down Expand Up @@ -674,6 +681,8 @@ private void runInternal() {

try {
callExecutor.execute(new MessagesAvailable());
} catch (RejectedExecutionException ree) {
GrpcUtil.closeQuietly(producer); // call close race - don't leak messages
} finally {
PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag);
}
Expand Down Expand Up @@ -752,7 +761,11 @@ private void runInternal() {
}
}

callExecutor.execute(new StreamClosed());
try {
callExecutor.execute(new StreamClosed());
} catch (RejectedExecutionException ree) {
// call close race
}
}

@Override
Expand Down Expand Up @@ -794,6 +807,8 @@ private void runInternal() {

try {
callExecutor.execute(new StreamOnReady());
} catch (RejectedExecutionException ree) {
// call close race
} finally {
PerfMark.stopTask("ClientStreamListener.onReady", tag);
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -294,13 +295,17 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
try {
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
toRemove.add(stream);
toRemove.add(stream);
} catch (RejectedExecutionException ree) {
// call closed race
}
} // else: stay pending
}

Expand Down
Expand Up @@ -54,6 +54,7 @@
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -202,6 +203,7 @@ public void inboundHeadersReceived_notifiesListener() {
}

@Test
@Ignore // No longer applicable due to deadline/close race condition
public void inboundHeadersReceived_failsIfStatusReported() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
Expand Down
39 changes: 33 additions & 6 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -147,6 +148,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
executor.shutdown();
if (interrupt) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -607,6 +609,9 @@ private Object waitForNext() {
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this) {
threadless.shutdown();
}
return next;
}
} finally {
Expand Down Expand Up @@ -690,6 +695,8 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());

private static final Thread SHUTDOWN = new Thread(); // sentinel

private volatile Thread waiter;

// Non private to avoid synthetic class
Expand All @@ -714,14 +721,29 @@ public void waitAndDrain() throws InterruptedException {
}
}
do {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
runQuietly(runnable);
} while ((runnable = poll()) != null);
}

/**
* Called after final call to {@link #waitAndDrain()}, from same thread.
*/
public void shutdown() {
waiter = SHUTDOWN;
Runnable runnable;
while ((runnable = poll()) != null) {
runQuietly(runnable);
}
}

private static void runQuietly(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
}

private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -731,7 +753,12 @@ private static void throwIfInterrupted() throws InterruptedException {
@Override
public void execute(Runnable runnable) {
add(runnable);
LockSupport.unpark(waiter); // no-op if null
Thread waiter = this.waiter;
if (waiter != SHUTDOWN) {
LockSupport.unpark(waiter); // no-op if null
} else if (remove(runnable)) {
throw new RejectedExecutionException();
}
}
}

Expand Down