Skip to content

Commit

Permalink
xds: fix ring-hash-picker behaviour (#9085)
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Apr 18, 2022
1 parent a0da558 commit 81c4571
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 30 deletions.
67 changes: 42 additions & 25 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;

/**
* A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
Expand Down Expand Up @@ -399,10 +400,12 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash.
// If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result
// based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found.
// Meanwhile, trigger connection for the first subchannel that is in IDLE if no subchannel
// before it is in CONNECTING or READY.
boolean hasPending = false; // true if having subchannel(s) in CONNECTING or IDLE
boolean canBuffer = true; // true if RPCs can be buffered with a pending subchannel
// Meanwhile, trigger connection for the channel and status:
// For the first subchannel that is in IDLE or TRANSIENT_FAILURE;
// And for the second subchannel that is in IDLE or TRANSIENT_FAILURE;
// And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE,
// stop until we find the first subchannel that is in CONNECTING or IDLE status.
boolean foundFirstNonFailed = false; // true if having subchannel(s) in CONNECTING or IDLE
Subchannel firstSubchannel = null;
Subchannel secondSubchannel = null;
for (int i = 0; i < ring.size(); i++) {
Expand All @@ -417,36 +420,50 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// are failed unless there is a READY connection.
if (firstSubchannel == null) {
firstSubchannel = subchannel.subchannel;
} else if (subchannel.subchannel != firstSubchannel) {
if (secondSubchannel == null) {
secondSubchannel = subchannel.subchannel;
} else if (subchannel.subchannel != secondSubchannel) {
canBuffer = false;
PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
if (maybeBuffer != null) {
return maybeBuffer;
}
}
if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE) {
continue;
}
if (!hasPending) { // first non-failing subchannel
if (subchannel.stateInfo.getState() == IDLE) {
final Subchannel finalSubchannel = subchannel.subchannel;
syncContext.execute(new Runnable() {
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
} else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) {
secondSubchannel = subchannel.subchannel;
PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
if (maybeBuffer != null) {
return maybeBuffer;
}
if (canBuffer) { // done if this is the first or second two subchannel
return PickResult.withNoResult(); // queue the pick and re-process later
} else if (subchannel.subchannel != firstSubchannel
&& subchannel.subchannel != secondSubchannel) {
if (!foundFirstNonFailed) {
pickSubchannelsNonReady(subchannel);
if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) {
foundFirstNonFailed = true;
}
}
hasPending = true;
}
}
// Fail the pick with error status of the original subchannel hit by hash.
SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey);
return PickResult.withError(originalSubchannel.stateInfo.getStatus());
}

@Nullable
private PickResult pickSubchannelsNonReady(SubchannelView subchannel) {
if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE
|| subchannel.stateInfo.getState() == IDLE ) {
final Subchannel finalSubchannel = subchannel.subchannel;
syncContext.execute(new Runnable() {
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
}
if (subchannel.stateInfo.getState() == CONNECTING
|| subchannel.stateInfo.getState() == IDLE) {
return PickResult.withNoResult();
} else {
return null;
}
}
}

/**
Expand Down
236 changes: 231 additions & 5 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Expand Up @@ -618,9 +618,8 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
assertThat(result.getStatus().getDescription()).isEqualTo("unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection(); // kickoff connection to server3 (next first non-failing)
// TODO: zivy@
//verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
//verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();

// Now connecting to server1.
deliverSubchannelState(
Expand Down Expand Up @@ -664,6 +663,7 @@ public void allSubchannelsInTransientFailure() {
}
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(3);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
Expand All @@ -674,6 +674,233 @@ public void allSubchannelsInTransientFailure() {
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
}

@Test
public void firstSubchannelIdle() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
}

@Test
public void firstSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
}

@Test
public void firstSubchannelFailure() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
public void secondSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + "unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
public void secondSubchannelFailure() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + " unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(2);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection();
}

@Test
public void thirdSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + " unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(3);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
Expand Down Expand Up @@ -706,8 +933,7 @@ public void stickyTransientFailure() {
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
// enabled me. there is a bug in picker behavior
// verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
Expand Down

0 comments on commit 81c4571

Please sign in to comment.