Skip to content

Commit

Permalink
Fix client call close race condition message leak
Browse files Browse the repository at this point in the history
As reported in grpc#7105. Not sure if this is how you want it done, but it does fix the problem.

Fixes grpc#7105
Fixes grpc#3557
  • Loading branch information
njhill committed Jun 9, 2020
1 parent c777e08 commit 78c7a4d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 13 deletions.
Expand Up @@ -311,7 +311,10 @@ protected final boolean isOutboundClosed() {
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata headers) {
checkState(!statusReported, "Received headers on closed stream");
// checkState(!statusReported, "Received headers on closed stream");
if (statusReported) {
return; // possible due to call close race
}
statsTraceCtx.clientInboundHeaders();

boolean compressedStream = false;
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Expand Up @@ -53,6 +53,7 @@
import java.nio.charset.Charset;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -402,7 +403,11 @@ public void run() {
new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
TimeUnit.NANOSECONDS);
executeCloseObserverInContext(observer, status);
try {
executeCloseObserverInContext(observer, status);
} catch (RejectedExecutionException ree) {
// call close race
}
}

private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
Expand Down Expand Up @@ -620,6 +625,8 @@ private void runInternal() {

try {
callExecutor.execute(new HeadersRead());
} catch (RejectedExecutionException ree) {
// call close race
} finally {
PerfMark.stopTask("ClientStreamListener.headersRead", tag);
}
Expand Down Expand Up @@ -674,6 +681,8 @@ private void runInternal() {

try {
callExecutor.execute(new MessagesAvailable());
} catch (RejectedExecutionException ree) {
GrpcUtil.closeQuietly(producer); // call close race - don't leak messages
} finally {
PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag);
}
Expand Down Expand Up @@ -751,8 +760,11 @@ private void runInternal() {
close(savedStatus, savedTrailers);
}
}

callExecutor.execute(new StreamClosed());
try {
callExecutor.execute(new StreamClosed());
} catch (RejectedExecutionException ree) {
// call close race
}
}

@Override
Expand Down Expand Up @@ -794,6 +806,8 @@ private void runInternal() {

try {
callExecutor.execute(new StreamOnReady());
} catch (RejectedExecutionException ree) {
// call close race
} finally {
PerfMark.stopTask("ClientStreamListener.onReady", tag);
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -294,13 +295,17 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
try {
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
toRemove.add(stream);
toRemove.add(stream);
} catch (RejectedExecutionException ree) {
// call closed race
}
} // else: stay pending
}

Expand Down
Expand Up @@ -201,7 +201,8 @@ public void inboundHeadersReceived_notifiesListener() {
verify(mockListener).headersRead(headers);
}

@Test
//@Test
// No longer applicable due to deadline/close race condition
public void inboundHeadersReceived_failsIfStatusReported() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
Expand Down
39 changes: 33 additions & 6 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -147,6 +148,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
executor.shutdown();
if (interrupt) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -607,6 +609,9 @@ private Object waitForNext() {
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this) {
threadless.shutdown();
}
return next;
}
} finally {
Expand Down Expand Up @@ -690,6 +695,8 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());

private static final Thread SHUTDOWN = new Thread(); // sentinel

private volatile Thread waiter;

// Non private to avoid synthetic class
Expand All @@ -714,14 +721,29 @@ public void waitAndDrain() throws InterruptedException {
}
}
do {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
runQuietly(runnable);
} while ((runnable = poll()) != null);
}

/**
* Called after final call to {@link #waitAndDrain()}, from same thread
*/
public void shutdown() {
waiter = SHUTDOWN;
Runnable runnable;
while ((runnable = poll()) != null) {
runQuietly(runnable);
}
}

private static void runQuietly(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
}

private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -731,7 +753,12 @@ private static void throwIfInterrupted() throws InterruptedException {
@Override
public void execute(Runnable runnable) {
add(runnable);
LockSupport.unpark(waiter); // no-op if null
Thread waiter = this.waiter;
if (waiter != SHUTDOWN) {
LockSupport.unpark(waiter); // no-op if null
} else if (remove(runnable)) {
throw new RejectedExecutionException();
}
}
}

Expand Down

0 comments on commit 78c7a4d

Please sign in to comment.