Skip to content

Commit

Permalink
xds: add LRS balancing policy (#3799)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Aug 13, 2020
1 parent 0baf4c2 commit 1605756
Show file tree
Hide file tree
Showing 9 changed files with 684 additions and 11 deletions.
216 changes: 216 additions & 0 deletions xds/internal/balancer/lrs/balancer.go
@@ -0,0 +1,216 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package lrs implements load reporting balancer for xds.
package lrs

import (
"encoding/json"
"fmt"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
)

func init() {
balancer.Register(&lrsBB{})
}

const lrsBalancerName = "lrs_experimental"

type lrsBB struct{}

func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &lrsBalancer{
cc: cc,
buildOpts: opts,
}
b.loadStore = NewStore()
b.client = newXDSClientWrapper(b.loadStore)
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}

func (l *lrsBB) Name() string {
return lrsBalancerName
}

func (l *lrsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}

type lrsBalancer struct {
cc balancer.ClientConn
buildOpts balancer.BuildOptions

logger *grpclog.PrefixLogger
loadStore Store
client *xdsClientWrapper

config *lbConfig
lb balancer.Balancer // The sub balancer.
}

func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

// If child policy is a different type, recreate the sub-balancer.
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
bb := balancer.Get(newConfig.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
}
if b.lb != nil {
b.lb.Close()
}
b.lb = bb.Build(newCCWrapper(b.cc, b.loadStore, newConfig.Locality), b.buildOpts)
}
// Update load reporting config or xds client.
b.client.update(newConfig, s.ResolverState.Attributes)
b.config = newConfig

// Addresses and sub-balancer config are sent to sub-balancer.
return b.lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
})
}

func (b *lrsBalancer) ResolverError(err error) {
if b.lb != nil {
b.lb.ResolverError(err)
}
}

func (b *lrsBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
if b.lb != nil {
b.lb.UpdateSubConnState(sc, s)
}
}

func (b *lrsBalancer) Close() {
if b.lb != nil {
b.lb.Close()
b.lb = nil
}
b.client.close()
}

type ccWrapper struct {
balancer.ClientConn
loadStore Store
localityID *internal.LocalityID
}

func newCCWrapper(cc balancer.ClientConn, loadStore Store, localityID *internal.LocalityID) *ccWrapper {
return &ccWrapper{
ClientConn: cc,
loadStore: loadStore,
localityID: localityID,
}
}

func (ccw *ccWrapper) UpdateState(s balancer.State) {
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
ccw.ClientConn.UpdateState(s)
}

// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
ReportLoad(server string, clusterName string, loadStore Store) (cancel func())
Close()
}

type xdsClientWrapper struct {
loadStore Store

c xdsClientInterface
cancelLoadReport func()
clusterName string
lrsServerName string
}

func newXDSClientWrapper(loadStore Store) *xdsClientWrapper {
return &xdsClientWrapper{
loadStore: loadStore,
}
}

// update checks the config and xdsclient, and decides whether it needs to
// restart the load reporting stream.
//
// TODO: refactor lrs to share one stream instead of one per EDS.
func (w *xdsClientWrapper) update(newConfig *lbConfig, attr *attributes.Attributes) {
var restartLoadReport bool
if attr != nil {
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
if w.c != clientFromAttr {
// xds client is different, restart.
restartLoadReport = true
w.c = clientFromAttr
}
}
}

// ClusterName is different, restart. ClusterName is from ClusterName and
// EdsServiceName.
//
// TODO: LRS request actually has separate fields from these two values.
// Update lrs.Store to set both.
newClusterName := newConfig.EdsServiceName
if newClusterName == "" {
newClusterName = newConfig.ClusterName
}
if w.clusterName != newClusterName {
restartLoadReport = true
w.clusterName = newClusterName
}

if w.lrsServerName != newConfig.LrsLoadReportingServerName {
// LrsLoadReportingServerName is different, load should be report to a
// different server, restart.
restartLoadReport = true
w.lrsServerName = newConfig.LrsLoadReportingServerName
}

if restartLoadReport {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
if w.c != nil {
w.cancelLoadReport = w.c.ReportLoad(w.lrsServerName, w.clusterName, w.loadStore)
}
}
}

func (w *xdsClientWrapper) close() {
if w.cancelLoadReport != nil {
w.cancelLoadReport()
w.cancelLoadReport = nil
}
}
175 changes: 175 additions & 0 deletions xds/internal/balancer/lrs/balancer_test.go
@@ -0,0 +1,175 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package lrs

import (
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
)

var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
testLocality = &xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}
)

// This is a subset of testutils.fakeclient. Cannot use testutils.fakeclient
// because testutils imports package lrs.
//
// TODO: after refactoring xdsclient to support load reporting, the testutils
// package won't need to depend on lrs package for the store. And we can use the
// testutils for this.
type fakeXDSClient struct {
loadReportCh chan *reportLoadArgs
}

func newFakeXDSClient() *fakeXDSClient {
return &fakeXDSClient{
loadReportCh: make(chan *reportLoadArgs, 10),
}
}

// reportLoadArgs wraps the arguments passed to ReportLoad.
type reportLoadArgs struct {
// server is the name of the server to which the load is reported.
server string
// cluster is the name of the cluster for which load is reported.
cluster string
// loadStore is the store where loads are stored.
loadStore interface{}
}

// ReportLoad starts reporting load about clusterName to server.
func (xdsC *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore Store) (cancel func()) {
xdsC.loadReportCh <- &reportLoadArgs{server: server, cluster: clusterName, loadStore: loadStore}
return func() {}
}

// waitForReportLoad waits for ReportLoad to be invoked on this client within a
// reasonable timeout, and returns the arguments passed to it.
func (xdsC *fakeXDSClient) waitForReportLoad() (*reportLoadArgs, error) {
select {
case <-time.After(time.Second):
return nil, fmt.Errorf("timeout")
case a := <-xdsC.loadReportCh:
return a, nil
}
}

// Close closes the xds client.
func (xdsC *fakeXDSClient) Close() {
}

// TestLoadReporting verifies that the lrs balancer starts the loadReport
// stream when the lbConfig passed to it contains a valid value for the LRS
// server (empty string).
func TestLoadReporting(t *testing.T) {
builder := balancer.Get(lrsBalancerName)
cc := testutils.NewTestClientConn(t)
lrsB := builder.Build(cc, balancer.BuildOptions{})
defer lrsB.Close()

xdsC := newFakeXDSClient()
if err := lrsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
Attributes: attributes.New(xdsinternal.XDSClientID, xdsC),
},
BalancerConfig: &lbConfig{
EdsServiceName: testClusterName,
LrsLoadReportingServerName: testLRSServerName,
Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

got, err := xdsC.waitForReportLoad()
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.server != testLRSServerName || got.cluster != testClusterName {
t.Fatalf("xdsClient.ReportLoad called with {%q, %q}: want {%q, %q}", got.server, got.cluster, testLRSServerName, testClusterName)
}

sc1 := <-cc.NewSubConnCh
lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
lrsB.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Test pick with one backend.
p1 := <-cc.NewPickerCh
const successCount = 5
for i := 0; i < successCount; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
}
const errorCount = 5
for i := 0; i < errorCount; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
}

loads := make(map[xdsinternal.LocalityID]*rpcCountData)

got.loadStore.(*lrsStore).localityRPCCount.Range(
func(key, value interface{}) bool {
loads[key.(xdsinternal.LocalityID)] = value.(*rpcCountData)
return true
},
)

countData, ok := loads[*testLocality]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
if *countData.succeeded != successCount {
t.Errorf("got succeeded %v, want %v", *countData.succeeded, successCount)
}
if *countData.errored != errorCount {
t.Errorf("got errord %v, want %v", *countData.errored, errorCount)
}
if *countData.inProgress != 0 {
t.Errorf("got inProgress %v, want %v", *countData.inProgress, 0)
}
}

0 comments on commit 1605756

Please sign in to comment.