Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "core: delay sending cancel request on client-side when deadline expires (#6328)" #7457

Merged
merged 1 commit into from Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
168 changes: 69 additions & 99 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Expand Up @@ -71,37 +71,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
= "gzip".getBytes(Charset.forName("US-ASCII"));
// When a deadline is exceeded, there is a race between the server receiving the cancellation from
// the client and the server cancelling the stream itself. If the client's cancellation is
// received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED.
// This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common
// monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation.
@VisibleForTesting
static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);

private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final Executor callExecutor;
private final boolean callExecutorIsDirect;
private final CallTracer channelCallsTracer;
private final Context context;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest;
private CallOptions callOptions;
private ClientStream stream;
private volatile boolean cancelListenersShouldBeRemoved;
private boolean cancelCalled;
private boolean halfCloseCalled;
private final ClientStreamProvider clientStreamProvider;
private ContextCancellationListener cancellationListener;
private final ContextCancellationListener cancellationListener =
new ContextCancellationListener();
private final ScheduledExecutorService deadlineCancellationExecutor;
@Nullable
private final InternalConfigSelector configSelector;
private boolean fullStreamDecompression;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
private volatile ScheduledFuture<?> deadlineCancellationNotifyApplicationFuture;
private volatile ScheduledFuture<?> deadlineCancellationSendToServerFuture;
private boolean observerClosed = false;

ClientCallImpl(
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
Expand Down Expand Up @@ -135,20 +127,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}

private final class ContextCancellationListener implements CancellationListener {
private Listener<RespT> observer;

private ContextCancellationListener(Listener<RespT> observer) {
this.observer = observer;
}

@Override
public void cancelled(Context context) {
if (context.getDeadline() == null || !context.getDeadline().isExpired()) {
stream.cancel(statusFromCancelled(context));
} else {
Status status = statusFromCancelled(context);
delayedCancelOnDeadlineExceeded(status, observer);
}
stream.cancel(statusFromCancelled(context));
}
}

Expand Down Expand Up @@ -223,7 +204,19 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
// Context is already cancelled so no need to create a real stream, just notify the observer
// of cancellation via callback on the executor
stream = NoopClientStream.INSTANCE;
executeCloseObserverInContext(observer, statusFromCancelled(context));
final Listener<RespT> finalObserver = observer;
class ClosedByContext extends ContextRunnable {
ClosedByContext() {
super(context);
}

@Override
public void runInContext() {
closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
}
}

callExecutor.execute(new ClosedByContext());
return;
}

Expand Down Expand Up @@ -251,9 +244,23 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
compressor = compressorRegistry.lookupCompressor(compressorName);
if (compressor == null) {
stream = NoopClientStream.INSTANCE;
Status status = Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name %s", compressorName));
executeCloseObserverInContext(observer, status);
final Listener<RespT> finalObserver = observer;
class ClosedByNotFoundCompressor extends ContextRunnable {
ClosedByNotFoundCompressor() {
super(context);
}

@Override
public void runInContext() {
closeObserver(
finalObserver,
Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name %s", compressorName)),
new Metadata());
}
}

callExecutor.execute(new ClosedByNotFoundCompressor());
return;
}
} else {
Expand Down Expand Up @@ -294,7 +301,6 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
}
stream.setDecompressorRegistry(decompressorRegistry);
channelCallsTracer.reportCallStarted();
cancellationListener = new ContextCancellationListener(observer);
stream.start(new ClientStreamListenerImpl(observer));

// Delay any sources of cancellation after start(), because most of the transports are broken if
Expand All @@ -306,11 +312,8 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
// If the context has the effective deadline, we don't need to schedule an extra task.
&& !effectiveDeadline.equals(context.getDeadline())
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null
// if already expired deadline let failing stream handle
&& !(stream instanceof FailingClientStream)) {
deadlineCancellationNotifyApplicationFuture =
startDeadlineNotifyApplicationTimer(effectiveDeadline, observer);
&& deadlineCancellationExecutor != null) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before
Expand Down Expand Up @@ -410,76 +413,46 @@ private static void logIfContextNarrowedTimeout(

private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(cancellationListener);
ScheduledFuture<?> f = deadlineCancellationSendToServerFuture;
if (f != null) {
f.cancel(false);
}

f = deadlineCancellationNotifyApplicationFuture;
ScheduledFuture<?> f = deadlineCancellationFuture;
if (f != null) {
f.cancel(false);
}
}

private ScheduledFuture<?> startDeadlineNotifyApplicationTimer(Deadline deadline,
final Listener<RespT> observer) {
final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);

class DeadlineExceededNotifyApplicationTimer implements Runnable {
@Override
public void run() {
Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos);
delayedCancelOnDeadlineExceeded(status, observer);
}
}

return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()),
remainingNanos,
TimeUnit.NANOSECONDS);
}

private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) {
final InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
private class DeadlineTimer implements Runnable {
private final long remainingNanos;

long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);

StringBuilder buf = new StringBuilder();
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
}
buf.append(seconds);
buf.append(String.format(".%09d", nanos));
buf.append("s. ");
buf.append(insight);

return DEADLINE_EXCEEDED.augmentDescription(buf.toString());
}

private void delayedCancelOnDeadlineExceeded(final Status status, Listener<RespT> observer) {
if (deadlineCancellationSendToServerFuture != null) {
return;
DeadlineTimer(long remainingNanos) {
this.remainingNanos = remainingNanos;
}

class DeadlineExceededSendCancelToServerTimer implements Runnable {
@Override
public void run() {
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
stream.cancel(status);
@Override
public void run() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);

StringBuilder buf = new StringBuilder();
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
}
buf.append(seconds);
buf.append(String.format(".%09d", nanos));
buf.append("s. ");
buf.append(insight);
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
}
}

// This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a
// stream multiple time is safe, the race here is fine.
deadlineCancellationSendToServerFuture = deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
TimeUnit.NANOSECONDS);
executeCloseObserverInContext(observer, status);
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
}

private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
Expand All @@ -497,13 +470,6 @@ public void runInContext() {
callExecutor.execute(new CloseInContext());
}

private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
if (!observerClosed) {
observerClosed = true;
observer.onClose(status, trailers);
}
}

@Nullable
private Deadline effectiveDeadline() {
// Call options and context are immutable, so we don't need to cache the deadline.
Expand Down Expand Up @@ -646,6 +612,10 @@ public Attributes getAttributes() {
return Attributes.EMPTY;
}

private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
observer.onClose(status, trailers);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("method", method).toString();
Expand Down
36 changes: 6 additions & 30 deletions core/src/test/java/io/grpc/internal/ClientCallImplTest.java
Expand Up @@ -17,7 +17,6 @@
package io.grpc.internal;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -60,7 +59,6 @@
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
import io.grpc.internal.testing.SingleMessageProducer;
Expand Down Expand Up @@ -137,9 +135,6 @@ public class ClientCallImplTest {
@Captor
private ArgumentCaptor<Status> statusArgumentCaptor;

@Captor
private ArgumentCaptor<Metadata> metadataArgumentCaptor;

private CallOptions baseCallOptions;

@Before
Expand Down Expand Up @@ -1005,21 +1000,9 @@ public void expiredDeadlineCancelsStream_CallOptions() {

call.start(callListener, new Metadata());

fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);

// Verify cancel sent to application when deadline just past
verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verify(stream, never()).cancel(statusCaptor.capture());

fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1);
verify(stream, never()).cancel(any(Status.class));
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);

// verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
fakeClock.forwardNanos(1);
verify(stream).cancel(statusCaptor.capture());
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
Expand All @@ -1029,8 +1012,8 @@ public void expiredDeadlineCancelsStream_CallOptions() {
public void expiredDeadlineCancelsStream_Context() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);

Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker());
Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor);
Context context = Context.current()
.withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();

ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
Expand All @@ -1045,16 +1028,9 @@ public void expiredDeadlineCancelsStream_Context() {

call.start(callListener, new Metadata());

fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
verify(stream, never()).cancel(statusCaptor.capture());
// verify app is notified.
verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
assertThat(statusCaptor.getValue().getDescription()).contains("context timed out");
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);

// verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS);
verify(stream).cancel(statusCaptor.capture());
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
}
Expand Down
Expand Up @@ -3622,7 +3622,7 @@ public ClientTransportFactory buildClientTransportFactory() {
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);

timer.forwardTime(5, TimeUnit.SECONDS);
timer.forwardTime(1234, TimeUnit.SECONDS);

executor.runDueTasks();
try {
Expand All @@ -3633,9 +3633,6 @@ public ClientTransportFactory buildClientTransportFactory() {
}

mychannel.shutdownNow();
// Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream
// will add a task to executor. Cleaning that task here.
executor.runDueTasks();
}

@Deprecated
Expand Down