Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: change ring_hash LB aggregation rule to handles transient_failures #9084

Merged
merged 7 commits into from Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 72 additions & 31 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Expand Up @@ -40,6 +40,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -67,6 +68,9 @@ final class RingHashLoadBalancer extends LoadBalancer {

private List<RingEntry> ring;
private ConnectivityState currentState;
// If we need to proactively start connecting, simply iterate through all the subchannels.
// Alternatively, we can do it more fairly and effectively.
private Iterator<Subchannel> connectionAttemptIterator = subchannels.values().iterator();
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved

RingHashLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand Down Expand Up @@ -142,6 +146,7 @@ public void onSubchannelState(ConnectivityStateInfo newState) {
for (EquivalentAddressGroup addr : removedAddrs) {
removedSubchannels.add(subchannels.remove(addr));
}
connectionAttemptIterator = subchannels.values().iterator();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved

// Update the picker before shutting down the subchannels, to reduce the chance of race
// between picking a subchannel and shutting it down.
Expand Down Expand Up @@ -203,53 +208,85 @@ public void shutdown() {
* TRANSIENT_FAILURE</li>
* <li>If there is at least one subchannel in CONNECTING state, overall state is
* CONNECTING</li>
* <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
* more than one subchannel, report CONNECTING </li>
* <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
* <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
* </ol>
*/
private void updateBalancingState() {
checkState(!subchannels.isEmpty(), "no subchannel has been created");
int failureCount = 0;
boolean hasConnecting = false;
Subchannel idleSubchannel = null;
ConnectivityState overallState = null;
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
boolean start_connection_attempt = false;
int num_idle_ = 0;
int num_ready_ = 0;
int num_connecting_ = 0;
int num_transient_failure_ = 0;
for (Subchannel subchannel : subchannels.values()) {
ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
if (state == READY) {
overallState = READY;
num_ready_++;
break;
}
if (state == TRANSIENT_FAILURE) {
failureCount++;
} else if (state == CONNECTING) {
hasConnecting = true;
} else if (state == TRANSIENT_FAILURE) {
num_transient_failure_++;
} else if (state == CONNECTING ) {
num_connecting_++;
} else if (state == IDLE) {
if (idleSubchannel == null) {
idleSubchannel = subchannel;
}
num_idle_++;
}
}
if (overallState == null) {
if (failureCount >= 2) {
// This load balancer may not get any pick requests from the upstream if it's reporting
// TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least
// one subchannel that has not failed at any given time.
if (!hasConnecting && idleSubchannel != null) {
idleSubchannel.requestConnection();
}
overallState = TRANSIENT_FAILURE;
} else if (hasConnecting) {
overallState = CONNECTING;
} else if (idleSubchannel != null) {
overallState = IDLE;
} else {
overallState = TRANSIENT_FAILURE;
}
if (num_ready_ > 0) {
overallState = READY;
} else if (num_transient_failure_ >= 2) {
overallState = TRANSIENT_FAILURE;
start_connection_attempt = true;
} else if (num_connecting_ > 0) {
overallState = CONNECTING;
} else if (num_transient_failure_ == 1 && subchannels.size() > 1) {
overallState = CONNECTING;
start_connection_attempt = true;
} else if (num_idle_ > 0) {
overallState = IDLE;
} else {
overallState = TRANSIENT_FAILURE;
start_connection_attempt = true;
}
RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels);
// TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates
helper.updateBalancingState(overallState, picker);
currentState = overallState;
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
//
// Note that we do the same thing when the policy is in state
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt) {
if (!connectionAttemptIterator.hasNext()) {
connectionAttemptIterator = subchannels.values().iterator();
}
if (connectionAttemptIterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be an else clause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we do this regardless of resetting the iterator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to have this condition at all? The only way it would produce results is if subchannels is empty. But it seems things will be pretty broken if that's the case. random.nextInt(0) throws, for example. Are we doing this because we aren't confident the state of things when it is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not relying on the assumption that it is protected by syncContext is one reason.
The other is kinda best practice to always call hasNext() before next. I guess it does not hurt unless that it might hide some unexpected bugs.

final Subchannel finalSubchannel = connectionAttemptIterator.next();
syncContext.execute(new Runnable() {
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
}
}
}

private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
Expand All @@ -259,18 +296,22 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
helper.refreshNameResolution();
}
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
updateConnectivityState(subchannel, stateInfo);
updateBalancingState();
}

private void updateConnectivityState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
ConnectivityState previousConnectivityState = subchannelStateRef.value.getState();
// Don't proactively reconnect if the subchannel enters IDLE, even if previously was connected.
// If the subchannel was previously in TRANSIENT_FAILURE, it is considered to stay in
// TRANSIENT_FAILURE until it becomes READY.
if (subchannelStateRef.value.getState() == TRANSIENT_FAILURE) {
if (previousConnectivityState == TRANSIENT_FAILURE) {
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
return;
}
}
subchannelStateRef.value = stateInfo;
updateBalancingState();
}

private static void shutdownSubchannel(Subchannel subchannel) {
Expand Down