diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperationsHandler.java b/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperationsHandler.java index 2d7bd2165d..113cb2bfda 100755 --- a/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperationsHandler.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/channel/ChannelOperationsHandler.java @@ -21,6 +21,7 @@ import io.netty5.channel.ChannelHandlerContext; import io.netty5.handler.codec.DecoderResult; import io.netty5.handler.codec.DecoderResultProvider; +import io.netty5.handler.ssl.SslCloseCompletionEvent; import reactor.netty5.Connection; import reactor.netty5.ConnectionObserver; import reactor.netty5.NettyOutbound; @@ -78,6 +79,23 @@ final public void channelInactive(ChannelHandlerContext ctx) { } } + @Override + public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof SslCloseCompletionEvent sslCloseCompletionEvent) { + // When a close_notify is received, the SSLHandler fires an SslCloseCompletionEvent.SUCCESS event, + // so if the event is success and if the channel is still active (not closing for example), + // then immediately close the channel. + // see https://www.rfc-editor.org/rfc/rfc5246#section-7.2.1, which states that when receiving a close_notify, + // then the connection must be closed down immediately. + if (sslCloseCompletionEvent.isSuccess() && ctx.channel().isActive()) { + if (log.isDebugEnabled()) { + log.debug(format(ctx.channel(), "Received a TLS close_notify, closing the channel now.")); + } + ctx.close(); + } + } + } + @Override final public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg == null) { diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java index 4fad5d848c..eef396afbc 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/server/HttpServerTests.java @@ -74,10 +74,13 @@ import io.netty5.handler.codec.http.HttpServerCodec; import io.netty5.handler.codec.http.HttpVersion; import io.netty5.handler.codec.http.HttpUtil; +import io.netty5.handler.codec.http.LastHttpContent; import io.netty5.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty5.handler.ssl.SniCompletionEvent; +import io.netty5.handler.ssl.SslCloseCompletionEvent; import io.netty5.handler.ssl.SslContext; import io.netty5.handler.ssl.SslContextBuilder; +import io.netty5.handler.ssl.SslHandler; import io.netty5.handler.ssl.util.InsecureTrustManagerFactory; import io.netty5.handler.ssl.util.SelfSignedCertificate; import io.netty5.util.concurrent.SingleThreadEventExecutor; @@ -137,6 +140,56 @@ class HttpServerTests extends BaseHttpTest { ChannelGroup group; + /** + * Server Handler used to send a TLS close_notify after the server last response has been flushed. + * The close_notify is sent without closing the connection. + */ + final static class SendCloseNotifyAfterLastResponseHandler extends ChannelHandlerAdapter { + final static String NAME = "handler.send_close_notify_after_response"; + final CountDownLatch latch; + + SendCloseNotifyAfterLastResponseHandler(CountDownLatch latch) { + this.latch = latch; + } + + static void register(Connection cnx, CountDownLatch latch) { + SendCloseNotifyAfterLastResponseHandler handler = new SendCloseNotifyAfterLastResponseHandler(latch); + cnx.channel().pipeline().addBefore(NettyPipeline.HttpTrafficHandler, NAME, handler); + } + + @Override + public io.netty5.util.concurrent.Future write(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof LastHttpContent) { + SslHandler sslHandler = ctx.channel().pipeline().get(SslHandler.class); + Objects.requireNonNull(sslHandler, "sslHandler not found from pipeline"); + // closeOutbound sends a close_notify but don't close the connection. + return ctx.write(msg).addListener(future -> sslHandler.closeOutbound().addListener(f -> latch.countDown())); + } + return ctx.write(msg); + } + } + + /** + * Handler used by secured servers which don't want to close client connection when receiving a client close_notify ack. + * The handler is placed just before the ReactiveBridge (ChannelOperationsHandler), and will block + * any received SslCloseCompletionEvent events. Hence, ChannelOperationsHandler won't get the close_notify ack, + * and won't close the channel. + */ + final static class IgnoreCloseNotifyHandler extends ChannelHandlerAdapter { + final static String NAME = "handler.ignore_close_notify"; + + static void register(Connection cnx) { + cnx.channel().pipeline().addBefore(NettyPipeline.ReactiveBridge, NAME, new IgnoreCloseNotifyHandler()); + } + + @Override + public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) { + if (!(evt instanceof SslCloseCompletionEvent) || !((SslCloseCompletionEvent) evt).isSuccess()) { + ctx.fireChannelInboundEvent(evt); + } + } + } + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); @@ -2473,4 +2526,181 @@ private void doTestIssue1978( assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } + + /** + * The test simulates a situation where a connection is idle and available in the client connection pool, + * and then the client receives a close_notify, but the server has not yet closed the connection. + * In this case, the connection should be closed immediately and removed from the pool, in order to avoid + * any "SslClosedEngineException: SSLEngine closed already exception" the next time the connection will be + * acquired and written. + * So, in the test, a secured server responds to a first client request, and when the response is flushed, it sends a + * close_notify to the client without closing the connection. + * The first client should get its response OK, but when receiving the close_notify, it should immediately + * close the connection, which should not re-enter into the pool again. + * The next client request will work because the previous connection should have been closed. + */ + @Test + void test2498_close_notify_after_response_two_clients() throws Exception { + SslContext serverCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .build(); + + // Ensure that the server has sent the close_notify, and the client connection is closed after the 1st response. + CountDownLatch latch = new CountDownLatch(2); + + disposableServer = createServer() + .secure(spec -> spec.sslContext(serverCtx)) + .doOnConnection(cnx -> { + // will send a close_notify after the last response is sent, but won't close the connection + SendCloseNotifyAfterLastResponseHandler.register(cnx, latch); + // avoid closing the connection when the server receives the close_notify ack from the client + IgnoreCloseNotifyHandler.register(cnx); + }) + .handle((req, res) -> { + return res.sendString(Mono.just("test")); + }) + .bindNow(); + + // create the client + SslContext clientCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + + HttpClient client = createClient(disposableServer::address) + .secure(spec -> spec.sslContext(clientCtx)); + + // send a first request + String resp = client + .doOnConnected(cnx -> cnx.channel().closeFuture().addListener(l -> latch.countDown())) + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .block(Duration.ofSeconds(30)); + + assertThat(resp).isEqualTo("test"); + + // double check if close_notify was sent by server and if the client channel has been closed + assertThat(latch.await(40, TimeUnit.SECONDS)).as("latch await").isTrue(); + + // send a new request, which should succeed because at reception of previous close_notify we should have + // immediately closed the connection, else, if the connection is reused here, we would then get a + // "SslClosedEngineException: SSLEngine closed already" exception + + String resp2 = client + .get() + .uri("/") + .responseContent() + .aggregate() + .asString() + .block(Duration.ofSeconds(30)); + assertThat(resp2).isEqualTo("test"); + } + + /** + * The test simulates a situation where the client has fully written its request to a connection, + * but while waiting for the response, then a close_notify is received, and the server has not + * closed the connection. + * + * The client should then be aborted with a PrematureCloseException. + */ + @Test + void test2498_close_notify_on_request() throws Exception { + SslContext serverCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + + disposableServer = createServer() + .secure(spec -> spec.sslContext(serverCtx)) + // avoid closing the connection when the server receives the close_notify ack from the client + .doOnConnection(IgnoreCloseNotifyHandler::register) + .handle((req, res) -> { + req.receive() + .aggregate() + .subscribe(request -> req.withConnection(c -> { + SslHandler sslHandler = c.channel().pipeline().get(SslHandler.class); + Objects.requireNonNull(sslHandler, "sslHandler not found from pipeline"); + // send a close_notify but do not close the connection + sslHandler.closeOutbound().addListener(future -> latch.countDown()); + })); + return Mono.never(); + }) + .bindNow(); + + // create the client, which should be aborted since the server responds with a close_notify + Flux postFlux = Flux.just("content1", "content2", "content3", "content4") + .delayElements(Duration.ofMillis(10)); + + SslContext clientCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + + createClient(disposableServer::address) + .secure(spec -> spec.sslContext(clientCtx)) + .post() + .send(BufferFlux.fromString(postFlux)) + .uri("/") + .responseContent() + .aggregate() + .as(StepVerifier::create) + .expectErrorMatches(t -> t instanceof PrematureCloseException || t instanceof AbortedException) + .verify(Duration.ofSeconds(40)); + + // double check if the server has sent its close_notify + assertThat(latch.await(40, TimeUnit.SECONDS)).as("latch await").isTrue(); + } + + /** + * The test simulates a situation where the client is receiving a close_notify while + * writing request body parts to the connection. + * The server immediately sends a close_notify when the client is connecting. + * the client request is not consumed and the client connection is not closed. + * While writing the request body parts, the client should be aborted with a + * PrematureCloseException or an AbortedException, but not with an + * "SslClosedEngineException: SSLEngine closed already" exception. + */ + @Test + void test2498_close_notify_on_connect() throws Exception { + SslContext serverCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + + disposableServer = createServer() + .secure(spec -> spec.sslContext(serverCtx)) + .doOnConnection(cnx -> { + // avoid closing the client connection when receiving the close_notify ack from client + IgnoreCloseNotifyHandler.register(cnx); + // sends a close_notify immediately, without closing the connection + SslHandler sslHandler = cnx.channel().pipeline().get(SslHandler.class); + Objects.requireNonNull(sslHandler, "sslHandler not found from pipeline"); + sslHandler.closeOutbound().addListener(future -> latch.countDown()); + }) + .handle((req, res) -> Mono.never()) + .bindNow(); + + // create the client, which should be aborted since the server responds with a close_notify + Flux postFlux = Flux.range(0, 100) + .map(count -> "content" + count) + .delayElements(Duration.ofMillis(100)); + + SslContext clientCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + + createClient(disposableServer::address) + .secure(spec -> spec.sslContext(clientCtx)) + .post() + .send(BufferFlux.fromString(postFlux)) + .uri("/") + .responseContent() + .aggregate() + .as(StepVerifier::create) + .expectErrorMatches(t -> t instanceof PrematureCloseException || t instanceof AbortedException) + .verify(Duration.ofSeconds(40)); + + // double check if the server has sent its close_notify + assertThat(latch.await(40, TimeUnit.SECONDS)).as("latch await").isTrue(); + } }