Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: revert stickiness from round robin #6698

Merged
merged 1 commit into from Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 0 additions & 9 deletions core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
Expand Up @@ -314,15 +314,6 @@ public static List<LbConfig> unwrapLoadBalancingConfigList(List<Map<String, ?>>
return Collections.unmodifiableList(result);
}

/**
* Extracts the stickiness metadata key from a service config, or {@code null}.
*/
@Nullable
public static String getStickinessMetadataKeyFromServiceConfig(
Map<String, ?> serviceConfig) {
return JsonUtil.getString(serviceConfig, "stickinessMetadataKey");
}

/**
* A LoadBalancingConfig that includes the policy name (the key) and its raw config value (parsed
* JSON).
Expand Down
160 changes: 5 additions & 155 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Expand Up @@ -27,36 +27,24 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

import io.grpc.Attributes;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* A {@link LoadBalancer} that provides round-robin load-balancing over the {@link
Expand All @@ -66,8 +54,6 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");
// package-private to avoid synthetic access
static final Attributes.Key<Ref<Subchannel>> STICKY_REF = Attributes.Key.create("sticky-ref");

private final Helper helper;
private final Map<EquivalentAddressGroup, Subchannel> subchannels =
Expand All @@ -77,40 +63,18 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
private ConnectivityState currentState;
private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);

@Nullable
private StickinessState stickinessState;

RoundRobinLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.random = new Random();
}

@Override
@SuppressWarnings("deprecation")
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Attributes attributes = resolvedAddresses.getAttributes();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());

Map<String, ?> serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null) {
String stickinessMetadataKey =
ServiceConfigUtil.getStickinessMetadataKeyFromServiceConfig(serviceConfig);
if (stickinessMetadataKey != null) {
if (stickinessMetadataKey.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
helper.getChannelLogger().log(
ChannelLogLevel.WARNING,
"Binary stickiness header is not supported. The header \"{0}\" will be ignored",
stickinessMetadataKey);
} else if (stickinessState == null
|| !stickinessState.key.name().equals(stickinessMetadataKey)) {
stickinessState = new StickinessState(stickinessMetadataKey);
}
}
}

for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry :
latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
Expand All @@ -133,11 +97,6 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
.set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));

Ref<Subchannel> stickyRef = null;
if (stickinessState != null) {
subchannelAttrs.set(STICKY_REF, stickyRef = new Ref<>(null));
}

final Subchannel subchannel = checkNotNull(
helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(originalAddressGroup)
Expand All @@ -150,9 +109,6 @@ public void onSubchannelState(ConnectivityStateInfo state) {
processSubchannelState(subchannel, state);
}
});
if (stickyRef != null) {
stickyRef.value = subchannel;
}
subchannels.put(strippedAddressGroup, subchannel);
subchannel.requestConnection();
}
Expand Down Expand Up @@ -183,9 +139,6 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
stickinessState.remove(subchannel);
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Expand All @@ -197,9 +150,6 @@ private void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value =
ConnectivityStateInfo.forNonError(SHUTDOWN);
if (stickinessState != null) {
stickinessState.remove(subchannel);
}
}

@Override
Expand Down Expand Up @@ -241,7 +191,7 @@ private void updateBalancingState() {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState));
updateBalancingState(READY, new ReadyPicker(activeList, startIndex));
}
}

Expand Down Expand Up @@ -305,90 +255,6 @@ private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
return aCopy;
}

Map<String, Ref<Subchannel>> getStickinessMapForTest() {
if (stickinessState == null) {
return null;
}
return stickinessState.stickinessMap;
}

/**
* Holds stickiness related states: The stickiness key, a registry mapping stickiness values to
* the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref.
*/
@VisibleForTesting
static final class StickinessState {
static final int MAX_ENTRIES = 1000;

final Key<String> key;
final ConcurrentMap<String, Ref<Subchannel>> stickinessMap =
new ConcurrentHashMap<>();

final Queue<String> evictionQueue = new ConcurrentLinkedQueue<>();

StickinessState(@Nonnull String stickinessKey) {
this.key = Key.of(stickinessKey, Metadata.ASCII_STRING_MARSHALLER);
}

/**
* Returns the subchannel associated to the stickiness value if available in both the
* registry and the round robin list, otherwise associates the given subchannel with the
* stickiness key in the registry and returns the given subchannel.
*/
@Nonnull
Subchannel maybeRegister(
String stickinessValue, @Nonnull Subchannel subchannel) {
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
stickinessMap.putIfAbsent(stickinessValue, newSubchannelRef);
if (existingSubchannelRef == null) {
// new entry
addToEvictionQueue(stickinessValue);
return subchannel;
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
if (existingSubchannel != null && isReady(existingSubchannel)) {
return existingSubchannel;
}
}
// existingSubchannelRef is not null but no longer valid, replace it
if (stickinessMap.replace(stickinessValue, existingSubchannelRef, newSubchannelRef)) {
return subchannel;
}
// another thread concurrently removed or updated the entry, try again
}
}

private void addToEvictionQueue(String value) {
String oldValue;
while (stickinessMap.size() >= MAX_ENTRIES && (oldValue = evictionQueue.poll()) != null) {
stickinessMap.remove(oldValue);
}
evictionQueue.add(value);
}

/**
* Unregister the subchannel from StickinessState.
*/
void remove(Subchannel subchannel) {
subchannel.getAttributes().get(STICKY_REF).value = null;
}

/**
* Gets the subchannel associated with the stickiness value if there is.
*/
@Nullable
Subchannel getSubchannel(String stickinessValue) {
Ref<Subchannel> subchannelRef = stickinessMap.get(stickinessValue);
if (subchannelRef != null) {
return subchannelRef.value;
}
return null;
}
}

// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class RoundRobinPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(RoundRobinPicker picker);
Expand All @@ -400,33 +266,18 @@ static final class ReadyPicker extends RoundRobinPicker {
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");

private final List<Subchannel> list; // non-empty
@Nullable
private final RoundRobinLoadBalancer.StickinessState stickinessState;
@SuppressWarnings("unused")
private volatile int index;

ReadyPicker(List<Subchannel> list, int startIndex,
@Nullable RoundRobinLoadBalancer.StickinessState stickinessState) {
ReadyPicker(List<Subchannel> list, int startIndex) {
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.stickinessState = stickinessState;
this.index = startIndex - 1;
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}
}

return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
return PickResult.withSubchannel(nextSubchannel());
}

@Override
Expand Down Expand Up @@ -457,9 +308,8 @@ boolean isEquivalentTo(RoundRobinPicker picker) {
}
ReadyPicker other = (ReadyPicker) picker;
// the lists cannot contain duplicate subchannels
return other == this || (stickinessState == other.stickinessState
&& list.size() == other.list.size()
&& new HashSet<>(list).containsAll(other.list));
return other == this
|| (list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list));
}
}

Expand Down