Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5439: InProcessTransport optionally adds causal information to status #6968

Merged
merged 6 commits into from May 6, 2020
28 changes: 25 additions & 3 deletions core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
Expand Up @@ -70,6 +70,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
private final String name;
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
private boolean transportIncludeStatusCause = false;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

private InProcessChannelBuilder(String name) {
super(new InProcessSocketAddress(name), "localhost");
Expand Down Expand Up @@ -157,11 +158,30 @@ public InProcessChannelBuilder maxInboundMetadataSize(int bytes) {
return this;
}

/**
* Sets whether to include the cause with the status that is propagated
* forward from the InProcessTransport. This was added to make debugging failing
* tests easier by showing the cause of the status.
*
* <p>By default, this is set to false.
* A default value of false maintains consistency with other transports which strip causal
* information from the status to avoid leaking information to untrusted clients, and
* to avoid sharing language-specific information with the client.
* For the in-process implementation, this is not a concern.
*
* @param enable whether to include cause in status
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
* @return this
*/
public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
this.transportIncludeStatusCause = enable;
return this;
}

@Override
@Internal
protected ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
name, scheduledExecutorService, maxInboundMetadataSize);
name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
}

/**
Expand All @@ -173,16 +193,18 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final boolean useSharedTimer;
private final int maxInboundMetadataSize;
private boolean closed;
private boolean includeCauseWithStatus;

private InProcessClientTransportFactory(
String name,
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize) {
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
this.name = name;
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.includeCauseWithStatus = includeCauseWithStatus;
}

@Override
Expand All @@ -194,7 +216,7 @@ public ConnectionClientTransport newClientTransport(
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes());
options.getEagAttributes(), includeCauseWithStatus);
}

@Override
Expand Down
24 changes: 16 additions & 8 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Expand Up @@ -83,6 +83,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final String userAgent;
private final Optional<ServerListener> optionalServerListener;
private int serverMaxInboundMetadataSize;
private final boolean includeCauseWithStatus;
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
private ScheduledExecutorService serverScheduler;
private ServerTransportListener serverTransportListener;
Expand Down Expand Up @@ -115,7 +116,8 @@ protected void handleNotInUse() {
};

private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
String userAgent, Attributes eagAttrs, Optional<ServerListener> optionalServerListener) {
String userAgent, Attributes eagAttrs,
Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
this.name = name;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
Expand All @@ -129,21 +131,23 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
this.includeCauseWithStatus = includeCauseWithStatus;
}

public InProcessTransport(
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs) {
Attributes eagAttrs, boolean includeCauseWithStatus) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent());
Optional.<ServerListener>absent(), includeCauseWithStatus);
}

InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener));
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.of(serverListener), false);
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
this.serverSchedulerPool = serverSchedulerPool;
this.serverStreamTracerFactories = serverStreamTracerFactories;
Expand Down Expand Up @@ -564,7 +568,7 @@ public void close(Status status, Metadata trailers) {

/** clientStream.serverClosed() must be called before this method */
private void notifyClientClose(Status status, Metadata trailers) {
Status clientStatus = stripCause(status);
Status clientStatus = stripCause(status, includeCauseWithStatus);
synchronized (this) {
if (closed) {
return;
Expand Down Expand Up @@ -744,7 +748,7 @@ public synchronized boolean isReady() {
// Must be thread-safe for shutdownNow()
@Override
public void cancel(Status reason) {
Status serverStatus = stripCause(reason);
Status serverStatus = stripCause(reason, includeCauseWithStatus);
if (!internalCancel(serverStatus, serverStatus)) {
return;
}
Expand Down Expand Up @@ -849,13 +853,17 @@ public void appendTimeoutInsight(InsightBuilder insight) {
* <p>This is, so that the InProcess transport behaves in the same way as the other transports,
* when exchanging statuses between client and server and vice versa.
*/
private static Status stripCause(Status status) {
private static Status stripCause(Status status, boolean includeCauseWithStatus) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name, the argument, and the javadoc do not match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure what you mean. Where is the discrepancy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he's referring to where it says "but stripped of any other information (i.e. cause)"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the discrepancy?

The method name says "strip cause"
The javadoc says "strip something, i.e. cause"
The new argument says "do not strip cause if true".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok gotcha. I'll update the javadoc and method name

if (status == null) {
return null;
}
return Status
Status clientStatus = Status
.fromCodeValue(status.getCode().value())
.withDescription(status.getDescription());
if (includeCauseWithStatus) {
clientStatus = clientStatus.withCause(status.getCause());
}
return clientStatus;
}

private static class SingleMessageProducer implements StreamListener.MessageProducer {
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -161,7 +161,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");

channelz.addServer(this);
}

Expand Down Expand Up @@ -759,9 +758,9 @@ void setListener(ServerStreamListener listener) {
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose() {
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
stream.close(Status.UNKNOWN, new Metadata());
stream.close(Status.UNKNOWN.withCause(t), new Metadata());
}

@Override
Expand All @@ -782,10 +781,10 @@ public void runInContext() {
try {
getListener().messagesAvailable(producer);
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
Expand Down Expand Up @@ -817,10 +816,10 @@ public void runInContext() {
try {
getListener().halfClosed();
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
Expand Down Expand Up @@ -891,10 +890,10 @@ public void runInContext() {
try {
getListener().onReady();
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
Expand Down
59 changes: 58 additions & 1 deletion core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
Expand Up @@ -16,14 +16,30 @@

package io.grpc.inprocess;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.common.collect.ImmutableList;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.stub.ClientCalls;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.TestMethodDescriptors;
import java.util.List;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -35,6 +51,9 @@ public class InProcessTransportTest extends AbstractTransportTest {
private static final String AUTHORITY = "a-testing-authority";
private static final String USER_AGENT = "a-testing-user-agent";

@Rule
public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();

@Override
protected List<? extends InternalServer> newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
Expand All @@ -59,7 +78,7 @@ protected String testAuthority(InternalServer server) {
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(
TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT,
eagAttrs());
eagAttrs(), false);
}

@Override
Expand All @@ -75,4 +94,42 @@ protected boolean sizesReported() {
public void socketStats() throws Exception {
// test does not apply to in-process
}

@Test
public void causeShouldBePropagatedWithStatus() throws Exception {
server = null;
String failingServerName = "server_foo";
String serviceFoo = "service_foo";
final Status s = Status.INTERNAL.withCause(new Throwable("failing server exception"));
ServerServiceDefinition definition = ServerServiceDefinition.builder(serviceFoo)
.addMethod(TestMethodDescriptors.voidMethod(), new ServerCallHandler<Void, Void>() {
@Override
public ServerCall.Listener<Void> startCall(
ServerCall<Void, Void> call, Metadata headers) {
call.close(s, new Metadata());
return new ServerCall.Listener<Void>() {};
}
})
.build();
Server failingServer = InProcessServerBuilder
.forName(failingServerName)
.addService(definition)
.directExecutor()
.build()
.start();
grpcCleanupRule.register(failingServer);
ManagedChannel channel = InProcessChannelBuilder
.forName(failingServerName)
.propagateCauseWithStatus(true)
.build();
grpcCleanupRule.register(channel);
try {
ClientCalls.blockingUnaryCall(channel, TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT, null);
fail("exception should have been thrown");
} catch (StatusRuntimeException e) {
// When propagateCauseWithStatus is true, the cause should be sent forward
assertEquals(s.getCause(), e.getCause());
}
}
}
32 changes: 15 additions & 17 deletions core/src/test/java/io/grpc/internal/AbstractTransportTest.java
Expand Up @@ -157,7 +157,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
* {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following
* tests in an indeterminate state.
*/
private InternalServer server;
protected InternalServer server;
private ServerTransport serverTransport;
private ManagedClientTransport client;
private MethodDescriptor<String, String> methodDescriptor =
Expand Down Expand Up @@ -1058,9 +1058,7 @@ public void earlyServerClose_withServerHeaders() throws Exception {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
Expand Down Expand Up @@ -1097,10 +1095,7 @@ public void earlyServerClose_noServerHeaders() throws Exception {
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hellogoodbye", clientStreamStatus.getDescription());
// Cause should not be transmitted to the client.
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
Expand Down Expand Up @@ -1138,9 +1133,7 @@ public void earlyServerClose_serverFailure() throws Exception {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down Expand Up @@ -1188,9 +1181,7 @@ public void closed(
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down Expand Up @@ -1219,9 +1210,6 @@ public void clientCancel() throws Exception {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotEquals(Status.Code.OK, serverStatus.getCode());
// Cause should not be transmitted between client and server
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
assertNull(serverStatus.getCause());

clientStream.cancel(status);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertNull(clientStreamTracer1.getInboundTrailers());
Expand Down Expand Up @@ -2072,6 +2060,16 @@ private static void assertStatusEquals(Status expected, Status actual) {
}
}

/**
* Verifies that the client status is as expected. By default, the code and description should
* be present, and the cause should be stripped away.
*/
private void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
}

private static boolean waitForFuture(Future<?> future, long timeout, TimeUnit unit)
throws InterruptedException {
try {
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Expand Up @@ -1440,8 +1440,9 @@ private void verifyExecutorsReturned() {

private void ensureServerStateNotLeaked() {
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
assertEquals(Status.UNKNOWN, statusCaptor.getValue());
assertNull(statusCaptor.getValue().getCause());
assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode());
// Used in InProcessTransport when set to include the cause with the status
assertNotNull(statusCaptor.getValue().getCause());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}

Expand Down