Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix a bug for hedging with throttling #7337

Merged
merged 4 commits into from Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 80 additions & 63 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -783,7 +783,6 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
}

if (state.winningSubstream == null) {
boolean isFatal = false;
if (rpcProgress == RpcProgress.REFUSED
&& noMoreTransparentRetry.compareAndSet(false, true)) {
// transparent retry
Expand Down Expand Up @@ -837,51 +836,54 @@ public void run() {
nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
}

RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy;
if (!isHedging) {
ericgribkoff marked this conversation as resolved.
Show resolved Hide resolved
RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy;
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
}
scheduledRetryCopy.setFuture(
scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
callExecutor.execute(
new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream =
createSubstream(substream.previousAttemptCount + 1);
drain(newSubstream);
}
});
}
},
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS));
return;
}
} else {
HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
if (hedgingPlan.isHedgeable) {
pushbackHedging(hedgingPlan.hedgingPushbackMillis);
}
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
state = state.removeActiveHedge(substream);
// The invariant is whether or not #(Potential Hedge + active hedges) > 0.
// Once hasPotentialHedging(state) is false, it will always be false, and then
// #(state.activeHedges) will be decreasing. This guarantees that even there may be
// multiple concurrent hedges, one of the hedges will end up committed.
if (hedgingPlan.isHedgeable) {
if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
return;
}
// else, no activeHedges, no new hedges possible, try to commit
} // else, fatal, try to commit
ericgribkoff marked this conversation as resolved.
Show resolved Hide resolved
}
scheduledRetryCopy.setFuture(scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
callExecutor.execute(new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream
= createSubstream(substream.previousAttemptCount + 1);
drain(newSubstream);
}
});
}
},
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS));
return;
}
isFatal = retryPlan.isFatal;
pushbackHedging(retryPlan.hedgingPushbackMillis);
}

if (isHedging) {
synchronized (lock) {
state = state.removeActiveHedge(substream);

// The invariant is whether or not #(Potential Hedge + active hedges) > 0.
// Once hasPotentialHedging(state) is false, it will always be false, and then
// #(state.activeHedges) will be decreasing. This guarantees that even there may be
// multiple concurrent hedges, one of the hedges will end up committed.
if (!isFatal) {
if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
return;
}
// else, no activeHedges, no new hedges possible, try to commit
} // else, fatal, try to commit
}
}
}
Expand All @@ -901,12 +903,6 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
boolean shouldRetry = false;
long backoffNanos = 0L;
boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
boolean isNonFatalStatusCode = hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
if (isHedging && !isNonFatalStatusCode) {
// isFatal is true, no pushback
return new RetryPlan(/* shouldRetry = */ false, /* isFatal = */ true, 0, null);
}

String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
Integer pushbackMillis = null;
if (pushbackStr != null) {
Expand All @@ -916,11 +912,9 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
pushbackMillis = -1;
}
}

boolean isThrottled = false;
if (throttle != null) {
if (isRetryableStatusCode || isNonFatalStatusCode
|| (pushbackMillis != null && pushbackMillis < 0)) {
if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
}
}
Expand All @@ -933,7 +927,6 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
nextBackoffIntervalNanos = Math.min(
(long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
retryPolicy.maxBackoffNanos);

} // else no retry
} else if (pushbackMillis >= 0) {
shouldRetry = true;
Expand All @@ -942,8 +935,27 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
} // else no retry
} // else no retry

return new RetryPlan(
shouldRetry, /* isFatal = */ false, backoffNanos, isHedging ? pushbackMillis : null);
return new RetryPlan(shouldRetry, backoffNanos);
}

private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
Integer pushbackMillis = null;
if (pushbackStr != null) {
try {
pushbackMillis = Integer.valueOf(pushbackStr);
} catch (NumberFormatException e) {
pushbackMillis = -1;
}
}
ericgribkoff marked this conversation as resolved.
Show resolved Hide resolved
boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
boolean isThrottled = false;
if (throttle != null) {
if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
ericgribkoff marked this conversation as resolved.
Show resolved Hide resolved
isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
}
}
return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
}

@Override
Expand Down Expand Up @@ -1361,17 +1373,22 @@ public int hashCode() {

private static final class RetryPlan {
final boolean shouldRetry;
final boolean isFatal; // receiving a status not among the nonFatalStatusCodes
final long backoffNanos;
@Nullable
final Integer hedgingPushbackMillis;

RetryPlan(
boolean shouldRetry, boolean isFatal, long backoffNanos,
@Nullable Integer hedgingPushbackMillis) {
RetryPlan(boolean shouldRetry, long backoffNanos) {
this.shouldRetry = shouldRetry;
this.isFatal = isFatal;
this.backoffNanos = backoffNanos;
}
}

private static final class HedgingPlan {
final boolean isHedgeable;
@Nullable
final Integer hedgingPushbackMillis;

public HedgingPlan(
boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
this.isHedgeable = isHedgeable;
this.hedgingPushbackMillis = hedgingPushbackMillis;
}
}
Expand Down
34 changes: 33 additions & 1 deletion core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Expand Up @@ -37,6 +37,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -2284,7 +2285,7 @@ public void hedging_transparentRetryNotAllowed() {
}

@Test
public void hedging_throttled() {
public void hedging_throttledByOtherCall() {
Throttle throttle = new Throttle(4f, 0.8f);
RetriableStream<String> hedgingStream = newThrottledHedgingStream(throttle);

Expand Down Expand Up @@ -2313,6 +2314,37 @@ public void hedging_throttled() {
assertEquals(0, fakeClock.numPendingTasks());
}

@Test
public void hedging_throttledByHedgingStreams() {
Throttle throttle = new Throttle(4f, 0.8f);
RetriableStream<String> hedgingStream = newThrottledHedgingStream(throttle);

ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1, mockStream2, mockStream3);

hedgingStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());

fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());

sublistenerCaptor1.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata());
assertTrue(throttle.isAboveThreshold()); // count = 3
sublistenerCaptor2.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata());
assertFalse(throttle.isAboveThreshold()); // count = 2

verify(masterListener).closed(any(Status.class), any(Metadata.class));
verifyNoInteractions(mockStream3);
assertEquals(0, fakeClock.numPendingTasks());
}

/**
* Used to stub a retriable stream as well as to record methods of the retriable stream being
* called.
Expand Down