Skip to content

Commit

Permalink
pickerwrapper: use atomic instead of locks (grpc#7214)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 16, 2024
1 parent 0020ccf commit e22436a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 43 deletions.
4 changes: 4 additions & 0 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.cc.mu.Lock()
defer ccb.cc.mu.Unlock()
if ccb.cc.conns == nil {
// The CC has been closed; ignore this update.
return
}

ccb.mu.Lock()
if ccb.closed {
Expand Down
81 changes: 38 additions & 43 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
Expand All @@ -33,35 +33,43 @@ import (
"google.golang.org/grpc/status"
)

// pickerGeneration stores a picker and a channel used to signal that a picker
// newer than this one is available.
type pickerGeneration struct {
// picker is the picker produced by the LB policy. May be nil if a picker
// has never been produced.
picker balancer.Picker
// blockingCh is closed when the picker has been invalidated because there
// is a new one available.
blockingCh chan struct{}
}

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
// If pickerGen holds a nil pointer, the pickerWrapper is closed.
pickerGen atomic.Pointer[pickerGeneration]
statsHandlers []stats.Handler // to record blocking picker calls
}

func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
return &pickerWrapper{
blockingCh: make(chan struct{}),
pw := &pickerWrapper{
statsHandlers: statsHandlers,
}
pw.pickerGen.Store(&pickerGeneration{
blockingCh: make(chan struct{}),
})
return pw
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
// updatePicker is called by UpdateState calls from the LB policy. It
// unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
return
}
pw.picker = p
// pw.blockingCh should never be nil.
close(pw.blockingCh)
pw.blockingCh = make(chan struct{})
pw.mu.Unlock()
old := pw.pickerGen.Swap(&pickerGeneration{
picker: p,
blockingCh: make(chan struct{}),
})
close(old.blockingCh)
}

// doneChannelzWrapper performs the following:
Expand Down Expand Up @@ -98,20 +106,17 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
var lastPickErr error

for {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
pg := pw.pickerGen.Load()
if pg == nil {
return nil, balancer.PickResult{}, ErrClientConnClosing
}

if pw.picker == nil {
ch = pw.blockingCh
if pg.picker == nil {
ch = pg.blockingCh
}
if ch == pw.blockingCh {
if ch == pg.blockingCh {
// This could happen when either:
// - pw.picker is nil (the previous if condition), or
// - has called pick on the current picker.
pw.mu.Unlock()
// - we have already called pick on the current picker.
select {
case <-ctx.Done():
var errStr string
Expand Down Expand Up @@ -145,9 +150,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
}

ch = pw.blockingCh
p := pw.picker
pw.mu.Unlock()
ch = pg.blockingCh
p := pg.picker

pickResult, err := p.Pick(info)
if err != nil {
Expand Down Expand Up @@ -197,24 +201,15 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}

func (pw *pickerWrapper) close() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.done = true
close(pw.blockingCh)
old := pw.pickerGen.Swap(nil)
close(old.blockingCh)
}

// reset clears the pickerWrapper and prepares it for being used again when idle
// mode is exited.
func (pw *pickerWrapper) reset() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.blockingCh = make(chan struct{})
old := pw.pickerGen.Swap(&pickerGeneration{blockingCh: make(chan struct{})})
close(old.blockingCh)
}

// dropError is a wrapper error that indicates the LB policy wishes to drop the
Expand Down

0 comments on commit e22436a

Please sign in to comment.