New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lrs: add a layer for clusters in load store #3880
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
package edsbalancer | ||
|
||
import ( | ||
"sync" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/attributes" | ||
"google.golang.org/grpc/balancer" | ||
|
@@ -45,7 +47,44 @@ var ( | |
bootstrapConfigNew = bootstrap.NewConfig | ||
) | ||
|
||
// xdsClientWrapper is responsible for getting the xds client from attributes or | ||
type loadStoreWrapper struct { | ||
mu sync.RWMutex | ||
store *load.Store | ||
service string | ||
} | ||
|
||
func (lsw *loadStoreWrapper) update(store *load.Store, service string) { | ||
lsw.mu.Lock() | ||
defer lsw.mu.Unlock() | ||
lsw.store = store | ||
lsw.service = service | ||
} | ||
|
||
func (lsw *loadStoreWrapper) CallStarted(locality string) { | ||
lsw.mu.RLock() | ||
defer lsw.mu.RUnlock() | ||
lsw.store.PerCluster(lsw.service, "").CallStarted(locality) | ||
} | ||
|
||
func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { | ||
lsw.mu.RLock() | ||
defer lsw.mu.RUnlock() | ||
lsw.store.PerCluster(lsw.service, "").CallFinished(locality, err) | ||
} | ||
|
||
func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { | ||
lsw.mu.RLock() | ||
defer lsw.mu.RUnlock() | ||
lsw.store.PerCluster(lsw.service, "").CallServerLoad(locality, name, val) | ||
} | ||
|
||
func (lsw *loadStoreWrapper) CallDropped(category string) { | ||
lsw.mu.RLock() | ||
defer lsw.mu.RUnlock() | ||
lsw.store.PerCluster(lsw.service, "").CallDropped(category) | ||
} | ||
|
||
// xdsclientWrapper is responsible for getting the xds client from attributes or | ||
// creating a new xds client, and start watching EDS. The given callbacks will | ||
// be called with EDS updates or errors. | ||
type xdsClientWrapper struct { | ||
|
@@ -58,6 +97,7 @@ type xdsClientWrapper struct { | |
// xdsClient could come from attributes, or created with balancerName. | ||
xdsClient xdsClientInterface | ||
|
||
load *loadStoreWrapper | ||
// edsServiceName is the edsServiceName currently being watched, not | ||
// necessary the edsServiceName from service config. | ||
// | ||
|
@@ -82,6 +122,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb | |
logger: logger, | ||
newEDSUpdate: newEDSUpdate, | ||
bbo: bbo, | ||
load: &loadStoreWrapper{}, | ||
} | ||
} | ||
|
||
|
@@ -167,12 +208,11 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A | |
// | ||
// This usually means load report needs to be restarted, but this function does | ||
// NOT do that. Caller needs to call startLoadReport separately. | ||
func (c *xdsClientWrapper) startEndpointsWatch(nameToWatch string) { | ||
func (c *xdsClientWrapper) startEndpointsWatch() { | ||
if c.xdsClient == nil { | ||
return | ||
} | ||
|
||
c.edsServiceName = nameToWatch | ||
if c.cancelEndpointsWatch != nil { | ||
c.cancelEndpointsWatch() | ||
} | ||
|
@@ -193,7 +233,7 @@ func (c *xdsClientWrapper) startEndpointsWatch(nameToWatch string) { | |
// Caller can cal this when the loadReportServer name changes, but | ||
// edsServiceName doesn't (so we only need to restart load reporting, not EDS | ||
// watch). | ||
func (c *xdsClientWrapper) startLoadReport(edsServiceNameBeingWatched string, loadReportServer *string) { | ||
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) { | ||
if c.xdsClient == nil { | ||
c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed") | ||
return | ||
|
@@ -203,54 +243,42 @@ func (c *xdsClientWrapper) startLoadReport(edsServiceNameBeingWatched string, lo | |
} | ||
c.loadReportServer = loadReportServer | ||
if c.loadReportServer != nil { | ||
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, edsServiceNameBeingWatched) | ||
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName) | ||
} | ||
} | ||
|
||
func (c *xdsClientWrapper) loadStore() *load.Store { | ||
if c == nil || c.xdsClient == nil { | ||
func (c *xdsClientWrapper) loadStore() load.PerClusterReporter { | ||
if c == nil || c.load.store == nil { | ||
return nil | ||
} | ||
return c.xdsClient.LoadStore() | ||
return c.load | ||
} | ||
|
||
// handleUpdate applies the service config and attributes updates to the client, | ||
// including updating the xds_client to use, and updating the EDS name to watch. | ||
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) { | ||
clientChanged := c.updateXDSClient(config, attr) | ||
|
||
var ( | ||
restartEndpointsWatch bool | ||
restartLoadReport bool | ||
) | ||
|
||
// The clusterName to watch should come from CDS response, via service | ||
// config. If it's an empty string, fallback user's dial target. | ||
nameToWatch := config.EDSServiceName | ||
if nameToWatch == "" { | ||
c.logger.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target") | ||
nameToWatch = c.bbo.Target.Endpoint | ||
} | ||
|
||
// Need to restart EDS watch when one of the following happens: | ||
// - the xds_client is updated | ||
// - the xds_client didn't change, but the edsServiceName changed | ||
// | ||
// Only need to restart load reporting when: | ||
// - no need to restart EDS, but loadReportServer name changed | ||
if clientChanged || c.edsServiceName != nameToWatch { | ||
restartEndpointsWatch = true | ||
restartLoadReport = true | ||
} else if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { | ||
restartLoadReport = true | ||
} | ||
|
||
if restartEndpointsWatch { | ||
c.startEndpointsWatch(nameToWatch) | ||
if clientChanged || c.edsServiceName != config.EDSServiceName { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
c.edsServiceName = config.EDSServiceName | ||
c.startEndpointsWatch() | ||
// TODO: this update for the LRS service name is too early. It should | ||
// only apply to the new EDS response. But this is applied to the RPCs | ||
// before the new EDS response. To fully fix this, the EDS balancer | ||
// needs to do a graceful switch to another EDS implementation. | ||
// | ||
// This is OK for now, because we don't actually expect edsServiceName | ||
// to change. Fix this (a bigger change) will happen later. | ||
c.load.update(c.xdsClient.LoadStore(), c.edsServiceName) | ||
} | ||
|
||
if restartLoadReport { | ||
c.startLoadReport(nameToWatch, config.LrsLoadReportingServerName) | ||
// Only need to restart load reporting when: | ||
// - the loadReportServer name changed | ||
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { | ||
c.startLoadReport(config.LrsLoadReportingServerName) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this type?
If we cached the
PerClusterStore
in thexdsClientWrapper
whenever the LRS stream is to be updated (either the client changed, or the LRS server name changed or the edsServiceName changed), then it will simplify things, wouldn't it?Also, I feel that caching the
PerClusterStore
would reduce lock contention, and theStore
can also be simplified by not having to require aRWLock
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When clusterName/serviceName is updated, the cached
PerClusterStore
needs to be updated, and passed to balancer group. But there's no way to update the load store in a balancer group.