Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: fix ring-hash-picker behaviour #9085

Merged
merged 9 commits into from Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 43 additions & 25 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Expand Up @@ -46,6 +46,8 @@
import java.util.Random;
import java.util.Set;

YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
import javax.annotation.Nullable;

/**
* A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
* It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its
Expand Down Expand Up @@ -399,10 +401,12 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash.
// If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result
// based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found.
// Meanwhile, trigger connection for the first subchannel that is in IDLE if no subchannel
// before it is in CONNECTING or READY.
boolean hasPending = false; // true if having subchannel(s) in CONNECTING or IDLE
boolean canBuffer = true; // true if RPCs can be buffered with a pending subchannel
// Meanwhile, trigger connection for the channel and status:
// For the first subchannel that is in IDLE or TRANSIENT_FAILURE;
// And for the second subchannel that is in IDLE or TRANSIENT_FAILURE;
// And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE,
// stop until we find the first subchannel that is in CONNECTING or IDLE status.
boolean foundFirstNonFailed = false; // true if having subchannel(s) in CONNECTING or IDLE
Subchannel firstSubchannel = null;
Subchannel secondSubchannel = null;
for (int i = 0; i < ring.size(); i++) {
Expand All @@ -417,36 +421,50 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// are failed unless there is a READY connection.
if (firstSubchannel == null) {
firstSubchannel = subchannel.subchannel;
} else if (subchannel.subchannel != firstSubchannel) {
if (secondSubchannel == null) {
secondSubchannel = subchannel.subchannel;
} else if (subchannel.subchannel != secondSubchannel) {
canBuffer = false;
PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
if (maybeBuffer != null) {
return maybeBuffer;
}
}
if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE) {
continue;
}
if (!hasPending) { // first non-failing subchannel
if (subchannel.stateInfo.getState() == IDLE) {
final Subchannel finalSubchannel = subchannel.subchannel;
syncContext.execute(new Runnable() {
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
} else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) {
secondSubchannel = subchannel.subchannel;
PickResult maybeBuffer = pickSubchannelsNonReady(subchannel);
if (maybeBuffer != null) {
return maybeBuffer;
}
if (canBuffer) { // done if this is the first or second two subchannel
return PickResult.withNoResult(); // queue the pick and re-process later
} else if (subchannel.subchannel != firstSubchannel
&& subchannel.subchannel != secondSubchannel) {
if (!foundFirstNonFailed) {
pickSubchannelsNonReady(subchannel);
if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) {
foundFirstNonFailed = true;
}
}
hasPending = true;
}
}
// Fail the pick with error status of the original subchannel hit by hash.
SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey);
return PickResult.withError(originalSubchannel.stateInfo.getStatus());
}

@Nullable
private PickResult pickSubchannelsNonReady(SubchannelView subchannel) {
if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE
|| subchannel.stateInfo.getState() == IDLE ) {
final Subchannel finalSubchannel = subchannel.subchannel;
syncContext.execute(new Runnable() {
@Override
public void run() {
finalSubchannel.requestConnection();
}
});
}
if (subchannel.stateInfo.getState() == CONNECTING
|| subchannel.stateInfo.getState() == IDLE) {
return PickResult.withNoResult();
} else {
return null;
}
}
}

/**
Expand Down
236 changes: 231 additions & 5 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Expand Up @@ -618,9 +618,8 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
assertThat(result.getStatus().getDescription()).isEqualTo("unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection(); // kickoff connection to server3 (next first non-failing)
// TODO: zivy@
//verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
//verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();

// Now connecting to server1.
deliverSubchannelState(
Expand Down Expand Up @@ -664,6 +663,7 @@ public void allSubchannelsInTransientFailure() {
}
verify(helper, atLeastOnce())
.updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(3);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
Expand All @@ -674,6 +674,233 @@ public void allSubchannelsInTransientFailure() {
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
}

@Test
public void firstSubchannelIdle() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection. RPC hash hits server0.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
}

@Test
public void firstSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
}

@Test
public void firstSubchannelFailure() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
public void secondSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + "unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verifyConnection(1);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))), never())
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
public void secondSubchannelFailure() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + " unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(2);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))))
.requestConnection();
}

@Test
public void thirdSubchannelConnecting() {
// Map each server address to exactly one ring entry.
RingHashConfig config = new RingHashConfig(3, 3);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
// ring:
// "[FakeSocketAddress-server1]_0"
// "[FakeSocketAddress-server0]_0"
// "[FakeSocketAddress-server2]_0"

Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0)));
deliverSubchannelState(firstSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription(
firstSubchannel.getAddresses().getAddresses() + " unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))),
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("unreachable")));
deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))),
ConnectivityStateInfo.forNonError(CONNECTING));
verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
verifyConnection(3);

// Picking subchannel triggers connection.
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse();
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("[FakeSocketAddress-server0] unreachable");
verify(subchannels.get(Collections.singletonList(servers.get(0))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2))))
.requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
}

@Test
Expand Down Expand Up @@ -706,8 +933,7 @@ public void stickyTransientFailure() {
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
// enabled me. there is a bug in picker behavior
// verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection();
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection();
Expand Down