From 239db4b381d8a84a03a5e612e3dc41347acd1b4e Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 29 Oct 2020 20:02:57 -0400 Subject: [PATCH 01/10] RELEASING.md: Add pre-tagging internal step --- RELEASING.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/RELEASING.md b/RELEASING.md index a0852f4216d..500b551ccea 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -108,7 +108,8 @@ Tagging the Release 1. Verify there are no open issues in the release milestone. Open issues should either be deferred or resolved and the fix backported. -2. For vMajor.Minor.x branch, change `README.md` to refer to the next release +2. Ensure that Google-internal steps completed at go/grpc/java/releasing#before-tagging-a-release. +3. For vMajor.Minor.x branch, change `README.md` to refer to the next release version. _Also_ update the version numbers for protoc if the protobuf library version was updated since the last release. @@ -122,7 +123,7 @@ Tagging the Release $ ${EDITOR:-nano -w} cronet/README.md $ git commit -a -m "Update README etc to reference $MAJOR.$MINOR.$PATCH" ``` -3. Change root build files to remove "-SNAPSHOT" for the next release version +4. Change root build files to remove "-SNAPSHOT" for the next release version (e.g. `0.7.0`). Commit the result and make a tag: ```bash @@ -133,7 +134,7 @@ Tagging the Release $ git commit -a -m "Bump version to $MAJOR.$MINOR.$PATCH" $ git tag -a v$MAJOR.$MINOR.$PATCH -m "Version $MAJOR.$MINOR.$PATCH" ``` -4. Change root build files to the next snapshot version (e.g. `0.7.1-SNAPSHOT`). +5. Change root build files to the next snapshot version (e.g. `0.7.1-SNAPSHOT`). Commit the result: ```bash @@ -145,7 +146,7 @@ Tagging the Release $ ./gradlew build $ git commit -a -m "Bump version to $MAJOR.$MINOR.$((PATCH+1))-SNAPSHOT" ``` -5. Go through PR review and push the release tag and updated release branch to +6. Go through PR review and push the release tag and updated release branch to GitHub: ```bash @@ -154,7 +155,7 @@ Tagging the Release $ git push upstream v$MAJOR.$MINOR.x $ git push upstream v$MAJOR.$MINOR.$PATCH ``` -6. Close the release milestone. +7. Close the release milestone. Build Artifacts --------------- From b956f8852d36adc54eb60649d7b82896b399d723 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 7 Apr 2021 18:06:32 -0700 Subject: [PATCH 02/10] grpclb: include fallback reason in error status of failing to fallback (#8035) Enhance error information reflected by RPC status when failing to fallback (aka, no fallback addresses provided by resolver), by including the original cause of entering fallback. Cases to fallback include: - When the fallback timer fires before we have received the first response from the balancer. - If no fallback addresses are found, RPCs will be failed with status {UNAVAILABLE, description="Unable to fallback, no fallback addresses found\n Timeout waiting for remote balancer", cause=null} - When the balancer RPC finishes before receiving any backend addresses - If no fallback addresses are found, RPCs will be failed with status {UNAVAILABLE, description="Unable to fallback, no fallback addresses found\n ", cause=} - When we get an explicit response from the balancer telling us go into fallback - If no fallback addresses are found, RPCs will be failed with status {UNAVAILABLE, description="Unable to fallback, no fallback addresses found\n Fallback requested by balancer", cause=null} - When the balancer call has finished *and* we cannot connect to any of the backends in the last response we received from the balancer. - Depending on whichever the two happened last, the last happening one is the reason that triggers entering fallback. If no fallback addresses are found, RPCs will be failed with status {UNAVAILABLE, description="Unable to fallback, no fallback addresses found\n ", cause=} or {UNAVAILABLE, description="Unable to fallback, no fallback addresses found\n ", cause=} Note all RPCs will fail with UNAVAILABLE status code, the fallback reason will be attached as description and cause (if any). --- .../main/java/io/grpc/grpclb/GrpclbState.java | 76 +++++++-- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 151 +++++++++++++++++- 2 files changed, 202 insertions(+), 25 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 1c812e14500..c8bf77076c3 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -95,8 +95,20 @@ final class GrpclbState { static final Status NO_AVAILABLE_BACKENDS_STATUS = Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends"); @VisibleForTesting - static final Status NO_FALLBACK_BACKENDS_FOUND_STATUS = + static final Status BALANCER_TIMEOUT_STATUS = + Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer"); + @VisibleForTesting + static final Status BALANCER_REQUESTED_FALLBACK_STATUS = + Status.UNAVAILABLE.withDescription("Fallback requested by balancer"); + @VisibleForTesting + static final Status NO_FALLBACK_BACKENDS_STATUS = Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found"); + // This error status should never be propagated to RPC failures, as "no backend or balancer + // addresses found" should be directly handled as a name resolution error. So in cases of no + // balancer address, fallback should never fail. + private static final Status NO_LB_ADDRESS_PROVIDED_STATUS = + Status.UNAVAILABLE.withDescription("No balancer address found"); + @VisibleForTesting static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @@ -137,6 +149,10 @@ enum Mode { private ScheduledHandle fallbackTimer; private List fallbackBackendList = Collections.emptyList(); private boolean usingFallbackBackends; + // Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no + // fallback addresses found). + @Nullable + private Status fallbackReason; // True if the current balancer has returned a serverlist. Will be reset to false when lost // connection to a balancer. private boolean balancerWorking; @@ -239,7 +255,7 @@ void handleAddresses( // No balancer address: close existing balancer connection and enter fallback mode // immediately. shutdownLbComm(); - syncContext.execute(new FallbackModeTask()); + syncContext.execute(new FallbackModeTask(NO_LB_ADDRESS_PROVIDED_STATUS)); } else { startLbComm(newLbAddressGroups); // Avoid creating a new RPC just because the addresses were updated, as it can cause a @@ -253,7 +269,8 @@ void handleAddresses( // Start the fallback timer if it's never started if (fallbackTimer == null) { fallbackTimer = syncContext.schedule( - new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService); + new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS, + TimeUnit.MILLISECONDS, timerService); } } fallbackBackendList = newBackendServers; @@ -275,16 +292,21 @@ void requestConnection() { } private void maybeUseFallbackBackends() { - if (balancerWorking) { - return; - } - if (usingFallbackBackends) { + if (balancerWorking || usingFallbackBackends) { return; } + // Balancer RPC should have either been broken or timed out. + checkState(fallbackReason != null, "no reason to fallback"); for (Subchannel subchannel : subchannels.values()) { - if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { + ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { return; } + // If we do have balancer-provided backends, use one of its error in the error message if + // fail to fallback. + if (stateInfo.getState() == TRANSIENT_FAILURE) { + fallbackReason = stateInfo.getStatus(); + } } // Fallback conditions met useFallbackBackends(); @@ -401,8 +423,10 @@ void shutdown() { void propagateError(Status status) { logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status); if (backendList.isEmpty()) { + Status error = + Status.UNAVAILABLE.withCause(status.getCause()).withDescription(status.getDescription()); maybeUpdatePicker( - TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status)))); + TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(error)))); } } @@ -528,8 +552,17 @@ public void onSubchannelState(ConnectivityStateInfo newState) { @VisibleForTesting class FallbackModeTask implements Runnable { + private final Status reason; + + private FallbackModeTask(Status reason) { + this.reason = reason; + } + @Override public void run() { + // Timer should have been cancelled if entered fallback early. + checkState(!usingFallbackBackends, "already in fallback"); + fallbackReason = reason; maybeUseFallbackBackends(); maybeUpdatePicker(); } @@ -658,7 +691,9 @@ private void handleResponse(LoadBalanceResponse response) { } if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) { + // Force entering fallback requested by balancer. cancelFallbackTimer(); + fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS; useFallbackBackends(); maybeUpdatePicker(); return; @@ -690,7 +725,7 @@ private void handleResponse(LoadBalanceResponse response) { } catch (UnknownHostException e) { propagateError( Status.UNAVAILABLE - .withDescription("Host for server not found: " + server) + .withDescription("Invalid backend address: " + server) .withCause(e)); continue; } @@ -701,8 +736,9 @@ private void handleResponse(LoadBalanceResponse response) { newBackendAddrList.add(new BackendAddressGroup(eag, token)); } } - // Stop using fallback backends as soon as a new server list is received from the balancer. + // Exit fallback as soon as a new server list is received from the balancer. usingFallbackBackends = false; + fallbackReason = null; cancelFallbackTimer(); updateServerList(newDropList, newBackendAddrList, loadRecorder); maybeUpdatePicker(); @@ -717,6 +753,8 @@ private void handleStreamClosed(Status error) { cleanUp(); propagateError(error); balancerWorking = false; + fallbackReason = error; + cancelFallbackTimer(); maybeUseFallbackBackends(); maybeUpdatePicker(); @@ -773,15 +811,19 @@ private void maybeUpdatePicker() { List pickList; ConnectivityState state; if (backendList.isEmpty()) { - if (balancerWorking) { - pickList = - Collections.singletonList( - new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); + // Note balancer (is working) may enforce using fallback backends, and that fallback may + // fail. So we should check if currently in fallback first. + if (usingFallbackBackends) { + Status error = + NO_FALLBACK_BACKENDS_STATUS + .withCause(fallbackReason.getCause()) + .augmentDescription(fallbackReason.getDescription()); + pickList = Collections.singletonList(new ErrorEntry(error)); state = TRANSIENT_FAILURE; - } else if (usingFallbackBackends) { + } else if (balancerWorking) { pickList = Collections.singletonList( - new ErrorEntry(NO_FALLBACK_BACKENDS_FOUND_STATUS)); + new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); state = TRANSIENT_FAILURE; } else { // still waiting for balancer pickList = Collections.singletonList(BUFFER_ENTRY); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index edd422a8b76..4fc3d122347 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -792,14 +792,16 @@ public void nameResolutionFailsThenRecover() { "INFO: [grpclb-] Created", "DEBUG: [grpclb-] Error: " + error, "INFO: [grpclb-] Update balancing state to TRANSIENT_FAILURE: picks=" - + "[Status{code=NOT_FOUND, description=www.google.com not found, cause=null}]," + + "[Status{code=UNAVAILABLE, description=www.google.com not found, cause=null}]," + " drops=[]") .inOrder(); logs.clear(); RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker.dropList).isEmpty(); - assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); + PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription()); // Recover with a subsequent success List grpclbBalancerList = createResolvedBalancerAddresses(1); @@ -832,7 +834,9 @@ public void grpclbThenNameResolutionFails() { inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker.dropList).isEmpty(); - assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); + PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription()); assertFalse(oobChannel.isShutdown()); // Simulate receiving LB response @@ -1284,11 +1288,16 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { for (Subchannel subchannel : mockSubchannels) { verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); } + + // RPC error status includes message of balancer RPC timeout inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).isEmpty(); - assertThat(picker.pickList) - .containsExactly(new ErrorEntry(GrpclbState.NO_FALLBACK_BACKENDS_FOUND_STATUS)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()) + .isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()) + .contains(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription()); } //////////////////////////////////////////////////////////////// @@ -1396,6 +1405,9 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); lbResponseObserver.onError(streamError.asException()); + // Fallback time has been short-circuited + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // Fall back to the backends from resolver fallbackTestVerifyUseOfFallbackBackendLists( inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); @@ -1408,6 +1420,24 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { eq(LoadBalanceRequest.newBuilder().setInitialRequest( InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); + + ////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr + ////////////////////////////////////////////////////////////////////// + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + // Still in fallback logic, except that the backend list is empty + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + + // RPC error status includes error of balancer stream + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()).contains(streamError.getDescription()); } @Test @@ -1434,6 +1464,24 @@ public void grpclbFallback_noBalancerAddress() { assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); verify(helper, never()) .createOobChannel(ArgumentMatchers.anyList(), anyString()); + logs.clear(); + + /////////////////////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr or balancer addr + /////////////////////////////////////////////////////////////////////////////////////// + deliverResolvedAddresses(Collections.emptyList(), + Collections.emptyList()); + assertThat(logs).containsExactly( + "DEBUG: [grpclb-] Error: Status{code=UNAVAILABLE, " + + "description=No backend or balancer addresses found, cause=null}"); + + // Keep using existing fallback addresses without interruption + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool, never()) + .returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + verify(helper, never()) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); } @Test @@ -1531,6 +1579,7 @@ private void subtestGrpclbFallbackConnectionLost( } assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // No subchannel to fallback backends should have been created if no fallback happened if (!(balancerBroken && allSubchannelsBroken)) { verify(subchannelPool, never()).takeOrCreateSubchannel( eq(backendList.get(0)), any(Attributes.class)); @@ -1539,6 +1588,72 @@ private void subtestGrpclbFallbackConnectionLost( } } + @Test + public void grpclbFallback_allLost_failToFallback() { + long loadReportIntervalMillis = 1983; + InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); + + // Create balancer and (empty) backend addresses + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + fakeOobChannels.poll(); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if these methods have been run. + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + + inOrder.verifyNoMoreInteractions(); + + // Balancer returns a server list + List serverList = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(serverList)); + + List subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); + + // Break connections + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + // A new stream to LB is created + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + assertEquals(1, lbRequestObservers.size()); + + // Break all subchannel connections + Status error = Status.UNAUTHENTICATED.withDescription("Permission denied"); + for (Subchannel subchannel : subchannels) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); + } + + // Recycle all subchannels + for (Subchannel subchannel : subchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + + // RPC error status includes errors of subchannels to balancer-provided backends + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()).contains(error.getDescription()); + } + private List fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List addrs) { return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); @@ -1958,6 +2073,7 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).shutdown(); + // RPC error status includes message of no backends provided by balancer inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(errorPicker.pickList) @@ -2445,7 +2561,7 @@ public void grpclbWorking_lbSendsFallbackMessage() { new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) .inOrder(); - // enter fallback mode + // Balancer forces entering fallback mode lbResponseObserver.onNext(buildLbFallbackResponse()); // existing subchannels must be returned immediately to gracefully shutdown. @@ -2460,6 +2576,25 @@ public void grpclbWorking_lbSendsFallbackMessage() { assertFalse(oobChannel.isShutdown()); verify(lbRequestObserver, never()).onCompleted(); + ////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr + ////////////////////////////////////////////////////////////////////// + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + // Still in fallback logic, except that the backend list is empty + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + + // RPC error status includes message of fallback requested by balancer + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()) + .contains(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription()); + // exit fall back by providing two new backends ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002"); From 6ad3f5d9e40c3316b88cf7e3710b1bccd43ceefc Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Wed, 7 Apr 2021 15:35:30 -0400 Subject: [PATCH 03/10] xds: Fix error prone UnnecessaryJavacSuppressWarnings in a test --- xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 87cc2ceb16d..692bf9ec9e3 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -2127,7 +2127,6 @@ protected final Message buildListener(String name, Message routeConfiguration) { return buildListener(name, routeConfiguration, Collections.emptyList()); } - @SuppressWarnings("unchecked") protected abstract Message buildListener( String name, Message routeConfiguration, List httpFilters); From 1b996b171b47180a861703b8f17ae27adbc6af97 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Tue, 6 Apr 2021 20:24:13 -0400 Subject: [PATCH 04/10] Update README etc to reference 1.37.0 --- README.md | 30 ++++++++++++------------ cronet/README.md | 2 +- documentation/android-channel-builder.md | 4 ++-- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index db22e97c7dc..412c97172dc 100644 --- a/README.md +++ b/README.md @@ -31,8 +31,8 @@ For a guided tour, take a look at the [quick start guide](https://grpc.io/docs/languages/java/quickstart) or the more explanatory [gRPC basics](https://grpc.io/docs/languages/java/basics). -The [examples](https://github.com/grpc/grpc-java/tree/v1.36.0/examples) and the -[Android example](https://github.com/grpc/grpc-java/tree/v1.36.0/examples/android) +The [examples](https://github.com/grpc/grpc-java/tree/v1.37.0/examples) and the +[Android example](https://github.com/grpc/grpc-java/tree/v1.37.0/examples/android) are standalone projects that showcase the usage of gRPC. Download @@ -43,17 +43,17 @@ Download [the JARs][]. Or for Maven with non-Android, add to your `pom.xml`: io.grpc grpc-netty-shaded - 1.36.0 + 1.37.0 io.grpc grpc-protobuf - 1.36.0 + 1.37.0 io.grpc grpc-stub - 1.36.0 + 1.37.0 org.apache.tomcat @@ -65,23 +65,23 @@ Download [the JARs][]. Or for Maven with non-Android, add to your `pom.xml`: Or for Gradle with non-Android, add to your dependencies: ```gradle -implementation 'io.grpc:grpc-netty-shaded:1.36.0' -implementation 'io.grpc:grpc-protobuf:1.36.0' -implementation 'io.grpc:grpc-stub:1.36.0' +implementation 'io.grpc:grpc-netty-shaded:1.37.0' +implementation 'io.grpc:grpc-protobuf:1.37.0' +implementation 'io.grpc:grpc-stub:1.37.0' compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+ ``` For Android client, use `grpc-okhttp` instead of `grpc-netty-shaded` and `grpc-protobuf-lite` instead of `grpc-protobuf`: ```gradle -implementation 'io.grpc:grpc-okhttp:1.36.0' -implementation 'io.grpc:grpc-protobuf-lite:1.36.0' -implementation 'io.grpc:grpc-stub:1.36.0' +implementation 'io.grpc:grpc-okhttp:1.37.0' +implementation 'io.grpc:grpc-protobuf-lite:1.37.0' +implementation 'io.grpc:grpc-stub:1.37.0' compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+ ``` [the JARs]: -https://search.maven.org/search?q=g:io.grpc%20AND%20v:1.36.0 +https://search.maven.org/search?q=g:io.grpc%20AND%20v:1.37.0 Development snapshots are available in [Sonatypes's snapshot repository](https://oss.sonatype.org/content/repositories/snapshots/). @@ -113,7 +113,7 @@ For protobuf-based codegen integrated with the Maven build system, you can use com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier} grpc-java - io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier} + io.grpc:protoc-gen-grpc-java:1.37.0:exe:${os.detected.classifier} @@ -143,7 +143,7 @@ protobuf { } plugins { grpc { - artifact = 'io.grpc:protoc-gen-grpc-java:1.36.0' + artifact = 'io.grpc:protoc-gen-grpc-java:1.37.0' } } generateProtoTasks { @@ -176,7 +176,7 @@ protobuf { } plugins { grpc { - artifact = 'io.grpc:protoc-gen-grpc-java:1.36.0' + artifact = 'io.grpc:protoc-gen-grpc-java:1.37.0' } } generateProtoTasks { diff --git a/cronet/README.md b/cronet/README.md index a2178d71aa7..5a85d37cae5 100644 --- a/cronet/README.md +++ b/cronet/README.md @@ -26,7 +26,7 @@ In your app module's `build.gradle` file, include a dependency on both `grpc-cro Google Play Services Client Library for Cronet ``` -implementation 'io.grpc:grpc-cronet:1.36.0' +implementation 'io.grpc:grpc-cronet:1.37.0' implementation 'com.google.android.gms:play-services-cronet:16.0.0' ``` diff --git a/documentation/android-channel-builder.md b/documentation/android-channel-builder.md index d5dad50e0f2..d516a7db342 100644 --- a/documentation/android-channel-builder.md +++ b/documentation/android-channel-builder.md @@ -36,8 +36,8 @@ In your `build.gradle` file, include a dependency on both `grpc-android` and `grpc-okhttp`: ``` -implementation 'io.grpc:grpc-android:1.36.0' -implementation 'io.grpc:grpc-okhttp:1.36.0' +implementation 'io.grpc:grpc-android:1.37.0' +implementation 'io.grpc:grpc-okhttp:1.37.0' ``` You also need permission to access the device's network state in your From d971fe629ce60e39ce284171b24c5507d04ff4ba Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 8 Apr 2021 13:38:03 -0400 Subject: [PATCH 05/10] RELEASING.md: remove JCenter note JFrog has announced that they are shutting down the JCenter: https://jfrog.com/blog/into-the-sunset-bintray-jcenter-gocenter-and-chartcenter/ --- RELEASING.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/RELEASING.md b/RELEASING.md index 500b551ccea..48f3b645144 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -215,9 +215,6 @@ Finally, document and publicize the release. 2. Post a release announcement to [grpc-io](https://groups.google.com/forum/#!forum/grpc-io) (`grpc-io@googlegroups.com`). The title should be something that clearly identifies the release (e.g.`GRPC-Java Released`). - 1. Check if JCenter has picked up the new release (https://jcenter.bintray.com/io/grpc/) - and include its availability in the release announcement email. JCenter should mirror - everything on Maven Central, but users have reported delays. Update Hosted Javadoc --------------------- From 1b86618ce9db1a6bc2e0ec5fac1b28eb716fc47d Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 8 Apr 2021 18:42:33 -0400 Subject: [PATCH 06/10] buildscript: use different xds-k8s cluster In preparation to the Public Preview. --- buildscripts/kokoro/xds-k8s.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buildscripts/kokoro/xds-k8s.sh b/buildscripts/kokoro/xds-k8s.sh index 8b5bf72f0f7..44f9d77425c 100755 --- a/buildscripts/kokoro/xds-k8s.sh +++ b/buildscripts/kokoro/xds-k8s.sh @@ -4,8 +4,8 @@ set -eo pipefail # Constants readonly GITHUB_REPOSITORY_NAME="grpc-java" # GKE Cluster -readonly GKE_CLUSTER_NAME="interop-test-psm-sec1-us-central1" -readonly GKE_CLUSTER_ZONE="us-central1-a" +readonly GKE_CLUSTER_NAME="interop-test-psm-sec-testing-api" +readonly GKE_CLUSTER_ZONE="us-west1-b" ## xDS test server/client Docker images readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server" readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-client" From 95adf968483307783eaff46cd5036e0f66ba4a21 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 8 Apr 2021 17:58:45 -0700 Subject: [PATCH 07/10] xds: implement ring_hash load balancing policy (#7943) Implementation for the ring hash LB policy. A LoadBalancer that provides consistent hashing based load balancing to upstream hosts, with the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its addresses. Each request is routed to a host by hashing some property of the request and finding the nearest corresponding host clockwise around the ring. Each host is placed on the ring some number of times proportional to its weight. With the ring partitioned appropriately, the addition or removal of one host from a set of N hosts will affect only 1/N requests. --- .../io/grpc/xds/InternalXdsAttributes.java | 7 + .../io/grpc/xds/RingHashLoadBalancer.java | 460 +++++++++++ .../xds/RingHashLoadBalancerProvider.java | 72 ++ .../services/io.grpc.LoadBalancerProvider | 1 + .../xds/RingHashLoadBalancerProviderTest.java | 121 +++ .../io/grpc/xds/RingHashLoadBalancerTest.java | 728 ++++++++++++++++++ 6 files changed, 1389 insertions(+) create mode 100644 xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java create mode 100644 xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java create mode 100644 xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java create mode 100644 xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java diff --git a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java index 6f3062f22b7..efeee0758a3 100644 --- a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java @@ -68,5 +68,12 @@ public final class InternalXdsAttributes { static final Attributes.Key ATTR_LOCALITY = Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.locality"); + /** + * Endpoint weight for load balancing purposes. + */ + @EquivalentAddressGroup.Attr + static final Attributes.Key ATTR_SERVER_WEIGHT = + Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.serverWeight"); + private InternalXdsAttributes() {} } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java new file mode 100644 index 00000000000..bdbaa238e21 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -0,0 +1,460 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Sets; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.InternalLogId; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts. + * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its + * addresses. Each request is routed to a host by hashing some property of the request and finding + * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some + * number of times proportional to its weight. With the ring partitioned appropriately, the + * addition or removal of one host from a set of N hosts will affect only 1/N requests. + */ +final class RingHashLoadBalancer extends LoadBalancer { + private static final Attributes.Key> STATE_INFO = + Attributes.Key.create("state-info"); + private static final Status RPC_HASH_NOT_FOUND = + Status.INTERNAL.withDescription("RPC hash not found"); + private static final XxHash64 hashFunc = XxHash64.INSTANCE; + + private final XdsLogger logger; + private final SynchronizationContext syncContext; + private final Map subchannels = new HashMap<>(); + private final Helper helper; + + private List ring; + private ConnectivityState currentState; + + RingHashLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); + logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority())); + logger.log(XdsLogLevel.INFO, "Created"); + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + List addrList = resolvedAddresses.getAddresses(); + if (addrList.isEmpty()) { + handleNameResolutionError(Status.UNAVAILABLE.withDescription("No server addresses found")); + return; + } + Map latestAddrs = stripAttrs(addrList); + Set removedAddrs = + Sets.newHashSet(Sets.difference(subchannels.keySet(), latestAddrs.keySet())); + + RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + Map serverWeights = new HashMap<>(); + long totalWeight = 0L; + for (EquivalentAddressGroup eag : addrList) { + Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT); + // Support two ways of server weighing: either multiple instances of the same address + // or each address contains a per-address weight attribute. If a weight is not provided, + // each occurrence of the address will be counted a weight value of one. + if (weight == null) { + weight = 1L; + } + totalWeight += weight; + EquivalentAddressGroup addrKey = stripAttrs(eag); + if (serverWeights.containsKey(addrKey)) { + serverWeights.put(addrKey, serverWeights.get(addrKey) + weight); + } else { + serverWeights.put(addrKey, weight); + } + + Subchannel existingSubchannel = subchannels.get(addrKey); + if (existingSubchannel != null) { + existingSubchannel.updateAddresses(Collections.singletonList(eag)); + continue; + } + Attributes attr = Attributes.newBuilder().set( + STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE))).build(); + final Subchannel subchannel = helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(eag).setAttributes(attr).build()); + subchannel.start(new SubchannelStateListener() { + @Override + public void onSubchannelState(ConnectivityStateInfo newState) { + processSubchannelState(subchannel, newState); + } + }); + subchannels.put(addrKey, subchannel); + } + long minWeight = Collections.min(serverWeights.values()); + double normalizedMinWeight = (double) minWeight / totalWeight; + // Scale up the number of hashes per host such that the least-weighted host gets a whole + // number of hashes on the the ring. Other hosts might not end up with whole numbers, and + // that's fine (the ring-building algorithm can handle this). This preserves the original + // implementation's behavior: when weights aren't provided, all hosts should get an equal + // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled + // back down to fit. + double scale = Math.min( + Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight, + (double) config.maxRingSize); + ring = buildRing(serverWeights, totalWeight, scale); + + // Shut down subchannels for delisted addresses. + List removedSubchannels = new ArrayList<>(); + for (EquivalentAddressGroup addr : removedAddrs) { + removedSubchannels.add(subchannels.remove(addr)); + } + + // Update the picker before shutting down the subchannels, to reduce the chance of race + // between picking a subchannel and shutting it down. + updateBalancingState(); + for (Subchannel subchann : removedSubchannels) { + shutdownSubchannel(subchann); + } + } + + private static List buildRing( + Map serverWeights, long totalWeight, double scale) { + List ring = new ArrayList<>(); + double currentHashes = 0.0; + double targetHashes = 0.0; + for (Map.Entry entry : serverWeights.entrySet()) { + EquivalentAddressGroup addrKey = entry.getKey(); + double normalizedWeight = (double) entry.getValue() / totalWeight; + // TODO(chengyuanzhang): is using the list of socket address correct? + StringBuilder sb = new StringBuilder(addrKey.getAddresses().toString()); + sb.append('_'); + targetHashes += scale * normalizedWeight; + long i = 0L; + while (currentHashes < targetHashes) { + sb.append(i); + long hash = hashFunc.hashAsciiString(sb.toString()); + ring.add(new RingEntry(hash, addrKey)); + i++; + currentHashes++; + sb.setLength(sb.length() - 1); + } + } + Collections.sort(ring); + return Collections.unmodifiableList(ring); + } + + @Override + public void handleNameResolutionError(Status error) { + if (currentState != READY) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + } + + @Override + public void shutdown() { + logger.log(XdsLogLevel.INFO, "Shutdown"); + for (Subchannel subchannel : subchannels.values()) { + shutdownSubchannel(subchannel); + } + } + + private void updateBalancingState() { + checkState(!subchannels.isEmpty(), "no subchannel has been created"); + ConnectivityState overallState = aggregateState(subchannels.values()); + RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels); + // TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates + helper.updateBalancingState(overallState, picker); + currentState = overallState; + } + + private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) { + return; + } + Ref subchannelStateRef = getSubchannelStateInfoRef(subchannel); + + // Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected. + // If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in + // TRANSIENT_FAILURE until it becomes READY. + if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) { + if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { + return; + } + } + subchannelStateRef.value = stateInfo; + updateBalancingState(); + } + + /** + * Aggregates the connectivity states of a group of subchannels for overall connectivity state. + * + *

Aggregation rules (in order of dominance): + *

    + *
  1. If there is at least one subchannel in READY state, overall state is READY
  2. + *
  3. If there are 2 or more subchannels in TRANSIENT_FAILURE, overall state is + * TRANSIENT_FAILURE
  4. + *
  5. If there is at least one subchannel in CONNECTING state, overall state is + * CONNECTING
  6. + *
  7. If there is at least one subchannel in IDLE state, overall state is IDLE
  8. + *
  9. Otherwise, overall state is TRANSIENT_FAILURE
  10. + *
+ */ + private static ConnectivityState aggregateState(Iterable subchannels) { + int failureCount = 0; + boolean hasIdle = false; + boolean hasConnecting = false; + for (Subchannel subchannel : subchannels) { + ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState(); + if (state == READY) { + return state; + } + if (state == TRANSIENT_FAILURE) { + failureCount++; + } else if (state == CONNECTING) { + hasConnecting = true; + } else if (state == IDLE) { + hasIdle = true; + } + } + if (failureCount >= 2) { + return TRANSIENT_FAILURE; + } + if (hasConnecting) { + return CONNECTING; + } + return hasIdle ? IDLE : TRANSIENT_FAILURE; + } + + private static void shutdownSubchannel(Subchannel subchannel) { + subchannel.shutdown(); + getSubchannelStateInfoRef(subchannel).value = ConnectivityStateInfo.forNonError(SHUTDOWN); + } + + /** + * Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and + * remove all attributes. The values are the original EAGs. + */ + private static Map stripAttrs( + List groupList) { + Map addrs = + new HashMap<>(groupList.size() * 2); + for (EquivalentAddressGroup group : groupList) { + addrs.put(stripAttrs(group), group); + } + return addrs; + } + + private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { + return new EquivalentAddressGroup(eag.getAddresses()); + } + + private static Ref getSubchannelStateInfoRef( + Subchannel subchannel) { + return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO"); + } + + private static final class RingHashPicker extends SubchannelPicker { + private final SynchronizationContext syncContext; + private final List ring; + // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, + // freeze picker's view of subchannel's connectivity state. + // TODO(chengyuanzhang): can be more performance-friendly with + // IdentityHashMap and RingEntry contains Subchannel. + private final Map pickableSubchannels; // read-only + + private RingHashPicker( + SynchronizationContext syncContext, List ring, + Map subchannels) { + this.syncContext = syncContext; + this.ring = ring; + pickableSubchannels = new HashMap<>(subchannels.size()); + for (Map.Entry entry : subchannels.entrySet()) { + Subchannel subchannel = entry.getValue(); + ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).value; + pickableSubchannels.put(entry.getKey(), new SubchannelView(subchannel, stateInfo)); + } + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY); + if (requestHash == null) { + return PickResult.withError(RPC_HASH_NOT_FOUND); + } + + // Find the ring entry with hash next to (clockwise) the RPC's hash. + int low = 0; + int high = ring.size(); + int mid; + while (true) { + mid = (low + high) / 2; + if (mid == ring.size()) { + mid = 0; + break; + } + long midVal = ring.get(mid).hash; + long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash; + if (requestHash <= midVal && requestHash > midValL) { + break; + } + if (midVal < requestHash) { + low = mid + 1; + } else { + high = mid - 1; + } + if (low > high) { + mid = 0; + break; + } + } + + // Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash. + // If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result + // based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found. + // Meanwhile, trigger connection for the first subchannel that is in IDLE if no subchannel + // before it is in CONNECTING or READY. + boolean hasPending = false; // true if having subchannel(s) in CONNECTING or IDLE + boolean canBuffer = true; // true if RPCs can be buffered with a pending subchannel + Subchannel firstSubchannel = null; + Subchannel secondSubchannel = null; + for (int i = 0; i < ring.size(); i++) { + int index = (mid + i) % ring.size(); + EquivalentAddressGroup addrKey = ring.get(index).addrKey; + SubchannelView subchannel = pickableSubchannels.get(addrKey); + if (subchannel.stateInfo.getState() == READY) { + return PickResult.withSubchannel(subchannel.subchannel); + } + + // RPCs can be buffered if any of the first two subchannels is pending. Otherwise, RPCs + // are failed unless there is a READY connection. + if (firstSubchannel == null) { + firstSubchannel = subchannel.subchannel; + } else if (subchannel.subchannel != firstSubchannel) { + if (secondSubchannel == null) { + secondSubchannel = subchannel.subchannel; + } else if (subchannel.subchannel != secondSubchannel) { + canBuffer = false; + } + } + if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE) { + continue; + } + if (!hasPending) { // first non-failing subchannel + if (subchannel.stateInfo.getState() == IDLE) { + final Subchannel finalSubchannel = subchannel.subchannel; + syncContext.execute(new Runnable() { + @Override + public void run() { + finalSubchannel.requestConnection(); + } + }); + } + if (canBuffer) { // done if this is the first or second two subchannel + return PickResult.withNoResult(); // queue the pick and re-process later + } + hasPending = true; + } + } + // Fail the pick with error status of the original subchannel hit by hash. + SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey); + return PickResult.withError(originalSubchannel.stateInfo.getStatus()); + } + } + + /** + * An unmodifiable view of a subchannel with state not subject to its real connectivity + * state changes. + */ + private static final class SubchannelView { + private final Subchannel subchannel; + private final ConnectivityStateInfo stateInfo; + + private SubchannelView(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + this.subchannel = subchannel; + this.stateInfo = stateInfo; + } + } + + private static final class RingEntry implements Comparable { + private final long hash; + private final EquivalentAddressGroup addrKey; + + private RingEntry(long hash, EquivalentAddressGroup addrKey) { + this.hash = hash; + this.addrKey = addrKey; + } + + @Override + public int compareTo(RingEntry entry) { + return Long.compare(hash, entry.hash); + } + } + + /** + * A lighter weight Reference than AtomicReference. + */ + private static final class Ref { + T value; + + Ref(T value) { + this.value = value; + } + } + + /** + * Configures the ring property. The larger the ring is (that is, the more hashes there are + * for each provided host) the better the request distribution will reflect the desired weights. + */ + static final class RingHashConfig { + final long minRingSize; + final long maxRingSize; + + RingHashConfig(long minRingSize, long maxRingSize) { + checkArgument(minRingSize > 0, "minRingSize <= 0"); + checkArgument(maxRingSize > 0, "maxRingSize <= 0"); + checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize"); + this.minRingSize = minRingSize; + this.maxRingSize = maxRingSize; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("minRingSize", minRingSize) + .add("maxRingSize", maxRingSize) + .toString(); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java new file mode 100644 index 00000000000..fcbd527bf5c --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java @@ -0,0 +1,72 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; +import java.util.Map; + +/** + * The provider for the "ring_hash" balancing policy. + */ +@Internal +public final class RingHashLoadBalancerProvider extends LoadBalancerProvider { + + private static final boolean enableRingHash = + Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH")); + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new RingHashLoadBalancer(helper); + } + + @Override + public boolean isAvailable() { + return enableRingHash; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "ring_hash"; + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawLoadBalancingPolicyConfig) { + Long minRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "minRingSize"); + Long maxRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "maxRingSize"); + if (minRingSize == null || maxRingSize == null) { + return ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription( + "Missing 'mingRingSize'/'maxRingSize'")); + } + if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) { + return ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription( + "Invalid 'mingRingSize'/'maxRingSize'")); + } + return ConfigOrError.fromConfig(new RingHashConfig(minRingSize, maxRingSize)); + } +} diff --git a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index 6929006c103..7ba3dcf22f5 100644 --- a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -4,3 +4,4 @@ io.grpc.xds.WeightedTargetLoadBalancerProvider io.grpc.xds.ClusterManagerLoadBalancerProvider io.grpc.xds.ClusterResolverLoadBalancerProvider io.grpc.xds.ClusterImplLoadBalancerProvider +io.grpc.xds.RingHashLoadBalancerProvider diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java new file mode 100644 index 00000000000..2d7eb4fd59f --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerProviderTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.grpc.InternalServiceProviders; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.internal.JsonParser; +import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link RingHashLoadBalancerProvider}. */ +@RunWith(JUnit4.class) +public class RingHashLoadBalancerProviderTest { + private static final String AUTHORITY = "foo.googleapis.com"; + + private final SynchronizationContext syncContext = new SynchronizationContext( + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final RingHashLoadBalancerProvider provider = new RingHashLoadBalancerProvider(); + + @Test + public void provided() { + for (LoadBalancerProvider current : InternalServiceProviders.getCandidatesViaServiceLoader( + LoadBalancerProvider.class, getClass().getClassLoader())) { + if (current instanceof RingHashLoadBalancerProvider) { + return; + } + } + fail("RingHashLoadBalancerProvider not registered"); + } + + @Test + public void providesLoadBalancer() { + Helper helper = mock(Helper.class); + when(helper.getSynchronizationContext()).thenReturn(syncContext); + when(helper.getAuthority()).thenReturn(AUTHORITY); + assertThat(provider.newLoadBalancer(helper)) + .isInstanceOf(RingHashLoadBalancer.class); + } + + @Test + public void parseLoadBalancingConfig_valid() throws IOException { + String lbConfig = "{\"minRingSize\" : 10, \"maxRingSize\" : 100}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getConfig()).isNotNull(); + RingHashConfig config = (RingHashConfig) configOrError.getConfig(); + assertThat(config.minRingSize).isEqualTo(10L); + assertThat(config.maxRingSize).isEqualTo(100L); + } + + @Test + public void parseLoadBalancingConfig_missingRingSize() throws IOException { + String lbConfig = "{\"minRingSize\" : 10}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getError()).isNotNull(); + assertThat(configOrError.getError().getCode()).isEqualTo(Code.INVALID_ARGUMENT); + assertThat(configOrError.getError().getDescription()) + .isEqualTo("Missing 'mingRingSize'/'maxRingSize'"); + } + + @Test + public void parseLoadBalancingConfig_zeroMinRingSize() throws IOException { + String lbConfig = "{\"minRingSize\" : 0, \"maxRingSize\" : 100}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getError()).isNotNull(); + assertThat(configOrError.getError().getCode()).isEqualTo(Code.INVALID_ARGUMENT); + assertThat(configOrError.getError().getDescription()) + .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + } + + @Test + public void parseLoadBalancingConfig_minRingSizeGreaterThanMaxRingSize() throws IOException { + String lbConfig = "{\"minRingSize\" : 100, \"maxRingSize\" : 10}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getError()).isNotNull(); + assertThat(configOrError.getError().getCode()).isEqualTo(Code.INVALID_ARGUMENT); + assertThat(configOrError.getError().getDescription()) + .isEqualTo("Invalid 'mingRingSize'/'maxRingSize'"); + } + + @SuppressWarnings("unchecked") + private static Map parseJsonObject(String json) throws IOException { + return (Map) JsonParser.parse(json); + } +} diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java new file mode 100644 index 00000000000..b85b98a089e --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -0,0 +1,728 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Iterables; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.internal.PickSubchannelArgsImpl; +import io.grpc.testing.TestMethodDescriptors; +import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; +import java.lang.Thread.UncaughtExceptionHandler; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +/** Unit test for {@link io.grpc.LoadBalancer}. */ +@RunWith(JUnit4.class) +public class RingHashLoadBalancerTest { + private static final String AUTHORITY = "foo.googleapis.com"; + private static final Attributes.Key CUSTOM_KEY = Attributes.Key.create("custom-key"); + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + private final SynchronizationContext syncContext = new SynchronizationContext( + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final Map, Subchannel> subchannels = new HashMap<>(); + private final Map subchannelStateListeners = + new HashMap<>(); + private final XxHash64 hashFunc = XxHash64.INSTANCE; + @Mock + private Helper helper; + @Captor + private ArgumentCaptor pickerCaptor; + private RingHashLoadBalancer loadBalancer; + + @Before + public void setUp() { + when(helper.getAuthority()).thenReturn(AUTHORITY); + when(helper.getSynchronizationContext()).thenReturn(syncContext); + when(helper.createSubchannel(any(CreateSubchannelArgs.class))).thenAnswer( + new Answer() { + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0]; + final Subchannel subchannel = mock(Subchannel.class); + when(subchannel.getAllAddresses()).thenReturn(args.getAddresses()); + when(subchannel.getAttributes()).thenReturn(args.getAttributes()); + subchannels.put(args.getAddresses(), subchannel); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + subchannelStateListeners.put( + subchannel, (SubchannelStateListener) invocation.getArguments()[0]); + return null; + } + }).when(subchannel).start(any(SubchannelStateListener.class)); + return subchannel; + } + }); + loadBalancer = new RingHashLoadBalancer(helper); + // Skip uninterested interactions. + verify(helper).getAuthority(); + verify(helper).getSynchronizationContext(); + } + + @After + public void tearDown() { + loadBalancer.shutdown(); + for (Subchannel subchannel : subchannels.values()) { + verify(subchannel).shutdown(); + } + } + + @Test + public void subchannelLazyConnectUntilPicked() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1); // one server + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); + Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); + verify(subchannel, never()).requestConnection(); + verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isNull(); + verify(subchannel).requestConnection(); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + + // Subchannel becomes ready, triggers pick again. + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + verifyNoMoreInteractions(helper); + } + + @Test + public void subchannelNotAutoReconnectAfterReenteringIdle() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1); // one server + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); + InOrder inOrder = Mockito.inOrder(helper, subchannel); + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + inOrder.verify(subchannel, never()).requestConnection(); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + pickerCaptor.getValue().pickSubchannel(args); + inOrder.verify(subchannel).requestConnection(); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + inOrder.verify(subchannel, never()).requestConnection(); + + // Picking again triggers reconnection. + pickerCaptor.getValue().pickSubchannel(args); + inOrder.verify(subchannel).requestConnection(); + } + + @Test + public void aggregateSubchannelStates_connectingReadyIdleFailure() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1); + InOrder inOrder = Mockito.inOrder(helper); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + inOrder.verify(helper, times(2)).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // one in CONNECTING, one in IDLE + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + + // two in CONNECTING + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + + // one in CONNECTING, one in READY + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + + // one in TRANSIENT_FAILURE, one in READY + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNKNOWN.withDescription("unknown failure"))); + inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + + // one in TRANSIENT_FAILURE, one in IDLE + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + verifyNoMoreInteractions(helper); + } + + @Test + public void aggregateSubchannelStates_twoOrMoreSubchannelsInTransientFailure() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1, 1, 1); + InOrder inOrder = Mockito.inOrder(helper); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + inOrder.verify(helper, times(4)).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // one in TRANSIENT_FAILURE, three in IDLE + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("not found"))); + inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // two in TRANSIENT_FAILURE, two in IDLE + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("also not found"))); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + + // two in TRANSIENT_FAILURE, one in CONNECTING, one in IDLE + // The overall state is dominated by the two in TRANSIENT_FAILURE. + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + + // three in TRANSIENT_FAILURE, one in CONNECTING + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(3))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection lost"))); + inOrder.verify(helper) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + + // three in TRANSIENT_FAILURE, one in READY + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + + verifyNoMoreInteractions(helper); + } + + @Test + public void subchannelStayInTransientFailureUntilBecomeReady() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + reset(helper); + + // Simulate picks have taken place and subchannels have requested connection. + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure( + Status.UNAUTHENTICATED.withDescription("Permission denied"))); + } + + // Stays in IDLE when until there are two or more subchannels in TRANSIENT_FAILURE. + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + verify(helper, times(2)) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + + verifyNoMoreInteractions(helper); + // Simulate underlying subchannel auto reconnect after backoff. + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); + } + verifyNoMoreInteractions(helper); + + // Simulate one subchannel enters READY. + deliverSubchannelState( + subchannels.values().iterator().next(), ConnectivityStateInfo.forNonError(READY)); + verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + } + + @Test + public void deterministicPickWithHostsPartiallyRemoved() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1, 1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + InOrder inOrder = Mockito.inOrder(helper); + inOrder.verify(helper, times(5)).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // Bring all subchannels to READY so that next pick always succeeds. + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + } + + // Simulate rpc hash hits one ring entry exactly for server1. + long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server1]_0"); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + pickerCaptor.getValue().pickSubchannel(args); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + Subchannel subchannel = result.getSubchannel(); + assertThat(subchannel.getAddresses()).isEqualTo(servers.get(1)); + + List updatedServers = new ArrayList<>(); + for (EquivalentAddressGroup addr : servers.subList(0, 2)) { // only server0 and server1 left + Attributes attr = addr.getAttributes().toBuilder().set(CUSTOM_KEY, "custom value").build(); + updatedServers.add(new EquivalentAddressGroup(addr.getAddresses(), attr)); + } + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(updatedServers).setLoadBalancingPolicyConfig(config).build()); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .updateAddresses(Collections.singletonList(updatedServers.get(0))); + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .updateAddresses(Collections.singletonList(updatedServers.get(1))); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertThat(pickerCaptor.getValue().pickSubchannel(args).getSubchannel()) + .isSameInstanceAs(subchannel); + verifyNoMoreInteractions(helper); + } + + @Test + public void deterministicPickWithNewHostsAdded() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1, 1); // server0 and server1 + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + InOrder inOrder = Mockito.inOrder(helper); + inOrder.verify(helper, times(2)).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + // Bring all subchannels to READY so that next pick always succeeds. + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + } + + // Simulate rpc hash hits one ring entry exactly for server1. + long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server1]_0"); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + pickerCaptor.getValue().pickSubchannel(args); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + Subchannel subchannel = result.getSubchannel(); + assertThat(subchannel.getAddresses()).isEqualTo(servers.get(1)); + + servers = createWeightedServerAddrs(1, 1, 1, 1, 1); // server2, server3, server4 added + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + inOrder.verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertThat(pickerCaptor.getValue().pickSubchannel(args).getSubchannel()) + .isSameInstanceAs(subchannel); + verifyNoMoreInteractions(helper); + } + + @Test + public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // initial IDLE + reset(helper); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server0]_0"); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + + // Bring down server0 to force trying server2. + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isNull(); // buffer request + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); // kick off connection to server2 + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isNull(); // buffer request + + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forNonError(READY)); + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(2)); + } + + @Test + public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(4, 4); + List servers = createWeightedServerAddrs(1, 1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(4)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); // initial IDLE + reset(helper); + // ring: + // "[FakeSocketAddress-server3]_0" + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server0]_0"); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + + // Bring down server0 and server2 to force trying other servers. + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forTransientFailure( + Status.PERMISSION_DENIED.withDescription("permission denied"))); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC + assertThat(result.getStatus().getCode()) + .isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash + assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(3)))) + .requestConnection(); // kickoff connection to server3 (next first non-failing) + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); // no excessive connection + + // Now connecting to server3. + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(3))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + + result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC + assertThat(result.getStatus().getCode()) + .isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash + assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); // no excessive connection (server3 connection already in progress) + + // Simulate server1 becomes READY. + deliverSubchannelState( + subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(READY)); + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); // succeed + assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(1)); // with server1 + } + + @Test + public void allSubchannelsInTransientFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // Bring all subchannels to TRANSIENT_FAILURE. + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription( + subchannel.getAddresses().getAddresses() + " unreachable"))); + } + verify(helper, atLeastOnce()) + .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + + // Picking subchannel triggers connection. RPC hash hits server0. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("[FakeSocketAddress-server0] unreachable"); + } + + @Test + public void hostSelectionProportionalToWeights() { + RingHashConfig config = new RingHashConfig(100000, 1000000); // large ring + List servers = createWeightedServerAddrs(1, 10, 100); // 1:10:100 + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // Bring all subchannels to READY. + Map pickCounts = new HashMap<>(); + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + pickCounts.put(subchannel.getAddresses(), 0); + } + verify(helper, times(3)).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + + for (int i = 0; i < 10000; i++) { + long hash = hashFunc.hashInt(i); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hash)); + Subchannel pickedSubchannel = picker.pickSubchannel(args).getSubchannel(); + EquivalentAddressGroup addr = pickedSubchannel.getAddresses(); + pickCounts.put(addr, pickCounts.get(addr) + 1); + } + + // Actual distribution: server0 = 91, server1 = 866, server2 = 9043 (~0.5% tolerance) + double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1)); + double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(2)); + assertThat(ratio01).isWithin(0.01).of((double) 1 / 10); + assertThat(ratio12).isWithin(0.01).of((double) 10 / 100); + } + + @Test + public void hostSelectionProportionalToRepeatedAddressCount() { + RingHashConfig config = new RingHashConfig(100000, 100000); + List servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100 + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + // Bring all subchannels to READY. + Map pickCounts = new HashMap<>(); + for (Subchannel subchannel : subchannels.values()) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + pickCounts.put(subchannel.getAddresses(), 0); + } + verify(helper, times(3)).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + + for (int i = 0; i < 10000; i++) { + long hash = hashFunc.hashInt(i); + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hash)); + Subchannel pickedSubchannel = picker.pickSubchannel(args).getSubchannel(); + EquivalentAddressGroup addr = pickedSubchannel.getAddresses(); + pickCounts.put(addr, pickCounts.get(addr) + 1); + } + + // Actual distribution: server0 = 0, server1 = 90, server2 = 9910 + double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1)); + double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(11)); + assertThat(ratio01).isWithin(0.01).of((double) 1 / 10); + assertThat(ratio12).isWithin(0.01).of((double) 10 / 100); + } + + @Test + public void nameResolutionErrorWithNoActiveSubchannels() { + Status error = Status.UNAVAILABLE.withDescription("not reachable"); + loadBalancer.handleNameResolutionError(error); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo("not reachable"); + assertThat(result.getSubchannel()).isNull(); + verifyNoMoreInteractions(helper); + } + + @Test + public void nameResolutionErrorWithActiveSubchannels() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createWeightedServerAddrs(1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + // Picking subchannel triggers subchannel creation and connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + pickerCaptor.getValue().pickSubchannel(args); + deliverSubchannelState( + Iterables.getOnlyElement(subchannels.values()), ConnectivityStateInfo.forNonError(READY)); + verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + + loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("target not found")); + verifyNoMoreInteractions(helper); + } + + private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo state) { + subchannelStateListeners.get(subchannel).onSubchannelState(state); + } + + private static List createWeightedServerAddrs(long... weights) { + List addrs = new ArrayList<>(); + for (int i = 0; i < weights.length; i++) { + SocketAddress addr = new FakeSocketAddress("server" + i); + Attributes attr = Attributes.newBuilder().set( + InternalXdsAttributes.ATTR_SERVER_WEIGHT, weights[i]).build(); + EquivalentAddressGroup eag = new EquivalentAddressGroup(addr, attr); + addrs.add(eag); + } + return addrs; + } + + private static List createRepeatedServerAddrs(long... weights) { + List addrs = new ArrayList<>(); + for (int i = 0; i < weights.length; i++) { + SocketAddress addr = new FakeSocketAddress("server" + i); + for (int j = 0; j < weights[i]; j++) { + EquivalentAddressGroup eag = new EquivalentAddressGroup(addr); + addrs.add(eag); + } + } + return addrs; + } + + private static class FakeSocketAddress extends SocketAddress { + private final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof FakeSocketAddress)) { + return false; + } + return name.equals(((FakeSocketAddress) other).name); + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + } +} From c113ba10301ba88ac5c11c3344d6266777159641 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 8 Apr 2021 19:53:34 -0400 Subject: [PATCH 08/10] buildscript: add xds-k8s cluster endpoint override Missed this in 1b86618ce9db1a6bc2e0ec5fac1b28eb716fc47d --- buildscripts/kokoro/xds-k8s.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/buildscripts/kokoro/xds-k8s.sh b/buildscripts/kokoro/xds-k8s.sh index 44f9d77425c..bc0da15ba81 100755 --- a/buildscripts/kokoro/xds-k8s.sh +++ b/buildscripts/kokoro/xds-k8s.sh @@ -6,6 +6,7 @@ readonly GITHUB_REPOSITORY_NAME="grpc-java" # GKE Cluster readonly GKE_CLUSTER_NAME="interop-test-psm-sec-testing-api" readonly GKE_CLUSTER_ZONE="us-west1-b" +export CLOUDSDK_API_ENDPOINT_OVERRIDES_CONTAINER="https://test-container.sandbox.googleapis.com/" ## xDS test server/client Docker images readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-server" readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/java-client" From 278a336d1fe54547e2848924ccb9df7251c904f9 Mon Sep 17 00:00:00 2001 From: Sergii Tkachenko Date: Thu, 8 Apr 2021 21:44:26 -0400 Subject: [PATCH 09/10] buildscript: xds-k8s increase build timeout --- buildscripts/kokoro/xds-k8s.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildscripts/kokoro/xds-k8s.cfg b/buildscripts/kokoro/xds-k8s.cfg index 61fe825d8ad..09a8e705a4d 100644 --- a/buildscripts/kokoro/xds-k8s.cfg +++ b/buildscripts/kokoro/xds-k8s.cfg @@ -2,7 +2,7 @@ # Location of the continuous shell script in repository. build_file: "grpc-java/buildscripts/kokoro/xds-k8s.sh" -timeout_mins: 90 +timeout_mins: 120 action { define_artifacts { From d4fa0ecc07495097453b0a2848765f076b9e714c Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 12 Apr 2021 14:55:31 -0700 Subject: [PATCH 10/10] xds: reduce the size of ring for testing pick distributions (#8079) In the ring hash LB policy, building the ring is computationally heavy. Although using a larger ring can make the RPC distribution closer to the actual weights of hosts, it takes long time to finish the test. Internally, each test class is expected to finish within 1 minute, while each of the test cases for testing pick distribution takes about 30 sec. By reducing the ring size by a factor of 10, the time spent for those test cases reduce to 1-2 seconds. Now we need larger tolerance for the distribution (three hosts with weights 1:10:100): - With a ring size of 100000, the 10000 RPCs distribution is close to 91 : 866 : 9043 - With a ring size of 10000, the 10000 RPCs distribution is close to 104 : 808 : 9088 Roughly, this is still acceptable. --- .../io/grpc/xds/RingHashLoadBalancerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b85b98a089e..6b70e5974df 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -567,7 +567,7 @@ public void allSubchannelsInTransientFailure() { @Test public void hostSelectionProportionalToWeights() { - RingHashConfig config = new RingHashConfig(100000, 1000000); // large ring + RingHashConfig config = new RingHashConfig(10000, 100000); // large ring List servers = createWeightedServerAddrs(1, 10, 100); // 1:10:100 loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() @@ -594,16 +594,16 @@ public void hostSelectionProportionalToWeights() { pickCounts.put(addr, pickCounts.get(addr) + 1); } - // Actual distribution: server0 = 91, server1 = 866, server2 = 9043 (~0.5% tolerance) + // Actual distribution: server0 = 104, server1 = 808, server2 = 9088 double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1)); double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(2)); - assertThat(ratio01).isWithin(0.01).of((double) 1 / 10); - assertThat(ratio12).isWithin(0.01).of((double) 10 / 100); + assertThat(ratio01).isWithin(0.03).of((double) 1 / 10); + assertThat(ratio12).isWithin(0.03).of((double) 10 / 100); } @Test public void hostSelectionProportionalToRepeatedAddressCount() { - RingHashConfig config = new RingHashConfig(100000, 100000); + RingHashConfig config = new RingHashConfig(10000, 100000); List servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100 loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() @@ -630,11 +630,11 @@ public void hostSelectionProportionalToRepeatedAddressCount() { pickCounts.put(addr, pickCounts.get(addr) + 1); } - // Actual distribution: server0 = 0, server1 = 90, server2 = 9910 + // Actual distribution: server0 = 104, server1 = 808, server2 = 9088 double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1)); double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(11)); - assertThat(ratio01).isWithin(0.01).of((double) 1 / 10); - assertThat(ratio12).isWithin(0.01).of((double) 10 / 100); + assertThat(ratio01).isWithin(0.03).of((double) 1 / 10); + assertThat(ratio12).isWithin(0.03).of((double) 10 / 100); } @Test