Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

weightedtarget: return a more meaningful error when no child policy is reporting READY (#5391) #5711

Merged
merged 3 commits into from Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions balancer/weightedtarget/weightedaggregator/aggregator.go
Expand Up @@ -240,20 +240,23 @@ func (wbsa *Aggregator) BuildAndUpdate() {
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
m := wbsa.idToPickerState
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
readyPickerWithWeights := make([]weightedPickerState, 0, len(m))
for _, ps := range m {
pickerN := len(wbsa.idToPickerState)
readyPickers := make([]weightedPickerState, 0, pickerN)
errorPickers := make([]weightedPickerState, 0, pickerN)
for _, ps := range wbsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
readyPickers = append(readyPickers, *ps)
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
case connectivity.TransientFailure:
errorPickers = append(errorPickers, *ps)
}
}
var aggregatedState connectivity.State
Expand All @@ -272,11 +275,11 @@ func (wbsa *Aggregator) build() balancer.State {
var picker balancer.Picker
switch aggregatedState {
case connectivity.TransientFailure:
picker = base.NewErrPicker(balancer.ErrTransientFailure)
picker = newWeightedPickerGroup(errorPickers, wbsa.newWRR)
case connectivity.Connecting:
picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
default:
picker = newWeightedPickerGroup(readyPickerWithWeights, wbsa.newWRR)
picker = newWeightedPickerGroup(readyPickers, wbsa.newWRR)
}
return balancer.State{ConnectivityState: aggregatedState, Picker: picker}
}
Expand Down
47 changes: 33 additions & 14 deletions balancer/weightedtarget/weightedtarget_test.go
Expand Up @@ -20,7 +20,9 @@ package weightedtarget

import (
"encoding/json"
"errors"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -569,7 +571,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
}

// Turn sc1's connection down.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
scConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc1, balancer.SubConnState{
arvindbr8 marked this conversation as resolved.
Show resolved Hide resolved
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p)); err != nil {
Expand All @@ -586,11 +592,14 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
}

// Turn all connections down.
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
wtb.UpdateSubConnState(sc4, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
p = <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick error %q, got error %q", scConnErr, err)
}
}
}
Expand Down Expand Up @@ -793,7 +802,11 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
}

// Move balancer 3 into transient failure.
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
scConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc3, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
<-cc.NewPickerCh

// Remove the first balancer, while the third is transient failure.
Expand Down Expand Up @@ -827,8 +840,8 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
}
for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick error %q, got error %q", scConnErr, err)
}
}
}
Expand Down Expand Up @@ -1064,15 +1077,21 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes

// Set both subconn to TransientFailure, this will put both sub-balancers in
// transient failure.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
scConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
<-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
wtb.UpdateSubConnState(sc2, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
p := <-cc.NewPickerCh

for i := 0; i < 5; i++ {
r, err := p.Pick(balancer.PickInfo{})
if err != balancer.ErrTransientFailure {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick error %q, got error %q", scConnErr, err)
}
}

Expand All @@ -1086,8 +1105,8 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes

for i := 0; i < 5; i++ {
r, err := p.Pick(balancer.PickInfo{})
if err != balancer.ErrTransientFailure {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
if err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick error %q, got result %v, err %q", scConnErr, r, err)
}
}
}
Expand Down
23 changes: 17 additions & 6 deletions xds/internal/balancer/clusterresolver/priority_test.go
Expand Up @@ -19,6 +19,7 @@ package clusterresolver

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -247,7 +248,11 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
}

// Turn down 1, use 2
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
scConnErr := errors.New("subConn connection error")
edsb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
Expand All @@ -274,7 +279,8 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
}

// Should get an update with 1's old picker, to override 2's old picker.
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil {
want := errors.New("last connection error: subConn connection error")
if err := testErrPickerFromCh(cc.NewPickerCh, want); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to make the same comment as I did on the other file for here, but realized that testErrPickerFromCh performs a reflect.DeepEqual() on the got and want errors, instead of doing a strings.Contains(). Would you mind fixing the implementation of testErrPickerFromCh to do the latter instead. It doesn't have to be done in this PR. You can send another PR for that after this is merged. Thanks.

t.Fatal(err)
}

Expand Down Expand Up @@ -305,10 +311,15 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
}
sc1 := <-cc.NewSubConnCh
// Turn down 1, pick should error.
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
scConnErr := errors.New("subConn connection error")
edsb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})

// Test pick failure.
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil {
want := errors.New("last connection error: subConn connection error")
if err := testErrPickerFromCh(cc.NewPickerCh, want); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -463,8 +474,8 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {

// Add localities to existing priorities.
//
// - start with 2 locality with p0 and p1
// - add localities to existing p0 and p1
// - start with 2 locality with p0 and p1
// - add localities to existing p0 and p1
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
Expand Down