Skip to content

Commit

Permalink
inprocess,core: add ability to pass status cause to client
Browse files Browse the repository at this point in the history
Closes #5439
  • Loading branch information
reggiemcdonald committed May 6, 2020
1 parent a9250c1 commit 16b6145
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 42 deletions.
28 changes: 25 additions & 3 deletions core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
Expand Up @@ -70,6 +70,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
private final String name;
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
private boolean transportIncludeStatusCause = false;

private InProcessChannelBuilder(String name) {
super(new InProcessSocketAddress(name), "localhost");
Expand Down Expand Up @@ -157,11 +158,30 @@ public InProcessChannelBuilder maxInboundMetadataSize(int bytes) {
return this;
}

/**
* Sets whether to include the cause with the status that is propagated
* forward from the InProcessTransport. This was added to make debugging failing
* tests easier by showing the cause of the status.
*
* <p>By default, this is set to false.
* A default value of false maintains consistency with other transports which strip causal
* information from the status to avoid leaking information to untrusted clients, and
* to avoid sharing language-specific information with the client.
* For the in-process implementation, this is not a concern.
*
* @param enable whether to include cause in status
* @return this
*/
public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
this.transportIncludeStatusCause = enable;
return this;
}

@Override
@Internal
protected ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
name, scheduledExecutorService, maxInboundMetadataSize);
name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
}

/**
Expand All @@ -173,16 +193,18 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final boolean useSharedTimer;
private final int maxInboundMetadataSize;
private boolean closed;
private boolean includeCauseWithStatus;

private InProcessClientTransportFactory(
String name,
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize) {
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
this.name = name;
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.includeCauseWithStatus = includeCauseWithStatus;
}

@Override
Expand All @@ -194,7 +216,7 @@ public ConnectionClientTransport newClientTransport(
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes());
options.getEagAttributes(), includeCauseWithStatus);
}

@Override
Expand Down
34 changes: 22 additions & 12 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Expand Up @@ -83,6 +83,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final String userAgent;
private final Optional<ServerListener> optionalServerListener;
private int serverMaxInboundMetadataSize;
private final boolean includeCauseWithStatus;
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
private ScheduledExecutorService serverScheduler;
private ServerTransportListener serverTransportListener;
Expand Down Expand Up @@ -115,7 +116,8 @@ protected void handleNotInUse() {
};

private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
String userAgent, Attributes eagAttrs, Optional<ServerListener> optionalServerListener) {
String userAgent, Attributes eagAttrs,
Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
this.name = name;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
Expand All @@ -129,21 +131,23 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
this.includeCauseWithStatus = includeCauseWithStatus;
}

public InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs) {
Attributes eagAttrs, boolean includeCauseWithStatus) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent());
Optional.<ServerListener>absent(), includeCauseWithStatus);
}

InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener));
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.of(serverListener), false);
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
this.serverSchedulerPool = serverSchedulerPool;
this.serverStreamTracerFactories = serverStreamTracerFactories;
Expand Down Expand Up @@ -564,7 +568,7 @@ public void close(Status status, Metadata trailers) {

/** clientStream.serverClosed() must be called before this method */
private void notifyClientClose(Status status, Metadata trailers) {
Status clientStatus = stripCause(status);
Status clientStatus = cleanStatus(status, includeCauseWithStatus);
synchronized (this) {
if (closed) {
return;
Expand Down Expand Up @@ -744,7 +748,7 @@ public synchronized boolean isReady() {
// Must be thread-safe for shutdownNow()
@Override
public void cancel(Status reason) {
Status serverStatus = stripCause(reason);
Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
if (!internalCancel(serverStatus, serverStatus)) {
return;
}
Expand Down Expand Up @@ -843,19 +847,25 @@ public void appendTimeoutInsight(InsightBuilder insight) {
}

/**
* Returns a new status with the same code and description, but stripped of any other information
* (i.e. cause).
* Returns a new status with the same code and description.
* If includeCauseWithStatus is true, cause is also included.
*
* <p>This is, so that the InProcess transport behaves in the same way as the other transports,
* when exchanging statuses between client and server and vice versa.
* <p>For InProcess transport to behave in the same way as the other transports,
* when exchanging statuses between client and server and vice versa,
* the cause should be excluded from the status.
* For easier debugging, the status may be optionally included.
*/
private static Status stripCause(Status status) {
private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
if (status == null) {
return null;
}
return Status
Status clientStatus = Status
.fromCodeValue(status.getCode().value())
.withDescription(status.getDescription());
if (includeCauseWithStatus) {
clientStatus = clientStatus.withCause(status.getCause());
}
return clientStatus;
}

private static class SingleMessageProducer implements StreamListener.MessageProducer {
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -161,7 +161,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");

channelz.addServer(this);
}

Expand Down Expand Up @@ -762,9 +761,9 @@ void setListener(ServerStreamListener listener) {
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose() {
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
stream.close(Status.UNKNOWN, new Metadata());
stream.close(Status.UNKNOWN.withCause(t), new Metadata());
}

@Override
Expand All @@ -785,10 +784,10 @@ public void runInContext() {
try {
getListener().messagesAvailable(producer);
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
Expand Down Expand Up @@ -820,10 +819,10 @@ public void runInContext() {
try {
getListener().halfClosed();
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
Expand Down Expand Up @@ -894,10 +893,10 @@ public void runInContext() {
try {
getListener().onReady();
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
Expand Down
59 changes: 58 additions & 1 deletion core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
Expand Up @@ -16,14 +16,30 @@

package io.grpc.inprocess;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.common.collect.ImmutableList;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.stub.ClientCalls;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.TestMethodDescriptors;
import java.util.List;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -35,6 +51,9 @@ public class InProcessTransportTest extends AbstractTransportTest {
private static final String AUTHORITY = "a-testing-authority";
private static final String USER_AGENT = "a-testing-user-agent";

@Rule
public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();

@Override
protected List<? extends InternalServer> newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
Expand All @@ -59,7 +78,7 @@ protected String testAuthority(InternalServer server) {
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(
TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT,
eagAttrs());
eagAttrs(), false);
}

@Override
Expand All @@ -75,4 +94,42 @@ protected boolean sizesReported() {
public void socketStats() throws Exception {
// test does not apply to in-process
}

@Test
public void causeShouldBePropagatedWithStatus() throws Exception {
server = null;
String failingServerName = "server_foo";
String serviceFoo = "service_foo";
final Status s = Status.INTERNAL.withCause(new Throwable("failing server exception"));
ServerServiceDefinition definition = ServerServiceDefinition.builder(serviceFoo)
.addMethod(TestMethodDescriptors.voidMethod(), new ServerCallHandler<Void, Void>() {
@Override
public ServerCall.Listener<Void> startCall(
ServerCall<Void, Void> call, Metadata headers) {
call.close(s, new Metadata());
return new ServerCall.Listener<Void>() {};
}
})
.build();
Server failingServer = InProcessServerBuilder
.forName(failingServerName)
.addService(definition)
.directExecutor()
.build()
.start();
grpcCleanupRule.register(failingServer);
ManagedChannel channel = InProcessChannelBuilder
.forName(failingServerName)
.propagateCauseWithStatus(true)
.build();
grpcCleanupRule.register(channel);
try {
ClientCalls.blockingUnaryCall(channel, TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT, null);
fail("exception should have been thrown");
} catch (StatusRuntimeException e) {
// When propagateCauseWithStatus is true, the cause should be sent forward
assertEquals(s.getCause(), e.getCause());
}
}
}
31 changes: 16 additions & 15 deletions core/src/test/java/io/grpc/internal/AbstractTransportTest.java
Expand Up @@ -157,7 +157,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
* {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following
* tests in an indeterminate state.
*/
private InternalServer server;
protected InternalServer server;
private ServerTransport serverTransport;
private ManagedClientTransport client;
private MethodDescriptor<String, String> methodDescriptor =
Expand Down Expand Up @@ -1058,9 +1058,7 @@ public void earlyServerClose_withServerHeaders() throws Exception {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
Expand Down Expand Up @@ -1097,10 +1095,7 @@ public void earlyServerClose_noServerHeaders() throws Exception {
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hellogoodbye", clientStreamStatus.getDescription());
// Cause should not be transmitted to the client.
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
Expand Down Expand Up @@ -1138,9 +1133,7 @@ public void earlyServerClose_serverFailure() throws Exception {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down Expand Up @@ -1188,9 +1181,7 @@ public void closed(
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down Expand Up @@ -1219,7 +1210,7 @@ public void clientCancel() throws Exception {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotEquals(Status.Code.OK, serverStatus.getCode());
// Cause should not be transmitted between client and server
// Cause should not be transmitted between client and server by default
assertNull(serverStatus.getCause());

clientStream.cancel(status);
Expand Down Expand Up @@ -2072,6 +2063,16 @@ private static void assertStatusEquals(Status expected, Status actual) {
}
}

/**
* Verifies that the client status is as expected. By default, the code and description should
* be present, and the cause should be stripped away.
*/
private void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
}

private static boolean waitForFuture(Future<?> future, long timeout, TimeUnit unit)
throws InterruptedException {
try {
Expand Down

0 comments on commit 16b6145

Please sign in to comment.