Skip to content

Commit

Permalink
[lrs_stream_report] c3
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 22, 2020
1 parent 0e93130 commit e199a5a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 44 deletions.
2 changes: 1 addition & 1 deletion xds/internal/balancer/edsbalancer/eds.go
Expand Up @@ -178,7 +178,7 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
}

if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
x.logger.Infof("failed to update xds clients: %v", err)
x.logger.Warningf("failed to update xds clients: %v", err)
}

if x.config == nil {
Expand Down
29 changes: 6 additions & 23 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -51,7 +51,9 @@ type loadStoreWrapper struct {
mu sync.RWMutex
service string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name).
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
perCluster load.PerClusterReporter
}
Expand All @@ -63,10 +65,6 @@ func (lsw *loadStoreWrapper) updateServiceName(service string) {
return
}
lsw.service = service

if lsw.store == nil {
return
}
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

Expand All @@ -78,45 +76,30 @@ func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
}
lsw.store = store
lsw.perCluster = nil
if lsw.store != nil {
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallStarted(locality)
}

func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallFinished(locality, err)
}

func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallServerLoad(locality, name, val)
}

func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallDropped(category)
}

Expand Down Expand Up @@ -225,8 +208,8 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
}

// TODO: there's no long a need to read bootstrap file and create a new xds
// client. The EDS balancer should always get the xds client from
// TODO: there's no longer a need to read bootstrap file and create a new
// xds client. The EDS balancer should always get the xds client from
// attributes. Otherwise, this function should just fail. Also, xdsclient
// will be shared by multiple clients, so trying to make an xds client is
// just the wrong move.
Expand Down
24 changes: 4 additions & 20 deletions xds/internal/balancer/lrs/balancer.go
Expand Up @@ -155,7 +155,9 @@ type loadStoreWrapper struct {
cluster string
edsService string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name).
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
perCluster load.PerClusterReporter
}
Expand All @@ -168,10 +170,6 @@ func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string)
}
lsw.cluster = cluster
lsw.edsService = edsService

if lsw.store == nil {
return
}
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}

Expand All @@ -183,44 +181,30 @@ func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
}
lsw.store = store
lsw.perCluster = nil
if lsw.store != nil {
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallStarted(locality)
}

func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallFinished(locality, err)
}

func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallServerLoad(locality, name, val)
}

func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallDropped(category)
}

Expand Down

0 comments on commit e199a5a

Please sign in to comment.