Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lrs: handle multiple clusters in LRS stream #3935

Merged
merged 4 commits into from Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion xds/internal/balancer/edsbalancer/eds.go
Expand Up @@ -177,7 +177,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
return
}

x.client.handleUpdate(cfg, u.ResolverState.Attributes)
if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil {
x.logger.Warningf("failed to update xds clients: %v", err)
}

if x.config == nil {
x.config = cfg
Expand Down
5 changes: 3 additions & 2 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Expand Up @@ -688,9 +688,10 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// be used.
loadStore := load.NewStore()
lsWrapper := &loadStoreWrapper{}
lsWrapper.update(loadStore, testClusterNames[0])
lsWrapper.updateServiceName(testClusterNames[0])
lsWrapper.updateLoadStore(loadStore)
cw := &xdsClientWrapper{
load: lsWrapper,
loadWrapper: lsWrapper,
}

cc := testutils.NewTestClientConn(t)
Expand Down
82 changes: 51 additions & 31 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -19,6 +19,7 @@
package edsbalancer

import (
"fmt"
"sync"

"google.golang.org/grpc"
Expand All @@ -35,8 +36,7 @@ import (
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
LoadStore() *load.Store
ReportLoad(server string, clusterName string) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}

Expand All @@ -48,23 +48,37 @@ var (
)

type loadStoreWrapper struct {
mu sync.RWMutex
mu sync.RWMutex
service string
// Both store and perCluster will be nil if load reporting is disabled (EDS
// response doesn't have LRS server name). Note that methods on Store and
// perCluster all handle nil, so there's no need to check nil before calling
// them.
store *load.Store
service string
perCluster load.PerClusterReporter
}

func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
func (lsw *loadStoreWrapper) updateServiceName(service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store && service == lsw.service {
if lsw.service == service {
return
}
lsw.store = store
lsw.service = service
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = nil
lsw.perCluster = lsw.store.PerCluster(lsw.service, "")
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
Expand Down Expand Up @@ -102,7 +116,9 @@ type xdsClientWrapper struct {
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface

load *loadStoreWrapper
// loadWrapper is a wrapper with loadOriginal, with clusterName and
// edsServiceName. It's used children to report loads.
loadWrapper *loadStoreWrapper
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
Expand All @@ -127,7 +143,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb
logger: logger,
newEDSUpdate: newEDSUpdate,
bbo: bbo,
load: &loadStoreWrapper{},
loadWrapper: &loadStoreWrapper{},
}
}

Expand Down Expand Up @@ -168,12 +184,12 @@ func (c *xdsClientWrapper) replaceXDSClient(newClient xdsClientInterface, newBal
// the balancerName (from bootstrap file or from service config) changed.
// - if balancer names are the same, do nothing, and return false
// - if balancer names are different, create new one, and return true
func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) bool {
func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) (bool, error) {
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
// This will also clear balancerName, to indicate that client is
// from attributes.
return c.replaceXDSClient(clientFromAttr, "")
return c.replaceXDSClient(clientFromAttr, ""), nil
}
}

Expand All @@ -184,24 +200,28 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
}

if c.balancerName == clientConfig.BalancerName {
return false
return false, nil
}

var dopts []grpc.DialOption
if dialer := c.bbo.Dialer; dialer != nil {
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
}

// TODO: there's no longer a need to read bootstrap file and create a new
// xds client. The EDS balancer should always get the xds client from
// attributes. Otherwise, this function should just fail. Also, xdsclient
// will be shared by multiple clients, so trying to make an xds client is
// just the wrong move.
newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts})
if err != nil {
// This should never fail. xdsclientnew does a non-blocking dial, and
// all the config passed in should be validated.
//
// This could leave c.xdsClient as nil if this is the first update.
c.logger.Warningf("eds: failed to create xdsClient, error: %v", err)
return false
return false, fmt.Errorf("eds: failed to create xdsClient, error: %v", err)
}
return c.replaceXDSClient(newClient, clientConfig.BalancerName)
return c.replaceXDSClient(newClient, clientConfig.BalancerName), nil
}

// startEndpointsWatch starts the EDS watch. Caller can call this when the
Expand All @@ -214,10 +234,6 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (c *xdsClientWrapper) startEndpointsWatch() {
if c.xdsClient == nil {
return
}

if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
Expand All @@ -238,31 +254,32 @@ func (c *xdsClientWrapper) startEndpointsWatch() {
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
if c.xdsClient == nil {
c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed")
return
}
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store {
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.loadReportServer = loadReportServer
var loadStore *load.Store
if c.loadReportServer != nil {
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer)
}
return loadStore
}

func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil || c.load.store == nil {
if c == nil {
return nil
}
return c.load
return c.loadWrapper
}

// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
clientChanged := c.updateXDSClient(config, attr)
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) error {
clientChanged, err := c.updateXDSClient(config, attr)
if err != nil {
return err
}

// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
Expand All @@ -277,14 +294,17 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
c.loadWrapper.updateServiceName(c.edsServiceName)
}

// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
loadStore := c.startLoadReport(config.LrsLoadReportingServerName)
c.loadWrapper.updateLoadStore(loadStore)
}

easwars marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (c *xdsClientWrapper) cancelWatch() {
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Expand Up @@ -66,7 +66,7 @@ func (s) TestXDSLoadReporting(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" || got.Cluster != testEDSClusterName {
t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName)
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server)
}
}