Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: change ring_hash LB aggregation rule to handles transient_failures #9084

Merged
merged 7 commits into from Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
104 changes: 72 additions & 32 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Expand Up @@ -40,8 +40,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

/**
Expand All @@ -67,6 +69,8 @@ final class RingHashLoadBalancer extends LoadBalancer {

private List<RingEntry> ring;
private ConnectivityState currentState;
private Iterator<Subchannel> connectionAttemptIterator = subchannels.values().iterator();
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
private final Random random = new Random();

RingHashLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand Down Expand Up @@ -142,6 +146,14 @@ public void onSubchannelState(ConnectivityStateInfo newState) {
for (EquivalentAddressGroup addr : removedAddrs) {
removedSubchannels.add(subchannels.remove(addr));
}
// If we need to proactively start connecting, iterate through all the subchannels, starting
// at a random position.
// Alternatively, we should better start at the same position.
connectionAttemptIterator = subchannels.values().iterator();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
int randomAdvance = random.nextInt(subchannels.size());
while (randomAdvance-- > 0) {
connectionAttemptIterator.next();
}

// Update the picker before shutting down the subchannels, to reduce the chance of race
// between picking a subchannel and shutting it down.
Expand Down Expand Up @@ -203,53 +215,77 @@ public void shutdown() {
* TRANSIENT_FAILURE</li>
* <li>If there is at least one subchannel in CONNECTING state, overall state is
* CONNECTING</li>
* <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
* more than one subchannel, report CONNECTING </li>
* <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
* <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
* </ol>
*/
private void updateBalancingState() {
checkState(!subchannels.isEmpty(), "no subchannel has been created");
int failureCount = 0;
boolean hasConnecting = false;
Subchannel idleSubchannel = null;
ConnectivityState overallState = null;
boolean start_connection_attempt = false;
int num_idle_ = 0;
int num_ready_ = 0;
int num_connecting_ = 0;
int num_transient_failure_ = 0;
for (Subchannel subchannel : subchannels.values()) {
ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
if (state == READY) {
overallState = READY;
num_ready_++;
break;
}
if (state == TRANSIENT_FAILURE) {
failureCount++;
} else if (state == CONNECTING) {
hasConnecting = true;
} else if (state == TRANSIENT_FAILURE) {
num_transient_failure_++;
} else if (state == CONNECTING ) {
num_connecting_++;
} else if (state == IDLE) {
if (idleSubchannel == null) {
idleSubchannel = subchannel;
}
num_idle_++;
}
}
if (overallState == null) {
if (failureCount >= 2) {
// This load balancer may not get any pick requests from the upstream if it's reporting
// TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least
// one subchannel that has not failed at any given time.
if (!hasConnecting && idleSubchannel != null) {
idleSubchannel.requestConnection();
}
overallState = TRANSIENT_FAILURE;
} else if (hasConnecting) {
overallState = CONNECTING;
} else if (idleSubchannel != null) {
overallState = IDLE;
} else {
overallState = TRANSIENT_FAILURE;
}
ConnectivityState overallState;
if (num_ready_ > 0) {
overallState = READY;
} else if (num_transient_failure_ >= 2) {
overallState = TRANSIENT_FAILURE;
start_connection_attempt = true;
} else if (num_connecting_ > 0) {
overallState = CONNECTING;
} else if (num_transient_failure_ == 1 && subchannels.size() > 1) {
overallState = CONNECTING;
start_connection_attempt = true;
} else if (num_idle_ > 0) {
overallState = IDLE;
} else {
overallState = TRANSIENT_FAILURE;
start_connection_attempt = true;
}
RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels);
// TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates
helper.updateBalancingState(overallState, picker);
currentState = overallState;
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
//
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt) {
if (!connectionAttemptIterator.hasNext()) {
connectionAttemptIterator = subchannels.values().iterator();
}
connectionAttemptIterator.next().requestConnection();
}
}

private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
Expand All @@ -259,18 +295,22 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
updateConnectivityState(subchannel, stateInfo);
updateBalancingState();
}

private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
ConnectivityState previousConnectivityState = subchannelStateRef.value.getState();
// Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected.
// If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in
// TRANSIENT_FAILURE until it becomes READY.
if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) {
if (previousConnectivityState == TRANSIENT_FAILURE) {
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
return;
}
}
subchannelStateRef.value = stateInfo;
updateBalancingState();
}

private static void shutdownSubchannel(Subchannel subchannel) {
Expand Down