diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index bc2fb517945..2ebe001626b 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -345,7 +345,11 @@ private static Http2Error[] buildHttp2CodeMap() { Http2Error(int code, Status status) { this.code = code; - this.status = status.augmentDescription("HTTP/2 error code: " + this.name()); + String description = "HTTP/2 error code: " + this.name(); + if (status.getDescription() != null) { + description += " (" + status.getDescription() + ")"; + } + this.status = status.withDescription(description); } /** diff --git a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java index e10150cd134..1111904086a 100644 --- a/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java +++ b/netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java @@ -16,6 +16,7 @@ package io.grpc.netty; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Status; import io.grpc.internal.ManagedClientTransport; @@ -55,13 +56,16 @@ public void notifyGracefulShutdown(Status s) { listener.transportShutdown(s); } - public void notifyShutdown(Status s) { + /** Returns {@code true} if was the first shutdown. */ + @CanIgnoreReturnValue + public boolean notifyShutdown(Status s) { notifyGracefulShutdown(s); if (shutdownStatus != null) { - return; + return false; } shutdownStatus = s; shutdownThrowable = s.asException(); + return true; } public void notifyInUse(boolean inUse) { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 631d9e81863..a6929e6f81b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -130,6 +130,7 @@ protected void handleNotInUse() { private Attributes attributes; private InternalChannelz.Security securityInfo; private Status abruptGoAwayStatus; + private Status channelInactiveReason; static NettyClientHandler newHandler( ClientTransportLifecycleManager lifecycleManager, @@ -278,7 +279,7 @@ private NettyClientHandler( @Override public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { byte[] debugDataBytes = ByteBufUtil.getBytes(debugData); - goingAway(statusFromGoAway(errorCode, debugDataBytes)); + goingAway(errorCode, debugDataBytes); if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) { String data = new String(debugDataBytes, UTF_8); logger.log( @@ -400,8 +401,7 @@ private void onRstStreamRead(int streamId, long errorCode) { NettyClientStream.TransportState stream = clientStream(connection().stream(streamId)); if (stream != null) { PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag()); - Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) - .augmentDescription("Received Rst Stream"); + Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null); stream.transportReportStatus( status, errorCode == Http2Error.REFUSED_STREAM.code() @@ -433,6 +433,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.fine("Network channel is closed"); Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); lifecycleManager.notifyShutdown(status); + final Status streamStatus; + if (channelInactiveReason != null) { + streamStatus = channelInactiveReason; + } else { + streamStatus = lifecycleManager.getShutdownStatus(); + } try { cancelPing(lifecycleManager.getShutdownThrowable()); // Report status to the application layer for any open streams @@ -441,8 +447,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); if (clientStream != null) { - clientStream.transportReportStatus( - lifecycleManager.getShutdownStatus(), false, new Metadata()); + clientStream.transportReportStatus(streamStatus, false, new Metadata()); } return true; } @@ -630,8 +635,11 @@ public void operationComplete(ChannelFuture future) throws Exception { if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { StreamBufferingEncoder.Http2GoAwayException e = (StreamBufferingEncoder.Http2GoAwayException) cause; - lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData())); - promise.setFailure(lifecycleManager.getShutdownThrowable()); + Status status = statusFromH2Error( + Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream", + e.errorCode(), e.debugData()); + stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata()); + promise.setFailure(status.asRuntimeException()); } else { promise.setFailure(cause); } @@ -786,9 +794,20 @@ public boolean visit(Http2Stream stream) throws Http2Exception { * Handler for a GOAWAY being received. Fails any streams created after the * last known stream. May only be called during a read. */ - private void goingAway(Status status) { - lifecycleManager.notifyGracefulShutdown(status); - abruptGoAwayStatus = status; + private void goingAway(long errorCode, byte[] debugData) { + Status finalStatus = statusFromH2Error( + Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData); + lifecycleManager.notifyGracefulShutdown(finalStatus); + abruptGoAwayStatus = statusFromH2Error( + Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData); + // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it + // fails streams due to HPACK failures (e.g., header list too large). To be more conservative, + // we assume any sent streams may be related to the GOAWAY. This should rarely impact users + // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if + // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to + // UNAVAILABLE. https://github.com/netty/netty/issues/10670 + final Status abruptGoAwayStatusConservative = statusFromH2Error( + null, "Abrupt GOAWAY closed sent stream", errorCode, debugData); // Try to allocate as many in-flight streams as possible, to reduce race window of // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING @@ -798,9 +817,13 @@ private void goingAway(Status status) { // This can cause reentrancy, but should be minor since it is normal to handle writes in // response to a read. Also, the call stack is rather shallow at this point clientWriteQueue.drainNow(); - lifecycleManager.notifyShutdown(status); + if (lifecycleManager.notifyShutdown(finalStatus)) { + // This is for the only RPCs that are actually covered by the GOAWAY error code. All other + // RPCs were not observed by the remote and so should be UNAVAILABLE. + channelInactiveReason = statusFromH2Error( + null, "Connection closed after GOAWAY", errorCode, debugData); + } - final Status goAwayStatus = lifecycleManager.getShutdownStatus(); final int lastKnownStream = connection().local().lastStreamKnownByPeer(); try { connection().forEachActiveStream(new Http2StreamVisitor() { @@ -809,8 +832,13 @@ public boolean visit(Http2Stream stream) throws Http2Exception { if (stream.id() > lastKnownStream) { NettyClientStream.TransportState clientStream = clientStream(stream); if (clientStream != null) { + // RpcProgress _should_ be REFUSED, but are being conservative. See comment for + // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent + // retries, but our main goal of transporent retries is to resolve the local race. We + // still hope/expect servers to use the graceful double-GOAWAY when closing + // connections. clientStream.transportReportStatus( - goAwayStatus, RpcProgress.REFUSED, false, new Metadata()); + abruptGoAwayStatusConservative, RpcProgress.PROCESSED, false, new Metadata()); } stream.close(); } @@ -829,15 +857,20 @@ private void cancelPing(Throwable t) { } } - private Status statusFromGoAway(long errorCode, byte[] debugData) { - Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) - .augmentDescription("Received Goaway"); + /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */ + private Status statusFromH2Error( + Status.Code statusCode, String context, long errorCode, byte[] debugData) { + Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode); + if (statusCode == null) { + statusCode = status.getCode(); + } + String debugString = ""; if (debugData != null && debugData.length > 0) { // If a debug message was provided, use it. - String msg = new String(debugData, UTF_8); - status = status.augmentDescription(msg); + debugString = ", debug data: " + new String(debugData, UTF_8); } - return status; + return statusCode.toStatus() + .withDescription(context + ". " + status.getDescription() + debugString); } /** diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 4e469e707bf..b708d20931a 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -330,24 +330,13 @@ public void inboundShouldForwardToStream() throws Exception { assertNull("no additional message expected", streamListenerMessageQueue.poll()); } - @Test - public void receivedGoAwayShouldCancelBufferedStream() throws Exception { - // Force the stream to be buffered. - receiveMaxConcurrentStreams(0); - ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState)); - channelRead(goAwayFrame(0)); - assertTrue(future.isDone()); - assertFalse(future.isSuccess()); - Status status = Status.fromThrowable(future.cause()); - assertEquals(Status.Code.UNAVAILABLE, status.getCode()); - assertEquals("HTTP/2 error code: NO_ERROR\nReceived Goaway", status.getDescription()); - } - @Test public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception { ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState)); channelRead(goAwayFrame(streamId - 1)); - verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class)); + // This _should_ be REFUSED, but we purposefully use PROCESSED. See comment for + // abruptGoAwayStatusConservative in NettyClientHandler + verify(streamListener).closed(any(Status.class), eq(PROCESSED), any(Metadata.class)); assertTrue(future.isDone()); } @@ -386,8 +375,10 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(streamListener).closed(captor.capture(), same(REFUSED), ArgumentMatchers.notNull()); - assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode()); - assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test", + assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); + assertEquals( + "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, " + + "debug data: this is a test", captor.getValue().getDescription()); assertTrue(future.isDone()); } @@ -415,15 +406,18 @@ public void receivedGoAwayShouldFailUnknownStreams() throws Exception { // Read a GOAWAY that indicates our stream was never processed by the server. channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8))); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(streamListener).closed(captor.capture(), same(REFUSED), + // See comment for abruptGoAwayStatusConservative in NettyClientHandler + verify(streamListener).closed(captor.capture(), same(PROCESSED), ArgumentMatchers.notNull()); assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode()); - assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test", + assertEquals( + "Abrupt GOAWAY closed sent stream. HTTP/2 error code: CANCEL, " + + "debug data: this is a test", captor.getValue().getDescription()); } @Test - public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception { + public void receivedGoAwayShouldFailBufferedStreams() throws Exception { receiveMaxConcurrentStreams(0); ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState)); @@ -433,8 +427,10 @@ public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception { assertTrue(future.isDone()); assertFalse(future.isSuccess()); Status status = Status.fromThrowable(future.cause()); - assertEquals(Status.CANCELLED.getCode(), status.getCode()); - assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test", + assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); + assertEquals( + "GOAWAY closed buffered stream. HTTP/2 error code: CANCEL, " + + "debug data: this is a test", status.getDescription()); } @@ -448,8 +444,10 @@ public void receivedGoAwayShouldFailNewStreams() throws Exception { assertTrue(future.isDone()); assertFalse(future.isSuccess()); Status status = Status.fromThrowable(future.cause()); - assertEquals(Status.CANCELLED.getCode(), status.getCode()); - assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test", + assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); + assertEquals( + "GOAWAY shut down transport. HTTP/2 error code: CANCEL, " + + "debug data: this is a test", status.getDescription()); } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 8762f0b659b..02b24b7c0bc 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -529,7 +529,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception { Throwable rootCause = getRootCause(e); Status status = ((StatusException) rootCause).getStatus(); assertEquals(Status.Code.INTERNAL, status.getCode()); - assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream", + assertEquals("RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR", status.getDescription()); } }