Skip to content

Commit

Permalink
resolverWrapper: remove the watcher goroutine (#2446)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan committed Nov 9, 2018
1 parent a612bb6 commit eb55fa5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 59 deletions.
23 changes: 9 additions & 14 deletions clientconn.go
Expand Up @@ -288,19 +288,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}

// Build the resolver.
cc.resolverWrapper, err = newCCResolverWrapper(cc)
rWrapper, err := newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
// Start the resolver wrapper goroutine after resolverWrapper is created.
//
// If the goroutine is started before resolverWrapper is ready, the
// following may happen: The goroutine sends updates to cc. cc forwards
// those to balancer. Balancer creates new addrConn. addrConn fails to
// connect, and calls resolveNow(). resolveNow() tries to use the non-ready
// resolverWrapper.
cc.resolverWrapper.start()

cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
Expand Down Expand Up @@ -389,13 +384,13 @@ type ClientConn struct {
csMgr *connectivityStateManager

balancerBuildOpts balancer.BuildOptions
resolverWrapper *ccResolverWrapper
blockingpicker *pickerWrapper

mu sync.RWMutex
sc ServiceConfig
scRaw string
conns map[*addrConn]struct{}
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc ServiceConfig
scRaw string
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
Expand Down
56 changes: 11 additions & 45 deletions resolver_conn_wrapper.go
Expand Up @@ -93,47 +93,6 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
return ccr, nil
}

func (ccr *ccResolverWrapper) start() {
go ccr.watcher()
}

// watcher processes address updates and service config updates sequentially.
// Otherwise, we need to resolve possible races between address and service
// config (e.g. they specify different balancer types).
func (ccr *ccResolverWrapper) watcher() {
for {
select {
case <-ccr.done:
return
default:
}

select {
case addrs := <-ccr.addrCh:
select {
case <-ccr.done:
return
default:
}
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(addrs)
}
ccr.cc.handleResolvedAddrs(addrs, nil)
case sc := <-ccr.scCh:
select {
case <-ccr.done:
return
default:
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
ccr.cc.handleServiceConfig(sc)
case <-ccr.done:
return
}
}
}

func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
ccr.resolver.ResolveNow(o)
}
Expand All @@ -146,20 +105,27 @@ func (ccr *ccResolverWrapper) close() {
// NewAddress is called by the resolver implemenetion to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
select {
case <-ccr.addrCh:
case <-ccr.done:
return
default:
}
ccr.addrCh <- addrs
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(addrs)
}
ccr.cc.handleResolvedAddrs(addrs, nil)
}

// NewServiceConfig is called by the resolver implemenetion to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
select {
case <-ccr.scCh:
case <-ccr.done:
return
default:
}
ccr.scCh <- sc
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
ccr.cc.handleServiceConfig(sc)
}

func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {
Expand Down

0 comments on commit eb55fa5

Please sign in to comment.