diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 911860ae176e..0460f0c5e3c0 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -584,12 +584,24 @@ public String toString() { private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; - private boolean closed; + private Status exceptionStatus; public ClientStreamListenerImpl(Listener observer) { this.observer = checkNotNull(observer, "observer"); } + /** + * Cancels call and schedules onClose() notification. May only be called from the application + * thread. + */ + private void exceptionThrown(Status status) { + // Since each RPC can have its own executor, we can only call onClose() when we are sure there + // will be no further callbacks. We set the status here and overwrite the onClose() details + // when it arrives. + exceptionStatus = status; + stream.cancel(status); + } + @Override public void headersRead(final Metadata headers) { PerfMark.startTask("ClientStreamListener.headersRead", tag); @@ -612,16 +624,14 @@ public void runInContext() { } private void runInternal() { - if (closed) { + if (exceptionStatus != null) { return; } try { observer.onHeaders(headers); } catch (Throwable t) { - Status status = - Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); - stream.cancel(status); - close(status, new Metadata()); + exceptionThrown( + Status.CANCELLED.withCause(t).withDescription("Failed to read headers")); } } } @@ -655,7 +665,7 @@ public void runInContext() { } private void runInternal() { - if (closed) { + if (exceptionStatus != null) { GrpcUtil.closeQuietly(producer); return; } @@ -672,10 +682,8 @@ private void runInternal() { } } catch (Throwable t) { GrpcUtil.closeQuietly(producer); - Status status = - Status.CANCELLED.withCause(t).withDescription("Failed to read message."); - stream.cancel(status); - close(status, new Metadata()); + exceptionThrown( + Status.CANCELLED.withCause(t).withDescription("Failed to read message.")); } } } @@ -687,20 +695,6 @@ private void runInternal() { } } - /** - * Must be called from application thread. - */ - private void close(Status status, Metadata trailers) { - closed = true; - cancelListenersShouldBeRemoved = true; - try { - closeObserver(observer, status, trailers); - } finally { - removeContextListenerAndCancelDeadlineFuture(); - channelCallsTracer.reportCallEnded(status.isOk()); - } - } - @Override public void closed(Status status, Metadata trailers) { closed(status, RpcProgress.PROCESSED, trailers); @@ -752,11 +746,25 @@ public void runInContext() { } private void runInternal() { - if (closed) { - // We intentionally don't keep the status or metadata from the server. - return; + Status status = savedStatus; + Metadata trailers = savedTrailers; + if (exceptionStatus != null) { + // Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel(). + // However the cancel is racy and this closed() may have already been queued when the + // cancellation occurred. Since other calls like onMessage() will throw away data if + // exceptionStatus != null, it is semantically essential that we _not_ use a status + // provided by the server. + status = exceptionStatus; + // Replace trailers to prevent mixing sources of status and trailers. + trailers = new Metadata(); + } + cancelListenersShouldBeRemoved = true; + try { + closeObserver(observer, status, trailers); + } finally { + removeContextListenerAndCancelDeadlineFuture(); + channelCallsTracer.reportCallEnded(status.isOk()); } - close(savedStatus, savedTrailers); } } @@ -789,13 +797,14 @@ public void runInContext() { } private void runInternal() { + if (exceptionStatus != null) { + return; + } try { observer.onReady(); } catch (Throwable t) { - Status status = - Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); - stream.cancel(status); - close(status, new Metadata()); + exceptionThrown( + Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.")); } } } diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 091de2cc2656..cf74c2be46ea 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -256,6 +256,51 @@ public void exceptionInOnHeadersTakesPrecedenceOverServer() { verify(stream).cancel(same(callListenerStatus)); } + @Test + public void exceptionInOnHeadersHasOnCloseQueuedLast() { + class PointOfNoReturnExecutor implements Executor { + boolean rejectNewRunnables; + + @Override public void execute(Runnable command) { + assertThat(rejectNewRunnables).isFalse(); + command.run(); + } + } + + final PointOfNoReturnExecutor executor = new PointOfNoReturnExecutor(); + ClientCallImpl call = new ClientCallImpl<>( + method, + executor, + baseCallOptions, + provider, + deadlineCancellationExecutor, + channelCallTracer, + /* retryEnabled= */ false); + callListener = new NoopClientCall.NoopClientCallListener() { + private final RuntimeException failure = new RuntimeException("bad"); + + @Override public void onHeaders(Metadata metadata) { + throw failure; + } + + @Override public void onClose(Status status, Metadata metadata) { + verify(stream).cancel(same(status)); + assertThat(status.getCode()).isEqualTo(Status.Code.CANCELLED); + assertThat(status.getCause()).isSameInstanceAs(failure); + // At the point onClose() is called the user may shut down the executor, so no further + // Runnables may be scheduled. The only thread-safe way of guaranteeing that is for + // onClose() to be queued last. + executor.rejectNewRunnables = true; + } + }; + call.start(callListener, new Metadata()); + verify(stream).start(listenerArgumentCaptor.capture()); + final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); + + streamListener.headersRead(new Metadata()); + streamListener.closed(Status.OK, new Metadata()); + } + @Test public void exceptionInOnReadyTakesPrecedenceOverServer() { DelayedExecutor executor = new DelayedExecutor();