Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add LRS balancing policy #3799

Merged
merged 5 commits into from Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
216 changes: 216 additions & 0 deletions xds/internal/balancer/lrs/balancer.go
@@ -0,0 +1,216 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

easwars marked this conversation as resolved.
Show resolved Hide resolved
// Package lrs implements load reporting balancer for xds.
package lrs

import (
"encoding/json"
"fmt"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
)

func init() {
balancer.Register(&lrsBB{})
}

const lrsBalancerName = "lrs_experimental"

type lrsBB struct{}

func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &lrsBalancer{
cc: cc,
buildOpts: opts,
}
b.loadStore = NewStore()
b.client = newXDSClientWrapper(b.loadStore)
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}

func (l *lrsBB) Name() string {
return lrsBalancerName
}

func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}

type lrsBalancer struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions

logger *grpclog.PrefixLogger
loadStore Store
client *xdsClientWrapper

config *lbConfig
lb balancer.Balancer // The sub balancer.
}

func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
if b.lb != nil {
b.lb.Close()
}
b.lb = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts)
}
// Update load reporting config or xds client.
b.client.update(newConfig, s.ResolverState.Attributes)
b.config = newConfig

// Addresses and sub-balancer config are sent to sub-balancer.
return b.lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
})
}

func (b *lrsBalancer) ResolverError(err error) {
if b.lb != nil {
b.lb.ResolverError(err)
}
}

func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if b.lb != nil {
b.lb.UpdateSubConnState(sc, s)
}
}

func (b *lrsBalancer) Close() {
if b.lb != nil {
b.lb.Close()
b.lb = nil
}
b.client.close()
}

type ccWrapper struct {
balancer.ClientConn
loadStore Store
localityID *internal.LocalityID
}

func newCCWrapper(cc balancer.ClientConn, loadStore Store, localityID *internal.LocalityID) *ccWrapper {
return &ccWrapper{
ClientConn: cc,
loadStore: loadStore,
localityID: localityID,
}
}

func (ccw *ccWrapper) UpdateState(s balancer.State) {
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
ccw.ClientConn.UpdateState(s)
}

// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
ReportLoad(server string, clusterName string, loadStore Store) (cancel func())
Close()
}

type xdsClientWrapper struct {
loadStore Store

c xdsClientInterface
cancelLoadReport func()
clusterName string
lrsServerName string
}

func newXDSClientWrapper(loadStore Store) *xdsClientWrapper {
return &xdsClientWrapper{
loadStore: loadStore,
}
}

// update checks the config and xdsclient, and decides whether it needs to
// restart the load reporting stream.
//
// TODO: refactor lrs to share one stream instead of one per EDS.
func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) {
var restartLoadReport bool
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
if w.c != clientFromAttr {
// xds client is different, restart.
restartLoadReport = true
w.c = clientFromAttr
}
}
}

// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
//
// TODO: LRS request actually has separate fields from these two values.
// Update lrs.Store to set both.
newClusterName := newConfig.EdsServiceName
if newClusterName == "" {
newClusterName = newConfig.ClusterName
}
if w.clusterName != newClusterName {
restartLoadReport = true
w.clusterName = newClusterName
}

if w.lrsServerName != newConfig.LrsLoadReportingServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
restartLoadReport = true
w.lrsServerName = newConfig.LrsLoadReportingServerName
}

if restartLoadReport {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
if w.c != nil {
w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName, w.loadStore)
}
}
}

func (w *xdsClientWrapper) close() {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
}
175 changes: 175 additions & 0 deletions xds/internal/balancer/lrs/balancer_test.go
@@ -0,0 +1,175 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package lrs

import (
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
)

var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
testLocality = &xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}
)

// This is a subset of testutils.fakeclient. Cannot use testutils.fakeclient
// because testutils imports package lrs.
//
// TODO: after refactoring xdsclient to support load reporting, the testutils
// package won't need to depend on lrs package for the store. And we can use the
// testutils for this.
type fakeXDSClient struct {
loadReportCh chan *reportLoadArgs
}

func newFakeXDSClient() *fakeXDSClient {
return &fakeXDSClient{
loadReportCh: make(chan *reportLoadArgs, 10),
}
}

// reportLoadArgs wraps the arguments passed to ReportLoad.
type reportLoadArgs struct {
// server is the name of the server to which the load is reported.
server string
// cluster is the name of the cluster for which load is reported.
cluster string
// loadStore is the store where loads are stored.
loadStore interface{}
}

// ReportLoad starts reporting load about clusterName to server.
func (xdsC *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore Store) (cancel func()) {
xdsC.loadReportCh <- &reportLoadArgs{server: server, cluster: clusterName, loadStore: loadStore}
return func() {}
}

// waitForReportLoad waits for ReportLoad to be invoked on this client within a
// reasonable timeout, and returns the arguments passed to it.
func (xdsC *fakeXDSClient) waitForReportLoad() (*reportLoadArgs, error) {
select {
case <-time.After(time.Second):
return nil, fmt.Errorf("timeout")
case a := <-xdsC.loadReportCh:
return a, nil
}
}

// Close closes the xds client.
func (xdsC *fakeXDSClient) Close() {
}

// TestLoadReporting verifies that the lrs balancer starts the loadReport
// stream when the lbConfig passed to it contains a valid value for the LRS
// server (empty string).
func TestLoadReporting(t *testing.T) {
builder := balancer.Get(lrsBalancerName)
cc := testutils.NewTestClientConn(t)
lrsB := builder.Build(cc, balancer.BuildOptions{})
defer lrsB.Close()

xdsC := newFakeXDSClient()
if err := lrsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
Attributes: attributes.New(xdsinternal.XDSClientID, xdsC),
},
BalancerConfig: &lbConfig{
EdsServiceName: testClusterName,
LrsLoadReportingServerName: testLRSServerName,
Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

got, err := xdsC.waitForReportLoad()
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.server != testLRSServerName || got.cluster != testClusterName {
t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.server, got.cluster, testLRSServerName, testClusterName)
}

sc1 := <-cc.NewSubConnCh
lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Test pick with one backend.
p1 := <-cc.NewPickerCh
const successCount = 5
for i := 0; i < successCount; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
}
const errorCount = 5
for i := 0; i < errorCount; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
}

loads := make(map[xdsinternal.LocalityID]*rpcCountData)

got.loadStore.(*lrsStore).localityRPCCount.Range(
func(key, value interface{}) bool {
loads[key.(xdsinternal.LocalityID)] = value.(*rpcCountData)
return true
},
)

countData, ok := loads[*testLocality]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
if *countData.succeeded != successCount {
t.Errorf("got succeeded %v, want %v", *countData.succeeded, successCount)
}
if *countData.errored != errorCount {
t.Errorf("got errord %v, want %v", *countData.errored, errorCount)
}
if *countData.inProgress != 0 {
t.Errorf("got inProgress %v, want %v", *countData.inProgress, 0)
}
}