Skip to content

Commit

Permalink
netty: Differentiate GOAWAY closure status descriptions
Browse files Browse the repository at this point in the history
With this, it will be clear if the RPC failed because the server didn't
use a double-GOAWAY or if it failed because of MAX_CONCURRENT_STREAMS or
if it was due to a local race. It also fixes the status code to be
UNAVAILABLE except for the RPCs included in the GOAWAY error (modulo the
Netty bug).

Fixes #5855
  • Loading branch information
ejona86 committed Oct 9, 2020
1 parent 4ad14fe commit 610a8d7
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 46 deletions.
6 changes: 5 additions & 1 deletion core/src/main/java/io/grpc/internal/GrpcUtil.java
Expand Up @@ -345,7 +345,11 @@ private static Http2Error[] buildHttp2CodeMap() {

Http2Error(int code, Status status) {
this.code = code;
this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
String description = "HTTP/2 error code: " + this.name();
if (status.getDescription() != null) {
description += " (" + status.getDescription() + ")";
}
this.status = status.withDescription(description);
}

/**
Expand Down
Expand Up @@ -16,6 +16,7 @@

package io.grpc.netty;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.Status;
import io.grpc.internal.ManagedClientTransport;

Expand Down Expand Up @@ -55,13 +56,16 @@ public void notifyGracefulShutdown(Status s) {
listener.transportShutdown(s);
}

public void notifyShutdown(Status s) {
/** Returns {@code true} if was the first shutdown. */
@CanIgnoreReturnValue
public boolean notifyShutdown(Status s) {
notifyGracefulShutdown(s);
if (shutdownStatus != null) {
return;
return false;
}
shutdownStatus = s;
shutdownThrowable = s.asException();
return true;
}

public void notifyInUse(boolean inUse) {
Expand Down
71 changes: 52 additions & 19 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Expand Up @@ -130,6 +130,7 @@ protected void handleNotInUse() {
private Attributes attributes;
private InternalChannelz.Security securityInfo;
private Status abruptGoAwayStatus;
private Status channelInactiveReason;

static NettyClientHandler newHandler(
ClientTransportLifecycleManager lifecycleManager,
Expand Down Expand Up @@ -278,7 +279,7 @@ private NettyClientHandler(
@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
goingAway(statusFromGoAway(errorCode, debugDataBytes));
goingAway(errorCode, debugDataBytes);
if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
String data = new String(debugDataBytes, UTF_8);
logger.log(
Expand Down Expand Up @@ -400,8 +401,7 @@ private void onRstStreamRead(int streamId, long errorCode) {
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
if (stream != null) {
PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Rst Stream");
Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
stream.transportReportStatus(
status,
errorCode == Http2Error.REFUSED_STREAM.code()
Expand Down Expand Up @@ -433,6 +433,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.fine("Network channel is closed");
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
lifecycleManager.notifyShutdown(status);
final Status streamStatus;
if (channelInactiveReason != null) {
streamStatus = channelInactiveReason;
} else {
streamStatus = lifecycleManager.getShutdownStatus();
}
try {
cancelPing(lifecycleManager.getShutdownThrowable());
// Report status to the application layer for any open streams
Expand All @@ -441,8 +447,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
clientStream.transportReportStatus(
lifecycleManager.getShutdownStatus(), false, new Metadata());
clientStream.transportReportStatus(streamStatus, false, new Metadata());
}
return true;
}
Expand Down Expand Up @@ -630,8 +635,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
StreamBufferingEncoder.Http2GoAwayException e =
(StreamBufferingEncoder.Http2GoAwayException) cause;
lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
promise.setFailure(lifecycleManager.getShutdownThrowable());
Status status = statusFromH2Error(
Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
e.errorCode(), e.debugData());
stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata());
promise.setFailure(status.asRuntimeException());
} else {
promise.setFailure(cause);
}
Expand Down Expand Up @@ -786,9 +794,20 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
* Handler for a GOAWAY being received. Fails any streams created after the
* last known stream. May only be called during a read.
*/
private void goingAway(Status status) {
lifecycleManager.notifyGracefulShutdown(status);
abruptGoAwayStatus = status;
private void goingAway(long errorCode, byte[] debugData) {
Status finalStatus = statusFromH2Error(
Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
lifecycleManager.notifyGracefulShutdown(finalStatus);
abruptGoAwayStatus = statusFromH2Error(
Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
// While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
// fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
// we assume any sent streams may be related to the GOAWAY. This should rarely impact users
// since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
// there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
// UNAVAILABLE. https://github.com/netty/netty/issues/10670
final Status abruptGoAwayStatusConservative = statusFromH2Error(
null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
// Try to allocate as many in-flight streams as possible, to reduce race window of
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
Expand All @@ -798,9 +817,13 @@ private void goingAway(Status status) {
// This can cause reentrancy, but should be minor since it is normal to handle writes in
// response to a read. Also, the call stack is rather shallow at this point
clientWriteQueue.drainNow();
lifecycleManager.notifyShutdown(status);
if (lifecycleManager.notifyShutdown(finalStatus)) {
// This is for the only RPCs that are actually covered by the GOAWAY error code. All other
// RPCs were not observed by the remote and so should be UNAVAILABLE.
channelInactiveReason = statusFromH2Error(
null, "Connection closed after GOAWAY", errorCode, debugData);
}

final Status goAwayStatus = lifecycleManager.getShutdownStatus();
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
try {
connection().forEachActiveStream(new Http2StreamVisitor() {
Expand All @@ -809,8 +832,13 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
if (stream.id() > lastKnownStream) {
NettyClientStream.TransportState clientStream = clientStream(stream);
if (clientStream != null) {
// RpcProgress _should_ be REFUSED, but are being conservative. See comment for
// abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
// retries, but our main goal of transporent retries is to resolve the local race. We
// still hope/expect servers to use the graceful double-GOAWAY when closing
// connections.
clientStream.transportReportStatus(
goAwayStatus, RpcProgress.REFUSED, false, new Metadata());
abruptGoAwayStatusConservative, RpcProgress.PROCESSED, false, new Metadata());
}
stream.close();
}
Expand All @@ -829,15 +857,20 @@ private void cancelPing(Throwable t) {
}
}

private Status statusFromGoAway(long errorCode, byte[] debugData) {
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Goaway");
/** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
private Status statusFromH2Error(
Status.Code statusCode, String context, long errorCode, byte[] debugData) {
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode);
if (statusCode == null) {
statusCode = status.getCode();
}
String debugString = "";
if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it.
String msg = new String(debugData, UTF_8);
status = status.augmentDescription(msg);
debugString = ", debug data: " + new String(debugData, UTF_8);
}
return status;
return statusCode.toStatus()
.withDescription(context + ". " + status.getDescription() + debugString);
}

/**
Expand Down
44 changes: 21 additions & 23 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Expand Up @@ -330,24 +330,13 @@ public void inboundShouldForwardToStream() throws Exception {
assertNull("no additional message expected", streamListenerMessageQueue.poll());
}

@Test
public void receivedGoAwayShouldCancelBufferedStream() throws Exception {
// Force the stream to be buffered.
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(0));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
assertEquals(Status.Code.UNAVAILABLE, status.getCode());
assertEquals("HTTP/2 error code: NO_ERROR\nReceived Goaway", status.getDescription());
}

@Test
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channelRead(goAwayFrame(streamId - 1));
verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
// This _should_ be REFUSED, but we purposefully use PROCESSED. See comment for
// abruptGoAwayStatusConservative in NettyClientHandler
verify(streamListener).closed(any(Status.class), eq(PROCESSED), any(Metadata.class));
assertTrue(future.isDone());
}

Expand Down Expand Up @@ -386,8 +375,10 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(REFUSED),
ArgumentMatchers.<Metadata>notNull());
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
assertEquals(
"Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, "
+ "debug data: this is a test",
captor.getValue().getDescription());
assertTrue(future.isDone());
}
Expand Down Expand Up @@ -415,15 +406,18 @@ public void receivedGoAwayShouldFailUnknownStreams() throws Exception {
// Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(REFUSED),
// See comment for abruptGoAwayStatusConservative in NettyClientHandler
verify(streamListener).closed(captor.capture(), same(PROCESSED),
ArgumentMatchers.<Metadata>notNull());
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
assertEquals(
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: CANCEL, "
+ "debug data: this is a test",
captor.getValue().getDescription());
}

@Test
public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
public void receivedGoAwayShouldFailBufferedStreams() throws Exception {
receiveMaxConcurrentStreams(0);

ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
Expand All @@ -433,8 +427,10 @@ public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
assertEquals(
"GOAWAY closed buffered stream. HTTP/2 error code: CANCEL, "
+ "debug data: this is a test",
status.getDescription());
}

Expand All @@ -448,8 +444,10 @@ public void receivedGoAwayShouldFailNewStreams() throws Exception {
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
assertEquals(
"GOAWAY shut down transport. HTTP/2 error code: CANCEL, "
+ "debug data: this is a test",
status.getDescription());
}

Expand Down
Expand Up @@ -529,7 +529,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
Throwable rootCause = getRootCause(e);
Status status = ((StatusException) rootCause).getStatus();
assertEquals(Status.Code.INTERNAL, status.getCode());
assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream",
assertEquals("RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR",
status.getDescription());
}
}
Expand Down

0 comments on commit 610a8d7

Please sign in to comment.