Skip to content

Commit

Permalink
[ring_hash_policy] c1
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Sep 14, 2021
1 parent df65977 commit 60afa42
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
8 changes: 4 additions & 4 deletions xds/internal/balancer/ringhash/picker.go
Expand Up @@ -53,7 +53,7 @@ type handleRICSResult struct {
// The first return value indicates if the state is in Ready, Idle, Connecting
// or Shutdown. If it's true, the PickResult and error should be returned from
// Pick() as is.
func handleRICS(e *ringEntry, logger *grpclog.PrefixLogger) (handleRICSResult, bool) {
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
switch state := e.sc.effectiveState(); state {
case connectivity.Ready:
return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
Expand All @@ -73,14 +73,14 @@ func handleRICS(e *ringEntry, logger *grpclog.PrefixLogger) (handleRICSResult, b
default:
// Should never reach this. All the connectivity states are already
// handled in the cases.
logger.Errorf("SubConn has undefined connectivity state: %v", state)
p.logger.Errorf("SubConn has undefined connectivity state: %v", state)
return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true
}
}

func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
e := p.ring.pick(getRequestHash(info.Ctx))
if hr, ok := handleRICS(e, p.logger); ok {
if hr, ok := p.handleRICS(e); ok {
return hr.pr, hr.err
}
// ok was false, the entry is in transient failure.
Expand All @@ -100,7 +100,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro

// For the second SubConn, also check Ready/Idle/Connecting as if it's the
// first entry.
if hr, ok := handleRICS(e2, p.logger); ok {
if hr, ok := p.handleRICS(e2); ok {
return hr.pr, hr.err
}

Expand Down
14 changes: 8 additions & 6 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -180,8 +180,8 @@ type ringhashBalancer struct {
//
// The return value is whether the new address list is different from the
// previous. True if
// - an addresses was added
// - an addresses was removed
// - an address was added
// - an address was removed
// - an address's weight was updated
//
// Note that this function doesn't trigger SubConn connecting, so all the new
Expand Down Expand Up @@ -336,10 +336,12 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
if b.state == connectivity.TransientFailure {
scs.queueConnect()
}
// Resend the picker, there's no need to regenerate the picker because
// the ring didn't change.
sendUpdate = true
case connectivity.Connecting, connectivity.Ready:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Connecting:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Ready:
// Resend the picker, there's no need to regenerate the picker because
// the ring didn't change.
sendUpdate = true
Expand Down

0 comments on commit 60afa42

Please sign in to comment.