From 19274aac360c97ec357a85f9bcd0633107fb0d26 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 16 Apr 2021 14:23:14 -0700 Subject: [PATCH 1/2] netty: fix StreamBufferingEncoder GOAWAY bug Fix a bug in StreamBufferingEncoder: when client receives GOWAY while there are pending streams due to MAX_CONCURRENT_STREAMS, we see the following error: io.netty.handler.codec.http2.Http2Exception$StreamException: Maximum active streams violated for this endpoint. --- .../io/grpc/netty/NettyClientHandler.java | 28 +++++++++-------- .../io/grpc/netty/NettyClientHandlerTest.java | 30 +++++++++++++++++++ 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index a6929e6f81b..4a80a6d52cd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -562,20 +562,22 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise) } return; } - if (connection().goAwayReceived() - && streamId > connection().local().lastStreamKnownByPeer()) { - // This should only be reachable during onGoAwayReceived, as otherwise - // getShutdownThrowable() != null - command.stream().setNonExistent(); - Status s = abruptGoAwayStatus; - if (s == null) { - // Should be impossible, but handle psuedo-gracefully - s = Status.INTERNAL.withDescription( - "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); + if (connection().goAwayReceived()) { + if (streamId > connection().local().lastStreamKnownByPeer() + || connection().local().numActiveStreams() == connection().local().maxActiveStreams()) { + // This should only be reachable during onGoAwayReceived, as otherwise + // getShutdownThrowable() != null + command.stream().setNonExistent(); + Status s = abruptGoAwayStatus; + if (s == null) { + // Should be impossible, but handle psuedo-gracefully + s = Status.INTERNAL.withDescription( + "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); + } + command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); + promise.setFailure(s.asRuntimeException()); + return; } - command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); - promise.setFailure(s.asRuntimeException()); - return; } NettyClientStream.TransportState stream = command.stream(); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index b708d20931a..0bca75dc9bc 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -383,6 +383,36 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio assertTrue(future.isDone()); } + @Test + public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams() + throws Exception { + NettyClientStream.TransportState streamTransportState1 = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); + streamTransportState1.setListener(mock(ClientStreamListener.class)); + NettyClientStream.TransportState streamTransportState2 = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); + streamTransportState2.setListener(mock(ClientStreamListener.class)); + receiveMaxConcurrentStreams(1); + ChannelFuture future1 = writeQueue().enqueue( + newCreateStreamCommand(grpcHeaders, streamTransportState1), true); + ChannelFuture future2 = writeQueue().enqueue( + newCreateStreamCommand(grpcHeaders, streamTransportState2), true); + + // GOAWAY + channelRead(goAwayFrame(Integer.MAX_VALUE)); + assertTrue(future1.isSuccess()); + assertTrue(future2.isDone()); + assertThat(Status.fromThrowable(future2.cause()).getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(future2.cause().getMessage()).contains( + "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR"); + } + @Test public void receivedResetWithRefuseCode() throws Exception { ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState)); From 91b67f1387792c84f69cccefab228cd15a7bed8b Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 16 Apr 2021 16:10:38 -0700 Subject: [PATCH 2/2] netty: fix status message when GOAWAY at MAX_CONCURRENT_STREAMS limit Resolves #8097 --- .../io/grpc/netty/NettyClientHandler.java | 22 ++++++++++++------- .../io/grpc/netty/NettyClientHandlerTest.java | 4 +++- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 4a80a6d52cd..c07cc4bee50 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -563,17 +563,23 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise) return; } if (connection().goAwayReceived()) { - if (streamId > connection().local().lastStreamKnownByPeer() - || connection().local().numActiveStreams() == connection().local().maxActiveStreams()) { + Status s = abruptGoAwayStatus; + int maxActiveStreams = connection().local().maxActiveStreams(); + int lastStreamId = connection().local().lastStreamKnownByPeer(); + if (s == null) { + // Should be impossible, but handle pseudo-gracefully + s = Status.INTERNAL.withDescription( + "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); + } else if (streamId > lastStreamId) { + s = s.augmentDescription( + "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId); + } else if (connection().local().numActiveStreams() == maxActiveStreams) { + s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams); + } + if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) { // This should only be reachable during onGoAwayReceived, as otherwise // getShutdownThrowable() != null command.stream().setNonExistent(); - Status s = abruptGoAwayStatus; - if (s == null) { - // Should be impossible, but handle psuedo-gracefully - s = Status.INTERNAL.withDescription( - "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); - } command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); promise.setFailure(s.asRuntimeException()); return; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 0bca75dc9bc..b4fcd431837 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -378,7 +378,7 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); assertEquals( "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, " - + "debug data: this is a test", + + "debug data: this is a test\nstream id: 3, GOAWAY Last-Stream-ID:0", captor.getValue().getDescription()); assertTrue(future.isDone()); } @@ -411,6 +411,8 @@ public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStream assertThat(Status.fromThrowable(future2.cause()).getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(future2.cause().getMessage()).contains( "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR"); + assertThat(future2.cause().getMessage()).contains( + "At MAX_CONCURRENT_STREAMS limit"); } @Test