Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Assure that context cancellationCause set #9501

Merged
merged 2 commits into from Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/src/main/java/io/grpc/InternalStatus.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc;

import javax.annotation.Nullable;

/**
* Accesses internal data. Do not use this.
*/
Expand All @@ -34,4 +36,14 @@ private InternalStatus() {}
*/
@Internal
public static final Metadata.Key<Status> CODE_KEY = Status.CODE_KEY;

/**
* Create a new {@link StatusRuntimeException} with the internal option of skipping the filling
* of the stack trace.
*/
@Internal
public static final StatusRuntimeException asRuntimeException(Status status,
@Nullable Metadata trailers, boolean fillInStackTrace) {
return new StatusRuntimeException(status, trailers, fillInStackTrace);
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/io/grpc/internal/ServerCallImpl.java
Expand Up @@ -35,6 +35,7 @@
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.InternalDecompressorRegistry;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
Expand Down Expand Up @@ -368,19 +369,22 @@ public void closed(Status status) {
}

private void closedInternal(Status status) {
Throwable cancelCause = null;
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
// The status will not have a cause in all failure scenarios but we want to make sure
// we always cancel the context with one to keep the context cancelled state consistent.
cancelCause = InternalStatus.asRuntimeException(
Status.CANCELLED.withDescription("RPC cancelled"), null, false);
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
// Note that in failure situations JumpToApplicationThreadServerStreamListener has already
// closed the context. In these situations this cancel() call will be a no-op.
context.cancel(null);
context.cancel(cancelCause);
}
}

Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -45,6 +45,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalServerInterceptors;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
Expand Down Expand Up @@ -894,9 +895,18 @@ private void closedInternal(final Status status) {
// For cancellations, promptly inform any users of the context that their work should be
// aborted. Otherwise, we can wait until pending work is done.
if (!status.isOk()) {
// Since status was not OK we know that the call did not complete and got cancelled. To
// reflect this on the context we need to close it with a cause exception. Since not every
// failed status has an exception we will create one here if needed.
Throwable cancelCause = status.getCause();
if (cancelCause == null) {
cancelCause = InternalStatus.asRuntimeException(
Status.CANCELLED.withDescription("RPC cancelled"), null, false);
}

// The callExecutor might be busy doing user work. To avoid waiting, use an executor that
// is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
cancelExecutor.execute(new ContextCloser(context, cancelCause));
}
final Link link = PerfMark.linkOut();

Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerCallImplTest.java
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -424,7 +425,7 @@ public void streamListener_closedCancelled() {

verify(callListener).onCancel();
assertTrue(context.isCancelled());
assertNull(context.cancellationCause());
assertNotNull(context.cancellationCause());
}

@Test
Expand Down
9 changes: 6 additions & 3 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Expand Up @@ -130,7 +130,7 @@ public class ServerImplTest {
private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added");
private static final Context.CancellableContext SERVER_CONTEXT =
Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation();
private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FITLER =
private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable runnable) {
Expand Down Expand Up @@ -1085,7 +1085,7 @@ private void checkContext() {
assertTrue(onHalfCloseCalled.get());

streamListener.closed(Status.CANCELLED);
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER));
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER));
assertEquals(2, executor.runDueTasks());
assertTrue(onCancelCalled.get());

Expand Down Expand Up @@ -1179,10 +1179,11 @@ public void testStreamClose_clientCancelTriggersImmediateCancellation() throws E
assertFalse(callReference.get().isCancelled());
assertFalse(context.get().isCancelled());
streamListener.closed(Status.CANCELLED);
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER));
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER));
assertEquals(2, executor.runDueTasks());
assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNotNull();
assertTrue(contextCancelled.get());
}

Expand All @@ -1208,6 +1209,7 @@ public void testStreamClose_clientOkTriggersDelayedCancellation() throws Excepti
assertEquals(1, executor.runDueTasks());
assertFalse(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNull();
assertTrue(contextCancelled.get());
}

Expand All @@ -1228,6 +1230,7 @@ public void testStreamClose_deadlineExceededTriggersImmediateCancellation() thro

assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNotNull();
assertTrue(contextCancelled.get());
}

Expand Down