Skip to content

Commit

Permalink
core: revert stickiness from round robin (#6698)
Browse files Browse the repository at this point in the history
  • Loading branch information
creamsoup committed Feb 11, 2020
1 parent 774f276 commit 6599871
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 538 deletions.
9 changes: 0 additions & 9 deletions core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
Expand Up @@ -314,15 +314,6 @@ public static List<LbConfig> unwrapLoadBalancingConfigList(List<Map<String, ?>>
return Collections.unmodifiableList(result);
}

/**
* Extracts the stickiness metadata key from a service config, or {@code null}.
*/
@Nullable
public static String getStickinessMetadataKeyFromServiceConfig(
Map<String, ?> serviceConfig) {
return JsonUtil.getString(serviceConfig, "stickinessMetadataKey");
}

/**
* A LoadBalancingConfig that includes the policy name (the key) and its raw config value (parsed
* JSON).
Expand Down
160 changes: 5 additions & 155 deletions core/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Expand Up @@ -27,36 +27,24 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

import io.grpc.Attributes;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* A {@link LoadBalancer} that provides round-robin load-balancing over the {@link
Expand All @@ -66,8 +54,6 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");
// package-private to avoid synthetic access
static final Attributes.Key<Ref<Subchannel>> STICKY_REF = Attributes.Key.create("sticky-ref");

private final Helper helper;
private final Map<EquivalentAddressGroup, Subchannel> subchannels =
Expand All @@ -77,40 +63,18 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
private ConnectivityState currentState;
private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);

@Nullable
private StickinessState stickinessState;

RoundRobinLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.random = new Random();
}

@Override
@SuppressWarnings("deprecation")
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Attributes attributes = resolvedAddresses.getAttributes();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());

Map<String, ?> serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null) {
String stickinessMetadataKey =
ServiceConfigUtil.getStickinessMetadataKeyFromServiceConfig(serviceConfig);
if (stickinessMetadataKey != null) {
if (stickinessMetadataKey.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
helper.getChannelLogger().log(
ChannelLogLevel.WARNING,
"Binary stickiness header is not supported. The header \"{0}\" will be ignored",
stickinessMetadataKey);
} else if (stickinessState == null
|| !stickinessState.key.name().equals(stickinessMetadataKey)) {
stickinessState = new StickinessState(stickinessMetadataKey);
}
}
}

for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry :
latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
Expand All @@ -133,11 +97,6 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
.set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));

Ref<Subchannel> stickyRef = null;
if (stickinessState != null) {
subchannelAttrs.set(STICKY_REF, stickyRef = new Ref<>(null));
}

final Subchannel subchannel = checkNotNull(
helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(originalAddressGroup)
Expand All @@ -150,9 +109,6 @@ public void onSubchannelState(ConnectivityStateInfo state) {
processSubchannelState(subchannel, state);
}
});
if (stickyRef != null) {
stickyRef.value = subchannel;
}
subchannels.put(strippedAddressGroup, subchannel);
subchannel.requestConnection();
}
Expand Down Expand Up @@ -183,9 +139,6 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
stickinessState.remove(subchannel);
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
Expand All @@ -197,9 +150,6 @@ private void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value =
ConnectivityStateInfo.forNonError(SHUTDOWN);
if (stickinessState != null) {
stickinessState.remove(subchannel);
}
}

@Override
Expand Down Expand Up @@ -241,7 +191,7 @@ private void updateBalancingState() {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState));
updateBalancingState(READY, new ReadyPicker(activeList, startIndex));
}
}

Expand Down Expand Up @@ -305,90 +255,6 @@ private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
return aCopy;
}

Map<String, Ref<Subchannel>> getStickinessMapForTest() {
if (stickinessState == null) {
return null;
}
return stickinessState.stickinessMap;
}

/**
* Holds stickiness related states: The stickiness key, a registry mapping stickiness values to
* the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref.
*/
@VisibleForTesting
static final class StickinessState {
static final int MAX_ENTRIES = 1000;

final Key<String> key;
final ConcurrentMap<String, Ref<Subchannel>> stickinessMap =
new ConcurrentHashMap<>();

final Queue<String> evictionQueue = new ConcurrentLinkedQueue<>();

StickinessState(@Nonnull String stickinessKey) {
this.key = Key.of(stickinessKey, Metadata.ASCII_STRING_MARSHALLER);
}

/**
* Returns the subchannel associated to the stickiness value if available in both the
* registry and the round robin list, otherwise associates the given subchannel with the
* stickiness key in the registry and returns the given subchannel.
*/
@Nonnull
Subchannel maybeRegister(
String stickinessValue, @Nonnull Subchannel subchannel) {
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
stickinessMap.putIfAbsent(stickinessValue, newSubchannelRef);
if (existingSubchannelRef == null) {
// new entry
addToEvictionQueue(stickinessValue);
return subchannel;
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
if (existingSubchannel != null && isReady(existingSubchannel)) {
return existingSubchannel;
}
}
// existingSubchannelRef is not null but no longer valid, replace it
if (stickinessMap.replace(stickinessValue, existingSubchannelRef, newSubchannelRef)) {
return subchannel;
}
// another thread concurrently removed or updated the entry, try again
}
}

private void addToEvictionQueue(String value) {
String oldValue;
while (stickinessMap.size() >= MAX_ENTRIES && (oldValue = evictionQueue.poll()) != null) {
stickinessMap.remove(oldValue);
}
evictionQueue.add(value);
}

/**
* Unregister the subchannel from StickinessState.
*/
void remove(Subchannel subchannel) {
subchannel.getAttributes().get(STICKY_REF).value = null;
}

/**
* Gets the subchannel associated with the stickiness value if there is.
*/
@Nullable
Subchannel getSubchannel(String stickinessValue) {
Ref<Subchannel> subchannelRef = stickinessMap.get(stickinessValue);
if (subchannelRef != null) {
return subchannelRef.value;
}
return null;
}
}

// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class RoundRobinPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(RoundRobinPicker picker);
Expand All @@ -400,33 +266,18 @@ static final class ReadyPicker extends RoundRobinPicker {
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");

private final List<Subchannel> list; // non-empty
@Nullable
private final RoundRobinLoadBalancer.StickinessState stickinessState;
@SuppressWarnings("unused")
private volatile int index;

ReadyPicker(List<Subchannel> list, int startIndex,
@Nullable RoundRobinLoadBalancer.StickinessState stickinessState) {
ReadyPicker(List<Subchannel> list, int startIndex) {
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.stickinessState = stickinessState;
this.index = startIndex - 1;
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}
}

return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
return PickResult.withSubchannel(nextSubchannel());
}

@Override
Expand Down Expand Up @@ -457,9 +308,8 @@ boolean isEquivalentTo(RoundRobinPicker picker) {
}
ReadyPicker other = (ReadyPicker) picker;
// the lists cannot contain duplicate subchannels
return other == this || (stickinessState == other.stickinessState
&& list.size() == other.list.size()
&& new HashSet<>(list).containsAll(other.list));
return other == this
|| (list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list));
}
}

Expand Down

0 comments on commit 6599871

Please sign in to comment.