Skip to content

Commit

Permalink
Http2Pool handles the lifecycle of the cache with connections (#2257)
Browse files Browse the repository at this point in the history
Cache Http2FrameCodec/Http2MultiplexHandler/H2CUpgradeHandler context.
Obtain the negotiated application level protocol once.

Related to #2151 and #2262
  • Loading branch information
violetagg committed Jun 8, 2022
1 parent 738f5df commit f5e79b7
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 171 deletions.
Expand Up @@ -89,6 +89,7 @@ public abstract class PooledConnectionProvider<T extends Connection> implements
final Duration poolInactivity;
final Duration disposeTimeout;
final Map<SocketAddress, Integer> maxConnections = new HashMap<>();
Mono<Void> onDispose;

protected PooledConnectionProvider(Builder builder) {
this(builder, null);
Expand All @@ -106,6 +107,7 @@ protected PooledConnectionProvider(Builder builder) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}

Expand Down Expand Up @@ -190,10 +192,10 @@ public final Mono<Void> disposeLater() {
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
return Mono.empty();
return onDispose;
}
channelPools.clear();
return Mono.when(pools);
return onDispose.and(Mono.when(pools));
});
}

Expand Down Expand Up @@ -243,6 +245,10 @@ public String name() {
return name;
}

public void onDispose(Mono<Void> disposeMono) {
onDispose = onDispose.and(disposeMono);
}

protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(
TransportConfig config,
ConnectionObserver connectionObserver,
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty.http;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) {
http2ConnectionProvider = new AtomicReference<>();
}

@Override
public void disposeWhen(SocketAddress remoteAddress) {
ConnectionProvider provider = http2ConnectionProvider.get();
if (provider != null) {
provider.disposeWhen(remoteAddress);
}
super.disposeWhen(remoteAddress);
}

@Override
public AddressResolverGroup<?> getOrCreateDefaultResolver() {
return super.getOrCreateDefaultResolver();
Expand Down
Expand Up @@ -16,13 +16,12 @@
package reactor.netty.http.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
Expand All @@ -37,7 +36,6 @@
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.resources.ConnectionProvider;
Expand Down Expand Up @@ -76,6 +74,9 @@ final class Http2ConnectionProvider extends PooledConnectionProvider<Connection>
Http2ConnectionProvider(ConnectionProvider parent) {
super(initConfiguration(parent));
this.parent = parent;
if (parent instanceof PooledConnectionProvider) {
((PooledConnectionProvider<?>) parent).onDispose(disposeLater());
}
}

static Builder initConfiguration(ConnectionProvider parent) {
Expand Down Expand Up @@ -332,11 +333,12 @@ public void onUncaughtException(Connection connection, Throwable error) {
@Override
public void operationComplete(Future<Http2StreamChannel> future) {
Channel channel = pooledRef.poolable().channel();
Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class);
ChannelHandlerContext frameCodec = ((Http2Pool.Http2PooledRef) pooledRef).slot.http2FrameCodecCtx();
if (future.isSuccess()) {
Http2StreamChannel ch = future.getNow();

if (!channel.isActive() || frameCodec == null || !frameCodec.connection().local().canOpenStream()) {
if (!channel.isActive() || frameCodec == null ||
!((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) {
invalidate(this);
if (!retried) {
if (log.isDebugEnabled()) {
Expand All @@ -358,8 +360,8 @@ public void operationComplete(Future<Http2StreamChannel> future) {
sink.success(ops);
}

Http2Connection.Endpoint<Http2LocalFlowController> localEndpoint = frameCodec.connection().local();
if (log.isDebugEnabled()) {
Http2Connection.Endpoint<Http2LocalFlowController> localEndpoint = ((Http2FrameCodec) frameCodec.handler()).connection().local();
logStreamsState(ch, localEndpoint, "Stream opened");
}
}
Expand All @@ -372,8 +374,8 @@ public void operationComplete(Future<Http2StreamChannel> future) {

boolean isH2cUpgrade() {
Channel channel = pooledRef.poolable().channel();
if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null &&
channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) {
if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() != null &&
((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) {
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
sink.success(ops);
Expand All @@ -385,11 +387,9 @@ boolean isH2cUpgrade() {

boolean notHttp2() {
Channel channel = pooledRef.poolable().channel();
ChannelPipeline pipeline = channel.pipeline();
SslHandler handler = pipeline.get(SslHandler.class);
if (handler != null) {
String protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1;
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
String applicationProtocol = ((Http2Pool.Http2PooledRef) pooledRef).slot.applicationProtocol;
if (applicationProtocol != null) {
if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) {
// No information for the negotiated application-level protocol,
// or it is HTTP/1.1, continue as an HTTP/1.1 request
// and remove the connection from this pool.
Expand All @@ -400,15 +400,15 @@ boolean notHttp2() {
return true;
}
}
else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) {
else if (!ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) {
channel.attr(OWNER).set(null);
invalidate(this);
sink.error(new IOException("Unknown protocol [" + protocol + "]."));
sink.error(new IOException("Unknown protocol [" + applicationProtocol + "]."));
return true;
}
}
else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null &&
pipeline.get(NettyPipeline.H2MultiplexHandler) == null) {
else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == null &&
((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) {
// It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge,
// continue as an HTTP/1.1 request and remove the connection from this pool.
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
Expand Down

0 comments on commit f5e79b7

Please sign in to comment.