diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index bfbeb343147..5321fe5f0ca 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -20,6 +20,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.LRS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; @@ -370,6 +371,8 @@ private void propagateResourceError(Status error) { private final class RequestLimitingLbHelper extends ForwardingLoadBalancerHelper { private final Helper helper; + private ConnectivityState currentState = ConnectivityState.CONNECTING; + private SubchannelPicker currentPicker = BUFFER_PICKER; private List dropPolicies = Collections.emptyList(); private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @@ -380,6 +383,8 @@ private RequestLimitingLbHelper(Helper helper) { @Override public void updateBalancingState( ConnectivityState newState, final SubchannelPicker newPicker) { + currentState = newState; + currentPicker = newPicker; SubchannelPicker picker = new RequestLimitingSubchannelPicker( newPicker, dropPolicies, maxConcurrentRequests); delegate().updateBalancingState(newState, picker); @@ -392,10 +397,12 @@ protected Helper delegate() { private void updateDropPolicies(List dropOverloads) { dropPolicies = dropOverloads; + updateBalancingState(currentState, currentPicker); } private void updateMaxConcurrentRequests(long maxConcurrentRequests) { this.maxConcurrentRequests = maxConcurrentRequests; + updateBalancingState(currentState, currentPicker); } private final class RequestLimitingSubchannelPicker extends SubchannelPicker { diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java index 85adbeb2438..f8a60cdea9d 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java @@ -441,7 +441,7 @@ public void handleDrops() { new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTargetSelection, fakeRoundRobinSelection)) .build()); - when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 1_000_000); + when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints1 = buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); @@ -465,6 +465,18 @@ public void handleDrops() { assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("throttle")) .isEqualTo(1); + // Dynamically update drop policies. + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME, + Collections.singletonList(new DropOverload("lb", 1_000_000)), + Collections.singletonMap(locality1, localityLbEndpoints1)); + result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: lb"); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("lb")) + .isEqualTo(1); + result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); @@ -516,6 +528,23 @@ public void maxConcurrentRequests_appliedByLbConfig() { assertThat(result.getStatus().getDescription()) .isEqualTo("Cluster max concurrent requests limit exceeded"); assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); + + // Dynamically increment circuit breakers max_concurrent_requests threshold. + maxConcurrentRequests = 101L; + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, maxConcurrentRequests, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); + + result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); } @Test