Skip to content

Commit

Permalink
lrs: handle multiple clusters in LRS stream (#3935)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 26, 2020
1 parent e8e2287 commit a223251
Show file tree
Hide file tree
Showing 17 changed files with 451 additions and 235 deletions.
4 changes: 3 additions & 1 deletion xds/internal/balancer/edsbalancer/eds.go
Expand Up @@ -177,7 +177,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
return
}

x.client.handleUpdate(cfg, u.ResolverState.Attributes)
if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
x.logger.Warningf("failed to update xds clients: %v", err)
}

if x.config == nil {
x.config = cfg
Expand Down
5 changes: 3 additions & 2 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Expand Up @@ -688,9 +688,10 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// be used.
loadStore := load.NewStore()
lsWrapper := &loadStoreWrapper{}
lsWrapper.update(loadStore, testClusterNames[0])
lsWrapper.updateServiceName(testClusterNames[0])
lsWrapper.updateLoadStore(loadStore)
cw := &xdsClientWrapper{
load: lsWrapper,
loadWrapper: lsWrapper,
}

cc := testutils.NewTestClientConn(t)
Expand Down
82 changes: 51 additions & 31 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -19,6 +19,7 @@
package edsbalancer

import (
"fmt"
"sync"

"google.golang.org/grpc"
Expand All @@ -35,8 +36,7 @@ import (
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
LoadStore() *load.Store
ReportLoad(server string, clusterName string) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}

Expand All @@ -48,23 +48,37 @@ var (
)

type loadStoreWrapper struct {
mu sync.RWMutex
mu sync.RWMutex
service string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
service string
perCluster load.PerClusterReporter
}

func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
func (lsw *loadStoreWrapper) updateServiceName(service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store && service == lsw.service {
if lsw.service == service {
return
}
lsw.store = store
lsw.service = service
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = nil
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
Expand Down Expand Up @@ -102,7 +116,9 @@ type xdsClientWrapper struct {
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface

load *loadStoreWrapper
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
Expand All @@ -127,7 +143,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb
logger: logger,
newEDSUpdate: newEDSUpdate,
bbo: bbo,
load: &loadStoreWrapper{},
loadWrapper: &loadStoreWrapper{},
}
}

Expand Down Expand Up @@ -168,12 +184,12 @@ func (c *xdsClientWrapper) replaceXDSClient(newClient xdsClientInterface, newBal
// the balancerName (from bootstrap file or from service config) changed.
// - if balancer names are the same, do nothing, and return false
// - if balancer names are different, create new one, and return true
func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) bool {
func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) (bool, error) {
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
// This will also clear balancerName, to indicate that client is
// from attributes.
return c.replaceXDSClient(clientFromAttr, "")
return c.replaceXDSClient(clientFromAttr, ""), nil
}
}

Expand All @@ -184,24 +200,28 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
}

if c.balancerName == clientConfig.BalancerName {
return false
return false, nil
}

var dopts []grpc.DialOption
if dialer := c.bbo.Dialer; dialer != nil {
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
}

// TODO: there's no longer a need to read bootstrap file and create a new
// xds client. The EDS balancer should always get the xds client from
// attributes. Otherwise, this function should just fail. Also, xdsclient
// will be shared by multiple clients, so trying to make an xds client is
// just the wrong move.
newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts})
if err != nil {
// This should never fail. xdsclientnew does a non-blocking dial, and
// all the config passed in should be validated.
//
// This could leave c.xdsClient as nil if this is the first update.
c.logger.Warningf("eds: failed to create xdsClient, error: %v", err)
return false
return false, fmt.Errorf("eds: failed to create xdsClient, error: %v", err)
}
return c.replaceXDSClient(newClient, clientConfig.BalancerName)
return c.replaceXDSClient(newClient, clientConfig.BalancerName), nil
}

// startEndpointsWatch starts the EDS watch. Caller can call this when the
Expand All @@ -214,10 +234,6 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (c *xdsClientWrapper) startEndpointsWatch() {
if c.xdsClient == nil {
return
}

if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
Expand All @@ -238,31 +254,32 @@ func (c *xdsClientWrapper) startEndpointsWatch() {
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
if c.xdsClient == nil {
c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed")
return
}
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store {
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.loadReportServer = loadReportServer
var loadStore *load.Store
if c.loadReportServer != nil {
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
}
return loadStore
}

func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil || c.load.store == nil {
if c == nil {
return nil
}
return c.load
return c.loadWrapper
}

// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
clientChanged := c.updateXDSClient(config, attr)
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) error {
clientChanged, err := c.updateXDSClient(config, attr)
if err != nil {
return err
}

// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
Expand All @@ -277,14 +294,17 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
c.loadWrapper.updateServiceName(c.edsServiceName)
}

// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
loadStore := c.startLoadReport(config.LrsLoadReportingServerName)
c.loadWrapper.updateLoadStore(loadStore)
}

return nil
}

func (c *xdsClientWrapper) cancelWatch() {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Expand Up @@ -66,7 +66,7 @@ func (s) TestXDSLoadReporting(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" || got.Cluster != testEDSClusterName {
t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName)
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server)
}
}

0 comments on commit a223251

Please sign in to comment.