Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty: fix StreamBufferingEncoder GOAWAY bug #8020

Merged
merged 8 commits into from Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 15 additions & 13 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Expand Up @@ -569,20 +569,22 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise)
}
return;
}
if (connection().goAwayReceived()
&& streamId > connection().local().lastStreamKnownByPeer()) {
// This should only be reachable during onGoAwayReceived, as otherwise
// getShutdownThrowable() != null
command.stream().setNonExistent();
Status s = abruptGoAwayStatus;
if (s == null) {
// Should be impossible, but handle psuedo-gracefully
s = Status.INTERNAL.withDescription(
"Failed due to abrupt GOAWAY, but can't find GOAWAY details");
if (connection().goAwayReceived()) {
if (streamId > connection().local().lastStreamKnownByPeer()
Copy link
Member

@ejona86 ejona86 Apr 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordinarily I'd ask you to add a comment mentioning that checking for numActiveStreams is a workaround until we update to a netty version with netty/netty#11144.

However, I think we may need the numActiveStreams check. The error message "Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR" is misleading when max concurrent streams is hit. Should we construct a special status in the case connection().local().numActiveStreams() == connection().local().maxActiveStreams()? "At MAX_CONCURRENT_STREAMS limit and GOAWAY has been received. limit: " + maxActiveStreams()

I'm fine doing that as follow-up (or making a bug to track it; although that seems probably more work than just making the change).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #8097. Will fix in a follow-up PR.

|| connection().local().numActiveStreams() == connection().local().maxActiveStreams()) {
// This should only be reachable during onGoAwayReceived, as otherwise
// getShutdownThrowable() != null
command.stream().setNonExistent();
Status s = abruptGoAwayStatus;
if (s == null) {
// Should be impossible, but handle psuedo-gracefully
s = Status.INTERNAL.withDescription(
"Failed due to abrupt GOAWAY, but can't find GOAWAY details");
}
command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata());
promise.setFailure(s.asRuntimeException());
return;
}
command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata());
promise.setFailure(s.asRuntimeException());
return;
}

NettyClientStream.TransportState stream = command.stream();
Expand Down
30 changes: 30 additions & 0 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Expand Up @@ -383,6 +383,36 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio
assertTrue(future.isDone());
}

@Test
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
throws Exception {
NettyClientStream.TransportState streamTransportState1 = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState1.setListener(mock(ClientStreamListener.class));
NettyClientStream.TransportState streamTransportState2 = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
streamTransportState2.setListener(mock(ClientStreamListener.class));
receiveMaxConcurrentStreams(1);
ChannelFuture future1 = writeQueue().enqueue(
newCreateStreamCommand(grpcHeaders, streamTransportState1), true);
ChannelFuture future2 = writeQueue().enqueue(
newCreateStreamCommand(grpcHeaders, streamTransportState2), true);

// GOAWAY
channelRead(goAwayFrame(Integer.MAX_VALUE));
assertTrue(future1.isSuccess());
assertTrue(future2.isDone());
assertThat(Status.fromThrowable(future2.cause()).getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(future2.cause().getMessage()).contains(
"Abrupt GOAWAY closed unsent stream. HTTP/2 error code: NO_ERROR");
}

@Test
public void receivedResetWithRefuseCode() throws Exception {
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
Expand Down