From 3936daae107110f22cfb98f55e79e99e88555133 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 29 Jul 2020 16:26:16 -0700 Subject: [PATCH 1/5] [lrs_balancing_policy] xds: add LRS balancing policy --- xds/internal/balancer/lrs/balancer.go | 215 +++++++ xds/internal/balancer/lrs/config.go | 44 ++ xds/internal/balancer/lrs/config_test.go | 86 +++ xds/internal/balancer/lrs/logging.go | 34 ++ xds/internal/balancer/lrs/lrs_test.go | 560 ++++-------------- xds/internal/balancer/lrs/picker.go | 70 +++ .../balancer/lrs/{lrs.go => store.go} | 3 - xds/internal/balancer/lrs/store_test.go | 517 ++++++++++++++++ xds/internal/internal.go | 10 +- 9 files changed, 1076 insertions(+), 463 deletions(-) create mode 100644 xds/internal/balancer/lrs/balancer.go create mode 100644 xds/internal/balancer/lrs/config.go create mode 100644 xds/internal/balancer/lrs/config_test.go create mode 100644 xds/internal/balancer/lrs/logging.go create mode 100644 xds/internal/balancer/lrs/picker.go rename xds/internal/balancer/lrs/{lrs.go => store.go} (99%) create mode 100644 xds/internal/balancer/lrs/store_test.go diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go new file mode 100644 index 00000000000..25fe854051e --- /dev/null +++ b/xds/internal/balancer/lrs/balancer.go @@ -0,0 +1,215 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "encoding/json" + "fmt" + + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" + xdsinternal "google.golang.org/grpc/xds/internal" +) + +func init() { + balancer.Register(&lrsBB{}) +} + +const lrsBalancerName = "lrs_experimental" + +type lrsBB struct{} + +func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + b := &lrsBalancer{ + cc: cc, + buildOpts: opts, + } + b.loadStore = NewStore() + b.c = newXDSClientWrapper(b.loadStore) + b.logger = prefixLogger(b) + b.logger.Infof("Created") + return b +} + +func (l *lrsBB) Name() string { + return lrsBalancerName +} + +func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return parseConfig(c) +} + +type lrsBalancer struct { + cc balancer.ClientConn + buildOpts balancer.BuildOptions + + logger *grpclog.PrefixLogger + loadStore Store + c *xdsClientWrapper + + config *lbConfig + b balancer.Balancer // The sub balancer. +} + +func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + newConfig, ok := s.BalancerConfig.(*lbConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + } + + // If child policy is a different type, recreate the sub-balancer. + if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { + bb := balancer.Get(newConfig.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) + } + if b.b != nil { + b.b.Close() + } + b.b = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts) + } + // Update load reporting config or xds client. + b.c.update(newConfig, s.ResolverState.Attributes) + b.config = newConfig + + // Addresses and sub-balancer config are sent to sub-balancer. + return b.b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: b.config.ChildPolicy.Config, + }) +} + +func (b *lrsBalancer) ResolverError(err error) { + if b.b != nil { + b.b.ResolverError(err) + } +} + +func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { + if b.b != nil { + b.b.UpdateSubConnState(sc, s) + } +} + +func (b *lrsBalancer) Close() { + if b.b != nil { + b.b.Close() + b.b = nil + } + b.c.close() +} + +type ccWrapper struct { + balancer.ClientConn + loadStore Store + localityID *internal.LocalityID +} + +func newCCWrapper(cc balancer.ClientConn, loadStore Store, localityID *internal.LocalityID) *ccWrapper { + return &ccWrapper{ + ClientConn: cc, + loadStore: loadStore, + localityID: localityID, + } +} + +func (ccw *ccWrapper) UpdateState(s balancer.State) { + s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore) + ccw.ClientConn.UpdateState(s) +} + +// xdsClientInterface contains only the xds_client methods needed by LRS +// balancer. It's defined so we can override xdsclient in tests. +type xdsClientInterface interface { + ReportLoad(server string, clusterName string, loadStore Store) (cancel func()) + Close() +} + +type xdsClientWrapper struct { + loadStore Store + + c xdsClientInterface + cancelLoadReport func() + clusterName string + lrsServerName string +} + +func newXDSClientWrapper(loadStore Store) *xdsClientWrapper { + return &xdsClientWrapper{ + loadStore: loadStore, + } +} + +// update checks the config and xdsclient, and decide whether it needs to +// restart the load reporting stream. +// +// TODO: refactor lrs to share one stream instead of one per EDS. +func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) { + var restartLoadReport bool + if attr != nil { + if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil { + if w.c != clientFromAttr { + // xds client is different, restart. + restartLoadReport = true + w.c = clientFromAttr + } + } + } + + // ClusterName is different, restart. ClusterName is from ClusterName and + // EdsServiceName. + // + // TODO: LRS request actually has separate fields from these two values. + // Update lrs.Store to set both. + newClusterName := newConfig.EdsServiceName + if newClusterName == "" { + newClusterName = newConfig.ClusterName + } + if w.clusterName != newClusterName { + restartLoadReport = true + w.clusterName = newClusterName + } + + if w.lrsServerName != newConfig.LrsLoadReportingServerName { + // LrsLoadReportingServerName is different, load should be report to a + // different server, restart. + restartLoadReport = true + w.lrsServerName = newConfig.LrsLoadReportingServerName + } + + if restartLoadReport { + if w.cancelLoadReport != nil { + w.cancelLoadReport() + w.cancelLoadReport = nil + } + if w.c != nil { + w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName, w.loadStore) + } + } +} + +func (w *xdsClientWrapper) close() { + if w.cancelLoadReport != nil { + w.cancelLoadReport() + w.cancelLoadReport = nil + } +} diff --git a/xds/internal/balancer/lrs/config.go b/xds/internal/balancer/lrs/config.go new file mode 100644 index 00000000000..1f4e15e2630 --- /dev/null +++ b/xds/internal/balancer/lrs/config.go @@ -0,0 +1,44 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "encoding/json" + + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" +) + +type lbConfig struct { + serviceconfig.LoadBalancingConfig + ClusterName string + EdsServiceName string + LrsLoadReportingServerName string + Locality *internal.LocalityID + ChildPolicy *internalserviceconfig.BalancerConfig +} + +func parseConfig(c json.RawMessage) (*lbConfig, error) { + var cfg lbConfig + if err := json.Unmarshal(c, &cfg); err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/xds/internal/balancer/lrs/config_test.go b/xds/internal/balancer/lrs/config_test.go new file mode 100644 index 00000000000..a650db34eb9 --- /dev/null +++ b/xds/internal/balancer/lrs/config_test.go @@ -0,0 +1,86 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer/roundrobin" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" +) + +const ( + testClusterName = "test-cluster" + testServiceName = "test-eds-service" + testLRSServerName = "test-lrs-name" +) + +func TestParseConfig(t *testing.T) { + tests := []struct { + name string + js string + want *lbConfig + wantErr bool + }{ + { + name: "temp", + js: `{ + "clusterName": "test-cluster", + "edsServiceName": "test-eds-service", + "lrsLoadReportingServerName": "test-lrs-name", + "locality": { + "region": "test-region", + "zone": "test-zone", + "subZone": "test-sub-zone" + }, + "childPolicy":[{"round_robin":{}}] +} + `, + want: &lbConfig{ + ClusterName: testClusterName, + EdsServiceName: testServiceName, + LrsLoadReportingServerName: testLRSServerName, + Locality: &xdsinternal.LocalityID{ + Region: "test-region", + Zone: "test-zone", + SubZone: "test-sub-zone", + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + Config: nil, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseConfig([]byte(tt.js)) + if (err != nil) != tt.wantErr { + t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(got, tt.want) { + t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, cmp.Diff(got, tt.want)) + } + }) + } +} diff --git a/xds/internal/balancer/lrs/logging.go b/xds/internal/balancer/lrs/logging.go new file mode 100644 index 00000000000..602dac09959 --- /dev/null +++ b/xds/internal/balancer/lrs/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[lrs-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *lrsBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/lrs/lrs_test.go b/xds/internal/balancer/lrs/lrs_test.go index b18c3d7e218..5952d8c7da0 100644 --- a/xds/internal/balancer/lrs/lrs_test.go +++ b/xds/internal/balancer/lrs/lrs_test.go @@ -13,505 +13,159 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package lrs import ( - "context" "fmt" - "io" - "net" - "sort" - "sync" "testing" "time" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - "github.com/golang/protobuf/proto" - durationpb "github.com/golang/protobuf/ptypes/duration" - structpb "github.com/golang/protobuf/ptypes/struct" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/testutils" ) -const ( - testService = "grpc.service.test" - testHostname = "grpc.server.name" - nodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME" -) - var ( - dropCategories = []string{"drop_for_real", "drop_for_fun"} - localities = []internal.LocalityID{{Region: "a"}, {Region: "b"}} - errTest = fmt.Errorf("test error") + testBackendAddrs = []resolver.Address{ + {Addr: "1.1.1.1:1"}, + } + testLocality = &xdsinternal.LocalityID{ + Region: "test-region", + Zone: "test-zone", + SubZone: "test-sub-zone", + } ) -type rpcCountDataForTest struct { - succeeded uint64 - errored uint64 - inProgress uint64 - serverLoads map[string]float64 +// This is a subset of testutils.fakeclient. Cannot use testutils.fakeclient +// because testutils imports package lrs. +type fakeXDSClient struct { + loadReportCh chan *reportLoadArgs } -func newRPCCountDataForTest(succeeded, errored, inprogress uint64, serverLoads map[string]float64) *rpcCountDataForTest { - return &rpcCountDataForTest{ - succeeded: succeeded, - errored: errored, - inProgress: inprogress, - serverLoads: serverLoads, +func newFakeXDSClient() *fakeXDSClient { + return &fakeXDSClient{ + loadReportCh: make(chan *reportLoadArgs, 10), } } -// Equal() is needed to compare unexported fields. -func (rcd *rpcCountDataForTest) Equal(b *rpcCountDataForTest) bool { - return rcd.inProgress == b.inProgress && - rcd.errored == b.errored && - rcd.succeeded == b.succeeded && - cmp.Equal(rcd.serverLoads, b.serverLoads) +// reportLoadArgs wraps the arguments passed to ReportLoad. +type reportLoadArgs struct { + // server is the name of the server to which the load is reported. + server string + // cluster is the name of the cluster for which load is reported. + cluster string + // loadStore is the store where loads are stored. + loadStore interface{} } -// equalClusterStats sorts requests and clear report internal before comparing. -func equalClusterStats(a, b []*endpointpb.ClusterStats) bool { - for _, t := range [][]*endpointpb.ClusterStats{a, b} { - for _, s := range t { - sort.Slice(s.DroppedRequests, func(i, j int) bool { - return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category - }) - sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool { - return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String() - }) - for _, us := range s.UpstreamLocalityStats { - sort.Slice(us.LoadMetricStats, func(i, j int) bool { - return us.LoadMetricStats[i].MetricName < us.LoadMetricStats[j].MetricName - }) - } - s.LoadReportInterval = nil - } - } - return cmp.Equal(a, b, cmp.Comparer(proto.Equal)) +// ReportLoad starts reporting load about clusterName to server. +func (xdsC *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore Store) (cancel func()) { + xdsC.loadReportCh <- &reportLoadArgs{server: server, cluster: clusterName, loadStore: loadStore} + return func() {} } -func Test_lrsStore_buildStats_drops(t *testing.T) { - tests := []struct { - name string - drops []map[string]uint64 - }{ - { - name: "one drop report", - drops: []map[string]uint64{{ - dropCategories[0]: 31, - dropCategories[1]: 41, - }}, - }, - { - name: "two drop reports", - drops: []map[string]uint64{{ - dropCategories[0]: 31, - dropCategories[1]: 41, - }, { - dropCategories[0]: 59, - dropCategories[1]: 26, - }}, - }, - { - name: "no empty report", - drops: []map[string]uint64{{ - dropCategories[0]: 31, - dropCategories[1]: 41, - }, { - dropCategories[0]: 0, // This shouldn't cause an empty report for category[0]. - dropCategories[1]: 26, - }}, - }, +// waitForReportLoad waits for ReportLoad to be invoked on this client within a +// reasonable timeout, and returns the arguments passed to it. +func (xdsC *fakeXDSClient) waitForReportLoad() (*reportLoadArgs, error) { + select { + case <-time.After(time.Second): + return nil, fmt.Errorf("timeout") + case a := <-xdsC.loadReportCh: + return a, nil } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ls := NewStore().(*lrsStore) - - for _, ds := range tt.drops { - var ( - totalDropped uint64 - droppedReqs []*endpointpb.ClusterStats_DroppedRequests - ) - for cat, count := range ds { - if count == 0 { - continue - } - totalDropped += count - droppedReqs = append(droppedReqs, &endpointpb.ClusterStats_DroppedRequests{ - Category: cat, - DroppedCount: count, - }) - } - want := []*endpointpb.ClusterStats{ - { - ClusterName: testService, - TotalDroppedRequests: totalDropped, - DroppedRequests: droppedReqs, - }, - } - - var wg sync.WaitGroup - for c, count := range ds { - for i := 0; i < int(count); i++ { - wg.Add(1) - go func(i int, c string) { - ls.CallDropped(c) - wg.Done() - }(i, c) - } - } - wg.Wait() +} - if got := ls.buildStats(testService); !equalClusterStats(got, want) { - t.Errorf("lrsStore.buildStats() = %v, want %v", got, want) - t.Errorf("%s", cmp.Diff(got, want)) - } - } - }) - } +// Close closes the xds client. +func (xdsC *fakeXDSClient) Close() { } -func Test_lrsStore_buildStats_rpcCounts(t *testing.T) { - tests := []struct { - name string - rpcs []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 // Will be reported with successful RPCs. - } - }{ - { - name: "one rpcCount report", - rpcs: []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 - }{{ - localities[0]: {8, 3, 1, nil}, - }}, - }, - { - name: "two localities one rpcCount report", - rpcs: []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 - }{{ - localities[0]: {8, 3, 1, nil}, - localities[1]: {15, 1, 5, nil}, - }}, - }, - { - name: "three rpcCount reports", - rpcs: []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 - }{{ - localities[0]: {8, 3, 1, nil}, - localities[1]: {15, 1, 5, nil}, - }, { - localities[0]: {8, 3, 1, nil}, - }, { - localities[1]: {15, 1, 5, nil}, - }}, - }, - { - name: "no empty report", - rpcs: []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 - }{{ - localities[0]: {4, 3, 1, nil}, - localities[1]: {7, 1, 5, nil}, - }, { - localities[0]: {0, 0, 0, nil}, // This shouldn't cause an empty report for locality[0]. - localities[1]: {1, 1, 0, nil}, - }}, +// TestLoadReporting verifies that the lrs balancer starts the loadReport +// stream when the lbConfig passed to it contains a valid value for the LRS +// server (empty string). +func TestLoadReporting(t *testing.T) { + builder := balancer.Get(lrsBalancerName) + cc := testutils.NewTestClientConn(t) + lrsB := builder.Build(cc, balancer.BuildOptions{}) + defer lrsB.Close() + + xdsC := newFakeXDSClient() + if err := lrsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: testBackendAddrs, + Attributes: attributes.New(xdsinternal.XDSClientID, xdsC), }, - { - name: "two localities one report with server loads", - rpcs: []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 - }{{ - localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}}, - localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}}, - }}, - }, - { - name: "three reports with server loads", - rpcs: []map[internal.LocalityID]struct { - start, success, failure uint64 - serverData map[string]float64 - }{{ - localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}}, - localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}}, - }, { - localities[0]: {8, 3, 1, map[string]float64{"cpu": 1, "mem": 2}}, - }, { - localities[1]: {15, 4, 5, map[string]float64{"net": 13, "disk": 1.4}}, - }}, + BalancerConfig: &lbConfig{ + EdsServiceName: testClusterName, + LrsLoadReportingServerName: testLRSServerName, + Locality: testLocality, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ls := NewStore().(*lrsStore) - - // InProgress count doesn't get cleared at each buildStats, keep - // them to carry over. - inProgressCounts := make(map[internal.LocalityID]uint64) - - for _, counts := range tt.rpcs { - var upstreamLocalityStats []*endpointpb.UpstreamLocalityStats - - for l, count := range counts { - tempInProgress := count.start - count.success - count.failure + inProgressCounts[l] - inProgressCounts[l] = tempInProgress - if count.success == 0 && tempInProgress == 0 && count.failure == 0 { - continue - } - var loadMetricStats []*endpointpb.EndpointLoadMetricStats - for n, d := range count.serverData { - loadMetricStats = append(loadMetricStats, - &endpointpb.EndpointLoadMetricStats{ - MetricName: n, - NumRequestsFinishedWithMetric: count.success, - TotalMetricValue: d * float64(count.success), - }, - ) - } - upstreamLocalityStats = append(upstreamLocalityStats, &endpointpb.UpstreamLocalityStats{ - Locality: testutils.LocalityIDToProto(l), - TotalSuccessfulRequests: count.success, - TotalRequestsInProgress: tempInProgress, - TotalErrorRequests: count.failure, - LoadMetricStats: loadMetricStats, - }) - } - // InProgress count doesn't get cleared at each buildStats, and - // needs to be carried over to the next result. - for l, c := range inProgressCounts { - if _, ok := counts[l]; !ok { - upstreamLocalityStats = append(upstreamLocalityStats, &endpointpb.UpstreamLocalityStats{ - Locality: testutils.LocalityIDToProto(l), - TotalRequestsInProgress: c, - }) - } - } - want := []*endpointpb.ClusterStats{ - { - ClusterName: testService, - UpstreamLocalityStats: upstreamLocalityStats, - }, - } - - var wg sync.WaitGroup - for l, count := range counts { - for i := 0; i < int(count.success); i++ { - wg.Add(1) - go func(l internal.LocalityID, serverData map[string]float64) { - ls.CallStarted(l) - ls.CallFinished(l, nil) - for n, d := range serverData { - ls.CallServerLoad(l, n, d) - } - wg.Done() - }(l, count.serverData) - } - for i := 0; i < int(count.failure); i++ { - wg.Add(1) - go func(l internal.LocalityID) { - ls.CallStarted(l) - ls.CallFinished(l, errTest) - wg.Done() - }(l) - } - for i := 0; i < int(count.start-count.success-count.failure); i++ { - wg.Add(1) - go func(l internal.LocalityID) { - ls.CallStarted(l) - wg.Done() - }(l) - } - } - wg.Wait() - - if got := ls.buildStats(testService); !equalClusterStats(got, want) { - t.Errorf("lrsStore.buildStats() = %v, want %v", got, want) - t.Errorf("%s", cmp.Diff(got, want)) - } - } - }) - } -} - -type lrsServer struct { - reportingInterval *durationpb.Duration - - mu sync.Mutex - dropTotal uint64 - drops map[string]uint64 - rpcs map[internal.LocalityID]*rpcCountDataForTest -} - -func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_StreamLoadStatsServer) error { - req, err := stream.Recv() - if err != nil { - return err - } - - if req.GetNode().GetMetadata().GetFields()[nodeMetadataHostnameKey].GetStringValue() != testHostname { - return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req) - } - if err := stream.Send(&lrspb.LoadStatsResponse{ - Clusters: []string{testService, "another-cluster"}, - LoadReportingInterval: lrss.reportingInterval, }); err != nil { - return err + t.Fatalf("unexpected error from UpdateClientConnState: %v", err) } - for { - req, err := stream.Recv() - if err != nil { - if err == io.EOF { - return nil - } - return err - } - stats := req.ClusterStats[0] - lrss.mu.Lock() - lrss.dropTotal += stats.TotalDroppedRequests - for _, d := range stats.DroppedRequests { - lrss.drops[d.Category] += d.DroppedCount - } - for _, ss := range stats.UpstreamLocalityStats { - l := internal.LocalityID{ - Region: ss.Locality.Region, - Zone: ss.Locality.Zone, - SubZone: ss.Locality.SubZone, - } - counts, ok := lrss.rpcs[l] - if !ok { - counts = newRPCCountDataForTest(0, 0, 0, nil) - lrss.rpcs[l] = counts - } - counts.succeeded += ss.TotalSuccessfulRequests - counts.inProgress = ss.TotalRequestsInProgress - counts.errored += ss.TotalErrorRequests - for _, ts := range ss.LoadMetricStats { - if counts.serverLoads == nil { - counts.serverLoads = make(map[string]float64) - } - counts.serverLoads[ts.MetricName] = ts.TotalMetricValue / float64(ts.NumRequestsFinishedWithMetric) - } - } - lrss.mu.Unlock() - } -} - -func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr string, lrss *lrsServer, cleanup func()) { - lis, err := net.Listen("tcp", "localhost:0") + got, err := xdsC.waitForReportLoad() if err != nil { - t.Fatalf("listen failed due to: %v", err) + t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) } - svr := grpc.NewServer() - lrss = &lrsServer{ - reportingInterval: reportingInterval, - drops: make(map[string]uint64), - rpcs: make(map[internal.LocalityID]*rpcCountDataForTest), + if got.server != testLRSServerName || got.cluster != testClusterName { + t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.server, got.cluster, testLRSServerName, testClusterName) } - lrsgrpc.RegisterLoadReportingServiceServer(svr, lrss) - go svr.Serve(lis) - return lis.Addr().String(), lrss, func() { - svr.Stop() - lis.Close() - } -} -func Test_lrsStore_ReportTo(t *testing.T) { - const intervalNano = 1000 * 1000 * 50 - addr, lrss, cleanup := setupServer(t, &durationpb.Duration{ - Seconds: 0, - Nanos: intervalNano, - }) - defer cleanup() + sc1 := <-cc.NewSubConnCh + lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - ls := NewStore() - cc, err := grpc.Dial(addr, grpc.WithInsecure()) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - done := make(chan struct{}) - go func() { - node := &corepb.Node{ - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - nodeMetadataHostnameKey: { - Kind: &structpb.Value_StringValue{StringValue: testHostname}, - }, - }, - }, + // Test pick with one backend. + p1 := <-cc.NewPickerCh + const successCount = 5 + for i := 0; i < successCount; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } - ls.ReportTo(ctx, cc, testService, node) - close(done) - }() - - drops := map[string]uint64{ - dropCategories[0]: 13, - dropCategories[1]: 14, + gotSCSt.Done(balancer.DoneInfo{}) } - for c, d := range drops { - for i := 0; i < int(d); i++ { - ls.CallDropped(c) - time.Sleep(time.Nanosecond * intervalNano / 10) + const errorCount = 5 + for i := 0; i < errorCount; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } + gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) } - rpcs := map[internal.LocalityID]*rpcCountDataForTest{ - localities[0]: newRPCCountDataForTest(3, 1, 4, nil), - localities[1]: newRPCCountDataForTest(1, 5, 9, map[string]float64{"pi": 3.14, "e": 2.71}), - } - for l, count := range rpcs { - for i := 0; i < int(count.succeeded); i++ { - go func(i int, l internal.LocalityID, count *rpcCountDataForTest) { - ls.CallStarted(l) - ls.CallFinished(l, nil) - for n, d := range count.serverLoads { - ls.CallServerLoad(l, n, d) - } - }(i, l, count) - } - for i := 0; i < int(count.inProgress); i++ { - go func(i int, l internal.LocalityID) { - ls.CallStarted(l) - }(i, l) - } - for i := 0; i < int(count.errored); i++ { - go func(i int, l internal.LocalityID) { - ls.CallStarted(l) - ls.CallFinished(l, errTest) - }(i, l) - } - } + loads := make(map[xdsinternal.LocalityID]*rpcCountData) - time.Sleep(time.Nanosecond * intervalNano * 2) - cancel() - <-done + got.loadStore.(*lrsStore).localityRPCCount.Range( + func(key, value interface{}) bool { + loads[key.(xdsinternal.LocalityID)] = value.(*rpcCountData) + return true + }, + ) - lrss.mu.Lock() - defer lrss.mu.Unlock() - if !cmp.Equal(lrss.drops, drops) { - t.Errorf("different: %v", cmp.Diff(lrss.drops, drops)) + countData, ok := loads[*testLocality] + if !ok { + t.Fatalf("loads for %v not found in store", testLocality) + } + if *countData.succeeded != successCount { + t.Errorf("got succeeded %v, want %v", *countData.succeeded, successCount) + } + if *countData.errored != errorCount { + t.Errorf("got errord %v, want %v", *countData.errored, errorCount) } - if !cmp.Equal(lrss.rpcs, rpcs) { - t.Errorf("different: %v", cmp.Diff(lrss.rpcs, rpcs)) + if *countData.inProgress != 0 { + t.Errorf("got inProgress %v, want %v", *countData.inProgress, 0) } } diff --git a/xds/internal/balancer/lrs/picker.go b/xds/internal/balancer/lrs/picker.go new file mode 100644 index 00000000000..c4d6a4fa395 --- /dev/null +++ b/xds/internal/balancer/lrs/picker.go @@ -0,0 +1,70 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/xds/internal" +) + +const ( + serverLoadCPUName = "cpu_utilization" + serverLoadMemoryName = "mem_utilization" +) + +type loadReportPicker struct { + p balancer.Picker + + id internal.LocalityID + loadStore Store +} + +func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore Store) *loadReportPicker { + return &loadReportPicker{ + p: p, + id: id, + loadStore: loadStore, + } +} + +func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + res, err := lrp.p.Pick(info) + if lrp.loadStore != nil && err == nil { + lrp.loadStore.CallStarted(lrp.id) + td := res.Done + res.Done = func(info balancer.DoneInfo) { + lrp.loadStore.CallFinished(lrp.id, info.Err) + if load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport); ok { + lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization) + lrp.loadStore.CallServerLoad(lrp.id, serverLoadMemoryName, load.MemUtilization) + for n, d := range load.RequestCost { + lrp.loadStore.CallServerLoad(lrp.id, n, d) + } + for n, d := range load.Utilization { + lrp.loadStore.CallServerLoad(lrp.id, n, d) + } + } + if td != nil { + td(info) + } + } + } + return res, err +} diff --git a/xds/internal/balancer/lrs/lrs.go b/xds/internal/balancer/lrs/store.go similarity index 99% rename from xds/internal/balancer/lrs/lrs.go rename to xds/internal/balancer/lrs/store.go index 4bc20ef6e12..680c6f5a2c1 100644 --- a/xds/internal/balancer/lrs/lrs.go +++ b/xds/internal/balancer/lrs/store.go @@ -29,15 +29,12 @@ import ( lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" "github.com/golang/protobuf/ptypes" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/xds/internal" ) const negativeOneUInt64 = ^uint64(0) -var logger = grpclog.Component("xds") - // Store defines the interface for a load store. It keeps loads and can report // them to a server when requested. type Store interface { diff --git a/xds/internal/balancer/lrs/store_test.go b/xds/internal/balancer/lrs/store_test.go new file mode 100644 index 00000000000..b18c3d7e218 --- /dev/null +++ b/xds/internal/balancer/lrs/store_test.go @@ -0,0 +1,517 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lrs + +import ( + "context" + "fmt" + "io" + "net" + "sort" + "sync" + "testing" + "time" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" + lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" + "github.com/golang/protobuf/proto" + durationpb "github.com/golang/protobuf/ptypes/duration" + structpb "github.com/golang/protobuf/ptypes/struct" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/testutils" +) + +const ( + testService = "grpc.service.test" + testHostname = "grpc.server.name" + nodeMetadataHostnameKey = "PROXYLESS_CLIENT_HOSTNAME" +) + +var ( + dropCategories = []string{"drop_for_real", "drop_for_fun"} + localities = []internal.LocalityID{{Region: "a"}, {Region: "b"}} + errTest = fmt.Errorf("test error") +) + +type rpcCountDataForTest struct { + succeeded uint64 + errored uint64 + inProgress uint64 + serverLoads map[string]float64 +} + +func newRPCCountDataForTest(succeeded, errored, inprogress uint64, serverLoads map[string]float64) *rpcCountDataForTest { + return &rpcCountDataForTest{ + succeeded: succeeded, + errored: errored, + inProgress: inprogress, + serverLoads: serverLoads, + } +} + +// Equal() is needed to compare unexported fields. +func (rcd *rpcCountDataForTest) Equal(b *rpcCountDataForTest) bool { + return rcd.inProgress == b.inProgress && + rcd.errored == b.errored && + rcd.succeeded == b.succeeded && + cmp.Equal(rcd.serverLoads, b.serverLoads) +} + +// equalClusterStats sorts requests and clear report internal before comparing. +func equalClusterStats(a, b []*endpointpb.ClusterStats) bool { + for _, t := range [][]*endpointpb.ClusterStats{a, b} { + for _, s := range t { + sort.Slice(s.DroppedRequests, func(i, j int) bool { + return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category + }) + sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool { + return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String() + }) + for _, us := range s.UpstreamLocalityStats { + sort.Slice(us.LoadMetricStats, func(i, j int) bool { + return us.LoadMetricStats[i].MetricName < us.LoadMetricStats[j].MetricName + }) + } + s.LoadReportInterval = nil + } + } + return cmp.Equal(a, b, cmp.Comparer(proto.Equal)) +} + +func Test_lrsStore_buildStats_drops(t *testing.T) { + tests := []struct { + name string + drops []map[string]uint64 + }{ + { + name: "one drop report", + drops: []map[string]uint64{{ + dropCategories[0]: 31, + dropCategories[1]: 41, + }}, + }, + { + name: "two drop reports", + drops: []map[string]uint64{{ + dropCategories[0]: 31, + dropCategories[1]: 41, + }, { + dropCategories[0]: 59, + dropCategories[1]: 26, + }}, + }, + { + name: "no empty report", + drops: []map[string]uint64{{ + dropCategories[0]: 31, + dropCategories[1]: 41, + }, { + dropCategories[0]: 0, // This shouldn't cause an empty report for category[0]. + dropCategories[1]: 26, + }}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ls := NewStore().(*lrsStore) + + for _, ds := range tt.drops { + var ( + totalDropped uint64 + droppedReqs []*endpointpb.ClusterStats_DroppedRequests + ) + for cat, count := range ds { + if count == 0 { + continue + } + totalDropped += count + droppedReqs = append(droppedReqs, &endpointpb.ClusterStats_DroppedRequests{ + Category: cat, + DroppedCount: count, + }) + } + want := []*endpointpb.ClusterStats{ + { + ClusterName: testService, + TotalDroppedRequests: totalDropped, + DroppedRequests: droppedReqs, + }, + } + + var wg sync.WaitGroup + for c, count := range ds { + for i := 0; i < int(count); i++ { + wg.Add(1) + go func(i int, c string) { + ls.CallDropped(c) + wg.Done() + }(i, c) + } + } + wg.Wait() + + if got := ls.buildStats(testService); !equalClusterStats(got, want) { + t.Errorf("lrsStore.buildStats() = %v, want %v", got, want) + t.Errorf("%s", cmp.Diff(got, want)) + } + } + }) + } +} + +func Test_lrsStore_buildStats_rpcCounts(t *testing.T) { + tests := []struct { + name string + rpcs []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 // Will be reported with successful RPCs. + } + }{ + { + name: "one rpcCount report", + rpcs: []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, nil}, + }}, + }, + { + name: "two localities one rpcCount report", + rpcs: []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, nil}, + localities[1]: {15, 1, 5, nil}, + }}, + }, + { + name: "three rpcCount reports", + rpcs: []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, nil}, + localities[1]: {15, 1, 5, nil}, + }, { + localities[0]: {8, 3, 1, nil}, + }, { + localities[1]: {15, 1, 5, nil}, + }}, + }, + { + name: "no empty report", + rpcs: []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {4, 3, 1, nil}, + localities[1]: {7, 1, 5, nil}, + }, { + localities[0]: {0, 0, 0, nil}, // This shouldn't cause an empty report for locality[0]. + localities[1]: {1, 1, 0, nil}, + }}, + }, + { + name: "two localities one report with server loads", + rpcs: []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}}, + localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}}, + }}, + }, + { + name: "three reports with server loads", + rpcs: []map[internal.LocalityID]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}}, + localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}}, + }, { + localities[0]: {8, 3, 1, map[string]float64{"cpu": 1, "mem": 2}}, + }, { + localities[1]: {15, 4, 5, map[string]float64{"net": 13, "disk": 1.4}}, + }}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ls := NewStore().(*lrsStore) + + // InProgress count doesn't get cleared at each buildStats, keep + // them to carry over. + inProgressCounts := make(map[internal.LocalityID]uint64) + + for _, counts := range tt.rpcs { + var upstreamLocalityStats []*endpointpb.UpstreamLocalityStats + + for l, count := range counts { + tempInProgress := count.start - count.success - count.failure + inProgressCounts[l] + inProgressCounts[l] = tempInProgress + if count.success == 0 && tempInProgress == 0 && count.failure == 0 { + continue + } + var loadMetricStats []*endpointpb.EndpointLoadMetricStats + for n, d := range count.serverData { + loadMetricStats = append(loadMetricStats, + &endpointpb.EndpointLoadMetricStats{ + MetricName: n, + NumRequestsFinishedWithMetric: count.success, + TotalMetricValue: d * float64(count.success), + }, + ) + } + upstreamLocalityStats = append(upstreamLocalityStats, &endpointpb.UpstreamLocalityStats{ + Locality: testutils.LocalityIDToProto(l), + TotalSuccessfulRequests: count.success, + TotalRequestsInProgress: tempInProgress, + TotalErrorRequests: count.failure, + LoadMetricStats: loadMetricStats, + }) + } + // InProgress count doesn't get cleared at each buildStats, and + // needs to be carried over to the next result. + for l, c := range inProgressCounts { + if _, ok := counts[l]; !ok { + upstreamLocalityStats = append(upstreamLocalityStats, &endpointpb.UpstreamLocalityStats{ + Locality: testutils.LocalityIDToProto(l), + TotalRequestsInProgress: c, + }) + } + } + want := []*endpointpb.ClusterStats{ + { + ClusterName: testService, + UpstreamLocalityStats: upstreamLocalityStats, + }, + } + + var wg sync.WaitGroup + for l, count := range counts { + for i := 0; i < int(count.success); i++ { + wg.Add(1) + go func(l internal.LocalityID, serverData map[string]float64) { + ls.CallStarted(l) + ls.CallFinished(l, nil) + for n, d := range serverData { + ls.CallServerLoad(l, n, d) + } + wg.Done() + }(l, count.serverData) + } + for i := 0; i < int(count.failure); i++ { + wg.Add(1) + go func(l internal.LocalityID) { + ls.CallStarted(l) + ls.CallFinished(l, errTest) + wg.Done() + }(l) + } + for i := 0; i < int(count.start-count.success-count.failure); i++ { + wg.Add(1) + go func(l internal.LocalityID) { + ls.CallStarted(l) + wg.Done() + }(l) + } + } + wg.Wait() + + if got := ls.buildStats(testService); !equalClusterStats(got, want) { + t.Errorf("lrsStore.buildStats() = %v, want %v", got, want) + t.Errorf("%s", cmp.Diff(got, want)) + } + } + }) + } +} + +type lrsServer struct { + reportingInterval *durationpb.Duration + + mu sync.Mutex + dropTotal uint64 + drops map[string]uint64 + rpcs map[internal.LocalityID]*rpcCountDataForTest +} + +func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_StreamLoadStatsServer) error { + req, err := stream.Recv() + if err != nil { + return err + } + + if req.GetNode().GetMetadata().GetFields()[nodeMetadataHostnameKey].GetStringValue() != testHostname { + return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req) + } + if err := stream.Send(&lrspb.LoadStatsResponse{ + Clusters: []string{testService, "another-cluster"}, + LoadReportingInterval: lrss.reportingInterval, + }); err != nil { + return err + } + + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + stats := req.ClusterStats[0] + lrss.mu.Lock() + lrss.dropTotal += stats.TotalDroppedRequests + for _, d := range stats.DroppedRequests { + lrss.drops[d.Category] += d.DroppedCount + } + for _, ss := range stats.UpstreamLocalityStats { + l := internal.LocalityID{ + Region: ss.Locality.Region, + Zone: ss.Locality.Zone, + SubZone: ss.Locality.SubZone, + } + counts, ok := lrss.rpcs[l] + if !ok { + counts = newRPCCountDataForTest(0, 0, 0, nil) + lrss.rpcs[l] = counts + } + counts.succeeded += ss.TotalSuccessfulRequests + counts.inProgress = ss.TotalRequestsInProgress + counts.errored += ss.TotalErrorRequests + for _, ts := range ss.LoadMetricStats { + if counts.serverLoads == nil { + counts.serverLoads = make(map[string]float64) + } + counts.serverLoads[ts.MetricName] = ts.TotalMetricValue / float64(ts.NumRequestsFinishedWithMetric) + } + } + lrss.mu.Unlock() + } +} + +func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr string, lrss *lrsServer, cleanup func()) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("listen failed due to: %v", err) + } + svr := grpc.NewServer() + lrss = &lrsServer{ + reportingInterval: reportingInterval, + drops: make(map[string]uint64), + rpcs: make(map[internal.LocalityID]*rpcCountDataForTest), + } + lrsgrpc.RegisterLoadReportingServiceServer(svr, lrss) + go svr.Serve(lis) + return lis.Addr().String(), lrss, func() { + svr.Stop() + lis.Close() + } +} + +func Test_lrsStore_ReportTo(t *testing.T) { + const intervalNano = 1000 * 1000 * 50 + addr, lrss, cleanup := setupServer(t, &durationpb.Duration{ + Seconds: 0, + Nanos: intervalNano, + }) + defer cleanup() + + ls := NewStore() + cc, err := grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + done := make(chan struct{}) + go func() { + node := &corepb.Node{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + nodeMetadataHostnameKey: { + Kind: &structpb.Value_StringValue{StringValue: testHostname}, + }, + }, + }, + } + ls.ReportTo(ctx, cc, testService, node) + close(done) + }() + + drops := map[string]uint64{ + dropCategories[0]: 13, + dropCategories[1]: 14, + } + for c, d := range drops { + for i := 0; i < int(d); i++ { + ls.CallDropped(c) + time.Sleep(time.Nanosecond * intervalNano / 10) + } + } + + rpcs := map[internal.LocalityID]*rpcCountDataForTest{ + localities[0]: newRPCCountDataForTest(3, 1, 4, nil), + localities[1]: newRPCCountDataForTest(1, 5, 9, map[string]float64{"pi": 3.14, "e": 2.71}), + } + for l, count := range rpcs { + for i := 0; i < int(count.succeeded); i++ { + go func(i int, l internal.LocalityID, count *rpcCountDataForTest) { + ls.CallStarted(l) + ls.CallFinished(l, nil) + for n, d := range count.serverLoads { + ls.CallServerLoad(l, n, d) + } + }(i, l, count) + } + for i := 0; i < int(count.inProgress); i++ { + go func(i int, l internal.LocalityID) { + ls.CallStarted(l) + }(i, l) + } + for i := 0; i < int(count.errored); i++ { + go func(i int, l internal.LocalityID) { + ls.CallStarted(l) + ls.CallFinished(l, errTest) + }(i, l) + } + } + + time.Sleep(time.Nanosecond * intervalNano * 2) + cancel() + <-done + + lrss.mu.Lock() + defer lrss.mu.Unlock() + if !cmp.Equal(lrss.drops, drops) { + t.Errorf("different: %v", cmp.Diff(lrss.drops, drops)) + } + if !cmp.Equal(lrss.rpcs, rpcs) { + t.Errorf("different: %v", cmp.Diff(lrss.rpcs, rpcs)) + } +} diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 8b17cf93024..462a8bac59b 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -34,14 +34,10 @@ const XDSClientID = clientID("xdsClientID") // keys. // // xds.Locality cannot be map keys because one of the XXX fields is a slice. -// -// This struct should only be used as map keys. Use the proto message directly -// in all other places. -// type LocalityID struct { - Region string - Zone string - SubZone string + Region string `json:"region,omitempty"` + Zone string `json:"zone,omitempty"` + SubZone string `json:"subZone,omitempty"` } func (l LocalityID) String() string { From 900e3b38916dcf7bdfff43bf59c1612a39f1e7fa Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 11 Aug 2020 13:22:39 -0700 Subject: [PATCH 2/5] [lrs_balancing_policy] c1 --- xds/internal/balancer/lrs/balancer.go | 35 ++++++++++++----------- xds/internal/balancer/lrs/picker.go | 41 +++++++++++++++------------ xds/internal/balancer/lrs/store.go | 1 - 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go index 25fe854051e..4af91c76498 100644 --- a/xds/internal/balancer/lrs/balancer.go +++ b/xds/internal/balancer/lrs/balancer.go @@ -16,6 +16,7 @@ * */ +// Package lrs implements load reporting balancer for xds. package lrs import ( @@ -44,7 +45,7 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc buildOpts: opts, } b.loadStore = NewStore() - b.c = newXDSClientWrapper(b.loadStore) + b.client = newXDSClientWrapper(b.loadStore) b.logger = prefixLogger(b) b.logger.Infof("Created") return b @@ -64,10 +65,10 @@ type lrsBalancer struct { logger *grpclog.PrefixLogger loadStore Store - c *xdsClientWrapper + client *xdsClientWrapper config *lbConfig - b balancer.Balancer // The sub balancer. + lb balancer.Balancer // The sub balancer. } func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -82,40 +83,40 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { if bb == nil { return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) } - if b.b != nil { - b.b.Close() + if b.lb != nil { + b.lb.Close() } - b.b = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts) + b.lb = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts) } // Update load reporting config or xds client. - b.c.update(newConfig, s.ResolverState.Attributes) + b.client.update(newConfig, s.ResolverState.Attributes) b.config = newConfig // Addresses and sub-balancer config are sent to sub-balancer. - return b.b.UpdateClientConnState(balancer.ClientConnState{ + return b.lb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: s.ResolverState, BalancerConfig: b.config.ChildPolicy.Config, }) } func (b *lrsBalancer) ResolverError(err error) { - if b.b != nil { - b.b.ResolverError(err) + if b.lb != nil { + b.lb.ResolverError(err) } } func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - if b.b != nil { - b.b.UpdateSubConnState(sc, s) + if b.lb != nil { + b.lb.UpdateSubConnState(sc, s) } } func (b *lrsBalancer) Close() { - if b.b != nil { - b.b.Close() - b.b = nil + if b.lb != nil { + b.lb.Close() + b.lb = nil } - b.c.close() + b.client.close() } type ccWrapper struct { @@ -159,7 +160,7 @@ func newXDSClientWrapper(loadStore Store) *xdsClientWrapper { } } -// update checks the config and xdsclient, and decide whether it needs to +// update checks the config and xdsclient, and decides whether it needs to // restart the load reporting stream. // // TODO: refactor lrs to share one stream instead of one per EDS. diff --git a/xds/internal/balancer/lrs/picker.go b/xds/internal/balancer/lrs/picker.go index c4d6a4fa395..1fcc6e9e5b3 100644 --- a/xds/internal/balancer/lrs/picker.go +++ b/xds/internal/balancer/lrs/picker.go @@ -46,24 +46,29 @@ func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore St func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { res, err := lrp.p.Pick(info) - if lrp.loadStore != nil && err == nil { - lrp.loadStore.CallStarted(lrp.id) - td := res.Done - res.Done = func(info balancer.DoneInfo) { - lrp.loadStore.CallFinished(lrp.id, info.Err) - if load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport); ok { - lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization) - lrp.loadStore.CallServerLoad(lrp.id, serverLoadMemoryName, load.MemUtilization) - for n, d := range load.RequestCost { - lrp.loadStore.CallServerLoad(lrp.id, n, d) - } - for n, d := range load.Utilization { - lrp.loadStore.CallServerLoad(lrp.id, n, d) - } - } - if td != nil { - td(info) - } + if err != nil { + return res, err + } + + lrp.loadStore.CallStarted(lrp.id) + oldDone := res.Done + res.Done = func(info balancer.DoneInfo) { + if oldDone != nil { + oldDone(info) + } + lrp.loadStore.CallFinished(lrp.id, info.Err) + + load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) + if !ok { + return + } + lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization) + lrp.loadStore.CallServerLoad(lrp.id, serverLoadMemoryName, load.MemUtilization) + for n, d := range load.RequestCost { + lrp.loadStore.CallServerLoad(lrp.id, n, d) + } + for n, d := range load.Utilization { + lrp.loadStore.CallServerLoad(lrp.id, n, d) } } return res, err diff --git a/xds/internal/balancer/lrs/store.go b/xds/internal/balancer/lrs/store.go index 680c6f5a2c1..96c85f9cc9c 100644 --- a/xds/internal/balancer/lrs/store.go +++ b/xds/internal/balancer/lrs/store.go @@ -14,7 +14,6 @@ * limitations under the License. */ -// Package lrs implements load reporting service for xds balancer. package lrs import ( From 845f4cce01798e6b771b5dcab9fdde3b98289de3 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 11 Aug 2020 13:32:23 -0700 Subject: [PATCH 3/5] [lrs_balancing_policy] more config tests --- xds/internal/balancer/lrs/config.go | 10 +++++ xds/internal/balancer/lrs/config_test.go | 47 ++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/xds/internal/balancer/lrs/config.go b/xds/internal/balancer/lrs/config.go index 1f4e15e2630..3d39961401b 100644 --- a/xds/internal/balancer/lrs/config.go +++ b/xds/internal/balancer/lrs/config.go @@ -20,6 +20,7 @@ package lrs import ( "encoding/json" + "fmt" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" @@ -40,5 +41,14 @@ func parseConfig(c json.RawMessage) (*lbConfig, error) { if err := json.Unmarshal(c, &cfg); err != nil { return nil, err } + if cfg.ClusterName == "" { + return nil, fmt.Errorf("required ClusterName is not set in %+v", cfg) + } + if cfg.LrsLoadReportingServerName == "" { + return nil, fmt.Errorf("required LrsLoadReportingServerName is not set in %+v", cfg) + } + if cfg.Locality == nil { + return nil, fmt.Errorf("required Locality is not set in %+v", cfg) + } return &cfg, nil } diff --git a/xds/internal/balancer/lrs/config_test.go b/xds/internal/balancer/lrs/config_test.go index a650db34eb9..f49430569fe 100644 --- a/xds/internal/balancer/lrs/config_test.go +++ b/xds/internal/balancer/lrs/config_test.go @@ -41,7 +41,48 @@ func TestParseConfig(t *testing.T) { wantErr bool }{ { - name: "temp", + name: "no cluster name", + js: `{ + "edsServiceName": "test-eds-service", + "lrsLoadReportingServerName": "test-lrs-name", + "locality": { + "region": "test-region", + "zone": "test-zone", + "subZone": "test-sub-zone" + }, + "childPolicy":[{"round_robin":{}}] +} + `, + wantErr: true, + }, + { + name: "no LRS server name", + js: `{ + "clusterName": "test-cluster", + "edsServiceName": "test-eds-service", + "locality": { + "region": "test-region", + "zone": "test-zone", + "subZone": "test-sub-zone" + }, + "childPolicy":[{"round_robin":{}}] +} + `, + wantErr: true, + }, + { + name: "no locality", + js: `{ + "clusterName": "test-cluster", + "edsServiceName": "test-eds-service", + "lrsLoadReportingServerName": "test-lrs-name", + "childPolicy":[{"round_robin":{}}] +} + `, + wantErr: true, + }, + { + name: "good", js: `{ "clusterName": "test-cluster", "edsServiceName": "test-eds-service", @@ -78,8 +119,8 @@ func TestParseConfig(t *testing.T) { t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) return } - if !cmp.Equal(got, tt.want) { - t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, cmp.Diff(got, tt.want)) + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, diff) } }) } From 9c9c68a91d2794e11cad9c0a10091effb6a400d2 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 12 Aug 2020 15:15:48 -0700 Subject: [PATCH 4/5] [lrs_balancing_policy] rename --- xds/internal/balancer/lrs/{lrs_test.go => balancer_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename xds/internal/balancer/lrs/{lrs_test.go => balancer_test.go} (100%) diff --git a/xds/internal/balancer/lrs/lrs_test.go b/xds/internal/balancer/lrs/balancer_test.go similarity index 100% rename from xds/internal/balancer/lrs/lrs_test.go rename to xds/internal/balancer/lrs/balancer_test.go From 2af8c5f6c7988d932b2a6b3c107bdc9bd15dcd11 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 12 Aug 2020 15:20:57 -0700 Subject: [PATCH 5/5] [lrs_balancing_policy] todo --- xds/internal/balancer/lrs/balancer_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go index 5952d8c7da0..c57b5e1a127 100644 --- a/xds/internal/balancer/lrs/balancer_test.go +++ b/xds/internal/balancer/lrs/balancer_test.go @@ -47,6 +47,10 @@ var ( // This is a subset of testutils.fakeclient. Cannot use testutils.fakeclient // because testutils imports package lrs. +// +// TODO: after refactoring xdsclient to support load reporting, the testutils +// package won't need to depend on lrs package for the store. And we can use the +// testutils for this. type fakeXDSClient struct { loadReportCh chan *reportLoadArgs }