diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 07134939673..393b3644961 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -569,20 +569,22 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise) } return; } - if (connection().goAwayReceived() - && streamId > connection().local().lastStreamKnownByPeer()) { - // This should only be reachable during onGoAwayReceived, as otherwise - // getShutdownThrowable() != null - command.stream().setNonExistent(); - Status s = abruptGoAwayStatus; - if (s == null) { - // Should be impossible, but handle psuedo-gracefully - s = Status.INTERNAL.withDescription( - "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); + if (connection().goAwayReceived()) { + if (streamId > connection().local().lastStreamKnownByPeer() + || connection().local().numActiveStreams() == connection().local().maxActiveStreams()) { + // This should only be reachable during onGoAwayReceived, as otherwise + // getShutdownThrowable() != null + command.stream().setNonExistent(); + Status s = abruptGoAwayStatus; + if (s == null) { + // Should be impossible, but handle psuedo-gracefully + s = Status.INTERNAL.withDescription( + "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); + } + command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); + promise.setFailure(s.asRuntimeException()); + return; } - command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); - promise.setFailure(s.asRuntimeException()); - return; } NettyClientStream.TransportState stream = command.stream(); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 25813621cc6..b901ceeb642 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -383,6 +383,36 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio assertTrue(future.isDone()); } + @Test + public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams() + throws Exception { + NettyClientStream.TransportState streamTransportState1 = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); + streamTransportState1.setListener(mock(ClientStreamListener.class)); + NettyClientStream.TransportState streamTransportState2 = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); + streamTransportState2.setListener(mock(ClientStreamListener.class)); + receiveMaxConcurrentStreams(1); + ChannelFuture future1 = writeQueue().enqueue( + newCreateStreamCommand(grpcHeaders, streamTransportState1), true); + ChannelFuture future2 = writeQueue().enqueue( + newCreateStreamCommand(grpcHeaders, streamTransportState2), true); + + // GOAWAY + channelRead(goAwayFrame(Integer.MAX_VALUE)); + assertTrue(future1.isSuccess()); + assertTrue(future2.isDone()); + assertThat(Status.fromThrowable(future2.cause()).getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(future2.cause().getMessage()).contains( + "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR"); + } + @Test public void receivedResetWithRefuseCode() throws Exception { ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));