From 010f00a3bec28fc4be18d9c3cc093f70b2aa49d6 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Tue, 18 Aug 2020 10:43:53 -0700 Subject: [PATCH 1/4] core: fix a bug for hedging with throttling --- .../io/grpc/internal/RetriableStream.java | 143 ++++++++++-------- .../io/grpc/internal/RetriableStreamTest.java | 34 ++++- 2 files changed, 113 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index cf3c9afe48d..6c2a6bc5d49 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -783,7 +783,6 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { } if (state.winningSubstream == null) { - boolean isFatal = false; if (rpcProgress == RpcProgress.REFUSED && noMoreTransparentRetry.compareAndSet(false, true)) { // transparent retry @@ -837,51 +836,54 @@ public void run() { nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; } - RetryPlan retryPlan = makeRetryDecision(status, trailers); - if (retryPlan.shouldRetry) { - // The check state.winningSubstream == null, checking if is not already committed, is - // racy, but is still safe b/c the retry will also handle committed/cancellation - FutureCanceller scheduledRetryCopy; + if (!isHedging) { + RetryPlan retryPlan = makeRetryDecision(status, trailers); + if (retryPlan.shouldRetry) { + // The check state.winningSubstream == null, checking if is not already committed, is + // racy, but is still safe b/c the retry will also handle committed/cancellation + FutureCanceller scheduledRetryCopy; + synchronized (lock) { + scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); + } + scheduledRetryCopy.setFuture( + scheduledExecutorService.schedule( + new Runnable() { + @Override + public void run() { + callExecutor.execute( + new Runnable() { + @Override + public void run() { + // retry + Substream newSubstream = + createSubstream(substream.previousAttemptCount + 1); + drain(newSubstream); + } + }); + } + }, + retryPlan.backoffNanos, + TimeUnit.NANOSECONDS)); + return; + } + } else { + HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers); + if (hedgingPlan.isHedgeable) { + pushbackHedging(hedgingPlan.hedgingPushbackMillis); + } synchronized (lock) { - scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); + state = state.removeActiveHedge(substream); + // The invariant is whether or not #(Potential Hedge + active hedges) > 0. + // Once hasPotentialHedging(state) is false, it will always be false, and then + // #(state.activeHedges) will be decreasing. This guarantees that even there may be + // multiple concurrent hedges, one of the hedges will end up committed. + if (hedgingPlan.isHedgeable) { + if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { + return; + } + // else, no activeHedges, no new hedges possible, try to commit + } // else, fatal, try to commit } - scheduledRetryCopy.setFuture(scheduledExecutorService.schedule( - new Runnable() { - @Override - public void run() { - callExecutor.execute(new Runnable() { - @Override - public void run() { - // retry - Substream newSubstream - = createSubstream(substream.previousAttemptCount + 1); - drain(newSubstream); - } - }); - } - }, - retryPlan.backoffNanos, - TimeUnit.NANOSECONDS)); - return; - } - isFatal = retryPlan.isFatal; - pushbackHedging(retryPlan.hedgingPushbackMillis); - } - - if (isHedging) { - synchronized (lock) { - state = state.removeActiveHedge(substream); - - // The invariant is whether or not #(Potential Hedge + active hedges) > 0. - // Once hasPotentialHedging(state) is false, it will always be false, and then - // #(state.activeHedges) will be decreasing. This guarantees that even there may be - // multiple concurrent hedges, one of the hedges will end up committed. - if (!isFatal) { - if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { - return; - } - // else, no activeHedges, no new hedges possible, try to commit - } // else, fatal, try to commit } } } @@ -901,12 +903,6 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { boolean shouldRetry = false; long backoffNanos = 0L; boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode()); - boolean isNonFatalStatusCode = hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); - if (isHedging && !isNonFatalStatusCode) { - // isFatal is true, no pushback - return new RetryPlan(/* shouldRetry = */ false, /* isFatal = */ true, 0, null); - } - String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); Integer pushbackMillis = null; if (pushbackStr != null) { @@ -916,11 +912,9 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { pushbackMillis = -1; } } - boolean isThrottled = false; if (throttle != null) { - if (isRetryableStatusCode || isNonFatalStatusCode - || (pushbackMillis != null && pushbackMillis < 0)) { + if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) { isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); } } @@ -933,7 +927,6 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { nextBackoffIntervalNanos = Math.min( (long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), retryPolicy.maxBackoffNanos); - } // else no retry } else if (pushbackMillis >= 0) { shouldRetry = true; @@ -942,8 +935,27 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { } // else no retry } // else no retry - return new RetryPlan( - shouldRetry, /* isFatal = */ false, backoffNanos, isHedging ? pushbackMillis : null); + return new RetryPlan(shouldRetry, backoffNanos); + } + + private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) { + String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); + Integer pushbackMillis = null; + if (pushbackStr != null) { + try { + pushbackMillis = Integer.valueOf(pushbackStr); + } catch (NumberFormatException e) { + pushbackMillis = -1; + } + } + boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); + boolean isThrottled = false; + if (throttle != null) { + if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) { + isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); + } + } + return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis); } @Override @@ -1361,17 +1373,22 @@ public int hashCode() { private static final class RetryPlan { final boolean shouldRetry; - final boolean isFatal; // receiving a status not among the nonFatalStatusCodes final long backoffNanos; - @Nullable - final Integer hedgingPushbackMillis; - RetryPlan( - boolean shouldRetry, boolean isFatal, long backoffNanos, - @Nullable Integer hedgingPushbackMillis) { + RetryPlan(boolean shouldRetry, long backoffNanos) { this.shouldRetry = shouldRetry; - this.isFatal = isFatal; this.backoffNanos = backoffNanos; + } + } + + private static final class HedgingPlan { + final boolean isHedgeable; + @Nullable + final Integer hedgingPushbackMillis; + + public HedgingPlan( + boolean isHedgeable, @Nullable Integer hedgingPushbackMillis) { + this.isHedgeable = isHedgeable; this.hedgingPushbackMillis = hedgingPushbackMillis; } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 77c5fe7be89..8a69fe501b2 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -2284,7 +2285,7 @@ public void hedging_transparentRetryNotAllowed() { } @Test - public void hedging_throttled() { + public void hedging_throttledByOtherCall() { Throttle throttle = new Throttle(4f, 0.8f); RetriableStream hedgingStream = newThrottledHedgingStream(throttle); @@ -2313,6 +2314,37 @@ public void hedging_throttled() { assertEquals(0, fakeClock.numPendingTasks()); } + @Test + public void hedging_throttledByHedgingStreams() { + Throttle throttle = new Throttle(4f, 0.8f); + RetriableStream hedgingStream = newThrottledHedgingStream(throttle); + + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = mock(ClientStream.class); + ClientStream mockStream3 = mock(ClientStream.class); + when(retriableStreamRecorder.newSubstream(anyInt())) + .thenReturn(mockStream1, mockStream2, mockStream3); + + hedgingStream.start(masterListener); + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream2).start(sublistenerCaptor2.capture()); + + sublistenerCaptor1.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata()); + assertTrue(throttle.isAboveThreshold()); // count = 3 + sublistenerCaptor2.getValue().closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), new Metadata()); + assertFalse(throttle.isAboveThreshold()); // count = 2 + + verify(masterListener).closed(any(Status.class), any(Metadata.class)); + verifyNoInteractions(mockStream3); + assertEquals(0, fakeClock.numPendingTasks()); + } + /** * Used to stub a retriable stream as well as to record methods of the retriable stream being * called. From aeb1a253d28a56fdd68cf2b70b617fb407834370 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Tue, 18 Aug 2020 15:14:10 -0700 Subject: [PATCH 2/4] update outdated comment --- core/src/main/java/io/grpc/internal/RetriableStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 6c2a6bc5d49..58e903cb8cf 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -882,7 +882,7 @@ public void run() { return; } // else, no activeHedges, no new hedges possible, try to commit - } // else, fatal, try to commit + } // else, isHedgeable is false, try to commit } } } From 90d0a119bc38d4e1895984c49cef44d47ef620dc Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Tue, 18 Aug 2020 15:22:03 -0700 Subject: [PATCH 3/4] reuse shared code --- .../io/grpc/internal/RetriableStream.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 58e903cb8cf..5be4649c7fd 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -903,15 +903,7 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { boolean shouldRetry = false; long backoffNanos = 0L; boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode()); - String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); - Integer pushbackMillis = null; - if (pushbackStr != null) { - try { - pushbackMillis = Integer.valueOf(pushbackStr); - } catch (NumberFormatException e) { - pushbackMillis = -1; - } - } + Integer pushbackMillis = getPushbackMills(trailer); boolean isThrottled = false; if (throttle != null) { if (isRetryableStatusCode || (pushbackMillis != null && pushbackMillis < 0)) { @@ -939,6 +931,19 @@ private RetryPlan makeRetryDecision(Status status, Metadata trailer) { } private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) { + Integer pushbackMillis = getPushbackMills(trailer); + boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); + boolean isThrottled = false; + if (throttle != null) { + if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) { + isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); + } + } + return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis); + } + + @Nullable + private Integer getPushbackMills(Metadata trailer) { String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS); Integer pushbackMillis = null; if (pushbackStr != null) { @@ -948,14 +953,7 @@ private HedgingPlan makeHedgingDecision(Status status, Metadata trailer) { pushbackMillis = -1; } } - boolean isFatal = !hedgingPolicy.nonFatalStatusCodes.contains(status.getCode()); - boolean isThrottled = false; - if (throttle != null) { - if (!isFatal || (pushbackMillis != null && pushbackMillis < 0)) { - isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold(); - } - } - return new HedgingPlan(!isFatal && !isThrottled, pushbackMillis); + return pushbackMillis; } @Override From a0a998d019dd50e6b5ed9f5bed93369536fcec0b Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Tue, 18 Aug 2020 16:31:50 -0700 Subject: [PATCH 4/4] swap if (!isHedging) {} with else {} --- .../io/grpc/internal/RetriableStream.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 5be4649c7fd..44f194c5d0a 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -836,7 +836,25 @@ public void run() { nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos; } - if (!isHedging) { + if (isHedging) { + HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers); + if (hedgingPlan.isHedgeable) { + pushbackHedging(hedgingPlan.hedgingPushbackMillis); + } + synchronized (lock) { + state = state.removeActiveHedge(substream); + // The invariant is whether or not #(Potential Hedge + active hedges) > 0. + // Once hasPotentialHedging(state) is false, it will always be false, and then + // #(state.activeHedges) will be decreasing. This guarantees that even there may be + // multiple concurrent hedges, one of the hedges will end up committed. + if (hedgingPlan.isHedgeable) { + if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { + return; + } + // else, no activeHedges, no new hedges possible, try to commit + } // else, isHedgeable is false, try to commit + } + } else { RetryPlan retryPlan = makeRetryDecision(status, trailers); if (retryPlan.shouldRetry) { // The check state.winningSubstream == null, checking if is not already committed, is @@ -866,24 +884,6 @@ public void run() { TimeUnit.NANOSECONDS)); return; } - } else { - HedgingPlan hedgingPlan = makeHedgingDecision(status, trailers); - if (hedgingPlan.isHedgeable) { - pushbackHedging(hedgingPlan.hedgingPushbackMillis); - } - synchronized (lock) { - state = state.removeActiveHedge(substream); - // The invariant is whether or not #(Potential Hedge + active hedges) > 0. - // Once hasPotentialHedging(state) is false, it will always be false, and then - // #(state.activeHedges) will be decreasing. This guarantees that even there may be - // multiple concurrent hedges, one of the hedges will end up committed. - if (hedgingPlan.isHedgeable) { - if (hasPotentialHedging(state) || !state.activeHedges.isEmpty()) { - return; - } - // else, no activeHedges, no new hedges possible, try to commit - } // else, isHedgeable is false, try to commit - } } } }