diff --git a/xds/internal/balancer/lrs/balancer.go b/xds/internal/balancer/lrs/balancer.go new file mode 100644 index 00000000000..4af91c76498 --- /dev/null +++ b/xds/internal/balancer/lrs/balancer.go @@ -0,0 +1,216 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package lrs implements load reporting balancer for xds. +package lrs + +import ( + "encoding/json" + "fmt" + + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" + xdsinternal "google.golang.org/grpc/xds/internal" +) + +func init() { + balancer.Register(&lrsBB{}) +} + +const lrsBalancerName = "lrs_experimental" + +type lrsBB struct{} + +func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + b := &lrsBalancer{ + cc: cc, + buildOpts: opts, + } + b.loadStore = NewStore() + b.client = newXDSClientWrapper(b.loadStore) + b.logger = prefixLogger(b) + b.logger.Infof("Created") + return b +} + +func (l *lrsBB) Name() string { + return lrsBalancerName +} + +func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return parseConfig(c) +} + +type lrsBalancer struct { + cc balancer.ClientConn + buildOpts balancer.BuildOptions + + logger *grpclog.PrefixLogger + loadStore Store + client *xdsClientWrapper + + config *lbConfig + lb balancer.Balancer // The sub balancer. +} + +func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + newConfig, ok := s.BalancerConfig.(*lbConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + } + + // If child policy is a different type, recreate the sub-balancer. + if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { + bb := balancer.Get(newConfig.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name) + } + if b.lb != nil { + b.lb.Close() + } + b.lb = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts) + } + // Update load reporting config or xds client. + b.client.update(newConfig, s.ResolverState.Attributes) + b.config = newConfig + + // Addresses and sub-balancer config are sent to sub-balancer. + return b.lb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: b.config.ChildPolicy.Config, + }) +} + +func (b *lrsBalancer) ResolverError(err error) { + if b.lb != nil { + b.lb.ResolverError(err) + } +} + +func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { + if b.lb != nil { + b.lb.UpdateSubConnState(sc, s) + } +} + +func (b *lrsBalancer) Close() { + if b.lb != nil { + b.lb.Close() + b.lb = nil + } + b.client.close() +} + +type ccWrapper struct { + balancer.ClientConn + loadStore Store + localityID *internal.LocalityID +} + +func newCCWrapper(cc balancer.ClientConn, loadStore Store, localityID *internal.LocalityID) *ccWrapper { + return &ccWrapper{ + ClientConn: cc, + loadStore: loadStore, + localityID: localityID, + } +} + +func (ccw *ccWrapper) UpdateState(s balancer.State) { + s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore) + ccw.ClientConn.UpdateState(s) +} + +// xdsClientInterface contains only the xds_client methods needed by LRS +// balancer. It's defined so we can override xdsclient in tests. +type xdsClientInterface interface { + ReportLoad(server string, clusterName string, loadStore Store) (cancel func()) + Close() +} + +type xdsClientWrapper struct { + loadStore Store + + c xdsClientInterface + cancelLoadReport func() + clusterName string + lrsServerName string +} + +func newXDSClientWrapper(loadStore Store) *xdsClientWrapper { + return &xdsClientWrapper{ + loadStore: loadStore, + } +} + +// update checks the config and xdsclient, and decides whether it needs to +// restart the load reporting stream. +// +// TODO: refactor lrs to share one stream instead of one per EDS. +func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) { + var restartLoadReport bool + if attr != nil { + if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil { + if w.c != clientFromAttr { + // xds client is different, restart. + restartLoadReport = true + w.c = clientFromAttr + } + } + } + + // ClusterName is different, restart. ClusterName is from ClusterName and + // EdsServiceName. + // + // TODO: LRS request actually has separate fields from these two values. + // Update lrs.Store to set both. + newClusterName := newConfig.EdsServiceName + if newClusterName == "" { + newClusterName = newConfig.ClusterName + } + if w.clusterName != newClusterName { + restartLoadReport = true + w.clusterName = newClusterName + } + + if w.lrsServerName != newConfig.LrsLoadReportingServerName { + // LrsLoadReportingServerName is different, load should be report to a + // different server, restart. + restartLoadReport = true + w.lrsServerName = newConfig.LrsLoadReportingServerName + } + + if restartLoadReport { + if w.cancelLoadReport != nil { + w.cancelLoadReport() + w.cancelLoadReport = nil + } + if w.c != nil { + w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName, w.loadStore) + } + } +} + +func (w *xdsClientWrapper) close() { + if w.cancelLoadReport != nil { + w.cancelLoadReport() + w.cancelLoadReport = nil + } +} diff --git a/xds/internal/balancer/lrs/balancer_test.go b/xds/internal/balancer/lrs/balancer_test.go new file mode 100644 index 00000000000..c57b5e1a127 --- /dev/null +++ b/xds/internal/balancer/lrs/balancer_test.go @@ -0,0 +1,175 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + xdsinternal "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/testutils" +) + +var ( + testBackendAddrs = []resolver.Address{ + {Addr: "1.1.1.1:1"}, + } + testLocality = &xdsinternal.LocalityID{ + Region: "test-region", + Zone: "test-zone", + SubZone: "test-sub-zone", + } +) + +// This is a subset of testutils.fakeclient. Cannot use testutils.fakeclient +// because testutils imports package lrs. +// +// TODO: after refactoring xdsclient to support load reporting, the testutils +// package won't need to depend on lrs package for the store. And we can use the +// testutils for this. +type fakeXDSClient struct { + loadReportCh chan *reportLoadArgs +} + +func newFakeXDSClient() *fakeXDSClient { + return &fakeXDSClient{ + loadReportCh: make(chan *reportLoadArgs, 10), + } +} + +// reportLoadArgs wraps the arguments passed to ReportLoad. +type reportLoadArgs struct { + // server is the name of the server to which the load is reported. + server string + // cluster is the name of the cluster for which load is reported. + cluster string + // loadStore is the store where loads are stored. + loadStore interface{} +} + +// ReportLoad starts reporting load about clusterName to server. +func (xdsC *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore Store) (cancel func()) { + xdsC.loadReportCh <- &reportLoadArgs{server: server, cluster: clusterName, loadStore: loadStore} + return func() {} +} + +// waitForReportLoad waits for ReportLoad to be invoked on this client within a +// reasonable timeout, and returns the arguments passed to it. +func (xdsC *fakeXDSClient) waitForReportLoad() (*reportLoadArgs, error) { + select { + case <-time.After(time.Second): + return nil, fmt.Errorf("timeout") + case a := <-xdsC.loadReportCh: + return a, nil + } +} + +// Close closes the xds client. +func (xdsC *fakeXDSClient) Close() { +} + +// TestLoadReporting verifies that the lrs balancer starts the loadReport +// stream when the lbConfig passed to it contains a valid value for the LRS +// server (empty string). +func TestLoadReporting(t *testing.T) { + builder := balancer.Get(lrsBalancerName) + cc := testutils.NewTestClientConn(t) + lrsB := builder.Build(cc, balancer.BuildOptions{}) + defer lrsB.Close() + + xdsC := newFakeXDSClient() + if err := lrsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: testBackendAddrs, + Attributes: attributes.New(xdsinternal.XDSClientID, xdsC), + }, + BalancerConfig: &lbConfig{ + EdsServiceName: testClusterName, + LrsLoadReportingServerName: testLRSServerName, + Locality: testLocality, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + }); err != nil { + t.Fatalf("unexpected error from UpdateClientConnState: %v", err) + } + + got, err := xdsC.waitForReportLoad() + if err != nil { + t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) + } + if got.server != testLRSServerName || got.cluster != testClusterName { + t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.server, got.cluster, testLRSServerName, testClusterName) + } + + sc1 := <-cc.NewSubConnCh + lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test pick with one backend. + p1 := <-cc.NewPickerCh + const successCount = 5 + for i := 0; i < successCount; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + gotSCSt.Done(balancer.DoneInfo{}) + } + const errorCount = 5 + for i := 0; i < errorCount; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) + } + + loads := make(map[xdsinternal.LocalityID]*rpcCountData) + + got.loadStore.(*lrsStore).localityRPCCount.Range( + func(key, value interface{}) bool { + loads[key.(xdsinternal.LocalityID)] = value.(*rpcCountData) + return true + }, + ) + + countData, ok := loads[*testLocality] + if !ok { + t.Fatalf("loads for %v not found in store", testLocality) + } + if *countData.succeeded != successCount { + t.Errorf("got succeeded %v, want %v", *countData.succeeded, successCount) + } + if *countData.errored != errorCount { + t.Errorf("got errord %v, want %v", *countData.errored, errorCount) + } + if *countData.inProgress != 0 { + t.Errorf("got inProgress %v, want %v", *countData.inProgress, 0) + } +} diff --git a/xds/internal/balancer/lrs/config.go b/xds/internal/balancer/lrs/config.go new file mode 100644 index 00000000000..3d39961401b --- /dev/null +++ b/xds/internal/balancer/lrs/config.go @@ -0,0 +1,54 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "encoding/json" + "fmt" + + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal" +) + +type lbConfig struct { + serviceconfig.LoadBalancingConfig + ClusterName string + EdsServiceName string + LrsLoadReportingServerName string + Locality *internal.LocalityID + ChildPolicy *internalserviceconfig.BalancerConfig +} + +func parseConfig(c json.RawMessage) (*lbConfig, error) { + var cfg lbConfig + if err := json.Unmarshal(c, &cfg); err != nil { + return nil, err + } + if cfg.ClusterName == "" { + return nil, fmt.Errorf("required ClusterName is not set in %+v", cfg) + } + if cfg.LrsLoadReportingServerName == "" { + return nil, fmt.Errorf("required LrsLoadReportingServerName is not set in %+v", cfg) + } + if cfg.Locality == nil { + return nil, fmt.Errorf("required Locality is not set in %+v", cfg) + } + return &cfg, nil +} diff --git a/xds/internal/balancer/lrs/config_test.go b/xds/internal/balancer/lrs/config_test.go new file mode 100644 index 00000000000..f49430569fe --- /dev/null +++ b/xds/internal/balancer/lrs/config_test.go @@ -0,0 +1,127 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer/roundrobin" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" +) + +const ( + testClusterName = "test-cluster" + testServiceName = "test-eds-service" + testLRSServerName = "test-lrs-name" +) + +func TestParseConfig(t *testing.T) { + tests := []struct { + name string + js string + want *lbConfig + wantErr bool + }{ + { + name: "no cluster name", + js: `{ + "edsServiceName": "test-eds-service", + "lrsLoadReportingServerName": "test-lrs-name", + "locality": { + "region": "test-region", + "zone": "test-zone", + "subZone": "test-sub-zone" + }, + "childPolicy":[{"round_robin":{}}] +} + `, + wantErr: true, + }, + { + name: "no LRS server name", + js: `{ + "clusterName": "test-cluster", + "edsServiceName": "test-eds-service", + "locality": { + "region": "test-region", + "zone": "test-zone", + "subZone": "test-sub-zone" + }, + "childPolicy":[{"round_robin":{}}] +} + `, + wantErr: true, + }, + { + name: "no locality", + js: `{ + "clusterName": "test-cluster", + "edsServiceName": "test-eds-service", + "lrsLoadReportingServerName": "test-lrs-name", + "childPolicy":[{"round_robin":{}}] +} + `, + wantErr: true, + }, + { + name: "good", + js: `{ + "clusterName": "test-cluster", + "edsServiceName": "test-eds-service", + "lrsLoadReportingServerName": "test-lrs-name", + "locality": { + "region": "test-region", + "zone": "test-zone", + "subZone": "test-sub-zone" + }, + "childPolicy":[{"round_robin":{}}] +} + `, + want: &lbConfig{ + ClusterName: testClusterName, + EdsServiceName: testServiceName, + LrsLoadReportingServerName: testLRSServerName, + Locality: &xdsinternal.LocalityID{ + Region: "test-region", + Zone: "test-zone", + SubZone: "test-sub-zone", + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + Config: nil, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseConfig([]byte(tt.js)) + if (err != nil) != tt.wantErr { + t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, diff) + } + }) + } +} diff --git a/xds/internal/balancer/lrs/logging.go b/xds/internal/balancer/lrs/logging.go new file mode 100644 index 00000000000..602dac09959 --- /dev/null +++ b/xds/internal/balancer/lrs/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[lrs-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *lrsBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/lrs/picker.go b/xds/internal/balancer/lrs/picker.go new file mode 100644 index 00000000000..1fcc6e9e5b3 --- /dev/null +++ b/xds/internal/balancer/lrs/picker.go @@ -0,0 +1,75 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrs + +import ( + orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/xds/internal" +) + +const ( + serverLoadCPUName = "cpu_utilization" + serverLoadMemoryName = "mem_utilization" +) + +type loadReportPicker struct { + p balancer.Picker + + id internal.LocalityID + loadStore Store +} + +func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore Store) *loadReportPicker { + return &loadReportPicker{ + p: p, + id: id, + loadStore: loadStore, + } +} + +func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + res, err := lrp.p.Pick(info) + if err != nil { + return res, err + } + + lrp.loadStore.CallStarted(lrp.id) + oldDone := res.Done + res.Done = func(info balancer.DoneInfo) { + if oldDone != nil { + oldDone(info) + } + lrp.loadStore.CallFinished(lrp.id, info.Err) + + load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport) + if !ok { + return + } + lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization) + lrp.loadStore.CallServerLoad(lrp.id, serverLoadMemoryName, load.MemUtilization) + for n, d := range load.RequestCost { + lrp.loadStore.CallServerLoad(lrp.id, n, d) + } + for n, d := range load.Utilization { + lrp.loadStore.CallServerLoad(lrp.id, n, d) + } + } + return res, err +} diff --git a/xds/internal/balancer/lrs/lrs.go b/xds/internal/balancer/lrs/store.go similarity index 98% rename from xds/internal/balancer/lrs/lrs.go rename to xds/internal/balancer/lrs/store.go index 4bc20ef6e12..96c85f9cc9c 100644 --- a/xds/internal/balancer/lrs/lrs.go +++ b/xds/internal/balancer/lrs/store.go @@ -14,7 +14,6 @@ * limitations under the License. */ -// Package lrs implements load reporting service for xds balancer. package lrs import ( @@ -29,15 +28,12 @@ import ( lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" "github.com/golang/protobuf/ptypes" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/xds/internal" ) const negativeOneUInt64 = ^uint64(0) -var logger = grpclog.Component("xds") - // Store defines the interface for a load store. It keeps loads and can report // them to a server when requested. type Store interface { diff --git a/xds/internal/balancer/lrs/lrs_test.go b/xds/internal/balancer/lrs/store_test.go similarity index 100% rename from xds/internal/balancer/lrs/lrs_test.go rename to xds/internal/balancer/lrs/store_test.go diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 8b17cf93024..462a8bac59b 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -34,14 +34,10 @@ const XDSClientID = clientID("xdsClientID") // keys. // // xds.Locality cannot be map keys because one of the XXX fields is a slice. -// -// This struct should only be used as map keys. Use the proto message directly -// in all other places. -// type LocalityID struct { - Region string - Zone string - SubZone string + Region string `json:"region,omitempty"` + Zone string `json:"zone,omitempty"` + SubZone string `json:"subZone,omitempty"` } func (l LocalityID) String() string {