diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index a6929e6f81b..c07cc4bee50 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -562,20 +562,28 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise) } return; } - if (connection().goAwayReceived() - && streamId > connection().local().lastStreamKnownByPeer()) { - // This should only be reachable during onGoAwayReceived, as otherwise - // getShutdownThrowable() != null - command.stream().setNonExistent(); + if (connection().goAwayReceived()) { Status s = abruptGoAwayStatus; + int maxActiveStreams = connection().local().maxActiveStreams(); + int lastStreamId = connection().local().lastStreamKnownByPeer(); if (s == null) { - // Should be impossible, but handle psuedo-gracefully + // Should be impossible, but handle pseudo-gracefully s = Status.INTERNAL.withDescription( "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); + } else if (streamId > lastStreamId) { + s = s.augmentDescription( + "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId); + } else if (connection().local().numActiveStreams() == maxActiveStreams) { + s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams); + } + if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) { + // This should only be reachable during onGoAwayReceived, as otherwise + // getShutdownThrowable() != null + command.stream().setNonExistent(); + command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); + promise.setFailure(s.asRuntimeException()); + return; } - command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); - promise.setFailure(s.asRuntimeException()); - return; } NettyClientStream.TransportState stream = command.stream(); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index b708d20931a..b4fcd431837 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -378,11 +378,43 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); assertEquals( "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, " - + "debug data: this is a test", + + "debug data: this is a test\nstream id: 3, GOAWAY Last-Stream-ID:0", captor.getValue().getDescription()); assertTrue(future.isDone()); } + @Test + public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams() + throws Exception { + NettyClientStream.TransportState streamTransportState1 = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); + streamTransportState1.setListener(mock(ClientStreamListener.class)); + NettyClientStream.TransportState streamTransportState2 = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); + streamTransportState2.setListener(mock(ClientStreamListener.class)); + receiveMaxConcurrentStreams(1); + ChannelFuture future1 = writeQueue().enqueue( + newCreateStreamCommand(grpcHeaders, streamTransportState1), true); + ChannelFuture future2 = writeQueue().enqueue( + newCreateStreamCommand(grpcHeaders, streamTransportState2), true); + + // GOAWAY + channelRead(goAwayFrame(Integer.MAX_VALUE)); + assertTrue(future1.isSuccess()); + assertTrue(future2.isDone()); + assertThat(Status.fromThrowable(future2.cause()).getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(future2.cause().getMessage()).contains( + "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR"); + assertThat(future2.cause().getMessage()).contains( + "At MAX_CONCURRENT_STREAMS limit"); + } + @Test public void receivedResetWithRefuseCode() throws Exception { ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));