Skip to content

Commit

Permalink
lrs: add a layer for clusters in load store (#3880)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Sep 22, 2020
1 parent 64c4c37 commit 0dc9986
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 132 deletions.
16 changes: 5 additions & 11 deletions xds/internal/balancer/balancergroup/balancergroup.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/xds/internal/client/load"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand All @@ -32,13 +33,6 @@ import (
"google.golang.org/grpc/resolver"
)

// loadReporter wraps the methods from the loadStore that are used here.
type loadReporter interface {
CallStarted(locality string)
CallFinished(locality string, err error)
CallServerLoad(locality, name string, val float64)
}

// subBalancerWrapper is used to keep the configurations that will be used to start
// the underlying balancer. It can be called to start/stop the underlying
// balancer.
Expand Down Expand Up @@ -186,7 +180,7 @@ func (sbc *subBalancerWrapper) stopBalancer() {
type BalancerGroup struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
loadStore loadReporter
loadStore load.PerClusterReporter

// stateAggregator is where the state/picker updates will be sent to. It's
// provided by the parent balancer, to build a picker with all the
Expand Down Expand Up @@ -241,7 +235,7 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute

// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore loadReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
logger: logger,
Expand Down Expand Up @@ -500,10 +494,10 @@ type loadReportPicker struct {
p balancer.Picker

locality string
loadStore loadReporter
loadStore load.PerClusterReporter
}

func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
func newLoadReportPicker(p balancer.Picker, id string, loadStore load.PerClusterReporter) *loadReportPicker {
return &loadReportPicker{
p: p,
locality: id,
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/balancergroup/balancergroup_test.go
Expand Up @@ -70,7 +70,7 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
}
}

func newTestBalancerGroup(t *testing.T, loadStore *load.Store) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
func newTestBalancerGroup(t *testing.T, loadStore *load.PerClusterStore) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
}

func (s) TestBalancerGroup_LoadReport(t *testing.T) {
loadStore := &load.Store{}
loadStore := &load.PerClusterStore{}
cc, gator, bg := newTestBalancerGroup(t, loadStore)

backendToBalancerID := make(map[balancer.SubConn]string)
Expand Down
12 changes: 3 additions & 9 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)

// TODO: make this a environment variable?
Expand Down Expand Up @@ -450,20 +451,13 @@ func (edsImpl *edsBalancerImpl) close() {
}
}

// dropReporter wraps the single method used by the dropPicker to report dropped
// calls to the load store.
type dropReporter interface {
// CallDropped reports the drop of one RPC with the given category.
CallDropped(category string)
}

type dropPicker struct {
drops []*dropper
p balancer.Picker
loadStore dropReporter
loadStore load.PerClusterReporter
}

func newDropPicker(p balancer.Picker, drops []*dropper, loadStore dropReporter) *dropPicker {
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
Expand Down
15 changes: 3 additions & 12 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Expand Up @@ -682,22 +682,13 @@ func (s) TestDropPicker(t *testing.T) {
}
}

type loadStoreWrapper struct {
xdsClientInterface
ls *load.Store
}

func (l *loadStoreWrapper) LoadStore() *load.Store {
return l.ls
}

func (s) TestEDS_LoadReport(t *testing.T) {
// We create an xdsClientWrapper with a dummy xdsClientInterface which only
// implements the LoadStore() method to return the underlying load.Store to
// be used.
loadStore := &load.Store{}
loadStore := load.NewStore()
cw := &xdsClientWrapper{
xdsClient: &loadStoreWrapper{ls: loadStore},
load: &loadStoreWrapper{store: loadStore, service: testClusterNames[0]},
}

cc := testutils.NewTestClientConn(t)
Expand Down Expand Up @@ -745,7 +736,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
}
}

gotStoreData := loadStore.Stats()
gotStoreData := loadStore.PerCluster(testClusterNames[0], "").Stats()
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
}
Expand Down
98 changes: 63 additions & 35 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -19,6 +19,8 @@
package edsbalancer

import (
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
Expand All @@ -45,7 +47,44 @@ var (
bootstrapConfigNew = bootstrap.NewConfig
)

// xdsClientWrapper is responsible for getting the xds client from attributes or
type loadStoreWrapper struct {
mu sync.RWMutex
store *load.Store
service string
}

func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
lsw.store = store
lsw.service = service
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallStarted(locality)
}

func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallFinished(locality, err)
}

func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallServerLoad(locality, name, val)
}

func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallDropped(category)
}

// xdsclientWrapper is responsible for getting the xds client from attributes or
// creating a new xds client, and start watching EDS. The given callbacks will
// be called with EDS updates or errors.
type xdsClientWrapper struct {
Expand All @@ -58,6 +97,7 @@ type xdsClientWrapper struct {
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface

load *loadStoreWrapper
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
Expand All @@ -82,6 +122,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb
logger: logger,
newEDSUpdate: newEDSUpdate,
bbo: bbo,
load: &loadStoreWrapper{},
}
}

Expand Down Expand Up @@ -167,12 +208,11 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
//
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (c *xdsClientWrapper) startEndpointsWatch(nameToWatch string) {
func (c *xdsClientWrapper) startEndpointsWatch() {
if c.xdsClient == nil {
return
}

c.edsServiceName = nameToWatch
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
Expand All @@ -193,7 +233,7 @@ func (c *xdsClientWrapper) startEndpointsWatch(nameToWatch string) {
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (c *xdsClientWrapper) startLoadReport(edsServiceNameBeingWatched string, loadReportServer *string) {
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
if c.xdsClient == nil {
c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed")
return
Expand All @@ -203,54 +243,42 @@ func (c *xdsClientWrapper) startLoadReport(edsServiceNameBeingWatched string, lo
}
c.loadReportServer = loadReportServer
if c.loadReportServer != nil {
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, edsServiceNameBeingWatched)
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
}
}

func (c *xdsClientWrapper) loadStore() *load.Store {
if c == nil || c.xdsClient == nil {
func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil || c.load.store == nil {
return nil
}
return c.xdsClient.LoadStore()
return c.load
}

// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
clientChanged := c.updateXDSClient(config, attr)

var (
restartEndpointsWatch bool
restartLoadReport bool
)

// The clusterName to watch should come from CDS response, via service
// config. If it's an empty string, fallback user's dial target.
nameToWatch := config.EDSServiceName
if nameToWatch == "" {
c.logger.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target")
nameToWatch = c.bbo.Target.Endpoint
}

// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
// - the xds_client didn't change, but the edsServiceName changed
//
// Only need to restart load reporting when:
// - no need to restart EDS, but loadReportServer name changed
if clientChanged || c.edsServiceName != nameToWatch {
restartEndpointsWatch = true
restartLoadReport = true
} else if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
restartLoadReport = true
}

if restartEndpointsWatch {
c.startEndpointsWatch(nameToWatch)
if clientChanged || c.edsServiceName != config.EDSServiceName {
c.edsServiceName = config.EDSServiceName
c.startEndpointsWatch()
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
}

if restartLoadReport {
c.startLoadReport(nameToWatch, config.LrsLoadReportingServerName)
// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
}
}

Expand Down
28 changes: 6 additions & 22 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
Expand Up @@ -107,36 +107,20 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
}
defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()

// Update with an empty edsServiceName should trigger an EDS watch
// for the user's dial target.
cw.handleUpdate(&EDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: "",
}, nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := fakeServer.NewConnChan.Receive(ctx); err != nil {
t.Fatal("Failed to connect to fake server")
}
t.Log("Client connection established to fake server...")
if err := verifyExpectedRequests(fakeServer, testServiceName); err != nil {
t.Fatal(err)
}

// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same. The previously registered watch will be cancelled, which will
// result in an EDS request with no resource names being sent to the server.
// the same.
cw.handleUpdate(&EDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: "foobar-1",
}, nil)
if err := verifyExpectedRequests(fakeServer, "", "foobar-1"); err != nil {
if err := verifyExpectedRequests(fakeServer, "foobar-1"); err != nil {
t.Fatal(err)
}

// Also test the case where the edsServerName changes from one
// non-empty name to another, and make sure a new watch is
// registered.
// Also test the case where the edsServerName changes from one non-empty
// name to another, and make sure a new watch is registered. The previously
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
cw.handleUpdate(&EDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: "foobar-2",
Expand Down
9 changes: 6 additions & 3 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Expand Up @@ -35,16 +35,19 @@ import (
func (s) TestXDSLoadReporting(t *testing.T) {
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
edsB, ok := builder.Build(cc, balancer.BuildOptions{}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()

xdsC := fakeclient.NewClient()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{LrsLoadReportingServerName: new(string)},
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{
EDSServiceName: testEDSClusterName,
LrsLoadReportingServerName: new(string),
},
}); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 0dc9986

Please sign in to comment.