Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5439: InProcessTransport optionally adds causal information to status #6968

Merged
merged 6 commits into from May 6, 2020
20 changes: 17 additions & 3 deletions core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
Expand Up @@ -70,6 +70,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
private final String name;
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
private boolean transportIncludeStatusCause = false;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

private InProcessChannelBuilder(String name) {
super(new InProcessSocketAddress(name), "localhost");
Expand Down Expand Up @@ -157,11 +158,22 @@ public InProcessChannelBuilder maxInboundMetadataSize(int bytes) {
return this;
}

/**
* Sets whether to override the default behaviour of InProcessTransport to include
* the cause of the status.
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
* @param enable whether to include cause in status
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
* @return this
*/
public InProcessChannelBuilder transportIncludeStatusCause(boolean enable) {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
this.transportIncludeStatusCause = enable;
return this;
}

@Override
@Internal
protected ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
name, scheduledExecutorService, maxInboundMetadataSize);
name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
}

/**
Expand All @@ -173,16 +185,18 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final boolean useSharedTimer;
private final int maxInboundMetadataSize;
private boolean closed;
private boolean includeCauseWithStatus;

private InProcessClientTransportFactory(
String name,
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize) {
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
this.name = name;
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.includeCauseWithStatus = includeCauseWithStatus;
}

@Override
Expand All @@ -194,7 +208,7 @@ public ConnectionClientTransport newClientTransport(
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes());
options.getEagAttributes(), includeCauseWithStatus);
}

@Override
Expand Down
37 changes: 30 additions & 7 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Expand Up @@ -83,6 +83,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final String userAgent;
private final Optional<ServerListener> optionalServerListener;
private int serverMaxInboundMetadataSize;
private boolean includeCauseWithStatus;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
private ScheduledExecutorService serverScheduler;
private ServerTransportListener serverTransportListener;
Expand Down Expand Up @@ -115,7 +116,8 @@ protected void handleNotInUse() {
};

private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
String userAgent, Attributes eagAttrs, Optional<ServerListener> optionalServerListener) {
String userAgent, Attributes eagAttrs,
Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
this.name = name;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
Expand All @@ -129,26 +131,39 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
this.includeCauseWithStatus = includeCauseWithStatus;
}

public InProcessTransport(
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent());
Optional.<ServerListener>absent(), false);
}

public InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, boolean includeCauseWithStatus) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent(), includeCauseWithStatus);
}

InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener));
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.of(serverListener), false);
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
this.serverSchedulerPool = serverSchedulerPool;
this.serverStreamTracerFactories = serverStreamTracerFactories;
}

public void includeStatusWithCause(boolean enable) {
this.includeCauseWithStatus = enable;
}

@CheckReturnValue
@Override
public synchronized Runnable start(ManagedClientTransport.Listener listener) {
Expand Down Expand Up @@ -323,6 +338,10 @@ public ListenableFuture<SocketStats> getStats() {
return ret;
}

public boolean getIncludeCauseWithStatus() {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
return includeCauseWithStatus;
}

private synchronized void notifyShutdown(Status s) {
if (shutdown) {
return;
Expand Down Expand Up @@ -564,7 +583,7 @@ public void close(Status status, Metadata trailers) {

/** clientStream.serverClosed() must be called before this method */
private void notifyClientClose(Status status, Metadata trailers) {
Status clientStatus = stripCause(status);
Status clientStatus = stripCause(status, includeCauseWithStatus);
synchronized (this) {
if (closed) {
return;
Expand Down Expand Up @@ -744,7 +763,7 @@ public synchronized boolean isReady() {
// Must be thread-safe for shutdownNow()
@Override
public void cancel(Status reason) {
Status serverStatus = stripCause(reason);
Status serverStatus = stripCause(reason, includeCauseWithStatus);
if (!internalCancel(serverStatus, serverStatus)) {
return;
}
Expand Down Expand Up @@ -849,13 +868,17 @@ public void appendTimeoutInsight(InsightBuilder insight) {
* <p>This is, so that the InProcess transport behaves in the same way as the other transports,
* when exchanging statuses between client and server and vice versa.
*/
private static Status stripCause(Status status) {
private static Status stripCause(Status status, boolean includeCauseWithStatus) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name, the argument, and the javadoc do not match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure what you mean. Where is the discrepancy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he's referring to where it says "but stripped of any other information (i.e. cause)"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the discrepancy?

The method name says "strip cause"
The javadoc says "strip something, i.e. cause"
The new argument says "do not strip cause if true".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok gotcha. I'll update the javadoc and method name

if (status == null) {
return null;
}
return Status
Status clientStatus = Status
.fromCodeValue(status.getCode().value())
.withDescription(status.getDescription());
if (includeCauseWithStatus) {
clientStatus = clientStatus.withCause(status.getCause());
}
return clientStatus;
}

private static class SingleMessageProducer implements StreamListener.MessageProducer {
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -161,7 +161,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");

channelz.addServer(this);
}

Expand Down Expand Up @@ -759,9 +758,9 @@ void setListener(ServerStreamListener listener) {
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose() {
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
stream.close(Status.UNKNOWN, new Metadata());
stream.close(Status.UNKNOWN.withCause(t), new Metadata());
}

@Override
Expand All @@ -782,10 +781,10 @@ public void runInContext() {
try {
getListener().messagesAvailable(producer);
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
Expand Down Expand Up @@ -817,10 +816,10 @@ public void runInContext() {
try {
getListener().halfClosed();
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
Expand Down Expand Up @@ -891,10 +890,10 @@ public void runInContext() {
try {
getListener().onReady();
} catch (RuntimeException e) {
internalClose();
internalClose(e);
throw e;
} catch (Error e) {
internalClose();
internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.inprocess;

import static org.junit.Assert.assertEquals;

import io.grpc.Status;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class InProcessTransportTestWithCause extends InProcessTransportTest {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
InProcessTransport transport = (InProcessTransport) super.newClientTransport(server);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
transport.includeStatusWithCause(true);
return transport;
}

@Override
protected void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
// Transport has been configured to pass the cause
assertEquals(expectedStatus.getCause(), clientStreamStatus.getCause());
}
}
30 changes: 14 additions & 16 deletions core/src/test/java/io/grpc/internal/AbstractTransportTest.java
Expand Up @@ -1058,9 +1058,7 @@ public void earlyServerClose_withServerHeaders() throws Exception {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
Expand Down Expand Up @@ -1097,10 +1095,7 @@ public void earlyServerClose_noServerHeaders() throws Exception {
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals("Hellogoodbye", clientStreamStatus.getDescription());
// Cause should not be transmitted to the client.
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
Expand Down Expand Up @@ -1138,9 +1133,7 @@ public void earlyServerClose_serverFailure() throws Exception {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down Expand Up @@ -1188,9 +1181,7 @@ public void closed(
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down Expand Up @@ -1219,9 +1210,6 @@ public void clientCancel() throws Exception {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotEquals(Status.Code.OK, serverStatus.getCode());
// Cause should not be transmitted between client and server
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
assertNull(serverStatus.getCause());

clientStream.cancel(status);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertNull(clientStreamTracer1.getInboundTrailers());
Expand Down Expand Up @@ -2072,6 +2060,16 @@ private static void assertStatusEquals(Status expected, Status actual) {
}
}

/**
* Verifies that the client status is as expected. By default, the code and description should
* be present, and the cause should be stripped away.
*/
protected void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
}

private static boolean waitForFuture(Future<?> future, long timeout, TimeUnit unit)
throws InterruptedException {
try {
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Expand Up @@ -1440,8 +1440,9 @@ private void verifyExecutorsReturned() {

private void ensureServerStateNotLeaked() {
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
assertEquals(Status.UNKNOWN, statusCaptor.getValue());
assertNull(statusCaptor.getValue().getCause());
assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode());
// Used in InProcessTransport when set to include the cause with the status
assertNotNull(statusCaptor.getValue().getCause());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}

Expand Down