From 16b6145064fdb0da8428ea11b7df1be45318f77b Mon Sep 17 00:00:00 2001 From: Reginald McDonald <40721169+reggiemcdonald@users.noreply.github.com> Date: Wed, 6 May 2020 12:08:15 -0600 Subject: [PATCH] inprocess,core: add ability to pass status cause to client Closes #5439 --- .../inprocess/InProcessChannelBuilder.java | 28 ++++++++- .../io/grpc/inprocess/InProcessTransport.java | 34 +++++++---- .../java/io/grpc/internal/ServerImpl.java | 17 +++--- .../inprocess/InProcessTransportTest.java | 59 ++++++++++++++++++- .../grpc/internal/AbstractTransportTest.java | 31 +++++----- .../java/io/grpc/internal/ServerImplTest.java | 5 +- 6 files changed, 132 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 792257758b0..ba666115117 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -70,6 +70,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) { private final String name; private ScheduledExecutorService scheduledExecutorService; private int maxInboundMetadataSize = Integer.MAX_VALUE; + private boolean transportIncludeStatusCause = false; private InProcessChannelBuilder(String name) { super(new InProcessSocketAddress(name), "localhost"); @@ -157,11 +158,30 @@ public InProcessChannelBuilder maxInboundMetadataSize(int bytes) { return this; } + /** + * Sets whether to include the cause with the status that is propagated + * forward from the InProcessTransport. This was added to make debugging failing + * tests easier by showing the cause of the status. + * + *

By default, this is set to false. + * A default value of false maintains consistency with other transports which strip causal + * information from the status to avoid leaking information to untrusted clients, and + * to avoid sharing language-specific information with the client. + * For the in-process implementation, this is not a concern. + * + * @param enable whether to include cause in status + * @return this + */ + public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) { + this.transportIncludeStatusCause = enable; + return this; + } + @Override @Internal protected ClientTransportFactory buildTransportFactory() { return new InProcessClientTransportFactory( - name, scheduledExecutorService, maxInboundMetadataSize); + name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); } /** @@ -173,16 +193,18 @@ static final class InProcessClientTransportFactory implements ClientTransportFac private final boolean useSharedTimer; private final int maxInboundMetadataSize; private boolean closed; + private boolean includeCauseWithStatus; private InProcessClientTransportFactory( String name, @Nullable ScheduledExecutorService scheduledExecutorService, - int maxInboundMetadataSize) { + int maxInboundMetadataSize, boolean includeCauseWithStatus) { this.name = name; useSharedTimer = scheduledExecutorService == null; timerService = useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService; this.maxInboundMetadataSize = maxInboundMetadataSize; + this.includeCauseWithStatus = includeCauseWithStatus; } @Override @@ -194,7 +216,7 @@ public ConnectionClientTransport newClientTransport( // TODO(carl-mastrangelo): Pass channelLogger in. return new InProcessTransport( name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), - options.getEagAttributes()); + options.getEagAttributes(), includeCauseWithStatus); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 448d6913066..3461eeebc0f 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -83,6 +83,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private final String userAgent; private final Optional optionalServerListener; private int serverMaxInboundMetadataSize; + private final boolean includeCauseWithStatus; private ObjectPool serverSchedulerPool; private ScheduledExecutorService serverScheduler; private ServerTransportListener serverTransportListener; @@ -115,7 +116,8 @@ protected void handleNotInUse() { }; private InProcessTransport(String name, int maxInboundMetadataSize, String authority, - String userAgent, Attributes eagAttrs, Optional optionalServerListener) { + String userAgent, Attributes eagAttrs, + Optional optionalServerListener, boolean includeCauseWithStatus) { this.name = name; this.clientMaxInboundMetadataSize = maxInboundMetadataSize; this.authority = authority; @@ -129,13 +131,14 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho .build(); this.optionalServerListener = optionalServerListener; logId = InternalLogId.allocate(getClass(), name); + this.includeCauseWithStatus = includeCauseWithStatus; } public InProcessTransport( String name, int maxInboundMetadataSize, String authority, String userAgent, - Attributes eagAttrs) { + Attributes eagAttrs, boolean includeCauseWithStatus) { this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, - Optional.absent()); + Optional.absent(), includeCauseWithStatus); } InProcessTransport( @@ -143,7 +146,8 @@ public InProcessTransport( Attributes eagAttrs, ObjectPool serverSchedulerPool, List serverStreamTracerFactories, ServerListener serverListener) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener)); + this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + Optional.of(serverListener), false); this.serverMaxInboundMetadataSize = maxInboundMetadataSize; this.serverSchedulerPool = serverSchedulerPool; this.serverStreamTracerFactories = serverStreamTracerFactories; @@ -564,7 +568,7 @@ public void close(Status status, Metadata trailers) { /** clientStream.serverClosed() must be called before this method */ private void notifyClientClose(Status status, Metadata trailers) { - Status clientStatus = stripCause(status); + Status clientStatus = cleanStatus(status, includeCauseWithStatus); synchronized (this) { if (closed) { return; @@ -744,7 +748,7 @@ public synchronized boolean isReady() { // Must be thread-safe for shutdownNow() @Override public void cancel(Status reason) { - Status serverStatus = stripCause(reason); + Status serverStatus = cleanStatus(reason, includeCauseWithStatus); if (!internalCancel(serverStatus, serverStatus)) { return; } @@ -843,19 +847,25 @@ public void appendTimeoutInsight(InsightBuilder insight) { } /** - * Returns a new status with the same code and description, but stripped of any other information - * (i.e. cause). + * Returns a new status with the same code and description. + * If includeCauseWithStatus is true, cause is also included. * - *

This is, so that the InProcess transport behaves in the same way as the other transports, - * when exchanging statuses between client and server and vice versa. + *

For InProcess transport to behave in the same way as the other transports, + * when exchanging statuses between client and server and vice versa, + * the cause should be excluded from the status. + * For easier debugging, the status may be optionally included. */ - private static Status stripCause(Status status) { + private static Status cleanStatus(Status status, boolean includeCauseWithStatus) { if (status == null) { return null; } - return Status + Status clientStatus = Status .fromCodeValue(status.getCode().value()) .withDescription(status.getDescription()); + if (includeCauseWithStatus) { + clientStatus = clientStatus.withCause(status.getCause()); + } + return clientStatus; } private static class SingleMessageProducer implements StreamListener.MessageProducer { diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 5db33a226b2..05c16e45bfc 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -161,7 +161,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume this.channelz = builder.channelz; this.serverCallTracer = builder.callTracerFactory.create(); this.ticker = checkNotNull(builder.ticker, "ticker"); - channelz.addServer(this); } @@ -762,9 +761,9 @@ void setListener(ServerStreamListener listener) { /** * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ - private void internalClose() { + private void internalClose(Throwable t) { // TODO(ejona86): this is not thread-safe :) - stream.close(Status.UNKNOWN, new Metadata()); + stream.close(Status.UNKNOWN.withCause(t), new Metadata()); } @Override @@ -785,10 +784,10 @@ public void runInContext() { try { getListener().messagesAvailable(producer); } catch (RuntimeException e) { - internalClose(); + internalClose(e); throw e; } catch (Error e) { - internalClose(); + internalClose(e); throw e; } finally { PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag); @@ -820,10 +819,10 @@ public void runInContext() { try { getListener().halfClosed(); } catch (RuntimeException e) { - internalClose(); + internalClose(e); throw e; } catch (Error e) { - internalClose(); + internalClose(e); throw e; } finally { PerfMark.stopTask("ServerCallListener(app).halfClosed", tag); @@ -894,10 +893,10 @@ public void runInContext() { try { getListener().onReady(); } catch (RuntimeException e) { - internalClose(); + internalClose(e); throw e; } catch (Error e) { - internalClose(); + internalClose(e); throw e; } finally { PerfMark.stopTask("ServerCallListener(app).onReady", tag); diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 3bd46cd384d..f7e325ad5a9 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -16,14 +16,30 @@ package io.grpc.inprocess; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import com.google.common.collect.ImmutableList; +import io.grpc.CallOptions; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.internal.AbstractTransportTest; import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; +import io.grpc.stub.ClientCalls; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.TestMethodDescriptors; import java.util.List; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -35,6 +51,9 @@ public class InProcessTransportTest extends AbstractTransportTest { private static final String AUTHORITY = "a-testing-authority"; private static final String USER_AGENT = "a-testing-user-agent"; + @Rule + public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + @Override protected List newServer( List streamTracerFactories) { @@ -59,7 +78,7 @@ protected String testAuthority(InternalServer server) { protected ManagedClientTransport newClientTransport(InternalServer server) { return new InProcessTransport( TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT, - eagAttrs()); + eagAttrs(), false); } @Override @@ -75,4 +94,42 @@ protected boolean sizesReported() { public void socketStats() throws Exception { // test does not apply to in-process } + + @Test + public void causeShouldBePropagatedWithStatus() throws Exception { + server = null; + String failingServerName = "server_foo"; + String serviceFoo = "service_foo"; + final Status s = Status.INTERNAL.withCause(new Throwable("failing server exception")); + ServerServiceDefinition definition = ServerServiceDefinition.builder(serviceFoo) + .addMethod(TestMethodDescriptors.voidMethod(), new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + call.close(s, new Metadata()); + return new ServerCall.Listener() {}; + } + }) + .build(); + Server failingServer = InProcessServerBuilder + .forName(failingServerName) + .addService(definition) + .directExecutor() + .build() + .start(); + grpcCleanupRule.register(failingServer); + ManagedChannel channel = InProcessChannelBuilder + .forName(failingServerName) + .propagateCauseWithStatus(true) + .build(); + grpcCleanupRule.register(channel); + try { + ClientCalls.blockingUnaryCall(channel, TestMethodDescriptors.voidMethod(), + CallOptions.DEFAULT, null); + fail("exception should have been thrown"); + } catch (StatusRuntimeException e) { + // When propagateCauseWithStatus is true, the cause should be sent forward + assertEquals(s.getCause(), e.getCause()); + } + } } diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java index adbefbedef4..43f6273cdeb 100644 --- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java @@ -157,7 +157,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {} * {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following * tests in an indeterminate state. */ - private InternalServer server; + protected InternalServer server; private ServerTransport serverTransport; private ManagedClientTransport client; private MethodDescriptor methodDescriptor = @@ -1058,9 +1058,7 @@ public void earlyServerClose_withServerHeaders() throws Exception { Metadata clientStreamTrailers = clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); assertNotNull(clientStreamTrailers); - assertEquals(status.getCode(), clientStreamStatus.getCode()); - assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription()); - assertNull(clientStreamStatus.getCause()); + checkClientStatus(status, clientStreamStatus); assertTrue(clientStreamTracer1.getOutboundHeaders()); assertTrue(clientStreamTracer1.getInboundHeaders()); assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers()); @@ -1097,10 +1095,7 @@ public void earlyServerClose_noServerHeaders() throws Exception { Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); Metadata clientStreamTrailers = clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); - assertEquals(status.getCode(), clientStreamStatus.getCode()); - assertEquals("Hellogoodbye", clientStreamStatus.getDescription()); - // Cause should not be transmitted to the client. - assertNull(clientStreamStatus.getCause()); + checkClientStatus(status, clientStreamStatus); assertEquals( Lists.newArrayList(trailers.getAll(asciiKey)), Lists.newArrayList(clientStreamTrailers.getAll(asciiKey))); @@ -1138,9 +1133,7 @@ public void earlyServerClose_serverFailure() throws Exception { Metadata clientStreamTrailers = clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); assertNotNull(clientStreamTrailers); - assertEquals(status.getCode(), clientStreamStatus.getCode()); - assertEquals(status.getDescription(), clientStreamStatus.getDescription()); - assertNull(clientStreamStatus.getCause()); + checkClientStatus(status, clientStreamStatus); assertTrue(clientStreamTracer1.getOutboundHeaders()); assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers()); assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); @@ -1188,9 +1181,7 @@ public void closed( Metadata clientStreamTrailers = clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); assertNotNull(clientStreamTrailers); - assertEquals(status.getCode(), clientStreamStatus.getCode()); - assertEquals(status.getDescription(), clientStreamStatus.getDescription()); - assertNull(clientStreamStatus.getCause()); + checkClientStatus(status, clientStreamStatus); assertTrue(clientStreamTracer1.getOutboundHeaders()); assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers()); assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); @@ -1219,7 +1210,7 @@ public void clientCancel() throws Exception { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); assertNotEquals(Status.Code.OK, serverStatus.getCode()); - // Cause should not be transmitted between client and server + // Cause should not be transmitted between client and server by default assertNull(serverStatus.getCause()); clientStream.cancel(status); @@ -2072,6 +2063,16 @@ private static void assertStatusEquals(Status expected, Status actual) { } } + /** + * Verifies that the client status is as expected. By default, the code and description should + * be present, and the cause should be stripped away. + */ + private void checkClientStatus(Status expectedStatus, Status clientStreamStatus) { + assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode()); + assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription()); + assertNull(clientStreamStatus.getCause()); + } + private static boolean waitForFuture(Future future, long timeout, TimeUnit unit) throws InterruptedException { try { diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index d76e714523b..b32833f3439 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -1441,8 +1441,9 @@ private void verifyExecutorsReturned() { private void ensureServerStateNotLeaked() { verify(stream).close(statusCaptor.capture(), metadataCaptor.capture()); - assertEquals(Status.UNKNOWN, statusCaptor.getValue()); - assertNull(statusCaptor.getValue().getCause()); + assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode()); + // Used in InProcessTransport when set to include the cause with the status + assertNotNull(statusCaptor.getValue().getCause()); assertTrue(metadataCaptor.getValue().keys().isEmpty()); }