Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix StreamBufferingEncoder GOAWAY bug #11144

Merged
merged 13 commits into from Apr 19, 2021
Merged
Expand Up @@ -149,10 +149,14 @@ public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2
if (closed) {
return promise.setFailure(new Http2ChannelClosedException());
}
if (isExistingStream(streamId) || connection().goAwayReceived()) {
if (isExistingStream(streamId)) {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise);
}
if (connection().goAwayReceived()) {
promise.setFailure(new Http2Exception(Http2Error.NO_ERROR, "GOAWAY received"));
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
return promise;
normanmaurer marked this conversation as resolved.
Show resolved Hide resolved
}
if (canCreateStream()) {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise);
Expand Down
Expand Up @@ -111,6 +111,11 @@ public void setup() throws Exception {
when(writer.writeGoAway(any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class),
any(ChannelPromise.class)))
.thenAnswer(successAnswer());
when(writer.writeHeaders(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class),
normanmaurer marked this conversation as resolved.
Show resolved Hide resolved
anyInt(), anyBoolean(), any(ChannelPromise.class))).thenAnswer(noopAnswer());
when(writer.writeHeaders(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class),
anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), any(ChannelPromise.class)))
.thenAnswer(noopAnswer());

connection = new DefaultHttp2Connection(false);
connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
Expand Down Expand Up @@ -167,7 +172,7 @@ public void multipleWritesToActiveStream() {
encoder.writeData(ctx, 3, data(), 0, false, newPromise());
encoderWriteHeaders(3, newPromise());

writeVerifyWriteHeaders(times(2), 3);
writeVerifyWriteHeaders(times(1), 3);
// Contiguous data writes are coalesced
ArgumentCaptor<ByteBuf> bufCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(writer, times(1))
Expand Down Expand Up @@ -253,10 +258,24 @@ public void receivingGoAwayFailsBufferedStreams() throws Http2Exception {
int failCount = 0;
for (ChannelFuture f : futures) {
if (f.cause() != null) {
assertTrue(f.cause() instanceof Http2Exception);
Http2Exception e = (Http2Exception) f.cause();
assertEquals(Http2Error.STREAM_CLOSED, e.error());
failCount++;
}
}
assertEquals(9, failCount);
}

@Test
public void receivingGoAwayFailsNewStreams() throws Http2Exception {
encoder.writeSettingsAck(ctx, newPromise());
connection.goAwayReceived(11, 8, EMPTY_BUFFER);
ChannelFuture f = encoderWriteHeaders(3, newPromise());

assertTrue(f.cause() instanceof Http2Exception);
Http2Exception e = (Http2Exception) f.cause();
assertEquals(Http2Error.NO_ERROR, e.error());
assertEquals("GOAWAY received", e.getMessage());
assertEquals(0, encoder.numBufferedStreams());
}

Expand Down Expand Up @@ -533,6 +552,20 @@ public ChannelFuture answer(InvocationOnMock invocation) throws Throwable {
};
}

private Answer<ChannelFuture> noopAnswer() {
return new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock invocation) throws Throwable {
for (Object a : invocation.getArguments()) {
if (a instanceof ChannelPromise) {
return (ChannelFuture) a;
}
}
return newPromise();
}
};
}

private ChannelPromise newPromise() {
return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
}
Expand Down