Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lrs: add a layer for clusters in load store #3880

Merged
merged 3 commits into from Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 5 additions & 11 deletions xds/internal/balancer/balancergroup/balancergroup.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/xds/internal/client/load"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand All @@ -32,13 +33,6 @@ import (
"google.golang.org/grpc/resolver"
)

// loadReporter wraps the methods from the loadStore that are used here.
type loadReporter interface {
CallStarted(locality string)
CallFinished(locality string, err error)
CallServerLoad(locality, name string, val float64)
}

// subBalancerWrapper is used to keep the configurations that will be used to start
// the underlying balancer. It can be called to start/stop the underlying
// balancer.
Expand Down Expand Up @@ -186,7 +180,7 @@ func (sbc *subBalancerWrapper) stopBalancer() {
type BalancerGroup struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
loadStore loadReporter
loadStore load.PerClusterReporter

// stateAggregator is where the state/picker updates will be sent to. It's
// provided by the parent balancer, to build a picker with all the
Expand Down Expand Up @@ -241,7 +235,7 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute

// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore loadReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
logger: logger,
Expand Down Expand Up @@ -500,10 +494,10 @@ type loadReportPicker struct {
p balancer.Picker

locality string
loadStore loadReporter
loadStore load.PerClusterReporter
}

func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
func newLoadReportPicker(p balancer.Picker, id string, loadStore load.PerClusterReporter) *loadReportPicker {
return &loadReportPicker{
p: p,
locality: id,
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/balancergroup/balancergroup_test.go
Expand Up @@ -70,7 +70,7 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
}
}

func newTestBalancerGroup(t *testing.T, loadStore *load.Store) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
func newTestBalancerGroup(t *testing.T, loadStore *load.PerClusterStore) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
}

func (s) TestBalancerGroup_LoadReport(t *testing.T) {
loadStore := &load.Store{}
loadStore := &load.PerClusterStore{}
cc, gator, bg := newTestBalancerGroup(t, loadStore)

backendToBalancerID := make(map[balancer.SubConn]string)
Expand Down
12 changes: 3 additions & 9 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)

// TODO: make this a environment variable?
Expand Down Expand Up @@ -450,20 +451,13 @@ func (edsImpl *edsBalancerImpl) close() {
}
}

// dropReporter wraps the single method used by the dropPicker to report dropped
// calls to the load store.
type dropReporter interface {
// CallDropped reports the drop of one RPC with the given category.
CallDropped(category string)
}

type dropPicker struct {
drops []*dropper
p balancer.Picker
loadStore dropReporter
loadStore load.PerClusterReporter
}

func newDropPicker(p balancer.Picker, drops []*dropper, loadStore dropReporter) *dropPicker {
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
Expand Down
15 changes: 3 additions & 12 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Expand Up @@ -682,22 +682,13 @@ func (s) TestDropPicker(t *testing.T) {
}
}

type loadStoreWrapper struct {
xdsClientInterface
ls *load.Store
}

func (l *loadStoreWrapper) LoadStore() *load.Store {
return l.ls
}

func (s) TestEDS_LoadReport(t *testing.T) {
// We create an xdsClientWrapper with a dummy xdsClientInterface which only
// implements the LoadStore() method to return the underlying load.Store to
// be used.
loadStore := &load.Store{}
loadStore := load.NewStore()
cw := &xdsClientWrapper{
xdsClient: &loadStoreWrapper{ls: loadStore},
load: &loadStoreWrapper{store: loadStore, service: testClusterNames[0]},
}

cc := testutils.NewTestClientConn(t)
Expand Down Expand Up @@ -745,7 +736,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
}
}

gotStoreData := loadStore.Stats()
gotStoreData := loadStore.PerCluster(testClusterNames[0], "").Stats()
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("store.Stats() returned unexpected diff (-want +got):\n%s", diff)
}
Expand Down
98 changes: 63 additions & 35 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Expand Up @@ -19,6 +19,8 @@
package edsbalancer

import (
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
Expand All @@ -45,7 +47,44 @@ var (
bootstrapConfigNew = bootstrap.NewConfig
)

// xdsClientWrapper is responsible for getting the xds client from attributes or
type loadStoreWrapper struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this type?

If we cached the PerClusterStore in the xdsClientWrapper whenever the LRS stream is to be updated (either the client changed, or the LRS server name changed or the edsServiceName changed), then it will simplify things, wouldn't it?

Also, I feel that caching the PerClusterStore would reduce lock contention, and the Store can also be simplified by not having to require a RWLock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When clusterName/serviceName is updated, the cached PerClusterStore needs to be updated, and passed to balancer group. But there's no way to update the load store in a balancer group.

mu sync.RWMutex
store *load.Store
service string
}

func (lsw *loadStoreWrapper) update(store *load.Store, service string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
lsw.store = store
lsw.service = service
}

func (lsw *loadStoreWrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallStarted(locality)
}

func (lsw *loadStoreWrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallFinished(locality, err)
}

func (lsw *loadStoreWrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallServerLoad(locality, name, val)
}

func (lsw *loadStoreWrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
lsw.store.PerCluster(lsw.service, "").CallDropped(category)
}

// xdsclientWrapper is responsible for getting the xds client from attributes or
// creating a new xds client, and start watching EDS. The given callbacks will
// be called with EDS updates or errors.
type xdsClientWrapper struct {
Expand All @@ -58,6 +97,7 @@ type xdsClientWrapper struct {
// xdsClient could come from attributes, or created with balancerName.
xdsClient xdsClientInterface

load *loadStoreWrapper
// edsServiceName is the edsServiceName currently being watched, not
// necessary the edsServiceName from service config.
//
Expand All @@ -82,6 +122,7 @@ func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bb
logger: logger,
newEDSUpdate: newEDSUpdate,
bbo: bbo,
load: &loadStoreWrapper{},
}
}

Expand Down Expand Up @@ -167,12 +208,11 @@ func (c *xdsClientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.A
//
// This usually means load report needs to be restarted, but this function does
// NOT do that. Caller needs to call startLoadReport separately.
func (c *xdsClientWrapper) startEndpointsWatch(nameToWatch string) {
func (c *xdsClientWrapper) startEndpointsWatch() {
if c.xdsClient == nil {
return
}

c.edsServiceName = nameToWatch
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
Expand All @@ -193,7 +233,7 @@ func (c *xdsClientWrapper) startEndpointsWatch(nameToWatch string) {
// Caller can cal this when the loadReportServer name changes, but
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
// watch).
func (c *xdsClientWrapper) startLoadReport(edsServiceNameBeingWatched string, loadReportServer *string) {
func (c *xdsClientWrapper) startLoadReport(loadReportServer *string) {
if c.xdsClient == nil {
c.logger.Warningf("xds: xdsClient is nil when trying to start load reporting. This means xdsClient wasn't passed in from the resolver, and xdsClient.New failed")
return
Expand All @@ -203,54 +243,42 @@ func (c *xdsClientWrapper) startLoadReport(edsServiceNameBeingWatched string, lo
}
c.loadReportServer = loadReportServer
if c.loadReportServer != nil {
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, edsServiceNameBeingWatched)
c.cancelLoadReport = c.xdsClient.ReportLoad(*c.loadReportServer, c.edsServiceName)
}
}

func (c *xdsClientWrapper) loadStore() *load.Store {
if c == nil || c.xdsClient == nil {
func (c *xdsClientWrapper) loadStore() load.PerClusterReporter {
if c == nil || c.load.store == nil {
return nil
}
return c.xdsClient.LoadStore()
return c.load
}

// handleUpdate applies the service config and attributes updates to the client,
// including updating the xds_client to use, and updating the EDS name to watch.
func (c *xdsClientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
clientChanged := c.updateXDSClient(config, attr)

var (
restartEndpointsWatch bool
restartLoadReport bool
)

// The clusterName to watch should come from CDS response, via service
// config. If it's an empty string, fallback user's dial target.
nameToWatch := config.EDSServiceName
if nameToWatch == "" {
c.logger.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target")
nameToWatch = c.bbo.Target.Endpoint
}

// Need to restart EDS watch when one of the following happens:
// - the xds_client is updated
// - the xds_client didn't change, but the edsServiceName changed
//
// Only need to restart load reporting when:
// - no need to restart EDS, but loadReportServer name changed
if clientChanged || c.edsServiceName != nameToWatch {
restartEndpointsWatch = true
restartLoadReport = true
} else if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
restartLoadReport = true
}

if restartEndpointsWatch {
c.startEndpointsWatch(nameToWatch)
if clientChanged || c.edsServiceName != config.EDSServiceName {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.edsServiceName is only written to in startEndpointsWatch. Now that this is read from startLoadReport, I think it would be better to write to it here, rather than in startEndpointsWatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

c.edsServiceName = config.EDSServiceName
c.startEndpointsWatch()
// TODO: this update for the LRS service name is too early. It should
// only apply to the new EDS response. But this is applied to the RPCs
// before the new EDS response. To fully fix this, the EDS balancer
// needs to do a graceful switch to another EDS implementation.
//
// This is OK for now, because we don't actually expect edsServiceName
// to change. Fix this (a bigger change) will happen later.
c.load.update(c.xdsClient.LoadStore(), c.edsServiceName)
}

if restartLoadReport {
c.startLoadReport(nameToWatch, config.LrsLoadReportingServerName)
// Only need to restart load reporting when:
// - the loadReportServer name changed
if !equalStringPointers(c.loadReportServer, config.LrsLoadReportingServerName) {
c.startLoadReport(config.LrsLoadReportingServerName)
}
}

Expand Down
28 changes: 6 additions & 22 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
Expand Up @@ -107,36 +107,20 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
}
defer func() { bootstrapConfigNew = oldBootstrapConfigNew }()

// Update with an empty edsServiceName should trigger an EDS watch
// for the user's dial target.
cw.handleUpdate(&EDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: "",
}, nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := fakeServer.NewConnChan.Receive(ctx); err != nil {
t.Fatal("Failed to connect to fake server")
}
t.Log("Client connection established to fake server...")
if err := verifyExpectedRequests(fakeServer, testServiceName); err != nil {
t.Fatal(err)
}

// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same. The previously registered watch will be cancelled, which will
// result in an EDS request with no resource names being sent to the server.
// the same.
cw.handleUpdate(&EDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: "foobar-1",
}, nil)
if err := verifyExpectedRequests(fakeServer, "", "foobar-1"); err != nil {
if err := verifyExpectedRequests(fakeServer, "foobar-1"); err != nil {
t.Fatal(err)
}

// Also test the case where the edsServerName changes from one
// non-empty name to another, and make sure a new watch is
// registered.
// Also test the case where the edsServerName changes from one non-empty
// name to another, and make sure a new watch is registered. The previously
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
cw.handleUpdate(&EDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: "foobar-2",
Expand Down
9 changes: 6 additions & 3 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Expand Up @@ -35,16 +35,19 @@ import (
func (s) TestXDSLoadReporting(t *testing.T) {
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
edsB, ok := builder.Build(cc, balancer.BuildOptions{}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()

xdsC := fakeclient.NewClient()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{LrsLoadReportingServerName: new(string)},
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
BalancerConfig: &EDSConfig{
EDSServiceName: testEDSClusterName,
LrsLoadReportingServerName: new(string),
},
}); err != nil {
t.Fatal(err)
}
Expand Down