diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index db01fad6b3a..3c3a2227780 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -71,13 +71,6 @@ final class ClientCallImpl extends ClientCall { private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName()); private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS = "gzip".getBytes(Charset.forName("US-ASCII")); - // When a deadline is exceeded, there is a race between the server receiving the cancellation from - // the client and the server cancelling the stream itself. If the client's cancellation is - // received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED. - // This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common - // monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation. - @VisibleForTesting - static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1); private final MethodDescriptor method; private final Tag tag; @@ -85,6 +78,7 @@ final class ClientCallImpl extends ClientCall { private final boolean callExecutorIsDirect; private final CallTracer channelCallsTracer; private final Context context; + private volatile ScheduledFuture deadlineCancellationFuture; private final boolean unaryRequest; private CallOptions callOptions; private ClientStream stream; @@ -92,16 +86,14 @@ final class ClientCallImpl extends ClientCall { private boolean cancelCalled; private boolean halfCloseCalled; private final ClientStreamProvider clientStreamProvider; - private ContextCancellationListener cancellationListener; + private final ContextCancellationListener cancellationListener = + new ContextCancellationListener(); private final ScheduledExecutorService deadlineCancellationExecutor; @Nullable private final InternalConfigSelector configSelector; private boolean fullStreamDecompression; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); - private volatile ScheduledFuture deadlineCancellationNotifyApplicationFuture; - private volatile ScheduledFuture deadlineCancellationSendToServerFuture; - private boolean observerClosed = false; ClientCallImpl( MethodDescriptor method, Executor executor, CallOptions callOptions, @@ -135,20 +127,9 @@ final class ClientCallImpl extends ClientCall { } private final class ContextCancellationListener implements CancellationListener { - private Listener observer; - - private ContextCancellationListener(Listener observer) { - this.observer = observer; - } - @Override public void cancelled(Context context) { - if (context.getDeadline() == null || !context.getDeadline().isExpired()) { - stream.cancel(statusFromCancelled(context)); - } else { - Status status = statusFromCancelled(context); - delayedCancelOnDeadlineExceeded(status, observer); - } + stream.cancel(statusFromCancelled(context)); } } @@ -223,7 +204,19 @@ private void startInternal(Listener observer, Metadata headers) { // Context is already cancelled so no need to create a real stream, just notify the observer // of cancellation via callback on the executor stream = NoopClientStream.INSTANCE; - executeCloseObserverInContext(observer, statusFromCancelled(context)); + final Listener finalObserver = observer; + class ClosedByContext extends ContextRunnable { + ClosedByContext() { + super(context); + } + + @Override + public void runInContext() { + closeObserver(finalObserver, statusFromCancelled(context), new Metadata()); + } + } + + callExecutor.execute(new ClosedByContext()); return; } @@ -251,9 +244,23 @@ private void startInternal(Listener observer, Metadata headers) { compressor = compressorRegistry.lookupCompressor(compressorName); if (compressor == null) { stream = NoopClientStream.INSTANCE; - Status status = Status.INTERNAL.withDescription( - String.format("Unable to find compressor by name %s", compressorName)); - executeCloseObserverInContext(observer, status); + final Listener finalObserver = observer; + class ClosedByNotFoundCompressor extends ContextRunnable { + ClosedByNotFoundCompressor() { + super(context); + } + + @Override + public void runInContext() { + closeObserver( + finalObserver, + Status.INTERNAL.withDescription( + String.format("Unable to find compressor by name %s", compressorName)), + new Metadata()); + } + } + + callExecutor.execute(new ClosedByNotFoundCompressor()); return; } } else { @@ -294,7 +301,6 @@ private void startInternal(Listener observer, Metadata headers) { } stream.setDecompressorRegistry(decompressorRegistry); channelCallsTracer.reportCallStarted(); - cancellationListener = new ContextCancellationListener(observer); stream.start(new ClientStreamListenerImpl(observer)); // Delay any sources of cancellation after start(), because most of the transports are broken if @@ -306,11 +312,8 @@ private void startInternal(Listener observer, Metadata headers) { // If the context has the effective deadline, we don't need to schedule an extra task. && !effectiveDeadline.equals(context.getDeadline()) // If the channel has been terminated, we don't need to schedule an extra task. - && deadlineCancellationExecutor != null - // if already expired deadline let failing stream handle - && !(stream instanceof FailingClientStream)) { - deadlineCancellationNotifyApplicationFuture = - startDeadlineNotifyApplicationTimer(effectiveDeadline, observer); + && deadlineCancellationExecutor != null) { + deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); } if (cancelListenersShouldBeRemoved) { // Race detected! ClientStreamListener.closed may have been called before @@ -410,76 +413,46 @@ private static void logIfContextNarrowedTimeout( private void removeContextListenerAndCancelDeadlineFuture() { context.removeListener(cancellationListener); - ScheduledFuture f = deadlineCancellationSendToServerFuture; - if (f != null) { - f.cancel(false); - } - - f = deadlineCancellationNotifyApplicationFuture; + ScheduledFuture f = deadlineCancellationFuture; if (f != null) { f.cancel(false); } } - private ScheduledFuture startDeadlineNotifyApplicationTimer(Deadline deadline, - final Listener observer) { - final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); - - class DeadlineExceededNotifyApplicationTimer implements Runnable { - @Override - public void run() { - Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos); - delayedCancelOnDeadlineExceeded(status, observer); - } - } - - return deadlineCancellationExecutor.schedule( - new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()), - remainingNanos, - TimeUnit.NANOSECONDS); - } - - private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) { - final InsightBuilder insight = new InsightBuilder(); - stream.appendTimeoutInsight(insight); + private class DeadlineTimer implements Runnable { + private final long remainingNanos; - long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); - long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); - - StringBuilder buf = new StringBuilder(); - buf.append("deadline exceeded after "); - if (remainingNanos < 0) { - buf.append('-'); - } - buf.append(seconds); - buf.append(String.format(".%09d", nanos)); - buf.append("s. "); - buf.append(insight); - - return DEADLINE_EXCEEDED.augmentDescription(buf.toString()); - } - - private void delayedCancelOnDeadlineExceeded(final Status status, Listener observer) { - if (deadlineCancellationSendToServerFuture != null) { - return; + DeadlineTimer(long remainingNanos) { + this.remainingNanos = remainingNanos; } - class DeadlineExceededSendCancelToServerTimer implements Runnable { - @Override - public void run() { - // DelayedStream.cancel() is safe to call from a thread that is different from where the - // stream is created. - stream.cancel(status); + @Override + public void run() { + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + // DelayedStream.cancel() is safe to call from a thread that is different from where the + // stream is created. + long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); + long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); + + StringBuilder buf = new StringBuilder(); + buf.append("deadline exceeded after "); + if (remainingNanos < 0) { + buf.append('-'); } + buf.append(seconds); + buf.append(String.format(".%09d", nanos)); + buf.append("s. "); + buf.append(insight); + stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString())); } + } - // This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a - // stream multiple time is safe, the race here is fine. - deadlineCancellationSendToServerFuture = deadlineCancellationExecutor.schedule( - new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()), - DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS, - TimeUnit.NANOSECONDS); - executeCloseObserverInContext(observer, status); + private ScheduledFuture startDeadlineTimer(Deadline deadline) { + long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); + return deadlineCancellationExecutor.schedule( + new LogExceptionRunnable( + new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); } private void executeCloseObserverInContext(final Listener observer, final Status status) { @@ -497,13 +470,6 @@ public void runInContext() { callExecutor.execute(new CloseInContext()); } - private void closeObserver(Listener observer, Status status, Metadata trailers) { - if (!observerClosed) { - observerClosed = true; - observer.onClose(status, trailers); - } - } - @Nullable private Deadline effectiveDeadline() { // Call options and context are immutable, so we don't need to cache the deadline. @@ -646,6 +612,10 @@ public Attributes getAttributes() { return Attributes.EMPTY; } + private void closeObserver(Listener observer, Status status, Metadata trailers) { + observer.onClose(status, trailers); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("method", method).toString(); diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index a84bb225457..4b67454f175 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -17,7 +17,6 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS; import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -60,7 +59,6 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; -import io.grpc.Status.Code; import io.grpc.internal.ClientCallImpl.ClientStreamProvider; import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo; import io.grpc.internal.testing.SingleMessageProducer; @@ -137,9 +135,6 @@ public class ClientCallImplTest { @Captor private ArgumentCaptor statusArgumentCaptor; - @Captor - private ArgumentCaptor metadataArgumentCaptor; - private CallOptions baseCallOptions; @Before @@ -1005,21 +1000,9 @@ public void expiredDeadlineCancelsStream_CallOptions() { call.start(callListener, new Metadata()); - fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); - - // Verify cancel sent to application when deadline just past - verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture()); - assertThat(statusCaptor.getValue().getDescription()) - .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); - assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); - verify(stream, never()).cancel(statusCaptor.capture()); - - fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1); - verify(stream, never()).cancel(any(Status.class)); + fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1); - // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY - fakeClock.forwardNanos(1); - verify(stream).cancel(statusCaptor.capture()); + verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()) .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); @@ -1029,8 +1012,8 @@ public void expiredDeadlineCancelsStream_CallOptions() { public void expiredDeadlineCancelsStream_Context() { fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); - Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker()); - Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor); + Context context = Context.current() + .withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor); Context origContext = context.attach(); ClientCallImpl call = new ClientCallImpl<>( @@ -1045,16 +1028,9 @@ public void expiredDeadlineCancelsStream_Context() { call.start(callListener, new Metadata()); - fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); - verify(stream, never()).cancel(statusCaptor.capture()); - // verify app is notified. - verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture()); - assertThat(statusCaptor.getValue().getDescription()).contains("context timed out"); - assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1); - // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY - fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS); - verify(stream).cancel(statusCaptor.capture()); + verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out"); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 65a9ed31e29..74097162618 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -3622,7 +3622,7 @@ public ClientTransportFactory buildClientTransportFactory() { CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS)); ListenableFuture future2 = ClientCalls.futureUnaryCall(call2, null); - timer.forwardTime(5, TimeUnit.SECONDS); + timer.forwardTime(1234, TimeUnit.SECONDS); executor.runDueTasks(); try { @@ -3633,9 +3633,6 @@ public ClientTransportFactory buildClientTransportFactory() { } mychannel.shutdownNow(); - // Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream - // will add a task to executor. Cleaning that task here. - executor.runDueTasks(); } @Deprecated