From 492f610e932964a4370577e634ea90a948fd3445 Mon Sep 17 00:00:00 2001 From: hypnoce Date: Wed, 3 May 2023 07:43:13 +0200 Subject: [PATCH] servlet: force always sending end_stream in trailer frame (Fixes #10124) Signed-off-by: hypnoce --- .../ForceTrailersServerInterceptor.java | 124 ++++++++++++++++++ .../io/grpc/servlet/ServletServerBuilder.java | 27 ++++ .../io/grpc/servlet/TomcatInteropTest.java | 28 +--- 3 files changed, 154 insertions(+), 25 deletions(-) create mode 100644 servlet/src/main/java/io/grpc/servlet/ForceTrailersServerInterceptor.java diff --git a/servlet/src/main/java/io/grpc/servlet/ForceTrailersServerInterceptor.java b/servlet/src/main/java/io/grpc/servlet/ForceTrailersServerInterceptor.java new file mode 100644 index 000000000000..7314c575368b --- /dev/null +++ b/servlet/src/main/java/io/grpc/servlet/ForceTrailersServerInterceptor.java @@ -0,0 +1,124 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.servlet; + +import io.grpc.ForwardingServerCall; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; + +class ForceTrailersServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + ForceTrailersServerCall interceptedCall = new ForceTrailersServerCall<>(call); + try { + return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(interceptedCall, headers)) { + @Override + public void onMessage(ReqT message) { + try { + super.onMessage(message); + } catch (Throwable t) { + closeWithThrowable(t); + throw t; + } + } + + @Override + public void onHalfClose() { + try { + super.onHalfClose(); + } catch (Throwable t) { + closeWithThrowable(t); + throw t; + } + } + + @Override + public void onCancel() { + try { + super.onCancel(); + } catch (Throwable t) { + closeWithThrowable(t); + throw t; + } + } + + @Override + public void onComplete() { + try { + super.onComplete(); + } catch (Throwable t) { + closeWithThrowable(t); + throw t; + } + } + + @Override + public void onReady() { + try { + super.onReady(); + } catch (Throwable t) { + closeWithThrowable(t); + throw t; + } + } + + private void closeWithThrowable(Throwable t) { + Metadata metadata = Status.trailersFromThrowable(t); + if (metadata == null) { + metadata = new Metadata(); + } + interceptedCall.close(Status.fromThrowable(t), metadata); + } + }; + } catch (RuntimeException e) { + if (!interceptedCall.headersSent) { + call.sendHeaders(new Metadata()); + } + throw e; + } + } + + static class ForceTrailersServerCall extends ForwardingServerCall.SimpleForwardingServerCall { + + boolean headersSent = false; + + ForceTrailersServerCall(ServerCall delegate) { + super(delegate); + } + + @Override + public void sendHeaders(Metadata headers) { + super.sendHeaders(headers); + headersSent = true; + } + + @Override + public void close(Status status, Metadata trailers) { + if (!headersSent) { + sendHeaders(new Metadata()); + } + super.close(status, trailers); + } + } +} diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java index 3e852ea3c090..4163cdd19510 100644 --- a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java @@ -33,6 +33,7 @@ import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.ServerInterceptor; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.GrpcUtil; @@ -46,6 +47,7 @@ import java.io.File; import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -72,6 +74,8 @@ public final class ServletServerBuilder extends ForwardingServerBuilder interceptors = new ArrayList<>(); + private boolean forceTrailers; public ServletServerBuilder() { serverImplBuilder = new ServerImplBuilder(this::buildTransportServers); @@ -102,6 +106,10 @@ public ServletAdapter buildServletAdapter() { } private ServerTransportListener buildAndStart() { + interceptors.forEach(serverImplBuilder::intercept); + if (forceTrailers) { + serverImplBuilder.intercept(new ForceTrailersServerInterceptor()); + } Server server; try { internalCaller = true; @@ -176,6 +184,25 @@ public ServletServerBuilder maxInboundMessageSize(int bytes) { return this; } + /** + * Some servlet containers don't support sending trailers only (Tomcat). + * They send an empty data frame with an end_stream flag. + * This is not supported by gRPC as is expects end_stream flag in trailer or trailer-only frame + * To avoid this empty data frame, force the servlet container to either + * - send a header frame, an empty data frame and a trailer frame with end_stream (Tomcat) + * - send a header frame and a trailer frame with end_stream (Jetty, Undertow) + */ + public ServletServerBuilder forceTrailers(boolean forceTrailers) { + this.forceTrailers = forceTrailers; + return this; + } + + @Override + public ServletServerBuilder intercept(ServerInterceptor interceptor) { + interceptors.add(checkNotNull(interceptor, "interceptor")); + return this; + } + /** * Provides a custom scheduled executor service to the server builder. * diff --git a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java index 1422b5388fd7..786a3a10fd27 100644 --- a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java +++ b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java @@ -60,7 +60,9 @@ public static void cleanUp() throws Exception { @Override protected ServerBuilder getServerBuilder() { - return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + return new ServletServerBuilder() + .forceTrailers(true) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); } @Override @@ -112,28 +114,4 @@ protected boolean metricsExpected() { @Ignore("Tomcat is broken on client GOAWAY") @Test public void gracefulShutdown() {} - - // FIXME - @Override - @Ignore("Tomcat is not able to send trailer only") - @Test - public void specialStatusMessage() {} - - // FIXME - @Override - @Ignore("Tomcat is not able to send trailer only") - @Test - public void unimplementedMethod() {} - - // FIXME - @Override - @Ignore("Tomcat is not able to send trailer only") - @Test - public void statusCodeAndMessage() {} - - // FIXME - @Override - @Ignore("Tomcat is not able to send trailer only") - @Test - public void emptyStream() {} }