diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index cf3c9afe48d..44f194c5d0a 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -783,7 +783,6 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { } if (state.winningSubstream == null) { - boolean isFatal = false; if (rpcProgress == RpcProgress.REFUSED && noMoreTransparentRetry.compareAndSet(false, true)) { // transparent retry @@ -837,51 +836,54 @@ public void run() { nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; } - RetryPlan retryPlan = makeRetryDecision(status, trailers); - if (retryPlan.shouldRetry) { - // The check state.winningSubstream == null, checking if is not already committed, is - // racy, but is still safe b/c the retry will also handle committed/cancellation - FutureCanceller scheduledRetryCopy; + if (isHedging) { + HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers); + if (hedgingPlan.isHedgeable) { + pushbackHedging(hedgingPlan.hedgingPushbackMillis); + } synchronized (lock) { - scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); + state = state.removeActiveHedge(substream); + // The invariant is whether or not #(Potential Hedge + active hedges) > 0. + // Once hasPotentialHedging(state) is false, it will always be false, and then + // #(state.activeHedges) will be decreasing. This guarantees that even there may be + // multiple concurrent hedges, one of the hedges will end up committed. + if (hedgingPlan.isHedgeable) { + if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { + return; + } + // else, no activeHedges, no new hedges possible, try to commit + } // else, isHedgeable is false, try to commit } - scheduledRetryCopy.setFuture(scheduledExecutorService.schedule( - new Runnable() { - @Override - public void run() { - callExecutor.execute(new Runnable() { - @Override - public void run() { - // retry - Substream newSubstream - = createSubstream(substream.previousAttemptCount + 1); - drain(newSubstream); - } - }); - } - }, - retryPlan.backoffNanos, - TimeUnit.NANOSECONDS)); - return; - } - isFatal = retryPlan.isFatal; - pushbackHedging(retryPlan.hedgingPushbackMillis); - } - - if (isHedging) { - synchronized (lock) { - state = state.removeActiveHedge(substream); - - // The invariant is whether or not #(Potential Hedge + active hedges) > 0. - // Once hasPotentialHedging(state) is false, it will always be false, and then - // #(state.activeHedges) will be decreasing. This guarantees that even there may be - // multiple concurrent hedges, one of the hedges will end up committed. - if (!isFatal) { - if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { - return; + } else { + RetryPlan retryPlan = makeRetryDecision(status, trailers); + if (retryPlan.shouldRetry) { + // The check state.winningSubstream == null, checking if is not already committed, is + // racy, but is still safe b/c the retry will also handle committed/cancellation + FutureCanceller scheduledRetryCopy; + synchronized (lock) { + scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); } - // else, no activeHedges, no new hedges possible, try to commit - } // else, fatal, try to commit + scheduledRetryCopy.setFuture( + scheduledExecutorService.schedule( + new Runnable() { + @Override + public void run() { + callExecutor.execute( + new Runnable() { + @Override + public void run() { + // retry + Substream newSubstream = + createSubstream(substream.previousAttemptCount + 1); + drain(newSubstream); + } + }); + } + }, + retryPlan.backoffNanos, + TimeUnit.NANOSECONDS)); + return; + } } } } @@ -901,26 +903,10 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { boolean shouldRetry = false; long backoffNanos = 0L; boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode()); - boolean isNonFatalStatusCode = hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); - if (isHedging && !isNonFatalStatusCode) { - // isFatal is true, no pushback - return new RetryPlan(/* shouldRetry = */ false, /* isFatal = */ true, 0, null); - } - - String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); - Integer pushbackMillis = null; - if (pushbackStr != null) { - try { - pushbackMillis = Integer.valueOf(pushbackStr); - } catch (NumberFormatException e) { - pushbackMillis = -1; - } - } - + Integer pushbackMillis = getPushbackMills(trailer); boolean isThrottled = false; if (throttle != null) { - if (isRetryableStatusCode || isNonFatalStatusCode - || (pushbackMillis != null && pushbackMillis < 0)) { + if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) { isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); } } @@ -933,7 +919,6 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { nextBackoffIntervalNanos = Math.min( (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), retryPolicy.maxBackoffNanos); - } // else no retry } else if (pushbackMillis >= 0) { shouldRetry = true; @@ -942,8 +927,33 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { } // else no retry } // else no retry - return new RetryPlan( - shouldRetry, /* isFatal = */ false, backoffNanos, isHedging ? pushbackMillis : null); + return new RetryPlan(shouldRetry, backoffNanos); + } + + private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) { + Integer pushbackMillis = getPushbackMills(trailer); + boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); + boolean isThrottled = false; + if (throttle != null) { + if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) { + isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); + } + } + return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis); + } + + @Nullable + private Integer getPushbackMills(Metadata trailer) { + String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); + Integer pushbackMillis = null; + if (pushbackStr != null) { + try { + pushbackMillis = Integer.valueOf(pushbackStr); + } catch (NumberFormatException e) { + pushbackMillis = -1; + } + } + return pushbackMillis; } @Override @@ -1361,17 +1371,22 @@ public int hashCode() { private static final class RetryPlan { final boolean shouldRetry; - final boolean isFatal; // receiving a status not among the nonFatalStatusCodes final long backoffNanos; - @Nullable - final Integer hedgingPushbackMillis; - RetryPlan( - boolean shouldRetry, boolean isFatal, long backoffNanos, - @Nullable Integer hedgingPushbackMillis) { + RetryPlan(boolean shouldRetry, long backoffNanos) { this.shouldRetry = shouldRetry; - this.isFatal = isFatal; this.backoffNanos = backoffNanos; + } + } + + private static final class HedgingPlan { + final boolean isHedgeable; + @Nullable + final Integer hedgingPushbackMillis; + + public HedgingPlan( + boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) { + this.isHedgeable = isHedgeable; this.hedgingPushbackMillis = hedgingPushbackMillis; } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 77c5fe7be89..8a69fe501b2 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -2284,7 +2285,7 @@ public void hedging_transparentRetryNotAllowed() { } @Test - public void hedging_throttled() { + public void hedging_throttledByOtherCall() { Throttle throttle = new Throttle(4f, 0.8f); RetriableStream hedgingStream = newThrottledHedgingStream(throttle); @@ -2313,6 +2314,37 @@ public void hedging_throttled() { assertEquals(0, fakeClock.numPendingTasks()); } + @Test + public void hedging_throttledByHedgingStreams() { + Throttle throttle = new Throttle(4f, 0.8f); + RetriableStream hedgingStream = newThrottledHedgingStream(throttle); + + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = mock(ClientStream.class); + ClientStream mockStream3 = mock(ClientStream.class); + when(retriableStreamRecorder.newSubstream(anyInt())) + .thenReturn(mockStream1, mockStream2, mockStream3); + + hedgingStream.start(masterListener); + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream2).start(sublistenerCaptor2.capture()); + + sublistenerCaptor1.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata()); + assertTrue(throttle.isAboveThreshold()); // count = 3 + sublistenerCaptor2.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata()); + assertFalse(throttle.isAboveThreshold()); // count = 2 + + verify(masterListener).closed(any(Status.class), any(Metadata.class)); + verifyNoInteractions(mockStream3); + assertEquals(0, fakeClock.numPendingTasks()); + } + /** * Used to stub a retriable stream as well as to record methods of the retriable stream being * called.