From 4c916c4ed1d9f1a0c6e2422c97ced137750d2f7a Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Thu, 28 Apr 2022 13:51:46 -0700 Subject: [PATCH] xds: A new wrr_locality load balancer. (#9103) This LB is the parent for weighted_target and will configure it based on the child policy it gets in its configuration and locality weights that come in a ResolvedAddresses attribute. Described in [A52: gRPC xDS Custom Load Balancer Configuration](https://github.com/grpc/proposal/pull/298) --- .../io/grpc/xds/InternalXdsAttributes.java | 8 + .../io/grpc/xds/WrrLocalityLoadBalancer.java | 134 +++++++++++++ .../xds/WrrLocalityLoadBalancerProvider.java | 85 ++++++++ .../main/java/io/grpc/xds/XdsLbPolicies.java | 1 + .../services/io.grpc.LoadBalancerProvider | 1 + .../WrrLocalityLoadBalancerProviderTest.java | 69 +++++++ .../grpc/xds/WrrLocalityLoadBalancerTest.java | 182 ++++++++++++++++++ 7 files changed, 480 insertions(+) create mode 100644 xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java create mode 100644 xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancerProvider.java create mode 100644 xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerProviderTest.java create mode 100644 xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java diff --git a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java index 410a64df9ca..1750fbaf731 100644 --- a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java @@ -24,6 +24,7 @@ import io.grpc.internal.ObjectPool; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; import io.grpc.xds.internal.sds.SslContextProviderSupplier; +import java.util.Map; /** * Internal attributes used for xDS implementation. Do not use. @@ -53,6 +54,13 @@ public final class InternalXdsAttributes { static final Attributes.Key CALL_COUNTER_PROVIDER = Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.callCounterProvider"); + /** + * Map from localities to their weights. + */ + @NameResolver.ResolutionResultAttr + static final Attributes.Key> ATTR_LOCALITY_WEIGHTS = + Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeights"); + /** * Name of the cluster that provides this EquivalentAddressGroup. */ diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java new file mode 100644 index 00000000000..097aa23fd6a --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -0,0 +1,134 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.base.MoreObjects; +import io.grpc.InternalLogId; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; +import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * This load balancer acts as a parent for the {@link WeightedTargetLoadBalancer} and configures + * it with a child policy in its configuration and locality weights it gets from an attribute in + * {@link io.grpc.LoadBalancer.ResolvedAddresses}. + */ +final class WrrLocalityLoadBalancer extends LoadBalancer { + + private final XdsLogger logger; + private final Helper helper; + private final GracefulSwitchLoadBalancer switchLb; + + WrrLocalityLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + switchLb = new GracefulSwitchLoadBalancer(helper); + logger = XdsLogger.withLogId( + InternalLogId.allocate("xds-wrr-locality-lb", helper.getAuthority())); + logger.log(XdsLogLevel.INFO, "Created"); + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + + // The configuration with the child policy is combined with the locality weights + // to produce the weighted target LB config. + WrrLocalityConfig wrrLocalityConfig + = (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + Map localityWeights = resolvedAddresses.getAttributes() + .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); + + // Not having locality weights is a misconfiguration, and we have to return with an error. + if (localityWeights == null) { + Status unavailable = + Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weights provided"); + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); + return; + } + + // Weighted target LB expects a WeightedPolicySelection for each locality as it will create a + // child LB for each. + Map weightedPolicySelections = new HashMap<>(); + for (Locality locality : localityWeights.keySet()) { + weightedPolicySelections.put(locality.toString(), + new WeightedPolicySelection(localityWeights.get(locality), + wrrLocalityConfig.childPolicy)); + } + + switchLb.switchTo(wrrLocalityConfig.childPolicy.getProvider()); + switchLb.handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(new WeightedTargetConfig(weightedPolicySelections)) + .build()); + } + + @Override + public void handleNameResolutionError(Status error) { + logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); + switchLb.handleNameResolutionError(error); + } + + @Override + public void shutdown() { + switchLb.shutdown(); + } + + /** + * The LB config for {@link WrrLocalityLoadBalancer}. + */ + static final class WrrLocalityConfig { + + final PolicySelection childPolicy; + + WrrLocalityConfig(PolicySelection childPolicy) { + this.childPolicy = childPolicy; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WrrLocalityConfig that = (WrrLocalityConfig) o; + return Objects.equals(childPolicy, that.childPolicy); + } + + @Override + public int hashCode() { + return Objects.hashCode(childPolicy); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("childPolicy", childPolicy).toString(); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancerProvider.java new file mode 100644 index 00000000000..f4429b62af7 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancerProvider.java @@ -0,0 +1,85 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; +import java.util.List; +import java.util.Map; + +/** + * The provider for {@link WrrLocalityLoadBalancer}. An instance of this class should be acquired + * through {@link LoadBalancerRegistry#getProvider} by using the name + * "xds_wrr_locality_experimental". + */ +@Internal +public final class WrrLocalityLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new WrrLocalityLoadBalancer(helper); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return XdsLbPolicies.WRR_LOCALITY_POLICY_NAME; + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) { + try { + List childConfigCandidates = ServiceConfigUtil.unwrapLoadBalancingConfigList( + JsonUtil.getListOfObjects(rawConfig, "childPolicy")); + if (childConfigCandidates == null || childConfigCandidates.isEmpty()) { + return ConfigOrError.fromError(Status.INTERNAL.withDescription( + "No child policy in wrr_locality LB policy: " + + rawConfig)); + } + ConfigOrError selectedConfig = + ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, + LoadBalancerRegistry.getDefaultRegistry()); + if (selectedConfig.getError() != null) { + return selectedConfig; + } + PolicySelection policySelection = (PolicySelection) selectedConfig.getConfig(); + return ConfigOrError.fromConfig(new WrrLocalityConfig(policySelection)); + } catch (RuntimeException e) { + return ConfigOrError.fromError(Status.fromThrowable(e) + .withDescription("Failed to parse wrr_locality LB config: " + rawConfig)); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java b/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java index b11c7853473..dcca2fbfff3 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java +++ b/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java @@ -23,6 +23,7 @@ final class XdsLbPolicies { static final String PRIORITY_POLICY_NAME = "priority_experimental"; static final String CLUSTER_IMPL_POLICY_NAME = "cluster_impl_experimental"; static final String WEIGHTED_TARGET_POLICY_NAME = "weighted_target_experimental"; + static final String WRR_LOCALITY_POLICY_NAME = "wrr_locality_experimental"; private XdsLbPolicies() {} } diff --git a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index 8e5c2dd1c6a..6b6e3a392a9 100644 --- a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -6,3 +6,4 @@ io.grpc.xds.ClusterResolverLoadBalancerProvider io.grpc.xds.ClusterImplLoadBalancerProvider io.grpc.xds.LeastRequestLoadBalancerProvider io.grpc.xds.RingHashLoadBalancerProvider +io.grpc.xds.WrrLocalityLoadBalancerProvider diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerProviderTest.java new file mode 100644 index 00000000000..d251f3677d8 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerProviderTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver; +import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link WrrLocalityLoadBalancerProvider}. + */ +@RunWith(JUnit4.class) +public class WrrLocalityLoadBalancerProviderTest { + + @Test + public void provided() { + LoadBalancerProvider provider = + LoadBalancerRegistry.getDefaultRegistry().getProvider( + XdsLbPolicies.WRR_LOCALITY_POLICY_NAME); + assertThat(provider).isInstanceOf(WrrLocalityLoadBalancerProvider.class); + } + + @Test + public void providesLoadBalancer() { + Helper helper = mock(Helper.class); + when(helper.getAuthority()).thenReturn("api.google.com"); + LoadBalancerProvider provider = new WrrLocalityLoadBalancerProvider(); + LoadBalancer loadBalancer = provider.newLoadBalancer(helper); + assertThat(loadBalancer).isInstanceOf(WrrLocalityLoadBalancer.class); + } + + @Test + public void parseConfig() { + Map rawConfig = ImmutableMap.of("childPolicy", + ImmutableList.of(ImmutableMap.of("round_robin", ImmutableMap.of()))); + + WrrLocalityLoadBalancerProvider provider = new WrrLocalityLoadBalancerProvider(); + NameResolver.ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(rawConfig); + WrrLocalityConfig config = (WrrLocalityConfig) configOrError.getConfig(); + assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin"); + } +} diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java new file mode 100644 index 00000000000..3df55eac07a --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -0,0 +1,182 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; +import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; +import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.net.SocketAddress; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link WrrLocalityLoadBalancerProvider}. + */ +@RunWith(JUnit4.class) +public class WrrLocalityLoadBalancerTest { + + @Mock + private LoadBalancerProvider mockProvider; + @Mock + private LoadBalancer mockChildLb; + @Mock + private Helper mockHelper; + @Mock + private SocketAddress mockSocketAddress; + + @Captor + private ArgumentCaptor resolvedAddressesCaptor; + @Captor + private ArgumentCaptor connectivityStateCaptor; + @Captor + private ArgumentCaptor errorPickerCaptor; + + private EquivalentAddressGroup eag = new EquivalentAddressGroup(mockSocketAddress); + + private WrrLocalityLoadBalancer loadBalancer; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(mockProvider.newLoadBalancer(isA(Helper.class))).thenReturn(mockChildLb); + when(mockProvider.getPolicyName()).thenReturn("round_robin"); + loadBalancer = new WrrLocalityLoadBalancer(mockHelper); + } + + @Test + public void handleResolvedAddresses() { + // A two locality cluster with a mock child LB policy. + Locality localityOne = Locality.create("region1", "zone1", "subzone1"); + Locality localityTwo = Locality.create("region2", "zone2", "subzone2"); + PolicySelection childPolicy = new PolicySelection(mockProvider, null); + + // The child config is delivered wrapped in the wrr_locality config and the locality weights + // in a ResolvedAddresses attribute. + WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); + Map localityWeights = ImmutableMap.of(localityOne, 1, localityTwo, 2); + deliverAddresses(wlConfig, localityWeights); + + // Assert that the child policy and the locality weights were correctly mapped to a + // WeightedTargetConfig. + verify(mockChildLb).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + Object config = resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig(); + assertThat(config).isInstanceOf(WeightedTargetConfig.class); + WeightedTargetConfig wtConfig = (WeightedTargetConfig) config; + assertThat(wtConfig.targets).hasSize(2); + assertThat(wtConfig.targets).containsEntry(localityOne.toString(), + new WeightedPolicySelection(1, childPolicy)); + assertThat(wtConfig.targets).containsEntry(localityTwo.toString(), + new WeightedPolicySelection(2, childPolicy)); + + } + + @Test + public void handleResolvedAddresses_noLocalityWeights() { + // A two locality cluster with a mock child LB policy. + PolicySelection childPolicy = new PolicySelection(mockProvider, null); + + // The child config is delivered wrapped in the wrr_locality config and the locality weights + // in a ResolvedAddresses attribute. + WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); + deliverAddresses(wlConfig, null); + + // With no locality weights, we should get a TRANSIENT_FAILURE. + verify(mockHelper).getAuthority(); + verify(mockHelper).updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), + isA(ErrorPicker.class)); + } + + @Test + public void handleNameResolutionError_noChildLb() { + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + + verify(mockHelper).updateBalancingState(connectivityStateCaptor.capture(), + errorPickerCaptor.capture()); + assertThat(connectivityStateCaptor.getValue()).isEqualTo(ConnectivityState.TRANSIENT_FAILURE); + assertThat(errorPickerCaptor.getValue().toString()).isEqualTo( + new ErrorPicker(Status.DEADLINE_EXCEEDED).toString()); + } + + @Test + public void handleNameResolutionError_withChildLb() { + deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockProvider, null)), + ImmutableMap.of( + Locality.create("region", "zone", "subzone"), 1)); + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + + verify(mockHelper, never()).updateBalancingState(isA(ConnectivityState.class), + isA(ErrorPicker.class)); + verify(mockChildLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED); + } + + @Test + public void shutdown() { + deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockProvider, null)), + ImmutableMap.of( + Locality.create("region", "zone", "subzone"), 1)); + loadBalancer.shutdown(); + + verify(mockChildLb).shutdown(); + } + + @Test + public void configEquality() { + WrrLocalityConfig configOne = new WrrLocalityConfig(new PolicySelection(mockProvider, null)); + WrrLocalityConfig configTwo = new WrrLocalityConfig(new PolicySelection(mockProvider, null)); + WrrLocalityConfig differentConfig = new WrrLocalityConfig( + new PolicySelection(mockProvider, "config")); + + new EqualsTester().addEqualityGroup(configOne, configTwo).addEqualityGroup(differentConfig) + .testEquals(); + } + + private void deliverAddresses(WrrLocalityConfig config, Map localityWeights) { + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(ImmutableList.of(eag)).setAttributes( + Attributes.newBuilder() + .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build()) + .setLoadBalancingPolicyConfig(config).build()); + } +}