Skip to content

Commit

Permalink
core: fix a bug for hedging with throttling (#7337)
Browse files Browse the repository at this point in the history
Resolves #7222: If a hedging substream fails triggering throttling threshold, the call should be committed.

Refactored RetryPlan to two separate classes RetryPlan and HedgingPlan.
  • Loading branch information
dapengzhang0 committed Aug 19, 2020
1 parent cb07b0f commit a91acec
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 72 deletions.
157 changes: 86 additions & 71 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Expand Up @@ -783,7 +783,6 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
}

if (state.winningSubstream == null) {
boolean isFatal = false;
if (rpcProgress == RpcProgress.REFUSED
&& noMoreTransparentRetry.compareAndSet(false, true)) {
// transparent retry
Expand Down Expand Up @@ -837,51 +836,54 @@ public void run() {
nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
}

RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy;
if (isHedging) {
HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers);
if (hedgingPlan.isHedgeable) {
pushbackHedging(hedgingPlan.hedgingPushbackMillis);
}
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
state = state.removeActiveHedge(substream);
// The invariant is whether or not #(Potential Hedge + active hedges) > 0.
// Once hasPotentialHedging(state) is false, it will always be false, and then
// #(state.activeHedges) will be decreasing. This guarantees that even there may be
// multiple concurrent hedges, one of the hedges will end up committed.
if (hedgingPlan.isHedgeable) {
if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
return;
}
// else, no activeHedges, no new hedges possible, try to commit
} // else, isHedgeable is false, try to commit
}
scheduledRetryCopy.setFuture(scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
callExecutor.execute(new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream
= createSubstream(substream.previousAttemptCount + 1);
drain(newSubstream);
}
});
}
},
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS));
return;
}
isFatal = retryPlan.isFatal;
pushbackHedging(retryPlan.hedgingPushbackMillis);
}

if (isHedging) {
synchronized (lock) {
state = state.removeActiveHedge(substream);

// The invariant is whether or not #(Potential Hedge + active hedges) > 0.
// Once hasPotentialHedging(state) is false, it will always be false, and then
// #(state.activeHedges) will be decreasing. This guarantees that even there may be
// multiple concurrent hedges, one of the hedges will end up committed.
if (!isFatal) {
if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) {
return;
} else {
RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy;
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
}
// else, no activeHedges, no new hedges possible, try to commit
} // else, fatal, try to commit
scheduledRetryCopy.setFuture(
scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
callExecutor.execute(
new Runnable() {
@Override
public void run() {
// retry
Substream newSubstream =
createSubstream(substream.previousAttemptCount + 1);
drain(newSubstream);
}
});
}
},
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS));
return;
}
}
}
}
Expand All @@ -901,26 +903,10 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
boolean shouldRetry = false;
long backoffNanos = 0L;
boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
boolean isNonFatalStatusCode = hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
if (isHedging && !isNonFatalStatusCode) {
// isFatal is true, no pushback
return new RetryPlan(/* shouldRetry = */ false, /* isFatal = */ true, 0, null);
}

String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
Integer pushbackMillis = null;
if (pushbackStr != null) {
try {
pushbackMillis = Integer.valueOf(pushbackStr);
} catch (NumberFormatException e) {
pushbackMillis = -1;
}
}

Integer pushbackMillis = getPushbackMills(trailer);
boolean isThrottled = false;
if (throttle != null) {
if (isRetryableStatusCode || isNonFatalStatusCode
|| (pushbackMillis != null && pushbackMillis < 0)) {
if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) {
isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
}
}
Expand All @@ -933,7 +919,6 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
nextBackoffIntervalNanos = Math.min(
(long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
retryPolicy.maxBackoffNanos);

} // else no retry
} else if (pushbackMillis >= 0) {
shouldRetry = true;
Expand All @@ -942,8 +927,33 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
} // else no retry
} // else no retry

return new RetryPlan(
shouldRetry, /* isFatal = */ false, backoffNanos, isHedging ? pushbackMillis : null);
return new RetryPlan(shouldRetry, backoffNanos);
}

private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) {
Integer pushbackMillis = getPushbackMills(trailer);
boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
boolean isThrottled = false;
if (throttle != null) {
if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) {
isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
}
}
return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis);
}

@Nullable
private Integer getPushbackMills(Metadata trailer) {
String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
Integer pushbackMillis = null;
if (pushbackStr != null) {
try {
pushbackMillis = Integer.valueOf(pushbackStr);
} catch (NumberFormatException e) {
pushbackMillis = -1;
}
}
return pushbackMillis;
}

@Override
Expand Down Expand Up @@ -1361,17 +1371,22 @@ public int hashCode() {

private static final class RetryPlan {
final boolean shouldRetry;
final boolean isFatal; // receiving a status not among the nonFatalStatusCodes
final long backoffNanos;
@Nullable
final Integer hedgingPushbackMillis;

RetryPlan(
boolean shouldRetry, boolean isFatal, long backoffNanos,
@Nullable Integer hedgingPushbackMillis) {
RetryPlan(boolean shouldRetry, long backoffNanos) {
this.shouldRetry = shouldRetry;
this.isFatal = isFatal;
this.backoffNanos = backoffNanos;
}
}

private static final class HedgingPlan {
final boolean isHedgeable;
@Nullable
final Integer hedgingPushbackMillis;

public HedgingPlan(
boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) {
this.isHedgeable = isHedgeable;
this.hedgingPushbackMillis = hedgingPushbackMillis;
}
}
Expand Down
34 changes: 33 additions & 1 deletion core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Expand Up @@ -37,6 +37,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -2284,7 +2285,7 @@ public void hedging_transparentRetryNotAllowed() {
}

@Test
public void hedging_throttled() {
public void hedging_throttledByOtherCall() {
Throttle throttle = new Throttle(4f, 0.8f);
RetriableStream<String> hedgingStream = newThrottledHedgingStream(throttle);

Expand Down Expand Up @@ -2313,6 +2314,37 @@ public void hedging_throttled() {
assertEquals(0, fakeClock.numPendingTasks());
}

@Test
public void hedging_throttledByHedgingStreams() {
Throttle throttle = new Throttle(4f, 0.8f);
RetriableStream<String> hedgingStream = newThrottledHedgingStream(throttle);

ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1, mockStream2, mockStream3);

hedgingStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());

fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());

sublistenerCaptor1.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata());
assertTrue(throttle.isAboveThreshold()); // count = 3
sublistenerCaptor2.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata());
assertFalse(throttle.isAboveThreshold()); // count = 2

verify(masterListener).closed(any(Status.class), any(Metadata.class));
verifyNoInteractions(mockStream3);
assertEquals(0, fakeClock.numPendingTasks());
}

/**
* Used to stub a retriable stream as well as to record methods of the retriable stream being
* called.
Expand Down

0 comments on commit a91acec

Please sign in to comment.