Skip to content

Commit

Permalink
netty: fix StreamBufferingEncoder GOAWAY bug
Browse files Browse the repository at this point in the history
Fix a bug in StreamBufferingEncoder: when client receives GOWAY while there are pending streams due to MAX_CONCURRENT_STREAMS, we see the following error:
io.netty.handler.codec.http2.Http2Exception$StreamException: Maximum active streams violated for this endpoint.
  • Loading branch information
dapengzhang0 committed Apr 16, 2021
1 parent b4fe07d commit 49f9380
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
28 changes: 15 additions & 13 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Expand Up @@ -569,20 +569,22 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise)
}
return;
}
if (connection().goAwayReceived()
&& streamId > connection().local().lastStreamKnownByPeer()) {
// This should only be reachable during onGoAwayReceived, as otherwise
// getShutdownThrowable() != null
command.stream().setNonExistent();
Status s = abruptGoAwayStatus;
if (s == null) {
// Should be impossible, but handle psuedo-gracefully
s = Status.INTERNAL.withDescription(
"Failed due to abrupt GOAWAY, but can't find GOAWAY details");
if (connection().goAwayReceived()) {
if (streamId > connection().local().lastStreamKnownByPeer()
|| connection().local().numActiveStreams() == connection().local().maxActiveStreams()) {
// This should only be reachable during onGoAwayReceived, as otherwise
// getShutdownThrowable() != null
command.stream().setNonExistent();
Status s = abruptGoAwayStatus;
if (s == null) {
// Should be impossible, but handle psuedo-gracefully
s = Status.INTERNAL.withDescription(
"Failed due to abrupt GOAWAY, but can't find GOAWAY details");
}
command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata());
promise.setFailure(s.asRuntimeException());
return;
}
command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata());
promise.setFailure(s.asRuntimeException());
return;
}

NettyClientStream.TransportState stream = command.stream();
Expand Down
30 changes: 30 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Expand Up @@ -383,6 +383,36 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio
assertTrue(future.isDone());
}

@Test
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
throws Exception {
NettyClientStream.TransportState streamTransportState1 = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState1.setListener(mock(ClientStreamListener.class));
NettyClientStream.TransportState streamTransportState2 = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState2.setListener(mock(ClientStreamListener.class));
receiveMaxConcurrentStreams(1);
ChannelFuture future1 = writeQueue().enqueue(
newCreateStreamCommand(grpcHeaders, streamTransportState1), true);
ChannelFuture future2 = writeQueue().enqueue(
newCreateStreamCommand(grpcHeaders, streamTransportState2), true);

// GOAWAY
channelRead(goAwayFrame(Integer.MAX_VALUE));
assertTrue(future1.isSuccess());
assertTrue(future2.isDone());
assertThat(Status.fromThrowable(future2.cause()).getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(future2.cause().getMessage()).contains(
"Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR");
}

@Test
public void receivedResetWithRefuseCode() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
Expand Down

0 comments on commit 49f9380

Please sign in to comment.