diff --git a/go.mod b/go.mod index f4a728fa21b1..529129fed2a6 100644 --- a/go.mod +++ b/go.mod @@ -13,4 +13,5 @@ require ( golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 + google.golang.org/protobuf v1.23.0 ) diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 3f46241a0866..292ea4f80a70 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -688,7 +688,8 @@ func (s) TestEDS_LoadReport(t *testing.T) { // be used. loadStore := load.NewStore() lsWrapper := &loadStoreWrapper{} - lsWrapper.update(loadStore, testClusterNames[0]) + lsWrapper.updateServiceName(testClusterNames[0]) + lsWrapper.updateLoadStore(loadStore) cw := &xdsClientWrapper{ loadWrapper: lsWrapper, } diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go index dfb580c5f160..8d3830f6bc07 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go @@ -48,44 +48,75 @@ var ( ) type loadStoreWrapper struct { - mu sync.RWMutex + mu sync.RWMutex + service string + // Both store and perCluster will be nil if load reporting is disabled (EDS + // response doesn't have LRS server name). store *load.Store - service string perCluster load.PerClusterReporter } -func (lsw *loadStoreWrapper) update(store *load.Store, service string) { +func (lsw *loadStoreWrapper) updateServiceName(service string) { lsw.mu.Lock() defer lsw.mu.Unlock() - if store == lsw.store && service == lsw.service { + if lsw.service == service { return } - lsw.store = store lsw.service = service + + if lsw.store == nil { + return + } lsw.perCluster = lsw.store.PerCluster(lsw.service, "") } +func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = nil + if lsw.store != nil { + lsw.perCluster = lsw.store.PerCluster(lsw.service, "") + } + +} + func (lsw *loadStoreWrapper) CallStarted(locality string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallStarted(locality) } func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallFinished(locality, err) } func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallServerLoad(locality, name, val) } func (lsw *loadStoreWrapper) CallDropped(category string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallDropped(category) } @@ -105,9 +136,6 @@ type xdsClientWrapper struct { // loadWrapper is a wrapper with loadOriginal, with clusterName and // edsServiceName. It's used children to report loads. loadWrapper *loadStoreWrapper - // loadOriginal is the load.Store for reporting loads to lrsServerName. It's - // returned by the client. - loadOriginal *load.Store // edsServiceName is the edsServiceName currently being watched, not // necessary the edsServiceName from service config. // @@ -243,18 +271,20 @@ func (c *xdsClientWrapper) startEndpointsWatch() { // Caller can cal this when the loadReportServer name changes, but // edsServiceName doesn't (so we only need to restart load reporting, not EDS // watch). -func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) { +func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) *load.Store { if c.cancelLoadReport != nil { c.cancelLoadReport() } c.loadReportServer = loadReportServer + var loadStore *load.Store if c.loadReportServer != nil { - c.loadOriginal, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer) + loadStore, c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer) } + return loadStore } func (c *xdsClientWrapper) loadStore() load.PerClusterReporter { - if c == nil || c.loadWrapper.store == nil { + if c == nil { return nil } return c.loadWrapper @@ -268,25 +298,12 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr return err } - var updateLoadStore bool - // Need to restart EDS watch when one of the following happens: // - the xds_client is updated // - the xds_client didn't change, but the edsServiceName changed if clientChanged || c.edsServiceName != config.EDSServiceName { c.edsServiceName = config.EDSServiceName c.startEndpointsWatch() - updateLoadStore = true - } - - // Only need to restart load reporting when: - // - the loadReportServer name changed - if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { - c.startLoadReport(config.LrsLoadReportingServerName) - updateLoadStore = true - } - - if updateLoadStore { // TODO: this update for the LRS service name is too early. It should // only apply to the new EDS response. But this is applied to the RPCs // before the new EDS response. To fully fix this, the EDS balancer @@ -294,8 +311,16 @@ func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr // // This is OK for now, because we don't actually expect edsServiceName // to change. Fix this (a bigger change) will happen later. - c.loadWrapper.update(c.loadOriginal, c.edsServiceName) + c.loadWrapper.updateServiceName(c.edsServiceName) + } + + // Only need to restart load reporting when: + // - the loadReportServer name changed + if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) { + loadStore := c.startLoadReport(config.LrsLoadReportingServerName) + c.loadWrapper.updateLoadStore(loadStore) } + return nil } diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 5e16dbbc4630..916a48fc3ffc 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -152,45 +152,75 @@ type xdsClientInterface interface { type loadStoreWrapper struct { mu sync.RWMutex - store *load.Store cluster string edsService string + // Both store and perCluster will be nil if load reporting is disabled (EDS + // response doesn't have LRS server name). + store *load.Store perCluster load.PerClusterReporter } -func (lsw *loadStoreWrapper) update(store *load.Store, cluster, edsService string) { +func (lsw *loadStoreWrapper) updateClusterAndService(cluster, edsService string) { lsw.mu.Lock() defer lsw.mu.Unlock() - if store == lsw.store && cluster == lsw.cluster && edsService == lsw.edsService { + if cluster == lsw.cluster && edsService == lsw.edsService { return } - lsw.store = store lsw.cluster = cluster lsw.edsService = edsService + + if lsw.store == nil { + return + } lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) } +func (lsw *loadStoreWrapper) updateLoadStore(store *load.Store) { + lsw.mu.Lock() + defer lsw.mu.Unlock() + if store == lsw.store { + return + } + lsw.store = store + lsw.perCluster = nil + if lsw.store != nil { + lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService) + } +} + func (lsw *loadStoreWrapper) CallStarted(locality string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallStarted(locality) } func (lsw *loadStoreWrapper) CallFinished(locality string, err error) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallFinished(locality, err) } func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallServerLoad(locality, name, val) } func (lsw *loadStoreWrapper) CallDropped(category string) { lsw.mu.RLock() defer lsw.mu.RUnlock() + if lsw.perCluster == nil { + return + } lsw.perCluster.CallDropped(category) } @@ -200,9 +230,6 @@ type xdsClientWrapper struct { clusterName string edsServiceName string lrsServerName string - // loadOriginal is the load.Store for reporting loads to lrsServerName. It's - // returned by the client. - loadOriginal *load.Store // loadWrapper is a wrapper with loadOriginal, with clusterName and // edsServiceName. It's used children to report loads. loadWrapper *loadStoreWrapper @@ -218,16 +245,16 @@ func newXDSClientWrapper() *xdsClientWrapper { // restart the load reporting stream. func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) error { var ( - restartLoadReport bool - updateLoadStore bool + restartLoadReport bool + updateLoadClusterAndService bool ) if attr == nil { - return fmt.Errorf("failed to get xdsClient from attributes: attributes is nil") + return fmt.Errorf("lrs: failed to get xdsClient from attributes: attributes is nil") } clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface) if clientFromAttr == nil { - return fmt.Errorf("failed to get xdsClient from attributes: xdsClient not found in attributes") + return fmt.Errorf("lrs: failed to get xdsClient from attributes: xdsClient not found in attributes") } if w.c != clientFromAttr { @@ -239,14 +266,26 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut // ClusterName is different, restart. ClusterName is from ClusterName and // EdsServiceName. if w.clusterName != newConfig.ClusterName { - updateLoadStore = true + updateLoadClusterAndService = true w.clusterName = newConfig.ClusterName } if w.edsServiceName != newConfig.EdsServiceName { - updateLoadStore = true + updateLoadClusterAndService = true w.edsServiceName = newConfig.EdsServiceName } + if updateLoadClusterAndService { + // This updates the clusterName and serviceName that will reported for the + // loads. The update here is too early, the perfect timing is when the + // picker is updated with the new connection. But from this balancer's point + // of view, it's impossible to tell. + // + // On the other hand, this will almost never happen. Each LRS policy + // shouldn't get updated config. The parent should do a graceful switch when + // the clusterName or serviceName is changed. + w.loadWrapper.updateClusterAndService(w.clusterName, w.edsServiceName) + } + if w.lrsServerName != newConfig.LrsLoadReportingServerName { // LrsLoadReportingServerName is different, load should be report to a // different server, restart. @@ -255,36 +294,21 @@ func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attribut } if restartLoadReport { - updateLoadStore = true if w.cancelLoadReport != nil { w.cancelLoadReport() w.cancelLoadReport = nil } + var loadStore *load.Store if w.c != nil { - w.loadOriginal, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) + loadStore, w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName) } + w.loadWrapper.updateLoadStore(loadStore) } - if updateLoadStore { - // This updates the clusterName and serviceName that will reported for the - // loads. The update here is too early, the perfect timing is when the - // picker is updated with the new connection. But from this balancer's point - // of view, it's impossible to tell. - // - // On the other hand, this will almost never happen. Each LRS policy - // shouldn't get updated config. The parent should do a graceful switch when - // the clusterName or serviceName is changed. - if updateLoadStore { - w.loadWrapper.update(w.loadOriginal, w.clusterName, w.edsServiceName) - } - } return nil } func (w *xdsClientWrapper) loadStore() load.PerClusterReporter { - if w.loadWrapper.store == nil { - return nil - } return w.loadWrapper } diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go index 554c6c367a3b..d426247c10d9 100644 --- a/xds/internal/client/client_loadreport_test.go +++ b/xds/internal/client/client_loadreport_test.go @@ -26,8 +26,8 @@ import ( v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - "github.com/golang/protobuf/proto" durationpb "github.com/golang/protobuf/ptypes/duration" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpctest" @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/xds/internal/client/bootstrap" "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/testing/protocmp" _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. ) @@ -81,9 +82,9 @@ func (s) TestLRSClient(t *testing.T) { // Report to the same address should not create new ClientConn. store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) defer lrsCancel1() - ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if u, err := fs.NewConnChan.Receive(ctx); err != context.DeadlineExceeded { + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded { t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) } @@ -96,8 +97,6 @@ func (s) TestLRSClient(t *testing.T) { // Report to a different address should create new ClientConn. store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) defer lrsCancel2() - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() if u, err := fs2.NewConnChan.Receive(ctx); err != nil { t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) } @@ -129,13 +128,14 @@ func (s) TestLRSClient(t *testing.T) { t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") } receivedLoad[0].LoadReportInterval = nil - if want := (&endpointpb.ClusterStats{ + want := (&endpointpb.ClusterStats{ ClusterName: "cluster", ClusterServiceName: "eds", TotalDroppedRequests: 1, DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, - }); !proto.Equal(want, receivedLoad[0]) { - t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") + }) + if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" { + t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d) } // Cancel this load reporting stream, server should see error canceled. diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index e032f03b19a8..a2a92a24f7fb 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -115,7 +115,7 @@ func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName stri c.removeWatches[resourceType].Send(resourceName) } -func (c *testAPIClient) reportLoad(_ context.Context, _ *grpc.ClientConn, _ loadReportingOptions) { +func (c *testAPIClient) reportLoad(context.Context, *grpc.ClientConn, loadReportingOptions) { } func (c *testAPIClient) Close() {}