diff --git a/balancer/weightedtarget/weightedaggregator/aggregator.go b/balancer/weightedtarget/weightedaggregator/aggregator.go index 38bd9b223f8..dffc539b85d 100644 --- a/balancer/weightedtarget/weightedaggregator/aggregator.go +++ b/balancer/weightedtarget/weightedaggregator/aggregator.go @@ -240,20 +240,23 @@ func (wbsa *Aggregator) BuildAndUpdate() { // Caller must hold wbsa.mu. func (wbsa *Aggregator) build() balancer.State { wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) - m := wbsa.idToPickerState // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated // state. var readyN, connectingN, idleN int - readyPickerWithWeights := make([]weightedPickerState, 0, len(m)) - for _, ps := range m { + pickerN := len(wbsa.idToPickerState) + readyPickers := make([]weightedPickerState, 0, pickerN) + errorPickers := make([]weightedPickerState, 0, pickerN) + for _, ps := range wbsa.idToPickerState { switch ps.stateToAggregate { case connectivity.Ready: readyN++ - readyPickerWithWeights = append(readyPickerWithWeights, *ps) + readyPickers = append(readyPickers, *ps) case connectivity.Connecting: connectingN++ case connectivity.Idle: idleN++ + case connectivity.TransientFailure: + errorPickers = append(errorPickers, *ps) } } var aggregatedState connectivity.State @@ -272,11 +275,11 @@ func (wbsa *Aggregator) build() balancer.State { var picker balancer.Picker switch aggregatedState { case connectivity.TransientFailure: - picker = base.NewErrPicker(balancer.ErrTransientFailure) + picker = newWeightedPickerGroup(errorPickers, wbsa.newWRR) case connectivity.Connecting: picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable) default: - picker = newWeightedPickerGroup(readyPickerWithWeights, wbsa.newWRR) + picker = newWeightedPickerGroup(readyPickers, wbsa.newWRR) } return balancer.State{ConnectivityState: aggregatedState, Picker: picker} } diff --git a/balancer/weightedtarget/weightedtarget_test.go b/balancer/weightedtarget/weightedtarget_test.go index cc923526422..ea76ea1297c 100644 --- a/balancer/weightedtarget/weightedtarget_test.go +++ b/balancer/weightedtarget/weightedtarget_test.go @@ -20,7 +20,9 @@ package weightedtarget import ( "encoding/json" + "errors" "fmt" + "strings" "testing" "time" @@ -569,7 +571,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn sc1's connection down. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + scConnErr := errors.New("subConn connection error") + wtb.UpdateSubConnState(sc1, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) p = <-cc.NewPickerCh want = []balancer.SubConn{sc4} if err := testutils.IsRoundRobin(want, subConnFromPicker(p)); err != nil { @@ -586,11 +592,14 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { } // Turn all connections down. - wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + wtb.UpdateSubConnState(sc4, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) p = <-cc.NewPickerCh for i := 0; i < 5; i++ { - if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { - t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) + if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) { + t.Fatalf("want pick error %q, got error %q", scConnErr, err) } } } @@ -793,7 +802,11 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { } // Move balancer 3 into transient failure. - wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + scConnErr := errors.New("subConn connection error") + wtb.UpdateSubConnState(sc3, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) <-cc.NewPickerCh // Remove the first balancer, while the third is transient failure. @@ -827,8 +840,8 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } for i := 0; i < 5; i++ { - if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { - t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) + if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) { + t.Fatalf("want pick error %q, got error %q", scConnErr, err) } } } @@ -1064,15 +1077,21 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes // Set both subconn to TransientFailure, this will put both sub-balancers in // transient failure. - wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + scConnErr := errors.New("subConn connection error") + wtb.UpdateSubConnState(sc1, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) <-cc.NewPickerCh - wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + wtb.UpdateSubConnState(sc2, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) p := <-cc.NewPickerCh for i := 0; i < 5; i++ { - r, err := p.Pick(balancer.PickInfo{}) - if err != balancer.ErrTransientFailure { - t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err) + if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) { + t.Fatalf("want pick error %q, got error %q", scConnErr, err) } } @@ -1086,8 +1105,8 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes for i := 0; i < 5; i++ { r, err := p.Pick(balancer.PickInfo{}) - if err != balancer.ErrTransientFailure { - t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err) + if err == nil || !strings.Contains(err.Error(), scConnErr.Error()) { + t.Fatalf("want pick error %q, got result %v, err %q", scConnErr, r, err) } } } diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index b08b8208989..0ba5e1e8094 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -19,6 +19,7 @@ package clusterresolver import ( "context" + "errors" "fmt" "testing" "time" @@ -247,7 +248,11 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { } // Turn down 1, use 2 - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + scConnErr := errors.New("subConn connection error") + edsb.UpdateSubConnState(sc1, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -274,7 +279,8 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { } // Should get an update with 1's old picker, to override 2's old picker. - if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil { + want := errors.New("last connection error: subConn connection error") + if err := testErrPickerFromCh(cc.NewPickerCh, want); err != nil { t.Fatal(err) } @@ -305,10 +311,15 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { } sc1 := <-cc.NewSubConnCh // Turn down 1, pick should error. - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + scConnErr := errors.New("subConn connection error") + edsb.UpdateSubConnState(sc1, balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + ConnectionError: scConnErr, + }) // Test pick failure. - if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil { + want := errors.New("last connection error: subConn connection error") + if err := testErrPickerFromCh(cc.NewPickerCh, want); err != nil { t.Fatal(err) } @@ -463,8 +474,8 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) { // Add localities to existing priorities. // -// - start with 2 locality with p0 and p1 -// - add localities to existing p0 and p1 +// - start with 2 locality with p0 and p1 +// - add localities to existing p0 and p1 func (s) TestEDSPriority_MultipleLocalities(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup()