Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/ringhash: update connectivity state aggregation, and make sure at least one SubConn is connecting in TF #5338

Merged
merged 3 commits into from May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions xds/internal/balancer/ringhash/picker.go
Expand Up @@ -143,6 +143,8 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
}

// nextSkippingDuplicates finds the next entry in the ring, with a different
// subconn from the given entry.
func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
for next := ring.next(entry); next != entry; next = ring.next(next) {
if next.sc != entry.sc {
Expand All @@ -152,3 +154,28 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
// There's no qualifying next entry.
return nil
}

// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's
// different from the given subconn.
func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn {
var entry *ringEntry
for _, it := range ring.items {
if it.sc == sc {
entry = it
break
}
}
if entry == nil {
// If the given subconn is not in the ring (e.g. it was deleted), return
// the first one.
if len(ring.items) > 0 {
return ring.items[0].sc
}
return nil
}
ee := nextSkippingDuplicates(ring, entry)
if ee == nil {
return nil
}
return ee.sc
}
67 changes: 60 additions & 7 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -98,6 +98,10 @@ type subConn struct {
// When connectivity state is updated to Idle for this SubConn, if
// connectQueued is true, Connect() will be called on the SubConn.
connectQueued bool
// attemptingToConnect indicates if this subconn is attempting to connect.
// It's set when queueConnect is called. It's unset when the state is
// changed to Ready/Shutdown, or Idle (and if connectQueued is false).
attemptingToConnect bool
}

// setState updates the state of this SubConn.
Expand All @@ -113,6 +117,8 @@ func (sc *subConn) setState(s connectivity.State) {
if sc.connectQueued {
sc.connectQueued = false
sc.sc.Connect()
} else {
sc.attemptingToConnect = false
}
case connectivity.Connecting:
// Clear connectQueued if the SubConn isn't failing. This state
Expand All @@ -122,11 +128,14 @@ func (sc *subConn) setState(s connectivity.State) {
// Clear connectQueued if the SubConn isn't failing. This state
// transition is unlikely to happen, but handle this just in case.
sc.connectQueued = false
sc.attemptingToConnect = false
// Set to a non-failing state.
sc.failing = false
case connectivity.TransientFailure:
// Set to a failing state.
sc.failing = true
case connectivity.Shutdown:
sc.attemptingToConnect = false
}
sc.state = s
}
Expand All @@ -149,6 +158,7 @@ func (sc *subConn) effectiveState() connectivity.State {
func (sc *subConn) queueConnect() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.attemptingToConnect = true
if sc.state == connectivity.Idle {
sc.sc.Connect()
return
Expand All @@ -158,6 +168,12 @@ func (sc *subConn) queueConnect() {
sc.connectQueued = true
}

func (sc *subConn) isAttemptingToConnect() bool {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.attemptingToConnect
}

type ringhashBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
Expand Down Expand Up @@ -268,7 +284,8 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
var err error
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
panic(err)
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err))
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
Expand Down Expand Up @@ -334,12 +351,6 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance

switch s {
case connectivity.Idle:
// When the overall state is TransientFailure, this will never get picks
// if there's a lower priority. Need to keep the SubConns connecting so
// there's a chance it will recover.
if b.state == connectivity.TransientFailure {
scs.queueConnect()
}
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Connecting:
Expand All @@ -364,6 +375,35 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
if sendUpdate {
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

switch b.state {
case connectivity.Connecting, connectivity.TransientFailure:
// When overall state is TransientFailure, we need to make sure at least
// one SubConn is attempting to connect, otherwise this balancer may
// never get picks if the parent is priority.
//
// Because we report Connecting as the overall state when only one
// SubConn is in TransientFailure, we do the same check for Connecting
// here.
//
// Note that this check also covers deleting SubConns due to address
// change. E.g. if the SubConn attempting to connect is deleted, and the
// overall state is TF. Since there must be at least one SubConn
// attempting to connect, we need to trigger one. But since the deleted
// SubConn will eventually send a shutdown update, this code will run
// and trigger the next SubConn to connect.
for _, sc := range b.subConns {
if sc.isAttemptingToConnect() {
return
}
}
// Trigger a SubConn (this updated SubConn's next SubConn in the ring)
// to connect if nobody is attempting to connect.
sc := nextSkippingDuplicatesSubConn(b.ring, scs)
if sc != nil {
sc.queueConnect()
}
}
}

// mergeErrors builds an error from the last connection error and the last
Expand Down Expand Up @@ -395,6 +435,7 @@ func (b *ringhashBalancer) Close() {}
//
// It's not thread safe.
type connectivityStateEvaluator struct {
sum uint64
nums [5]uint64
}

Expand All @@ -404,6 +445,7 @@ type connectivityStateEvaluator struct {
// - If there is at least one subchannel in READY state, report READY.
// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE.
// - If there is at least one subchannel in CONNECTING state, report CONNECTING.
// - If there is one subchannel in TRANSIENT_FAILURE and there is more than one subchannel, report state CONNECTING.
// - If there is at least one subchannel in Idle state, report Idle.
// - Otherwise, report TRANSIENT_FAILURE.
//
Expand All @@ -417,6 +459,14 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
cse.nums[state] += updateVal
}
if oldState == connectivity.Shutdown {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you ever transition from shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a new SubConn is created, we record a shutdown->idle transition.
(And I believe we do the same in roundrobin)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, that makes sense thanks.

// There's technically no transition from Shutdown. But we record a
// Shutdown->Idle transition when a new SubConn is created.
cse.sum++
}
if newState == connectivity.Shutdown {
cse.sum--
}

if cse.nums[connectivity.Ready] > 0 {
return connectivity.Ready
Expand All @@ -427,6 +477,9 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
if cse.nums[connectivity.Connecting] > 0 {
return connectivity.Connecting
}
if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 {
return connectivity.Connecting
}
if cse.nums[connectivity.Idle] > 0 {
return connectivity.Idle
}
Expand Down
60 changes: 43 additions & 17 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -365,8 +365,8 @@ func TestAddrWeightChange(t *testing.T) {
}

// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the
// overall state is TransientFailure, the SubConns turning Idle will be
// triggered to Connect(). But not when the overall state is not
// overall state is TransientFailure, the SubConns turning Idle will trigger the
// next SubConn in the ring to Connect(). But not when the overall state is not
// TransientFailure.
func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
wantAddrs := []resolver.Address{
Expand All @@ -377,30 +377,56 @@ func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
_, b, p0 := setupTest(t, wantAddrs)
ring0 := p0.(*picker).ring

// Turn all SubConns to TransientFailure.
for _, it := range ring0.items {
b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
}

// The next one turning Idle should Connect().
// Turn the first subconn to transient failure.
sc0 := ring0.items[0].sc.sc
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the second subconn to connect (because overall state is
// Connect (when one subconn is TF)).
sc1 := ring0.items[1].sc.sc
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc1)
}

// Turn the second subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the third subconn to connect.
sc2 := ring0.items[2].sc.sc
select {
case <-sc2.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2)
}

// Turn the third subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the first subconn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0)
}

// If this SubConn is ready. Other SubConns turning Idle will not Connect().
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Turn the third subconn to TF again.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// The third SubConn in the ring should connect.
sc1 := ring0.items[1].sc.sc
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})
// This will not trigger any new Connect() on the SubConns, because sc0 is
// still attempting to connect, and we only need one SubConn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc0)
case <-sc1.(*testutils.TestSubConn).ConnectCh:
t.Errorf("unexpected Connect() from SubConn %v", sc1)
t.Fatalf("unexpected Connect() from SubConn %v", sc1)
case <-sc2.(*testutils.TestSubConn).ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc2)
case <-time.After(defaultTestShortTimeout):
}
}
Expand Down