From 78c7a4d0486341f8092f9ad35ac4438b37f7c519 Mon Sep 17 00:00:00 2001 From: nickhill Date: Tue, 9 Jun 2020 08:41:29 -0700 Subject: [PATCH 1/4] Fix client call close race condition message leak As reported in #7105. Not sure if this is how you want it done, but it does fix the problem. Fixes #7105 Fixes #3557 --- .../grpc/internal/AbstractClientStream.java | 5 ++- .../java/io/grpc/internal/ClientCallImpl.java | 20 ++++++++-- .../grpc/internal/DelayedClientTransport.java | 9 ++++- .../internal/AbstractClientStreamTest.java | 3 +- .../main/java/io/grpc/stub/ClientCalls.java | 39 ++++++++++++++++--- 5 files changed, 63 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 79a775c398f..fe455ae24ad 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -311,7 +311,10 @@ protected final boolean isOutboundClosed() { * @param headers the parsed headers */ protected void inboundHeadersReceived(Metadata headers) { - checkState(!statusReported, "Received headers on closed stream"); + // checkState(!statusReported, "Received headers on closed stream"); + if (statusReported) { + return; // possible due to call close race + } statsTraceCtx.clientInboundHeaders(); boolean compressedStream = false; diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 436b1f41e33..d37764b2a76 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -53,6 +53,7 @@ import java.nio.charset.Charset; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -402,7 +403,11 @@ public void run() { new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()), DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS, TimeUnit.NANOSECONDS); - executeCloseObserverInContext(observer, status); + try { + executeCloseObserverInContext(observer, status); + } catch (RejectedExecutionException ree) { + // call close race + } } private void executeCloseObserverInContext(final Listener observer, final Status status) { @@ -620,6 +625,8 @@ private void runInternal() { try { callExecutor.execute(new HeadersRead()); + } catch (RejectedExecutionException ree) { + // call close race } finally { PerfMark.stopTask("ClientStreamListener.headersRead", tag); } @@ -674,6 +681,8 @@ private void runInternal() { try { callExecutor.execute(new MessagesAvailable()); + } catch (RejectedExecutionException ree) { + GrpcUtil.closeQuietly(producer); // call close race - don't leak messages } finally { PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag); } @@ -751,8 +760,11 @@ private void runInternal() { close(savedStatus, savedTrailers); } } - - callExecutor.execute(new StreamClosed()); + try { + callExecutor.execute(new StreamClosed()); + } catch (RejectedExecutionException ree) { + // call close race + } } @Override @@ -794,6 +806,8 @@ private void runInternal() { try { callExecutor.execute(new StreamOnReady()); + } catch (RejectedExecutionException ree) { + // call close race } finally { PerfMark.stopTask("ClientStreamListener.onReady", tag); } diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 3922ee5b89e..91f18b010bb 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -294,13 +295,17 @@ final void reprocess(@Nullable SubchannelPicker picker) { if (callOptions.getExecutor() != null) { executor = callOptions.getExecutor(); } - executor.execute(new Runnable() { + try { + executor.execute(new Runnable() { @Override public void run() { stream.createRealStream(transport); } }); - toRemove.add(stream); + toRemove.add(stream); + } catch (RejectedExecutionException ree) { + // call closed race + } } // else: stay pending } diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 7ce6b421167..6bd60e18186 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -201,7 +201,8 @@ public void inboundHeadersReceived_notifiesListener() { verify(mockListener).headersRead(headers); } - @Test + //@Test + // No longer applicable due to deadline/close race condition public void inboundHeadersReceived_failsIfStatusReported() { AbstractClientStream stream = new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index aa507bbd009..b363eea1742 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.LockSupport; import java.util.logging.Level; import java.util.logging.Logger; @@ -147,6 +148,7 @@ public static RespT blockingUnaryCall( // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). throw cancelThrow(call, e); } finally { + executor.shutdown(); if (interrupt) { Thread.currentThread().interrupt(); } @@ -607,6 +609,9 @@ private Object waitForNext() { // Now wait for onClose() to be called, so interceptors can clean up } } + if (next == this) { + threadless.shutdown(); + } return next; } } finally { @@ -690,6 +695,8 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue Date: Tue, 9 Jun 2020 09:09:00 -0700 Subject: [PATCH 2/4] Fix ignored unit test issue --- .../test/java/io/grpc/internal/AbstractClientStreamTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 6bd60e18186..748f9158e9c 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -54,6 +54,7 @@ import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -201,8 +202,8 @@ public void inboundHeadersReceived_notifiesListener() { verify(mockListener).headersRead(headers); } - //@Test - // No longer applicable due to deadline/close race condition + @Test + @Ignore // No longer applicable due to deadline/close race condition public void inboundHeadersReceived_failsIfStatusReported() { AbstractClientStream stream = new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); From c6ebf4fcd16fa2bea4237a19f988b10c155c53cb Mon Sep 17 00:00:00 2001 From: nickhill Date: Tue, 9 Jun 2020 09:54:16 -0700 Subject: [PATCH 3/4] Fix checkstyle error --- core/src/main/java/io/grpc/internal/ClientCallImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index d37764b2a76..1ffbfdb96f4 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -760,6 +760,7 @@ private void runInternal() { close(savedStatus, savedTrailers); } } + try { callExecutor.execute(new StreamClosed()); } catch (RejectedExecutionException ree) { From f0fcd97e192597e3fc3d21260f354fca71658096 Mon Sep 17 00:00:00 2001 From: nickhill Date: Tue, 9 Jun 2020 11:04:51 -0700 Subject: [PATCH 4/4] Fix another checkstyle error :-/ --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index b363eea1742..4833a64a895 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -726,7 +726,7 @@ public void waitAndDrain() throws InterruptedException { } /** - * Called after final call to {@link #waitAndDrain()}, from same thread + * Called after final call to {@link #waitAndDrain()}, from same thread. */ public void shutdown() { waiter = SHUTDOWN;