Skip to content

Commit

Permalink
xds: implement WeightedTargetLoadBalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Feb 20, 2020
1 parent 557da62 commit 829babc
Show file tree
Hide file tree
Showing 9 changed files with 1,015 additions and 30 deletions.
30 changes: 12 additions & 18 deletions xds/src/main/java/io/grpc/xds/LocalityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
import io.grpc.xds.OrcaOobUtil.OrcaReportingConfig;
import io.grpc.xds.OrcaOobUtil.OrcaReportingHelperWrapper;
import io.grpc.xds.RandomWeightedPicker.WeightedChildPicker;
import io.grpc.xds.RandomWeightedPicker.WeightedPickerFactory;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -100,7 +101,7 @@ final class LocalityStoreImpl implements LocalityStore {
private static final long DELAYED_DELETION_TIMEOUT_MINUTES = 15L;

private final Helper helper;
private final PickerFactory pickerFactory;
private final WeightedPickerFactory pickerFactory;
private final LoadBalancerProvider loadBalancerProvider;
private final ThreadSafeRandom random;
private final LoadStatsStore loadStatsStore;
Expand All @@ -115,14 +116,20 @@ final class LocalityStoreImpl implements LocalityStore {

LocalityStoreImpl(
Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance,
loadStatsStore, OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance());
this(
helper,
WeightedPickerFactory.RANDOM_PICKER_FACTORY,
lbRegistry,
ThreadSafeRandom.ThreadSafeRandomImpl.instance,
loadStatsStore,
OrcaPerRequestUtil.getInstance(),
OrcaOobUtil.getInstance());
}

@VisibleForTesting
LocalityStoreImpl(
Helper helper,
PickerFactory pickerFactory,
WeightedPickerFactory pickerFactory,
LoadBalancerRegistry lbRegistry,
ThreadSafeRandom random,
LoadStatsStore loadStatsStore,
Expand All @@ -139,11 +146,6 @@ final class LocalityStoreImpl implements LocalityStore {
this.orcaOobUtil = checkNotNull(orcaOobUtil, "orcaOobUtil");
}

@VisibleForTesting // Introduced for testing only.
interface PickerFactory {
SubchannelPicker picker(List<WeightedChildPicker> childPickers);
}

private static final class DroppablePicker extends SubchannelPicker {

final List<DropOverload> dropOverloads;
Expand Down Expand Up @@ -182,14 +184,6 @@ public String toString() {
}
}

private static final PickerFactory pickerFactoryImpl =
new PickerFactory() {
@Override
public SubchannelPicker picker(List<WeightedChildPicker> childPickers) {
return new InterLocalityPicker(childPickers);
}
};

@Override
public void reset() {
for (Locality locality : localityMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.grpc.LoadBalancer.SubchannelPicker;
import java.util.List;

final class InterLocalityPicker extends SubchannelPicker {
final class RandomWeightedPicker extends SubchannelPicker {

private final List<WeightedChildPicker> weightedChildPickers;
private final ThreadSafeRandom random;
Expand Down Expand Up @@ -62,12 +62,12 @@ public String toString() {
}
}

InterLocalityPicker(List<WeightedChildPicker> weightedChildPickers) {
RandomWeightedPicker(List<WeightedChildPicker> weightedChildPickers) {
this(weightedChildPickers, ThreadSafeRandom.ThreadSafeRandomImpl.instance);
}

@VisibleForTesting
InterLocalityPicker(List<WeightedChildPicker> weightedChildPickers, ThreadSafeRandom random) {
RandomWeightedPicker(List<WeightedChildPicker> weightedChildPickers, ThreadSafeRandom random) {
checkNotNull(weightedChildPickers, "weightedChildPickers in null");
checkArgument(!weightedChildPickers.isEmpty(), "weightedChildPickers is empty");

Expand Down Expand Up @@ -116,4 +116,17 @@ public String toString() {
.add("totalWeight", totalWeight)
.toString();
}

/** Factory that creates a SubchannelPicker for a given list of weighted child pickers. */
interface WeightedPickerFactory {
WeightedPickerFactory RANDOM_PICKER_FACTORY =
new WeightedPickerFactory() {
@Override
public SubchannelPicker picker(List<WeightedChildPicker> childPickers) {
return new RandomWeightedPicker(childPickers);
}
};

SubchannelPicker picker(List<WeightedChildPicker> childPickers);
}
}
221 changes: 221 additions & 0 deletions xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.RandomWeightedPicker.WeightedChildPicker;
import io.grpc.xds.RandomWeightedPicker.WeightedPickerFactory;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedChildLbConfig;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/** Load balancer for weighted_target policy. */
final class WeightedTargetLoadBalancer extends LoadBalancer {

private final ChannelLogger channelLogger;
private final LoadBalancerRegistry lbRegistry;
private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
private final Map<String, ChildHelper> childHelpers = new HashMap<>();
private final Helper helper;
private final WeightedPickerFactory weightedPickerFactory;

private Map<String, WeightedChildLbConfig> targets = ImmutableMap.of();

/**
* Constructs a WeightedTargetLoadBalancer with the given weighted list of child balancer configs.
* The list must not be empty and must not contain duplicate lb configs.
*/
WeightedTargetLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) {
this(
checkNotNull(helper, "helper"),
checkNotNull(lbRegistry, "lbRegistry"),
WeightedPickerFactory.RANDOM_PICKER_FACTORY);
}

@VisibleForTesting
WeightedTargetLoadBalancer(
Helper helper, LoadBalancerRegistry lbRegistry, WeightedPickerFactory weightedPickerFactory) {
this.helper = helper;
this.lbRegistry = lbRegistry;
this.weightedPickerFactory = weightedPickerFactory;
channelLogger = helper.getChannelLogger();
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
channelLogger.log(ChannelLogLevel.DEBUG, "Received ResolvedAddresses {0}", resolvedAddresses);
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
if (!(lbConfig instanceof WeightedTargetConfig)) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(Status.UNAVAILABLE.withDescription(
"Load balancing config '" + lbConfig + "' is not a CdsConfig")));
return;
}

WeightedTargetConfig weightedTargetConfig = (WeightedTargetConfig) lbConfig;
Map<String, WeightedChildLbConfig> newTargets = weightedTargetConfig.targets;

for (String targetName : newTargets.keySet()) {
WeightedChildLbConfig weightedChildLbConfig = newTargets.get(targetName);
if (!targets.containsKey(targetName)) {
// Create child balancers for new names.
ChildHelper childHelper = new ChildHelper();
GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper);
childBalancer.switchTo(lbRegistry.getProvider(weightedChildLbConfig.policyName));
childHelpers.put(targetName, childHelper);
childBalancers.put(targetName, childBalancer);
} else if (!weightedChildLbConfig.policyName.equals(targets.get(targetName).policyName)) {
// Policy name change for the same target is not a typical usecase, but we should support
// it. Switch child policy.
childBalancers.get(targetName)
.switchTo(lbRegistry.getProvider(weightedChildLbConfig.policyName));
}
}

// Update new config map.
targets = newTargets;

// Call handleResolvedAddresses() for each child balancer.
for (String targetName : targets.keySet()) {
childBalancers.get(targetName).handleResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(targets.get(targetName).config)
.build());
}

// Cleanup removed targets.
// TODO(zdapeng): cache removed target for 15 minutes.
for (String targetName : childBalancers.keySet()) {
if (!targets.containsKey(targetName)) {
childBalancers.get(targetName).shutdown();
}
}
childBalancers.keySet().retainAll(targets.keySet());
childHelpers.keySet().retainAll(targets.keySet());
}

@Override
public void handleNameResolutionError(Status error) {
channelLogger.log(ChannelLogLevel.ERROR, "Name resolution error: {0}", error);
if (childBalancers.isEmpty()) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
for (LoadBalancer childBalancer : childBalancers.values()) {
childBalancer.handleNameResolutionError(error);
}
}

@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}

@Override
public void shutdown() {
channelLogger.log(ChannelLogLevel.DEBUG, "weighted_target load balancer is shutting down");
for (LoadBalancer childBalancer : childBalancers.values()) {
childBalancer.shutdown();
}
}

private void updateBalancingState() {
List<WeightedChildPicker> childPickers = new ArrayList<>();

ConnectivityState overallState = null;
for (String name : targets.keySet()) {
ChildHelper childHelper = childHelpers.get(name);
ConnectivityState childState = childHelper.currentState;
overallState = aggregateState(overallState, childState);
if (READY == childState) {
int weight = targets.get(name).weight;
childPickers.add(new WeightedChildPicker(weight, childHelper.currentPicker));
}
}

SubchannelPicker picker;
if (childPickers.isEmpty()) {
if (overallState == TRANSIENT_FAILURE) {
picker = new ErrorPicker(Status.UNAVAILABLE); // TODO: more details in status
} else {
picker = XdsSubchannelPickers.BUFFER_PICKER;
}
} else {
picker = weightedPickerFactory.picker(childPickers);
}

if (overallState != null) {
helper.updateBalancingState(overallState, picker);
}
}

@Nullable
private ConnectivityState aggregateState(
@Nullable ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
return childState;
}
if (overallState == READY || childState == READY) {
return READY;
}
if (overallState == CONNECTING || childState == CONNECTING) {
return CONNECTING;
}
if (overallState == IDLE || childState == IDLE) {
return IDLE;
}
return overallState;
}

private final class ChildHelper extends ForwardingLoadBalancerHelper {
ConnectivityState currentState = CONNECTING;
SubchannelPicker currentPicker = BUFFER_PICKER;

@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
currentState = newState;
currentPicker = newPicker;
WeightedTargetLoadBalancer.this.updateBalancingState();
}

@Override
protected Helper delegate() {
return helper;
}
}
}

0 comments on commit 829babc

Please sign in to comment.