From 2ab46280fa65dfd926a3cdcce08a567bec5b272e Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 18 Apr 2022 12:16:08 -0700 Subject: [PATCH] xds: fix ring-hash-picker behaviour (#9085) --- .../io/grpc/xds/RingHashLoadBalancer.java | 67 +++-- .../io/grpc/xds/RingHashLoadBalancerTest.java | 236 +++++++++++++++++- 2 files changed, 273 insertions(+), 30 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index f4f8ee2e6ee..34d5e2efcbc 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import javax.annotation.Nullable; /** * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts. @@ -399,10 +400,12 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // Try finding a READY subchannel. Starting from the ring entry next to the RPC's hash. // If the one of the first two subchannels is not in TRANSIENT_FAILURE, return result // based on that subchannel. Otherwise, fail the pick unless a READY subchannel is found. - // Meanwhile, trigger connection for the first subchannel that is in IDLE if no subchannel - // before it is in CONNECTING or READY. - boolean hasPending = false; // true if having subchannel(s) in CONNECTING or IDLE - boolean canBuffer = true; // true if RPCs can be buffered with a pending subchannel + // Meanwhile, trigger connection for the channel and status: + // For the first subchannel that is in IDLE or TRANSIENT_FAILURE; + // And for the second subchannel that is in IDLE or TRANSIENT_FAILURE; + // And for each of the following subchannels that is in TRANSIENT_FAILURE or IDLE, + // stop until we find the first subchannel that is in CONNECTING or IDLE status. + boolean foundFirstNonFailed = false; // true if having subchannel(s) in CONNECTING or IDLE Subchannel firstSubchannel = null; Subchannel secondSubchannel = null; for (int i = 0; i < ring.size(); i++) { @@ -417,36 +420,50 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // are failed unless there is a READY connection. if (firstSubchannel == null) { firstSubchannel = subchannel.subchannel; - } else if (subchannel.subchannel != firstSubchannel) { - if (secondSubchannel == null) { - secondSubchannel = subchannel.subchannel; - } else if (subchannel.subchannel != secondSubchannel) { - canBuffer = false; + PickResult maybeBuffer = pickSubchannelsNonReady(subchannel); + if (maybeBuffer != null) { + return maybeBuffer; } - } - if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE) { - continue; - } - if (!hasPending) { // first non-failing subchannel - if (subchannel.stateInfo.getState() == IDLE) { - final Subchannel finalSubchannel = subchannel.subchannel; - syncContext.execute(new Runnable() { - @Override - public void run() { - finalSubchannel.requestConnection(); - } - }); + } else if (subchannel.subchannel != firstSubchannel && secondSubchannel == null) { + secondSubchannel = subchannel.subchannel; + PickResult maybeBuffer = pickSubchannelsNonReady(subchannel); + if (maybeBuffer != null) { + return maybeBuffer; } - if (canBuffer) { // done if this is the first or second two subchannel - return PickResult.withNoResult(); // queue the pick and re-process later + } else if (subchannel.subchannel != firstSubchannel + && subchannel.subchannel != secondSubchannel) { + if (!foundFirstNonFailed) { + pickSubchannelsNonReady(subchannel); + if (subchannel.stateInfo.getState() != TRANSIENT_FAILURE) { + foundFirstNonFailed = true; + } } - hasPending = true; } } // Fail the pick with error status of the original subchannel hit by hash. SubchannelView originalSubchannel = pickableSubchannels.get(ring.get(mid).addrKey); return PickResult.withError(originalSubchannel.stateInfo.getStatus()); } + + @Nullable + private PickResult pickSubchannelsNonReady(SubchannelView subchannel) { + if (subchannel.stateInfo.getState() == TRANSIENT_FAILURE + || subchannel.stateInfo.getState() == IDLE ) { + final Subchannel finalSubchannel = subchannel.subchannel; + syncContext.execute(new Runnable() { + @Override + public void run() { + finalSubchannel.requestConnection(); + } + }); + } + if (subchannel.stateInfo.getState() == CONNECTING + || subchannel.stateInfo.getState() == IDLE) { + return PickResult.withNoResult(); + } else { + return null; + } + } } /** diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 9edbe02f098..8b205e8ad85 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -618,9 +618,8 @@ public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); verify(subchannels.get(Collections.singletonList(servers.get(1)))) .requestConnection(); // kickoff connection to server3 (next first non-failing) - // TODO: zivy@ - //verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); - //verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); // Now connecting to server1. deliverSubchannelState( @@ -664,6 +663,7 @@ public void allSubchannelsInTransientFailure() { } verify(helper, atLeastOnce()) .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verifyConnection(3); // Picking subchannel triggers connection. RPC hash hits server0. PickSubchannelArgs args = new PickSubchannelArgsImpl( @@ -674,6 +674,233 @@ public void allSubchannelsInTransientFailure() { assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(result.getStatus().getDescription()) .isEqualTo("[FakeSocketAddress-server0] unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + } + + @Test + public void firstSubchannelIdle() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. RPC hash hits server0. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) + .requestConnection(); + } + + @Test + public void firstSubchannelConnecting() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) + .requestConnection(); + } + + @Test + public void firstSubchannelFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + } + + @Test + public void secondSubchannelConnecting() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + "unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verifyConnection(1); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isTrue(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); + } + + @Test + public void secondSubchannelFailure() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + " unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verifyConnection(2); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("[FakeSocketAddress-server0] unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1)))) + .requestConnection(); + } + + @Test + public void thirdSubchannelConnecting() { + // Map each server address to exactly one ring entry. + RingHashConfig config = new RingHashConfig(3, 3); + List servers = createWeightedServerAddrs(1, 1, 1); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); + verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // ring: + // "[FakeSocketAddress-server1]_0" + // "[FakeSocketAddress-server0]_0" + // "[FakeSocketAddress-server2]_0" + + Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + deliverSubchannelState(firstSubchannel, + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription( + firstSubchannel.getAddresses().getAddresses() + " unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("unreachable"))); + deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + ConnectivityStateInfo.forNonError(CONNECTING)); + verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verifyConnection(3); + + // Picking subchannel triggers connection. + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("[FakeSocketAddress-server0] unreachable"); + verify(subchannels.get(Collections.singletonList(servers.get(0)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(2)))) + .requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) + .requestConnection(); } @Test @@ -706,8 +933,7 @@ public void stickyTransientFailure() { CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); - // enabled me. there is a bug in picker behavior - // verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); + verify(subchannels.get(Collections.singletonList(servers.get(0)))).requestConnection(); verify(subchannels.get(Collections.singletonList(servers.get(2)))).requestConnection(); verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) .requestConnection();