Skip to content

Commit

Permalink
xds: immediately update picker when circuit breakers/drop policies ch…
Browse files Browse the repository at this point in the history
…ange (#7600)

Previously the EDS LB policies does not propagate an updated picker that uses the new circuit breaker threshold and drop policies when those values change. The result is new circuit breaker/drop policies are not dynamically applied to new RPCs unless subchannel state has changed. This change fixes this problem. Whenever the EDS LB policy receives an config update, the immediately updates the picker with corresponding circuit breakers and drop policies to the channel so that the channel is alway picking up the latest configuration.
  • Loading branch information
voidzcy committed Nov 6, 2020
1 parent a589f52 commit beb3232
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.LRS_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
Expand Down Expand Up @@ -370,6 +371,8 @@ private void propagateResourceError(Status error) {

private final class RequestLimitingLbHelper extends ForwardingLoadBalancerHelper {
private final Helper helper;
private ConnectivityState currentState = ConnectivityState.CONNECTING;
private SubchannelPicker currentPicker = BUFFER_PICKER;
private List<DropOverload> dropPolicies = Collections.emptyList();
private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;

Expand All @@ -380,6 +383,8 @@ private RequestLimitingLbHelper(Helper helper) {
@Override
public void updateBalancingState(
ConnectivityState newState, final SubchannelPicker newPicker) {
currentState = newState;
currentPicker = newPicker;
SubchannelPicker picker = new RequestLimitingSubchannelPicker(
newPicker, dropPolicies, maxConcurrentRequests);
delegate().updateBalancingState(newState, picker);
Expand All @@ -392,10 +397,12 @@ protected Helper delegate() {

private void updateDropPolicies(List<DropOverload> dropOverloads) {
dropPolicies = dropOverloads;
updateBalancingState(currentState, currentPicker);
}

private void updateMaxConcurrentRequests(long maxConcurrentRequests) {
this.maxConcurrentRequests = maxConcurrentRequests;
updateBalancingState(currentState, currentPicker);
}

private final class RequestLimitingSubchannelPicker extends SubchannelPicker {
Expand Down
31 changes: 30 additions & 1 deletion xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java
Expand Up @@ -441,7 +441,7 @@ public void handleDrops() {
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null,
weightedTargetSelection, fakeRoundRobinSelection))
.build());
when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 1_000_000);
when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints1 =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true));
Expand All @@ -465,6 +465,18 @@ public void handleDrops() {
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("throttle"))
.isEqualTo(1);

// Dynamically update drop policies.
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME,
Collections.singletonList(new DropOverload("lb", 1_000_000)),
Collections.singletonMap(locality1, localityLbEndpoints1));
result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: lb");
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("lb"))
.isEqualTo(1);

result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
Expand Down Expand Up @@ -516,6 +528,23 @@ public void maxConcurrentRequests_appliedByLbConfig() {
assertThat(result.getStatus().getDescription())
.isEqualTo("Cluster max concurrent requests limit exceeded");
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L);

// Dynamically increment circuit breakers max_concurrent_requests threshold.
maxConcurrentRequests = 101L;
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
.setLoadBalancingPolicyConfig(
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, maxConcurrentRequests,
weightedTargetSelection, fakeRoundRobinSelection))
.build());

result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L);
}

@Test
Expand Down

0 comments on commit beb3232

Please sign in to comment.