Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Use wrr_locality LB and support load_balancing_policy in Cluster #9141

Merged
merged 6 commits into from May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -1635,9 +1635,7 @@ static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEdsResource
}
CdsUpdate.Builder updateBuilder = structOrError.getStruct();

// TODO: If load_balancing_policy is set in Cluster use it for LB config, otherwise fall back
// to using the legacy lb_policy field.
ImmutableMap<String, ?> lbPolicyConfig = LegacyLoadBalancerConfigFactory.newConfig(cluster,
ImmutableMap<String, ?> lbPolicyConfig = LoadBalancerConfigFactory.newConfig(cluster,
enableLeastRequest);

// Validate the LB config by trying to parse it with the corresponding LB provider.
Expand Down
49 changes: 19 additions & 30 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Expand Up @@ -19,7 +19,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
Expand Down Expand Up @@ -49,8 +48,6 @@
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
Expand Down Expand Up @@ -208,6 +205,8 @@ private void handleEndpointResourceUpdate() {
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> priorities = new ArrayList<>(); // totally ordered priority list
Map<Locality, Integer> localityWeights = new HashMap<>();

Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
ClusterState state = clusterStates.get(cluster);
Expand All @@ -219,6 +218,7 @@ private void handleEndpointResourceUpdate() {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
localityWeights.putAll(state.result.localityWeights);
} else {
endpointNotFound = state.status;
}
Expand Down Expand Up @@ -249,6 +249,9 @@ private void handleEndpointResourceUpdate() {
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(childConfig)
.setAddresses(Collections.unmodifiableList(addresses))
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS,
Collections.unmodifiableMap(localityWeights)).build())
.build());
}

Expand Down Expand Up @@ -318,6 +321,7 @@ private abstract class ClusterState {
// Most recently resolved addresses and config, or null if resource not exists.
@Nullable
protected ClusterResolutionResult result;

protected boolean shutdown;

private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
Expand Down Expand Up @@ -377,6 +381,7 @@ public void run() {
}
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
update.localityLbEndpointsMap;
Map<Locality, Integer> localityWeights = new HashMap<>();
List<DropOverload> dropOverloads = update.dropPolicies;
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
Expand Down Expand Up @@ -409,6 +414,7 @@ public void run() {
"Discard locality {0} with 0 healthy endpoints", locality);
continue;
}
localityWeights.put(locality, localityLbInfo.localityWeight());
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
if (!prioritizedLocalityWeights.containsKey(priorityName)) {
prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
}
Expand All @@ -428,7 +434,8 @@ public void run() {
endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities);
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities,
localityWeights);
handleEndpointResourceUpdate();
}
}
Expand Down Expand Up @@ -634,18 +641,23 @@ private static class ClusterResolutionResult {
private final Map<String, PriorityChildConfig> priorityChildConfigs;
// List of priority names ordered in descending priorities.
private final List<String> priorities;
// Most recent view on how localities in the cluster should be wighted. Only set for EDS
// clusters that support the concept.
private final Map<Locality, Integer> localityWeights;

ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
PriorityChildConfig config) {
this(addresses, Collections.singletonMap(priority, config),
Collections.singletonList(priority));
Collections.singletonList(priority), Collections.emptyMap());
}

ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
Map<String, PriorityChildConfig> configs, List<String> priorities) {
Map<String, PriorityChildConfig> configs, List<String> priorities,
Map<Locality, Integer> localityWeights) {
this.addresses = addresses;
this.priorityChildConfigs = configs;
this.priorities = priorities;
this.localityWeights = localityWeights;
}
}

Expand Down Expand Up @@ -686,32 +698,9 @@ private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildCon
List<DropOverload> dropOverloads) {
Map<String, PriorityChildConfig> configs = new HashMap<>();
for (String priority : prioritizedLocalityWeights.keySet()) {
PolicySelection leafPolicy = endpointLbPolicy;
// Depending on the endpoint-level load balancing policy, different LB hierarchy may be
// created. If the endpoint-level LB policy is round_robin or least_request_experimental,
// it creates a two-level LB hierarchy: a locality-level LB policy that balances load
// according to locality weights followed by an endpoint-level LB policy that balances load
// between endpoints within the locality. If the endpoint-level LB policy is
// ring_hash_experimental, it creates a unified LB policy that balances load by weighing the
// product of each endpoint's weight and the weight of the locality it belongs to.
if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin")
|| endpointLbPolicy.getProvider().getPolicyName().equals("least_request_experimental")) {
Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
Map<String, WeightedPolicySelection> targets = new HashMap<>();
for (Locality locality : localityWeights.keySet()) {
int weight = localityWeights.get(locality);
WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy);
targets.put(localityName(locality), target);
}
LoadBalancerProvider weightedTargetLbProvider =
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME);
WeightedTargetConfig weightedTargetConfig =
new WeightedTargetConfig(Collections.unmodifiableMap(targets));
leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig);
}
ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
dropOverloads, leafPolicy, tlsContext);
dropOverloads, endpointLbPolicy, tlsContext);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection clusterImplPolicy =
Expand Down
105 changes: 0 additions & 105 deletions xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java

This file was deleted.