diff --git a/go.sum b/go.sum index 293f5490353..518f2b5e8a5 100644 --- a/go.sum +++ b/go.sum @@ -71,7 +71,6 @@ google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO50 google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= @@ -85,7 +84,6 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 3f46241a086..292ea4f80a7 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -688,7 +688,8 @@ func (s) TestEDS_LoadReport(t *testing.T) { // be used. loadStore := load.NewStore() lsWrapper := &loadStoreWrapper{} - lsWrapper.update(loadStore, testClusterNames[0]) + lsWrapper.updateServiceName(testClusterNames[0]) + lsWrapper.updateLoadStore(loadStore) cw := &xdsClientWrapper{ loadWrapper: lsWrapper, } diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go index dfb580c5f16..8d3830f6bc0 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go @@ -48,44 +48,75 @@ var ( ) type loadStoreWrapper struct { - mu sync.RWMutex + mu sync.RWMutex + service string + // Both store and perCluster will be nil if load reporting is disabled (EDS + // response doesn't have LRS server name). store *load.Store - service string perCluster load.PerClusterReporter } -func (lsw *loadStoreWrapper) update(store *load.Store, service string) { +func (lsw *loadStoreWrapper) updateServiceName(service string) { lsw.mu.Lock() defer lsw.mu.Unlock() - if store == lsw.store && service == lsw.service { + if lsw.service == service { return } - lsw.store = store lsw.service = service + + if lsw.store == nil { + return + } lsw.perCluster = lsw.store.PerCluster(lsw.service, "") } +func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = nil + if lsw.store != nil { + lsw.perCluster = lsw.store.PerCluster(lsw.service, "") + } + +} + func (lsw *loadStoreWrapper) CallStarted(locality string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallStarted(locality) } func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallFinished(locality, err) } func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallServerLoad(locality, name, val) } func (lsw *loadStoreWrapper) CallDropped(category string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallDropped(category) } @@ -105,9 +136,6 @@ type xdsClientWrapper struct { // loadWrapper is a wrapper with loadOriginal, with clusterName and // edsServiceName. It's used children to report loads. loadWrapper *loadStoreWrapper - // loadOriginal is the load.Store for reporting loads to lrsServerName. It's - // returned by the client. - loadOriginal *load.Store // edsServiceName is the edsServiceName currently being watched, not // necessary the edsServiceName from service config. // @@ -243,18 +271,20 @@ func (c *xdsClientWrapper) startEndpointsWatch() { // Caller can cal this when the loadReportServer name changes, but // edsServiceName doesn't (so we only need to restart load reporting, not EDS // watch). -func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) { +func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store { if c.cancelLoadReport != nil { c.cancelLoadReport() } c.loadReportServer = loadReportServer + var loadStore *load.Store if c.loadReportServer != nil { - c.loadOriginal, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer) + loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer) } + return loadStore } func (c *xdsClientWrapper) loadStore() load.PerClusterReporter { - if c == nil || c.loadWrapper.store == nil { + if c == nil { return nil } return c.loadWrapper @@ -268,25 +298,12 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr return err } - var updateLoadStore bool - // Need to restart EDS watch when one of the following happens: // - the xds_client is updated // - the xds_client didn't change, but the edsServiceName changed if clientChanged || c.edsServiceName != config.EDSServiceName { c.edsServiceName = config.EDSServiceName c.startEndpointsWatch() - updateLoadStore = true - } - - // Only need to restart load reporting when: - // - the loadReportServer name changed - if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { - c.startLoadReport(config.LrsLoadReportingServerName) - updateLoadStore = true - } - - if updateLoadStore { // TODO: this update for the LRS service name is too early. It should // only apply to the new EDS response. But this is applied to the RPCs // before the new EDS response. To fully fix this, the EDS balancer @@ -294,8 +311,16 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr // // This is OK for now, because we don't actually expect edsServiceName // to change. Fix this (a bigger change) will happen later. - c.loadWrapper.update(c.loadOriginal, c.edsServiceName) + c.loadWrapper.updateServiceName(c.edsServiceName) + } + + // Only need to restart load reporting when: + // - the loadReportServer name changed + if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { + loadStore := c.startLoadReport(config.LrsLoadReportingServerName) + c.loadWrapper.updateLoadStore(loadStore) } + return nil } diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 5e16dbbc463..916a48fc3ff 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -152,45 +152,75 @@ type xdsClientInterface interface { type loadStoreWrapper struct { mu sync.RWMutex - store *load.Store cluster string edsService string + // Both store and perCluster will be nil if load reporting is disabled (EDS + // response doesn't have LRS server name). + store *load.Store perCluster load.PerClusterReporter } -func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) { +func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) { lsw.mu.Lock() defer lsw.mu.Unlock() - if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService { + if cluster == lsw.cluster && edsService == lsw.edsService { return } - lsw.store = store lsw.cluster = cluster lsw.edsService = edsService + + if lsw.store == nil { + return + } lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) } +func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = nil + if lsw.store != nil { + lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) + } +} + func (lsw *loadStoreWrapper) CallStarted(locality string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallStarted(locality) } func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallFinished(locality, err) } func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallServerLoad(locality, name, val) } func (lsw *loadStoreWrapper) CallDropped(category string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallDropped(category) } @@ -200,9 +230,6 @@ type xdsClientWrapper struct { clusterName string edsServiceName string lrsServerName string - // loadOriginal is the load.Store for reporting loads to lrsServerName. It's - // returned by the client. - loadOriginal *load.Store // loadWrapper is a wrapper with loadOriginal, with clusterName and // edsServiceName. It's used children to report loads. loadWrapper *loadStoreWrapper @@ -218,16 +245,16 @@ func newXDSClientWrapper() *xdsClientWrapper { // restart the load reporting stream. func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error { var ( - restartLoadReport bool - updateLoadStore bool + restartLoadReport bool + updateLoadClusterAndService bool ) if attr == nil { - return fmt.Errorf("failed to get xdsClient from attributes: attributes is nil") + return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil") } clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface) if clientFromAttr == nil { - return fmt.Errorf("failed to get xdsClient from attributes: xdsClient not found in attributes") + return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes") } if w.c != clientFromAttr { @@ -239,14 +266,26 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut // ClusterName is different, restart. ClusterName is from ClusterName and // EdsServiceName. if w.clusterName != newConfig.ClusterName { - updateLoadStore = true + updateLoadClusterAndService = true w.clusterName = newConfig.ClusterName } if w.edsServiceName != newConfig.EdsServiceName { - updateLoadStore = true + updateLoadClusterAndService = true w.edsServiceName = newConfig.EdsServiceName } + if updateLoadClusterAndService { + // This updates the clusterName and serviceName that will reported for the + // loads. The update here is too early, the perfect timing is when the + // picker is updated with the new connection. But from this balancer's point + // of view, it's impossible to tell. + // + // On the other hand, this will almost never happen. Each LRS policy + // shouldn't get updated config. The parent should do a graceful switch when + // the clusterName or serviceName is changed. + w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName) + } + if w.lrsServerName != newConfig.LrsLoadReportingServerName { // LrsLoadReportingServerName is different, load should be report to a // different server, restart. @@ -255,36 +294,21 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut } if restartLoadReport { - updateLoadStore = true if w.cancelLoadReport != nil { w.cancelLoadReport() w.cancelLoadReport = nil } + var loadStore *load.Store if w.c != nil { - w.loadOriginal, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) + loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) } + w.loadWrapper.updateLoadStore(loadStore) } - if updateLoadStore { - // This updates the clusterName and serviceName that will reported for the - // loads. The update here is too early, the perfect timing is when the - // picker is updated with the new connection. But from this balancer's point - // of view, it's impossible to tell. - // - // On the other hand, this will almost never happen. Each LRS policy - // shouldn't get updated config. The parent should do a graceful switch when - // the clusterName or serviceName is changed. - if updateLoadStore { - w.loadWrapper.update(w.loadOriginal, w.clusterName, w.edsServiceName) - } - } return nil } func (w *xdsClientWrapper) loadStore() load.PerClusterReporter { - if w.loadWrapper.store == nil { - return nil - } return w.loadWrapper } diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go index 554c6c367a3..d426247c10d 100644 --- a/xds/internal/client/client_loadreport_test.go +++ b/xds/internal/client/client_loadreport_test.go @@ -26,8 +26,8 @@ import ( v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - "github.com/golang/protobuf/proto" durationpb "github.com/golang/protobuf/ptypes/duration" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpctest" @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/xds/internal/client/bootstrap" "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/testing/protocmp" _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. ) @@ -81,9 +82,9 @@ func (s) TestLRSClient(t *testing.T) { // Report to the same address should not create new ClientConn. store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) defer lrsCancel1() - ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if u, err := fs.NewConnChan.Receive(ctx); err != context.DeadlineExceeded { + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded { t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) } @@ -96,8 +97,6 @@ func (s) TestLRSClient(t *testing.T) { // Report to a different address should create new ClientConn. store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) defer lrsCancel2() - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() if u, err := fs2.NewConnChan.Receive(ctx); err != nil { t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) } @@ -129,13 +128,14 @@ func (s) TestLRSClient(t *testing.T) { t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") } receivedLoad[0].LoadReportInterval = nil - if want := (&endpointpb.ClusterStats{ + want := (&endpointpb.ClusterStats{ ClusterName: "cluster", ClusterServiceName: "eds", TotalDroppedRequests: 1, DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, - }); !proto.Equal(want, receivedLoad[0]) { - t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") + }) + if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" { + t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d) } // Cancel this load reporting stream, server should see error canceled. diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index e032f03b19a..a2a92a24f7f 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -115,7 +115,7 @@ func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName stri c.removeWatches[resourceType].Send(resourceName) } -func (c *testAPIClient) reportLoad(_ context.Context, _ *grpc.ClientConn, _ loadReportingOptions) { +func (c *testAPIClient) reportLoad(context.Context, *grpc.ClientConn, loadReportingOptions) { } func (c *testAPIClient) Close() {}