diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 6b124c27071..a148db0e42e 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -177,7 +177,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { return } - x.client.handleUpdate(cfg, u.ResolverState.Attributes) + if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil { + x.logger.Warningf("failed to update xds clients: %v", err) + } if x.config == nil { x.config = cfg diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index a56acb7e293..292ea4f80a7 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -688,9 +688,10 @@ func (s) TestEDS_LoadReport(t *testing.T) { // be used. loadStore := load.NewStore() lsWrapper := &loadStoreWrapper{} - lsWrapper.update(loadStore, testClusterNames[0]) + lsWrapper.updateServiceName(testClusterNames[0]) + lsWrapper.updateLoadStore(loadStore) cw := &xdsClientWrapper{ - load: lsWrapper, + loadWrapper: lsWrapper, } cc := testutils.NewTestClientConn(t) diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go index f22c1624a3b..fe4e996a7b5 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go @@ -19,6 +19,7 @@ package edsbalancer import ( + "fmt" "sync" "google.golang.org/grpc" @@ -35,8 +36,7 @@ import ( // balancer. It's defined so we can override xdsclientNew function in tests. type xdsClientInterface interface { WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func()) - LoadStore() *load.Store - ReportLoad(server string, clusterName string) (cancel func()) + ReportLoad(server string) (loadStore *load.Store, cancel func()) Close() } @@ -48,23 +48,37 @@ var ( ) type loadStoreWrapper struct { - mu sync.RWMutex + mu sync.RWMutex + service string + // Both store and perCluster will be nil if load reporting is disabled (EDS + // response doesn't have LRS server name). Note that methods on Store and + // perCluster all handle nil, so there's no need to check nil before calling + // them. store *load.Store - service string perCluster load.PerClusterReporter } -func (lsw *loadStoreWrapper) update(store *load.Store, service string) { +func (lsw *loadStoreWrapper) updateServiceName(service string) { lsw.mu.Lock() defer lsw.mu.Unlock() - if store == lsw.store && service == lsw.service { + if lsw.service == service { return } - lsw.store = store lsw.service = service lsw.perCluster = lsw.store.PerCluster(lsw.service, "") } +func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = nil + lsw.perCluster = lsw.store.PerCluster(lsw.service, "") +} + func (lsw *loadStoreWrapper) CallStarted(locality string) { lsw.mu.RLock() defer lsw.mu.RUnlock() @@ -102,7 +116,9 @@ type xdsClientWrapper struct { // xdsClient could come from attributes, or created with balancerName. xdsClient xdsClientInterface - load *loadStoreWrapper + // loadWrapper is a wrapper with loadOriginal, with clusterName and + // edsServiceName. It's used children to report loads. + loadWrapper *loadStoreWrapper // edsServiceName is the edsServiceName currently being watched, not // necessary the edsServiceName from service config. // @@ -127,7 +143,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb logger: logger, newEDSUpdate: newEDSUpdate, bbo: bbo, - load: &loadStoreWrapper{}, + loadWrapper: &loadStoreWrapper{}, } } @@ -168,12 +184,12 @@ func (c *xdsClientWrapper) replaceXDSClient(newClient xdsClientInterface, newBal // the balancerName (from bootstrap file or from service config) changed. // - if balancer names are the same, do nothing, and return false // - if balancer names are different, create new one, and return true -func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) bool { +func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) (bool, error) { if attr != nil { if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil { // This will also clear balancerName, to indicate that client is // from attributes. - return c.replaceXDSClient(clientFromAttr, "") + return c.replaceXDSClient(clientFromAttr, ""), nil } } @@ -184,7 +200,7 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A } if c.balancerName == clientConfig.BalancerName { - return false + return false, nil } var dopts []grpc.DialOption @@ -192,16 +208,20 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)} } + // TODO: there's no longer a need to read bootstrap file and create a new + // xds client. The EDS balancer should always get the xds client from + // attributes. Otherwise, this function should just fail. Also, xdsclient + // will be shared by multiple clients, so trying to make an xds client is + // just the wrong move. newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts}) if err != nil { // This should never fail. xdsclientnew does a non-blocking dial, and // all the config passed in should be validated. // // This could leave c.xdsClient as nil if this is the first update. - c.logger.Warningf("eds: failed to create xdsClient, error: %v", err) - return false + return false, fmt.Errorf("eds: failed to create xdsClient, error: %v", err) } - return c.replaceXDSClient(newClient, clientConfig.BalancerName) + return c.replaceXDSClient(newClient, clientConfig.BalancerName), nil } // startEndpointsWatch starts the EDS watch. Caller can call this when the @@ -214,10 +234,6 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A // This usually means load report needs to be restarted, but this function does // NOT do that. Caller needs to call startLoadReport separately. func (c *xdsClientWrapper) startEndpointsWatch() { - if c.xdsClient == nil { - return - } - if c.cancelEndpointsWatch != nil { c.cancelEndpointsWatch() } @@ -238,31 +254,32 @@ func (c *xdsClientWrapper) startEndpointsWatch() { // Caller can cal this when the loadReportServer name changes, but // edsServiceName doesn't (so we only need to restart load reporting, not EDS // watch). -func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) { - if c.xdsClient == nil { - c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed") - return - } +func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store { if c.cancelLoadReport != nil { c.cancelLoadReport() } c.loadReportServer = loadReportServer + var loadStore *load.Store if c.loadReportServer != nil { - c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName) + loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer) } + return loadStore } func (c *xdsClientWrapper) loadStore() load.PerClusterReporter { - if c == nil || c.load.store == nil { + if c == nil { return nil } - return c.load + return c.loadWrapper } // handleUpdate applies the service config and attributes updates to the client, // including updating the xds_client to use, and updating the EDS name to watch. -func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) { - clientChanged := c.updateXDSClient(config, attr) +func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) error { + clientChanged, err := c.updateXDSClient(config, attr) + if err != nil { + return err + } // Need to restart EDS watch when one of the following happens: // - the xds_client is updated @@ -277,14 +294,17 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr // // This is OK for now, because we don't actually expect edsServiceName // to change. Fix this (a bigger change) will happen later. - c.load.update(c.xdsClient.LoadStore(), c.edsServiceName) + c.loadWrapper.updateServiceName(c.edsServiceName) } // Only need to restart load reporting when: // - the loadReportServer name changed if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { - c.startLoadReport(config.LrsLoadReportingServerName) + loadStore := c.startLoadReport(config.LrsLoadReportingServerName) + c.loadWrapper.updateLoadStore(loadStore) } + + return nil } func (c *xdsClientWrapper) cancelWatch() { diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go index 8d888ec6f3b..955f54401c8 100644 --- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go +++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go @@ -66,7 +66,7 @@ func (s) TestXDSLoadReporting(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != "" || got.Cluster != testEDSClusterName { - t.Fatalf("xdsClient.ReportLoad called with {%v, %v}: want {\"\", %v}", got.Server, got.Cluster, testEDSClusterName) + if got.Server != "" { + t.Fatalf("xdsClient.ReportLoad called with {%v}: want {\"\"}", got.Server) } } diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 1361fb15728..f8e7673f7d8 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -80,7 +80,9 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // Update load reporting config or xds client. This needs to be done before // updating the child policy because we need the loadStore from the updated // client to be passed to the ccWrapper. - b.client.update(newConfig, s.ResolverState.Attributes) + if err := b.client.update(newConfig, s.ResolverState.Attributes); err != nil { + return err + } // If child policy is a different type, recreate the sub-balancer. if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { @@ -144,31 +146,44 @@ func (ccw *ccWrapper) UpdateState(s balancer.State) { // xdsClientInterface contains only the xds_client methods needed by LRS // balancer. It's defined so we can override xdsclient in tests. type xdsClientInterface interface { - LoadStore() *load.Store - ReportLoad(server string, clusterName string) func() + ReportLoad(server string) (*load.Store, func()) Close() } type loadStoreWrapper struct { mu sync.RWMutex - store *load.Store cluster string edsService string + // Both store and perCluster will be nil if load reporting is disabled (EDS + // response doesn't have LRS server name). Note that methods on Store and + // perCluster all handle nil, so there's no need to check nil before calling + // them. + store *load.Store perCluster load.PerClusterReporter } -func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) { +func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) { lsw.mu.Lock() defer lsw.mu.Unlock() - if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService { + if cluster == lsw.cluster && edsService == lsw.edsService { return } - lsw.store = store lsw.cluster = cluster lsw.edsService = edsService lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) } +func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = nil + lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) +} + func (lsw *loadStoreWrapper) CallStarted(locality string) { lsw.mu.RLock() defer lsw.mu.RUnlock() @@ -199,44 +214,62 @@ type xdsClientWrapper struct { clusterName string edsServiceName string lrsServerName string - load *loadStoreWrapper + // loadWrapper is a wrapper with loadOriginal, with clusterName and + // edsServiceName. It's used children to report loads. + loadWrapper *loadStoreWrapper } func newXDSClientWrapper() *xdsClientWrapper { return &xdsClientWrapper{ - load: &loadStoreWrapper{}, + loadWrapper: &loadStoreWrapper{}, } } // update checks the config and xdsclient, and decides whether it needs to // restart the load reporting stream. -func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) { +func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error { var ( - restartLoadReport bool - updateLoadStore bool + restartLoadReport bool + updateLoadClusterAndService bool ) - if attr != nil { - if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil { - if w.c != clientFromAttr { - // xds client is different, restart. - restartLoadReport = true - updateLoadStore = true - w.c = clientFromAttr - } - } + + if attr == nil { + return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil") + } + clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface) + if clientFromAttr == nil { + return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes") + } + + if w.c != clientFromAttr { + // xds client is different, restart. + restartLoadReport = true + w.c = clientFromAttr } // ClusterName is different, restart. ClusterName is from ClusterName and // EdsServiceName. if w.clusterName != newConfig.ClusterName { - updateLoadStore = true + updateLoadClusterAndService = true w.clusterName = newConfig.ClusterName } if w.edsServiceName != newConfig.EdsServiceName { - updateLoadStore = true + updateLoadClusterAndService = true w.edsServiceName = newConfig.EdsServiceName } + if updateLoadClusterAndService { + // This updates the clusterName and serviceName that will reported for the + // loads. The update here is too early, the perfect timing is when the + // picker is updated with the new connection. But from this balancer's point + // of view, it's impossible to tell. + // + // On the other hand, this will almost never happen. Each LRS policy + // shouldn't get updated config. The parent should do a graceful switch when + // the clusterName or serviceName is changed. + w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName) + } + if w.lrsServerName != newConfig.LrsLoadReportingServerName { // LrsLoadReportingServerName is different, load should be report to a // different server, restart. @@ -244,34 +277,23 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut w.lrsServerName = newConfig.LrsLoadReportingServerName } - // This updates the clusterName and serviceName that will reported for the - // loads. The update here is too early, the perfect timing is when the - // picker is updated with the new connection. But from this balancer's point - // of view, it's impossible to tell. - // - // On the other hand, this will almost never happen. Each LRS policy - // shouldn't get updated config. The parent should do a graceful switch when - // the clusterName or serviceName is changed. - if updateLoadStore { - w.load.update(w.c.LoadStore(), w.clusterName, w.edsServiceName) - } - if restartLoadReport { if w.cancelLoadReport != nil { w.cancelLoadReport() w.cancelLoadReport = nil } + var loadStore *load.Store if w.c != nil { - w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName) + loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) } + w.loadWrapper.updateLoadStore(loadStore) } + + return nil } func (w *xdsClientWrapper) loadStore() load.PerClusterReporter { - if w.load.store == nil { - return nil - } - return w.load + return w.loadWrapper } func (w *xdsClientWrapper) close() { diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go index 789cfea0c00..38dd573ef14 100644 --- a/xds/internal/balancer/lrs/balancer_test.go +++ b/xds/internal/balancer/lrs/balancer_test.go @@ -84,8 +84,8 @@ func TestLoadReporting(t *testing.T) { if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - if got.Server != testLRSServerName || got.Cluster != testClusterName { - t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.Server, got.Cluster, testLRSServerName, testClusterName) + if got.Server != testLRSServerName { + t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerName) } sc1 := <-cc.NewSubConnCh diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 0d61963e1b7..34785692797 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -30,6 +30,7 @@ import ( v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" "github.com/golang/protobuf/proto" + "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc" "google.golang.org/grpc/internal/backoff" @@ -39,7 +40,6 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/client/bootstrap" - "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/version" ) @@ -78,9 +78,6 @@ type BuildOptions struct { // Backoff returns the amount of time to backoff before retrying broken // streams. Backoff func(int) time.Duration - // LoadStore contains load reports which need to be pushed to the management - // server. - LoadStore *load.Store // Logger provides enhanced logging capabilities. Logger *grpclog.PrefixLogger } @@ -99,6 +96,12 @@ type APIClientBuilder interface { // APIClient represents the functionality provided by transport protocol // version specific implementations of the xDS client. +// +// TODO: unexport this interface and all the methods after the PR to make +// xdsClient sharable by clients. AddWatch and RemoveWatch are exported for +// v2/v3 to override because they need to keep track of LDS name for RDS to use. +// After the share xdsClient change, that's no longer necessary. After that, we +// will still keep this interface for testing purposes. type APIClient interface { // AddWatch adds a watch for an xDS resource given its type and name. AddWatch(ResourceType, string) @@ -107,21 +110,18 @@ type APIClient interface { // given its type and name. RemoveWatch(ResourceType, string) - // ReportLoad starts an LRS stream to periodically report load using the + // reportLoad starts an LRS stream to periodically report load using the // provided ClientConn, which represent a connection to the management // server. - ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) + reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) // Close cleans up resources allocated by the API client. Close() } -// LoadReportingOptions contains configuration knobs for reporting load data. -type LoadReportingOptions struct { - // ClusterName is the cluster name for which load is being reported. - ClusterName string - // TargetName is the target of the parent ClientConn. - TargetName string +// loadReportingOptions contains configuration knobs for reporting load data. +type loadReportingOptions struct { + loadStore *load.Store } // UpdateHandler receives and processes (by taking appropriate actions) xDS @@ -328,7 +328,6 @@ type Client struct { opts Options cc *grpc.ClientConn // Connection to the xDS server apiClient APIClient - loadStore *load.Store logger *grpclog.PrefixLogger @@ -342,6 +341,11 @@ type Client struct { cdsCache map[string]ClusterUpdate edsWatchers map[string]map[*watchInfo]bool edsCache map[string]EndpointsUpdate + + // Changes to map lrsClients and the lrsClient inside the map need to be + // protected by lrsMu. + lrsMu sync.Mutex + lrsClients map[string]*lrsClient } // New returns a new xdsClient configured with opts. @@ -382,9 +386,8 @@ func New(opts Options) (*Client, error) { } c := &Client{ - done: grpcsync.NewEvent(), - opts: opts, - loadStore: load.NewStore(), + done: grpcsync.NewEvent(), + opts: opts, updateCh: buffer.NewUnbounded(), ldsWatchers: make(map[string]map[*watchInfo]bool), @@ -395,6 +398,7 @@ func New(opts Options) (*Client, error) { cdsCache: make(map[string]ClusterUpdate), edsWatchers: make(map[string]map[*watchInfo]bool), edsCache: make(map[string]EndpointsUpdate), + lrsClients: make(map[string]*lrsClient), } cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) @@ -410,7 +414,6 @@ func New(opts Options) (*Client, error) { Parent: c, NodeProto: opts.Config.NodeProto, Backoff: backoff.DefaultExponential.Backoff, - LoadStore: c.loadStore, Logger: c.logger, }) if err != nil { diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go index e52c3b93f90..e91316b9fe4 100644 --- a/xds/internal/client/client_loadreport.go +++ b/xds/internal/client/client_loadreport.go @@ -24,53 +24,116 @@ import ( "google.golang.org/grpc/xds/internal/client/load" ) -// NodeMetadataHostnameKey is the metadata key for specifying the target name in -// the node proto of an LRS request. -const NodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME" +// ReportLoad starts an load reporting stream to the given server. If the server +// is not an empty string, and is different from the xds server, a new +// ClientConn will be created. +// +// The same options used for creating the Client will be used (including +// NodeProto, and dial options if necessary). +// +// It returns a Store for the user to report loads, a function to cancel the +// load reporting stream. +func (c *Client) ReportLoad(server string) (*load.Store, func()) { + c.lrsMu.Lock() + defer c.lrsMu.Unlock() + + // If there's already a client to this server, use it. Otherwise, create + // one. + lrsC, ok := c.lrsClients[server] + if !ok { + lrsC = newLRSClient(c, server) + c.lrsClients[server] = lrsC + } + + store := lrsC.ref() + return store, func() { + // This is a callback, need to hold lrsMu. + c.lrsMu.Lock() + defer c.lrsMu.Unlock() + if lrsC.unRef() { + // Delete the lrsClient from map if this is the last reference. + delete(c.lrsClients, server) + } + } +} + +// lrsClient maps to one lrsServer. It contains: +// - a ClientConn to this server (only if it's different from the xds server) +// - a load.Store that contains loads only for this server +type lrsClient struct { + parent *Client + server string -// LoadStore returns the underlying load data store used by the xDS client. -func (c *Client) LoadStore() *load.Store { - return c.loadStore + cc *grpc.ClientConn // nil if the server is same as the xds server + refCount int + cancelStream func() + loadStore *load.Store +} + +// newLRSClient creates a new LRS stream to the server. +func newLRSClient(parent *Client, server string) *lrsClient { + return &lrsClient{ + parent: parent, + server: server, + refCount: 0, + } } -// ReportLoad sends the load of the given clusterName to the given server. If -// the server is not an empty string, and is different from the xds server, a -// new ClientConn will be created. +// ref increments the refCount. If this is the first ref, it starts the LRS stream. // -// The same options used for creating the Client will be used (including -// NodeProto, and dial options if necessary). +// Not thread-safe, caller needs to synchronize. +func (lrsC *lrsClient) ref() *load.Store { + lrsC.refCount++ + if lrsC.refCount == 1 { + lrsC.startStream() + } + return lrsC.loadStore +} + +// unRef decrements the refCount, and closes the stream if refCount reaches 0 +// (and close the cc if cc is not xDS cc). It returns whether refCount reached 0 +// after this call. // -// It returns a function to cancel the load reporting stream. If server is -// different from xds server, the ClientConn will also be closed. -func (c *Client) ReportLoad(server string, clusterName string) func() { - var ( - cc *grpc.ClientConn - closeCC bool - ) - c.logger.Infof("Starting load report to server: %s", server) - if server == "" || server == c.opts.Config.BalancerName { - cc = c.cc +// Not thread-safe, caller needs to synchronize. +func (lrsC *lrsClient) unRef() (closed bool) { + lrsC.refCount-- + if lrsC.refCount != 0 { + return false + } + lrsC.parent.logger.Infof("Stopping load report to server: %s", lrsC.server) + lrsC.cancelStream() + if lrsC.cc != nil { + lrsC.cc.Close() + } + return true +} + +// startStream starts the LRS stream to the server. If server is not the same +// xDS server from the parent, it also creates a ClientConn. +func (lrsC *lrsClient) startStream() { + var cc *grpc.ClientConn + + lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server) + if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName { + // Reuse the xDS client if server is the same. + cc = lrsC.parent.cc } else { - c.logger.Infof("LRS server is different from xDS server, starting a new ClientConn") - dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...) - ccNew, err := grpc.Dial(server, dopts...) + lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn") + dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...) + ccNew, err := grpc.Dial(lrsC.server, dopts...) if err != nil { // An error from a non-blocking dial indicates something serious. - c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err) - return func() {} + lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) + return } cc = ccNew - closeCC = true - } - ctx, cancel := context.WithCancel(context.Background()) - go c.apiClient.ReportLoad(ctx, c.cc, LoadReportingOptions{ - ClusterName: clusterName, - TargetName: c.opts.TargetName, - }) - return func() { - cancel() - if closeCC { - cc.Close() - } + lrsC.cc = ccNew } + + var ctx context.Context + ctx, lrsC.cancelStream = context.WithCancel(context.Background()) + + // Create the store and stream. + lrsC.loadStore = load.NewStore() + go lrsC.parent.apiClient.reportLoad(ctx, cc, loadReportingOptions{loadStore: lrsC.loadStore}) } diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go new file mode 100644 index 00000000000..d426247c10d --- /dev/null +++ b/xds/internal/client/client_loadreport_test.go @@ -0,0 +1,148 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client_test + +import ( + "context" + "testing" + "time" + + v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" + durationpb "github.com/golang/protobuf/ptypes/duration" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" + "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/testing/protocmp" + + _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. +) + +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestLRSClient(t *testing.T) { + fs, sCleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("failed to start fake xDS server: %v", err) + } + defer sCleanup() + + xdsC, err := client.New(client.Options{ + Config: bootstrap.Config{ + BalancerName: fs.Address, + Creds: grpc.WithInsecure(), + NodeProto: &v2corepb.Node{}, + TransportAPI: version.TransportV2, + }, + }) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + defer xdsC.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if u, err := fs.NewConnChan.Receive(ctx); err != nil { + t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) + } + + // Report to the same address should not create new ClientConn. + store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) + defer lrsCancel1() + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded { + t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) + } + + fs2, sCleanup2, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("failed to start fake xDS server: %v", err) + } + defer sCleanup2() + + // Report to a different address should create new ClientConn. + store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) + defer lrsCancel2() + if u, err := fs2.NewConnChan.Receive(ctx); err != nil { + t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) + } + + if store1 == store2 { + t.Fatalf("got same store for different servers, want different") + } + + if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil { + t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) + } + store2.PerCluster("cluster", "eds").CallDropped("test") + + // Send one resp to the client. + fs2.LRSResponseChan <- &fakeserver.Response{ + Resp: &lrspb.LoadStatsResponse{ + SendAllClusters: true, + LoadReportingInterval: &durationpb.Duration{Nanos: 50000000}, + }, + } + + // Server should receive a req with the loads. + u, err := fs2.LRSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err) + } + receivedLoad := u.(*fakeserver.Request).Req.(*lrspb.LoadStatsRequest).ClusterStats + if len(receivedLoad) <= 0 { + t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") + } + receivedLoad[0].LoadReportInterval = nil + want := (&endpointpb.ClusterStats{ + ClusterName: "cluster", + ClusterServiceName: "eds", + TotalDroppedRequests: 1, + DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, + }) + if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" { + t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d) + } + + // Cancel this load reporting stream, server should see error canceled. + lrsCancel2() + + // Server should receive a stream canceled error. + if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled { + t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err) + } +} diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index 4a531384fff..a2a92a24f7f 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -115,7 +115,7 @@ func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName stri c.removeWatches[resourceType].Send(resourceName) } -func (c *testAPIClient) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) { +func (c *testAPIClient) reportLoad(context.Context, *grpc.ClientConn, loadReportingOptions) { } func (c *testAPIClient) Close() {} diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go index 607f26fd5e1..b286a61d638 100644 --- a/xds/internal/client/transport_helper.go +++ b/xds/internal/client/transport_helper.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc" "google.golang.org/grpc/internal/buffer" @@ -71,19 +72,21 @@ type VersionedClient interface { NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) // SendFirstLoadStatsRequest constructs and sends the first request on the - // LRS stream. This contains the node proto with appropriate metadata - // fields. - SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error + // LRS stream. + SendFirstLoadStatsRequest(s grpc.ClientStream) error // HandleLoadStatsResponse receives the first response from the server which // contains the load reporting interval and the clusters for which the // server asks the client to report load for. - HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) + // + // If the response sets SendAllClusters to true, the returned clusters is + // nil. + HandleLoadStatsResponse(s grpc.ClientStream) (clusters []string, _ time.Duration, _ error) // SendLoadStatsRequest will be invoked at regular intervals to send load // report with load data reported since the last time this method was // invoked. - SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error + SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error } // TransportHelper contains all xDS transport protocol related functionality @@ -443,9 +446,9 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea return target, rType, version, nonce, send } -// ReportLoad starts an LRS stream to report load data to the management server. +// reportLoad starts an LRS stream to report load data to the management server. // It blocks until the context is cancelled. -func (t *TransportHelper) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) { +func (t *TransportHelper) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions) { retries := 0 for { if ctx.Err() != nil { @@ -472,23 +475,23 @@ func (t *TransportHelper) ReportLoad(ctx context.Context, cc *grpc.ClientConn, o } logger.Infof("lrs: created LRS stream") - if err := t.vClient.SendFirstLoadStatsRequest(stream, opts.TargetName); err != nil { + if err := t.vClient.SendFirstLoadStatsRequest(stream); err != nil { logger.Warningf("lrs: failed to send first request: %v", err) continue } - interval, err := t.vClient.HandleLoadStatsResponse(stream, opts.ClusterName) + clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream) if err != nil { logger.Warning(err) continue } retries = 0 - t.sendLoads(ctx, stream, opts.ClusterName, interval) + t.sendLoads(ctx, stream, opts.loadStore, clusters, interval) } } -func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, clusterName string, interval time.Duration) { +func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) { tick := time.NewTicker(interval) defer tick.Stop() for { @@ -497,7 +500,7 @@ func (t *TransportHelper) sendLoads(ctx context.Context, stream grpc.ClientStrea case <-ctx.Done(): return } - if err := t.vClient.SendLoadStatsRequest(stream, clusterName); err != nil { + if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil { logger.Warning(err) return } diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go index 7b063ad4f55..433e6907d9f 100644 --- a/xds/internal/client/v2/client.go +++ b/xds/internal/client/v2/client.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/version" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" @@ -69,7 +68,6 @@ func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIC cc: cc, parent: opts.Parent, nodeProto: nodeProto, - loadStore: opts.LoadStore, logger: opts.Logger, } v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) @@ -88,7 +86,6 @@ type client struct { ctx context.Context cancelCtx context.CancelFunc parent xdsclient.UpdateHandler - loadStore *load.Store logger *grpclog.PrefixLogger // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. diff --git a/xds/internal/client/v2/loadreport.go b/xds/internal/client/v2/loadreport.go index a06dcb8e9f0..69405fcd9ad 100644 --- a/xds/internal/client/v2/loadreport.go +++ b/xds/internal/client/v2/loadreport.go @@ -26,17 +26,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/xds/internal/client/load" v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v2endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - structpb "github.com/golang/protobuf/ptypes/struct" "google.golang.org/grpc" "google.golang.org/grpc/xds/internal" - xdsclient "google.golang.org/grpc/xds/internal/client" ) +const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters" + type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) { @@ -44,7 +45,7 @@ func (v2c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) return c.StreamLoadStats(ctx) } -func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error { +func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) @@ -53,72 +54,52 @@ func (v2c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName str if node == nil { node = &v2corepb.Node{} } - if node.Metadata == nil { - node.Metadata = &structpb.Struct{} - } - if node.Metadata.Fields == nil { - node.Metadata.Fields = make(map[string]*structpb.Value) - } - node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{ - Kind: &structpb.Value_StringValue{StringValue: targetName}, - } + node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters) req := &lrspb.LoadStatsRequest{Node: node} v2c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req) return stream.Send(req) } -func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) { +func (v2c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { stream, ok := s.(lrsStream) if !ok { - return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) + return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) } resp, err := stream.Recv() if err != nil { - return 0, fmt.Errorf("lrs: failed to receive first response: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err) } v2c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp) interval, err := ptypes.Duration(resp.GetLoadReportingInterval()) if err != nil { - return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) } - // The LRS client should join the clusters it knows with the cluster - // list from response, and send loads for them. - // - // But the LRS client now only supports one cluster. TODO: extend it to - // support multiple clusters. - var clusterFoundInResponse bool - for _, c := range resp.Clusters { - if c == clusterName { - clusterFoundInResponse = true - } - } - if !clusterFoundInResponse { - return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName) - } if resp.ReportEndpointGranularity { // TODO: fixme to support per endpoint loads. - return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") + return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") } - return interval, nil + clusters := resp.Clusters + if resp.SendAllClusters { + // Return nil to send stats for all clusters. + clusters = nil + } + + return clusters, interval, nil } -func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error { +func (v2c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) } - if v2c.loadStore == nil { - return errors.New("lrs: LoadStore is not initialized") - } var clusterStats []*v2endpointpb.ClusterStats - sds := v2c.loadStore.Stats([]string{clusterName}) - for _, sd := range sds { + for _, sd := range loads { var ( droppedReqs []*v2endpointpb.ClusterStats_DroppedRequests localityStats []*v2endpointpb.UpstreamLocalityStats diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go index 5d8d7198ce2..00e74e5974e 100644 --- a/xds/internal/client/v3/client.go +++ b/xds/internal/client/v3/client.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/version" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -69,7 +68,6 @@ func newClient(cc *grpc.ClientConn, opts xdsclient.BuildOptions) (xdsclient.APIC cc: cc, parent: opts.Parent, nodeProto: nodeProto, - loadStore: opts.LoadStore, logger: opts.Logger, } v3c.ctx, v3c.cancelCtx = context.WithCancel(context.Background()) @@ -88,7 +86,6 @@ type client struct { ctx context.Context cancelCtx context.CancelFunc parent xdsclient.UpdateHandler - loadStore *load.Store logger *grpclog.PrefixLogger // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. diff --git a/xds/internal/client/v3/loadreport.go b/xds/internal/client/v3/loadreport.go index beca34c49bf..74e18632aa0 100644 --- a/xds/internal/client/v3/loadreport.go +++ b/xds/internal/client/v3/loadreport.go @@ -26,17 +26,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/xds/internal/client/load" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" - structpb "github.com/golang/protobuf/ptypes/struct" "google.golang.org/grpc" "google.golang.org/grpc/xds/internal" - xdsclient "google.golang.org/grpc/xds/internal/client" ) +const clientFeatureLRSSendAllClusters = "envoy.lrs.supports_send_all_clusters" + type lrsStream lrsgrpc.LoadReportingService_StreamLoadStatsClient func (v3c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) (grpc.ClientStream, error) { @@ -44,7 +45,7 @@ func (v3c *client) NewLoadStatsStream(ctx context.Context, cc *grpc.ClientConn) return c.StreamLoadStats(ctx) } -func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName string) error { +func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) @@ -53,72 +54,52 @@ func (v3c *client) SendFirstLoadStatsRequest(s grpc.ClientStream, targetName str if node == nil { node = &v3corepb.Node{} } - if node.Metadata == nil { - node.Metadata = &structpb.Struct{} - } - if node.Metadata.Fields == nil { - node.Metadata.Fields = make(map[string]*structpb.Value) - } - node.Metadata.Fields[xdsclient.NodeMetadataHostnameKey] = &structpb.Value{ - Kind: &structpb.Value_StringValue{StringValue: targetName}, - } + node.ClientFeatures = append(node.ClientFeatures, clientFeatureLRSSendAllClusters) req := &lrspb.LoadStatsRequest{Node: node} v3c.logger.Infof("lrs: sending init LoadStatsRequest: %v", req) return stream.Send(req) } -func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream, clusterName string) (time.Duration, error) { +func (v3c *client) HandleLoadStatsResponse(s grpc.ClientStream) ([]string, time.Duration, error) { stream, ok := s.(lrsStream) if !ok { - return 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) + return nil, 0, fmt.Errorf("lrs: Attempt to receive response on unsupported stream type: %T", s) } resp, err := stream.Recv() if err != nil { - return 0, fmt.Errorf("lrs: failed to receive first response: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to receive first response: %v", err) } v3c.logger.Infof("lrs: received first LoadStatsResponse: %+v", resp) interval, err := ptypes.Duration(resp.GetLoadReportingInterval()) if err != nil { - return 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) + return nil, 0, fmt.Errorf("lrs: failed to convert report interval: %v", err) } - // The LRS client should join the clusters it knows with the cluster - // list from response, and send loads for them. - // - // But the LRS client now only supports one cluster. TODO: extend it to - // support multiple clusters. - var clusterFoundInResponse bool - for _, c := range resp.Clusters { - if c == clusterName { - clusterFoundInResponse = true - } - } - if !clusterFoundInResponse { - return 0, fmt.Errorf("lrs: received clusters %v does not contain expected {%v}", resp.Clusters, clusterName) - } if resp.ReportEndpointGranularity { // TODO: fixme to support per endpoint loads. - return 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") + return nil, 0, errors.New("lrs: endpoint loads requested, but not supported by current implementation") } - return interval, nil + clusters := resp.Clusters + if resp.SendAllClusters { + // Return nil to send stats for all clusters. + clusters = nil + } + + return clusters, interval, nil } -func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, clusterName string) error { +func (v3c *client) SendLoadStatsRequest(s grpc.ClientStream, loads []*load.Data) error { stream, ok := s.(lrsStream) if !ok { return fmt.Errorf("lrs: Attempt to send request on unsupported stream type: %T", s) } - if v3c.loadStore == nil { - return errors.New("lrs: LoadStore is not initialized") - } var clusterStats []*v3endpointpb.ClusterStats - sds := v3c.loadStore.Stats([]string{clusterName}) - for _, sd := range sds { + for _, sd := range loads { var ( droppedReqs []*v3endpointpb.ClusterStats_DroppedRequests localityStats []*v3endpointpb.UpstreamLocalityStats diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index cd2710e612a..408817c1784 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -156,14 +156,12 @@ func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) error { type ReportLoadArgs struct { // Server is the name of the server to which the load is reported. Server string - // Cluster is the name of the cluster for which load is reported. - Cluster string } // ReportLoad starts reporting load about clusterName to server. -func (xdsC *Client) ReportLoad(server string, clusterName string) (cancel func()) { - xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName}) - return func() {} +func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) { + xdsC.loadReportCh.Send(ReportLoadArgs{Server: server}) + return xdsC.loadStore, func() {} } // LoadStore returns the underlying load data store. diff --git a/xds/internal/testutils/fakeserver/server.go b/xds/internal/testutils/fakeserver/server.go index 4cff72087ce..994f5308f3a 100644 --- a/xds/internal/testutils/fakeserver/server.go +++ b/xds/internal/testutils/fakeserver/server.go @@ -203,10 +203,10 @@ type lrsServer struct { func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error { req, err := s.Recv() + lrsS.reqChan.Send(&Request{req, err}) if err != nil { return err } - lrsS.reqChan.Send(&Request{req, err}) select { case r := <-lrsS.respChan: @@ -222,12 +222,12 @@ func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoad for { req, err := s.Recv() + lrsS.reqChan.Send(&Request{req, err}) if err != nil { if err == io.EOF { return nil } return err } - lrsS.reqChan.Send(&Request{req, err}) } }