Skip to content

Commit

Permalink
[lrs_stream_report] c2
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 20, 2020
1 parent 6a428b6 commit 13f8352
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 67 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -13,4 +13,5 @@ require (
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
google.golang.org/protobuf v1.23.0
)
3 changes: 2 additions & 1 deletion xds/internal/balancer/edsbalancer/eds_impl_test.go
Expand Up @@ -688,7 +688,8 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// be used.
loadStore := load.NewStore()
lsWrapper := &loadStoreWrapper{}
lsWrapper.update(loadStore, testClusterNames[0])
lsWrapper.updateServiceName(testClusterNames[0])
lsWrapper.updateLoadStore(loadStore)
cw := &xdsClientWrapper{
loadWrapper: lsWrapper,
}
Expand Down
75 changes: 50 additions & 25 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -48,44 +48,75 @@ var (
)

type loadStoreWrapper struct {
mu sync.RWMutex
mu sync.RWMutex
service string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name).
store *load.Store
service string
perCluster load.PerClusterReporter
}

func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
func (lsw *loadStoreWrapper) updateServiceName(service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store && service == lsw.service {
if lsw.service == service {
return
}
lsw.store = store
lsw.service = service

if lsw.store == nil {
return
}
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = nil
if lsw.store != nil {
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallStarted(locality)
}

func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallFinished(locality, err)
}

func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallServerLoad(locality, name, val)
}

func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallDropped(category)
}

Expand All @@ -105,9 +136,6 @@ type xdsClientWrapper struct {
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
Expand Down Expand Up @@ -243,18 +271,20 @@ func (c *xdsClientWrapper) startEndpointsWatch() {
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store {
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.loadReportServer = loadReportServer
var loadStore *load.Store
if c.loadReportServer != nil {
c.loadOriginal, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
}
return loadStore
}

func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil || c.loadWrapper.store == nil {
if c == nil {
return nil
}
return c.loadWrapper
Expand All @@ -268,34 +298,29 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr
return err
}

var updateLoadStore bool

// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
// - the xds_client didn't change, but the edsServiceName changed
if clientChanged || c.edsServiceName != config.EDSServiceName {
c.edsServiceName = config.EDSServiceName
c.startEndpointsWatch()
updateLoadStore = true
}

// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
updateLoadStore = true
}

if updateLoadStore {
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.loadWrapper.update(c.loadOriginal, c.edsServiceName)
c.loadWrapper.updateServiceName(c.edsServiceName)
}

// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
loadStore := c.startLoadReport(config.LrsLoadReportingServerName)
c.loadWrapper.updateLoadStore(loadStore)
}

return nil
}

Expand Down
86 changes: 55 additions & 31 deletions xds/internal/balancer/lrs/balancer.go
Expand Up @@ -152,45 +152,75 @@ type xdsClientInterface interface {

type loadStoreWrapper struct {
mu sync.RWMutex
store *load.Store
cluster string
edsService string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name).
store *load.Store
perCluster load.PerClusterReporter
}

func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) {
func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService {
if cluster == lsw.cluster && edsService == lsw.edsService {
return
}
lsw.store = store
lsw.cluster = cluster
lsw.edsService = edsService

if lsw.store == nil {
return
}
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}

func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = nil
if lsw.store != nil {
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallStarted(locality)
}

func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallFinished(locality, err)
}

func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallServerLoad(locality, name, val)
}

func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster == nil {
return
}
lsw.perCluster.CallDropped(category)
}

Expand All @@ -200,9 +230,6 @@ type xdsClientWrapper struct {
clusterName string
edsServiceName string
lrsServerName string
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's
// returned by the client.
loadOriginal *load.Store
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
Expand All @@ -218,16 +245,16 @@ func newXDSClientWrapper() *xdsClientWrapper {
// restart the load reporting stream.
func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error {
var (
restartLoadReport bool
updateLoadStore bool
restartLoadReport bool
updateLoadClusterAndService bool
)

if attr == nil {
return fmt.Errorf("failed to get xdsClient from attributes: attributes is nil")
return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil")
}
clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface)
if clientFromAttr == nil {
return fmt.Errorf("failed to get xdsClient from attributes: xdsClient not found in attributes")
return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes")
}

if w.c != clientFromAttr {
Expand All @@ -239,14 +266,26 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut
// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
if w.clusterName != newConfig.ClusterName {
updateLoadStore = true
updateLoadClusterAndService = true
w.clusterName = newConfig.ClusterName
}
if w.edsServiceName != newConfig.EdsServiceName {
updateLoadStore = true
updateLoadClusterAndService = true
w.edsServiceName = newConfig.EdsServiceName
}

if updateLoadClusterAndService {
// This updates the clusterName and serviceName that will reported for the
// loads. The update here is too early, the perfect timing is when the
// picker is updated with the new connection. But from this balancer's point
// of view, it's impossible to tell.
//
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch when
// the clusterName or serviceName is changed.
w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName)
}

if w.lrsServerName != newConfig.LrsLoadReportingServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
Expand All @@ -255,36 +294,21 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut
}

if restartLoadReport {
updateLoadStore = true
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
var loadStore *load.Store
if w.c != nil {
w.loadOriginal, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName)
}
w.loadWrapper.updateLoadStore(loadStore)
}

if updateLoadStore {
// This updates the clusterName and serviceName that will reported for the
// loads. The update here is too early, the perfect timing is when the
// picker is updated with the new connection. But from this balancer's point
// of view, it's impossible to tell.
//
// On the other hand, this will almost never happen. Each LRS policy
// shouldn't get updated config. The parent should do a graceful switch when
// the clusterName or serviceName is changed.
if updateLoadStore {
w.loadWrapper.update(w.loadOriginal, w.clusterName, w.edsServiceName)
}
}
return nil
}

func (w *xdsClientWrapper) loadStore() load.PerClusterReporter {
if w.loadWrapper.store == nil {
return nil
}
return w.loadWrapper
}

Expand Down

0 comments on commit 13f8352

Please sign in to comment.