Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add LRS balancing policy #3799

Merged
merged 5 commits into from Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
215 changes: 215 additions & 0 deletions xds/internal/balancer/lrs/balancer.go
@@ -0,0 +1,215 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

easwars marked this conversation as resolved.
Show resolved Hide resolved
package lrs

import (
"encoding/json"
"fmt"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
)

func init() {
balancer.Register(&lrsBB{})
}

const lrsBalancerName = "lrs_experimental"

type lrsBB struct{}

func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &lrsBalancer{
cc: cc,
buildOpts: opts,
}
b.loadStore = NewStore()
b.c = newXDSClientWrapper(b.loadStore)
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}

func (l *lrsBB) Name() string {
return lrsBalancerName
}

func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}

type lrsBalancer struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions

logger *grpclog.PrefixLogger
loadStore Store
c *xdsClientWrapper
easwars marked this conversation as resolved.
Show resolved Hide resolved

config *lbConfig
b balancer.Balancer // The sub balancer.
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
if b.b != nil {
b.b.Close()
}
b.b = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts)
}
// Update load reporting config or xds client.
b.c.update(newConfig, s.ResolverState.Attributes)
b.config = newConfig

// Addresses and sub-balancer config are sent to sub-balancer.
return b.b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
})
}

func (b *lrsBalancer) ResolverError(err error) {
if b.b != nil {
b.b.ResolverError(err)
}
}

func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if b.b != nil {
b.b.UpdateSubConnState(sc, s)
}
}

func (b *lrsBalancer) Close() {
if b.b != nil {
b.b.Close()
b.b = nil
}
b.c.close()
}

type ccWrapper struct {
balancer.ClientConn
loadStore Store
localityID *internal.LocalityID
}

func newCCWrapper(cc balancer.ClientConn, loadStore Store, localityID *internal.LocalityID) *ccWrapper {
return &ccWrapper{
ClientConn: cc,
loadStore: loadStore,
localityID: localityID,
}
}

func (ccw *ccWrapper) UpdateState(s balancer.State) {
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
ccw.ClientConn.UpdateState(s)
}

// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
ReportLoad(server string, clusterName string, loadStore Store) (cancel func())
Close()
}

type xdsClientWrapper struct {
loadStore Store

c xdsClientInterface
cancelLoadReport func()
clusterName string
lrsServerName string
}

func newXDSClientWrapper(loadStore Store) *xdsClientWrapper {
return &xdsClientWrapper{
loadStore: loadStore,
}
}

// update checks the config and xdsclient, and decide whether it needs to
easwars marked this conversation as resolved.
Show resolved Hide resolved
// restart the load reporting stream.
//
// TODO: refactor lrs to share one stream instead of one per EDS.
func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) {
var restartLoadReport bool
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
if w.c != clientFromAttr {
// xds client is different, restart.
restartLoadReport = true
w.c = clientFromAttr
}
}
}

// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
//
// TODO: LRS request actually has separate fields from these two values.
// Update lrs.Store to set both.
newClusterName := newConfig.EdsServiceName
if newClusterName == "" {
newClusterName = newConfig.ClusterName
}
if w.clusterName != newClusterName {
restartLoadReport = true
w.clusterName = newClusterName
}

if w.lrsServerName != newConfig.LrsLoadReportingServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
restartLoadReport = true
w.lrsServerName = newConfig.LrsLoadReportingServerName
}

if restartLoadReport {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
if w.c != nil {
w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName, w.loadStore)
}
}
}

func (w *xdsClientWrapper) close() {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
}
44 changes: 44 additions & 0 deletions xds/internal/balancer/lrs/config.go
@@ -0,0 +1,44 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package lrs

import (
"encoding/json"

internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
)

type lbConfig struct {
serviceconfig.LoadBalancingConfig
ClusterName string
EdsServiceName string
LrsLoadReportingServerName string
Locality *internal.LocalityID
ChildPolicy *internalserviceconfig.BalancerConfig
}

func parseConfig(c json.RawMessage) (*lbConfig, error) {
var cfg lbConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
86 changes: 86 additions & 0 deletions xds/internal/balancer/lrs/config_test.go
@@ -0,0 +1,86 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package lrs

import (
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer/roundrobin"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
)

const (
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
)

func TestParseConfig(t *testing.T) {
tests := []struct {
name string
js string
want *lbConfig
wantErr bool
}{
easwars marked this conversation as resolved.
Show resolved Hide resolved
{
name: "temp",
easwars marked this conversation as resolved.
Show resolved Hide resolved
js: `{
"clusterName": "test-cluster",
"edsServiceName": "test-eds-service",
"lrsLoadReportingServerName": "test-lrs-name",
"locality": {
"region": "test-region",
"zone": "test-zone",
"subZone": "test-sub-zone"
},
"childPolicy":[{"round_robin":{}}]
}
`,
want: &lbConfig{
ClusterName: testClusterName,
EdsServiceName: testServiceName,
LrsLoadReportingServerName: testLRSServerName,
Locality: &xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
Config: nil,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseConfig([]byte(tt.js))
if (err != nil) != tt.wantErr {
t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("parseConfig() got = %v, want %v, diff: %s", got, tt.want, cmp.Diff(got, tt.want))
}
})
}
}
34 changes: 34 additions & 0 deletions xds/internal/balancer/lrs/logging.go
@@ -0,0 +1,34 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package lrs

import (
"fmt"

"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)

const prefix = "[lrs-lb %p] "

var logger = grpclog.Component("xds")

func prefixLogger(p *lrsBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}