New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds/ringhash: update connectivity state aggregation, and make sure at least one SubConn is connecting in TF #5338
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,6 +98,10 @@ type subConn struct { | |
// When connectivity state is updated to Idle for this SubConn, if | ||
// connectQueued is true, Connect() will be called on the SubConn. | ||
connectQueued bool | ||
// attemptingToConnect indicates if this subconn is attempting to connect. | ||
// It's set when queueConnect is called. It's unset when the state is | ||
// changed to Ready/Shutdown, or Idle (and if connectQueued is false). | ||
attemptingToConnect bool | ||
} | ||
|
||
// setState updates the state of this SubConn. | ||
|
@@ -113,6 +117,8 @@ func (sc *subConn) setState(s connectivity.State) { | |
if sc.connectQueued { | ||
sc.connectQueued = false | ||
sc.sc.Connect() | ||
} else { | ||
sc.attemptingToConnect = false | ||
} | ||
case connectivity.Connecting: | ||
// Clear connectQueued if the SubConn isn't failing. This state | ||
|
@@ -122,11 +128,14 @@ func (sc *subConn) setState(s connectivity.State) { | |
// Clear connectQueued if the SubConn isn't failing. This state | ||
// transition is unlikely to happen, but handle this just in case. | ||
sc.connectQueued = false | ||
sc.attemptingToConnect = false | ||
// Set to a non-failing state. | ||
sc.failing = false | ||
case connectivity.TransientFailure: | ||
// Set to a failing state. | ||
sc.failing = true | ||
case connectivity.Shutdown: | ||
sc.attemptingToConnect = false | ||
} | ||
sc.state = s | ||
} | ||
|
@@ -147,8 +156,10 @@ func (sc *subConn) effectiveState() connectivity.State { | |
// it's Connect() will be triggered. If the SubConn state is already Idle, it | ||
// will just call Connect(). | ||
func (sc *subConn) queueConnect() { | ||
fmt.Printf(" ===== queue connect for %v\n", sc) | ||
sc.mu.Lock() | ||
defer sc.mu.Unlock() | ||
sc.attemptingToConnect = true | ||
if sc.state == connectivity.Idle { | ||
sc.sc.Connect() | ||
return | ||
|
@@ -158,6 +169,12 @@ func (sc *subConn) queueConnect() { | |
sc.connectQueued = true | ||
} | ||
|
||
func (sc *subConn) isAttemptingToConnect() bool { | ||
sc.mu.Lock() | ||
defer sc.mu.Unlock() | ||
return sc.attemptingToConnect | ||
} | ||
|
||
type ringhashBalancer struct { | ||
cc balancer.ClientConn | ||
logger *grpclog.PrefixLogger | ||
|
@@ -268,7 +285,8 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err | |
var err error | ||
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) | ||
if err != nil { | ||
panic(err) | ||
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err)) | ||
return balancer.ErrBadResolverState | ||
} | ||
b.regeneratePicker() | ||
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) | ||
|
@@ -334,12 +352,6 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance | |
|
||
switch s { | ||
case connectivity.Idle: | ||
// When the overall state is TransientFailure, this will never get picks | ||
// if there's a lower priority. Need to keep the SubConns connecting so | ||
// there's a chance it will recover. | ||
if b.state == connectivity.TransientFailure { | ||
scs.queueConnect() | ||
} | ||
// No need to send an update. No queued RPC can be unblocked. If the | ||
// overall state changed because of this, sendUpdate is already true. | ||
case connectivity.Connecting: | ||
|
@@ -364,6 +376,38 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance | |
if sendUpdate { | ||
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) | ||
} | ||
|
||
fmt.Println(" ---- overall state", b.state) | ||
|
||
switch b.state { | ||
case connectivity.Connecting, connectivity.TransientFailure: | ||
// When overall state is TransientFailure, we need to make sure at least | ||
// one SubConn is attempting to connect, otherwise this balancer may | ||
// never get picks if the parent is priority. | ||
// | ||
// Because we report Connecting as the overall state when only one | ||
// SubConn is in TransientFailure, we do the same check for Connecting | ||
// here. | ||
// | ||
// Note that this check also covers deleting SubConns due to address | ||
// change. E.g. if the SubConn attempting to connect is deleted, and the | ||
// overall state is TF. Since there must be at least one SubConn | ||
// attempting to connect, we need to trigger one. But since the deleted | ||
// SubConn will eventually send a shutdown update, this code will run | ||
// and trigger the next SubConn to connect. | ||
for _, sc := range b.subConns { | ||
if sc.isAttemptingToConnect() { | ||
fmt.Printf(" +++++ %v is attempting to connect\n", sc) | ||
return | ||
} | ||
} | ||
// Trigger a SubConn (this updated SubConn's next SubConn in the ring) | ||
// to connect if nobody is attempting to connect. | ||
sc := nextSkippingDuplicatesSubConn(b.ring, scs) | ||
if sc != nil { | ||
sc.queueConnect() | ||
} | ||
} | ||
} | ||
|
||
// mergeErrors builds an error from the last connection error and the last | ||
|
@@ -395,6 +439,7 @@ func (b *ringhashBalancer) Close() {} | |
// | ||
// It's not thread safe. | ||
type connectivityStateEvaluator struct { | ||
sum uint64 | ||
nums [5]uint64 | ||
} | ||
|
||
|
@@ -404,6 +449,7 @@ type connectivityStateEvaluator struct { | |
// - If there is at least one subchannel in READY state, report READY. | ||
// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE. | ||
// - If there is at least one subchannel in CONNECTING state, report CONNECTING. | ||
// - If there is one subchannel in TRANSIENT_FAILURE and there is more than one subchannel, report state CONNECTING. | ||
// - If there is at least one subchannel in Idle state, report Idle. | ||
// - Otherwise, report TRANSIENT_FAILURE. | ||
// | ||
|
@@ -417,6 +463,12 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne | |
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. | ||
cse.nums[state] += updateVal | ||
} | ||
if oldState == connectivity.Shutdown { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you ever transition from shutdown? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a new SubConn is created, we record a shutdown->idle transition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, that makes sense thanks. |
||
cse.sum++ | ||
} | ||
if newState == connectivity.Shutdown { | ||
cse.sum-- | ||
} | ||
|
||
if cse.nums[connectivity.Ready] > 0 { | ||
return connectivity.Ready | ||
|
@@ -427,6 +479,9 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne | |
if cse.nums[connectivity.Connecting] > 0 { | ||
return connectivity.Connecting | ||
} | ||
if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 { | ||
return connectivity.Connecting | ||
} | ||
if cse.nums[connectivity.Idle] > 0 { | ||
return connectivity.Idle | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete (there are at least a few of these in here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done