Skip to content

Commit

Permalink
core: Assure that context cancellationCause is set (grpc#9501)
Browse files Browse the repository at this point in the history
core: Assure that context cancellationCause is set

Makes sure that whenever a context is in a cancelled state, we also have
a cancellationCause.
  • Loading branch information
temawi authored and larry-safran committed Oct 6, 2022
1 parent 66d4b9c commit 0dd1a75
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 8 deletions.
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/InternalStatus.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc;

import javax.annotation.Nullable;

/**
* Accesses internal data. Do not use this.
*/
Expand All @@ -34,4 +36,14 @@ private InternalStatus() {}
*/
@Internal
public static final Metadata.Key<Status> CODE_KEY = Status.CODE_KEY;

/**
* Create a new {@link StatusRuntimeException} with the internal option of skipping the filling
* of the stack trace.
*/
@Internal
public static final StatusRuntimeException asRuntimeException(Status status,
@Nullable Metadata trailers, boolean fillInStackTrace) {
return new StatusRuntimeException(status, trailers, fillInStackTrace);
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/io/grpc/internal/ServerCallImpl.java
Expand Up @@ -35,6 +35,7 @@
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.InternalDecompressorRegistry;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
Expand Down Expand Up @@ -368,19 +369,22 @@ public void closed(Status status) {
}

private void closedInternal(Status status) {
Throwable cancelCause = null;
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
// The status will not have a cause in all failure scenarios but we want to make sure
// we always cancel the context with one to keep the context cancelled state consistent.
cancelCause = InternalStatus.asRuntimeException(
Status.CANCELLED.withDescription("RPC cancelled"), null, false);
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
// Note that in failure situations JumpToApplicationThreadServerStreamListener has already
// closed the context. In these situations this cancel() call will be a no-op.
context.cancel(null);
context.cancel(cancelCause);
}
}

Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -45,6 +45,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalServerInterceptors;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
Expand Down Expand Up @@ -894,9 +895,18 @@ private void closedInternal(final Status status) {
// For cancellations, promptly inform any users of the context that their work should be
// aborted. Otherwise, we can wait until pending work is done.
if (!status.isOk()) {
// Since status was not OK we know that the call did not complete and got cancelled. To
// reflect this on the context we need to close it with a cause exception. Since not every
// failed status has an exception we will create one here if needed.
Throwable cancelCause = status.getCause();
if (cancelCause == null) {
cancelCause = InternalStatus.asRuntimeException(
Status.CANCELLED.withDescription("RPC cancelled"), null, false);
}

// The callExecutor might be busy doing user work. To avoid waiting, use an executor that
// is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
cancelExecutor.execute(new ContextCloser(context, cancelCause));
}
final Link link = PerfMark.linkOut();

Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerCallImplTest.java
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -424,7 +425,7 @@ public void streamListener_closedCancelled() {

verify(callListener).onCancel();
assertTrue(context.isCancelled());
assertNull(context.cancellationCause());
assertNotNull(context.cancellationCause());
}

@Test
Expand Down
9 changes: 6 additions & 3 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Expand Up @@ -130,7 +130,7 @@ public class ServerImplTest {
private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added");
private static final Context.CancellableContext SERVER_CONTEXT =
Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation();
private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FITLER =
private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable runnable) {
Expand Down Expand Up @@ -1085,7 +1085,7 @@ private void checkContext() {
assertTrue(onHalfCloseCalled.get());

streamListener.closed(Status.CANCELLED);
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER));
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER));
assertEquals(2, executor.runDueTasks());
assertTrue(onCancelCalled.get());

Expand Down Expand Up @@ -1179,10 +1179,11 @@ public void testStreamClose_clientCancelTriggersImmediateCancellation() throws E
assertFalse(callReference.get().isCancelled());
assertFalse(context.get().isCancelled());
streamListener.closed(Status.CANCELLED);
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER));
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER));
assertEquals(2, executor.runDueTasks());
assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNotNull();
assertTrue(contextCancelled.get());
}

Expand All @@ -1208,6 +1209,7 @@ public void testStreamClose_clientOkTriggersDelayedCancellation() throws Excepti
assertEquals(1, executor.runDueTasks());
assertFalse(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNull();
assertTrue(contextCancelled.get());
}

Expand All @@ -1228,6 +1230,7 @@ public void testStreamClose_deadlineExceededTriggersImmediateCancellation() thro

assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNotNull();
assertTrue(contextCancelled.get());
}

Expand Down

0 comments on commit 0dd1a75

Please sign in to comment.