Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not open a stream on a connection that received GOAWAY #2408

Merged
merged 1 commit into from Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -338,6 +338,7 @@ public void operationComplete(Future<Http2StreamChannel> future) {
Http2StreamChannel ch = future.getNow();

if (!channel.isActive() || frameCodec == null ||
((Http2FrameCodec) frameCodec.handler()).connection().goAwayReceived() ||
!((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) {
invalidate(this);
if (!retried) {
Expand Down
Expand Up @@ -318,6 +318,11 @@ else if (poolConfig.evictInBackgroundInterval().isZero()) {
ref.slot.invalidate();
removeSlot(ref.slot);
}
// received GO_AWAY
if (ref.slot.goAwayReceived()) {
ref.slot.invalidate();
removeSlot(ref.slot);
}
// max life reached
else if (maxLifeReached(ref.slot)) {
//"FutureReturnValueIgnored" this is deliberate
Expand Down Expand Up @@ -487,6 +492,17 @@ void evictInBackground() {
continue;
}

if (slot.goAwayReceived()) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Channel received GO_AWAY, remove from pool"));
}
recordInteractionTimestamp();
slots.remove();
IDLE_SIZE.decrementAndGet(this);
slot.invalidate();
continue;
}

if (maxLifeReached(slot)) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool"));
Expand Down Expand Up @@ -555,6 +571,24 @@ Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
continue;
}

// check the connection received GO_AWAY
if (slot.goAwayReceived()) {
if (slot.concurrency() > 0) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Channel received GO_AWAY, {} active streams"),
slot.concurrency());
}
offerSlot(resources, slot);
}
else {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Channel received GO_AWAY, remove from pool"));
}
slot.invalidate();
}
continue;
}

// check whether the connection's idle time has been reached
if (maxIdleReached(slot)) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -982,6 +1016,11 @@ int decrementConcurrencyAndGet() {
return concurrency;
}

boolean goAwayReceived() {
ChannelHandlerContext frameCodec = http2FrameCodecCtx();
return frameCodec != null && ((Http2FrameCodec) frameCodec.handler()).connection().goAwayReceived();
}

long idleTime() {
if (concurrency() > 0) {
return 0L;
Expand Down
Expand Up @@ -26,6 +26,8 @@
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
Expand All @@ -40,9 +42,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.core.publisher.Sinks;
import reactor.netty.BaseHttpTest;
import reactor.netty.ByteBufMono;
import reactor.netty.ConnectionObserver;
Expand All @@ -63,6 +67,7 @@
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -661,4 +666,73 @@ private double getGaugeValue(String gaugeName, String... tags) {
}
return result;
}

@Test
@SuppressWarnings("FutureReturnValueIgnored")
void testHttp2PoolAndGoAway() {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
.configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));

Sinks.Empty<Void> startSending = Sinks.empty();
disposableServer =
createServer()
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(serverCtx))
.route(r -> r.get("/1", (req, res) -> res.sendString(startSending.asMono().then(Mono.just("/1"))))
.get("/2", (req, res) -> {
//"FutureReturnValueIgnored" this is deliberate
req.withConnection(conn -> conn.channel().parent().close());
startSending.tryEmitEmpty();
return res.sendString(Mono.just("/2"));
})
.get("/3", (req, res) -> res.sendString(Mono.just("/3"))))
.bindNow();

ConnectionProvider provider = ConnectionProvider.create("testHttp2PoolAndGoAway", 1);
Sinks.Empty<Void> goAwayReceived = Sinks.empty();
HttpClient client =
createClient(provider, disposableServer.port())
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(clientCtx))
.doOnChannelInit((observer, channel, address) -> {
Http2FrameCodec http2FrameCodec = channel.pipeline().get(Http2FrameCodec.class);

http2FrameCodec.gracefulShutdownTimeoutMillis(-1);

Http2Connection.Listener goAwayFrameListener = Mockito.mock(Http2Connection.Listener.class);
Mockito.doAnswer(invocation -> {
goAwayReceived.tryEmitEmpty();
return null;
})
.when(goAwayFrameListener)
.onGoAwayReceived(Mockito.anyInt(), Mockito.anyLong(), Mockito.any());
http2FrameCodec.connection().addListener(goAwayFrameListener);
});

try {
Flux.range(1, 3)
.flatMap(i -> {
Mono<String> request = client.get()
.uri("/" + i)
.responseContent()
.aggregate()
.asString();
if (i == 3) {
return goAwayReceived.asMono().then(request);
}
return request;
})
.collectList()
.as(StepVerifier::create)
.expectNext(Arrays.asList("/1", "/2", "/3"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
finally {
provider.disposeLater()
.block(Duration.ofSeconds(5));
}
}
}