From beb3232c0a6c2873292da7d24fb9f678a7f5a650 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 6 Nov 2020 15:59:25 -0800 Subject: [PATCH] xds: immediately update picker when circuit breakers/drop policies change (#7600) Previously the EDS LB policies does not propagate an updated picker that uses the new circuit breaker threshold and drop policies when those values change. The result is new circuit breaker/drop policies are not dynamically applied to new RPCs unless subchannel state has changed. This change fixes this problem. Whenever the EDS LB policy receives an config update, the immediately updates the picker with corresponding circuit breakers and drop policies to the channel so that the channel is alway picking up the latest configuration. --- .../java/io/grpc/xds/EdsLoadBalancer2.java | 7 +++++ .../io/grpc/xds/EdsLoadBalancer2Test.java | 31 ++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index bfbeb343147..5321fe5f0ca 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -20,6 +20,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.LRS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; @@ -370,6 +371,8 @@ private void propagateResourceError(Status error) { private final class RequestLimitingLbHelper extends ForwardingLoadBalancerHelper { private final Helper helper; + private ConnectivityState currentState = ConnectivityState.CONNECTING; + private SubchannelPicker currentPicker = BUFFER_PICKER; private List dropPolicies = Collections.emptyList(); private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @@ -380,6 +383,8 @@ private RequestLimitingLbHelper(Helper helper) { @Override public void updateBalancingState( ConnectivityState newState, final SubchannelPicker newPicker) { + currentState = newState; + currentPicker = newPicker; SubchannelPicker picker = new RequestLimitingSubchannelPicker( newPicker, dropPolicies, maxConcurrentRequests); delegate().updateBalancingState(newState, picker); @@ -392,10 +397,12 @@ protected Helper delegate() { private void updateDropPolicies(List dropOverloads) { dropPolicies = dropOverloads; + updateBalancingState(currentState, currentPicker); } private void updateMaxConcurrentRequests(long maxConcurrentRequests) { this.maxConcurrentRequests = maxConcurrentRequests; + updateBalancingState(currentState, currentPicker); } private final class RequestLimitingSubchannelPicker extends SubchannelPicker { diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java index 85adbeb2438..f8a60cdea9d 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java @@ -441,7 +441,7 @@ public void handleDrops() { new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTargetSelection, fakeRoundRobinSelection)) .build()); - when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 1_000_000); + when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints1 = buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); @@ -465,6 +465,18 @@ public void handleDrops() { assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("throttle")) .isEqualTo(1); + // Dynamically update drop policies. + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME, + Collections.singletonList(new DropOverload("lb", 1_000_000)), + Collections.singletonMap(locality1, localityLbEndpoints1)); + result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: lb"); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("lb")) + .isEqualTo(1); + result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); @@ -516,6 +528,23 @@ public void maxConcurrentRequests_appliedByLbConfig() { assertThat(result.getStatus().getDescription()) .isEqualTo("Cluster max concurrent requests limit exceeded"); assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); + + // Dynamically increment circuit breakers max_concurrent_requests threshold. + maxConcurrentRequests = 101L; + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, maxConcurrentRequests, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); + + result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); } @Test