Skip to content

Commit

Permalink
xds: refactor xds_client (#3477)
Browse files Browse the repository at this point in the history
This PR refactors xds_client to support multiples watches. Those watches can be for the same type and same resource_name.

There's upper level `Client` and lower level `v2client`. Before this change, all logic was in `v2client`, and `Client` was a thin wrapper.

This PR moves some of the functionality from `v2client` to `Client`. New layers:

- Upper level `Client`
  - keeps a list of watchers
    - provides method `func WatchXXX() (cancel func())`
    - has `WatchService()` which involves `LDS` and `RDS`
  - handles resources from the xDS responses and dispatch to the watchers
    - including multiple watchers for the same resource_name
  - keeps cache
    - and checks cache for new watches

- Lower level `v2client`
  - is a dumb client that
    - manages ADS stream
    - sends a new xDS request when add/remove watch
    - parses xDS responses
      - It doesn't call watchers, but forwards all parsed results to upper Client
    - handles ACK/NACK
  - supports `addWatch(type, name)` and `removeWatch(type, name)`
    - instead of `func watchCDS() func()`, which is now moved up to upper `Client`

Also includes other changes:
 - Corresponding test changes (some tests for `v2client` were moved to `Client`)
 - Method and type renaming
   - CDS/EDS -> Cluster/Endpoints
 - callback functions all accept updates as non-pointers
  • Loading branch information
menghanl committed Apr 15, 2020
1 parent 6e001be commit ff40ef4
Show file tree
Hide file tree
Showing 38 changed files with 2,906 additions and 1,754 deletions.
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
// xdsClientInterface contains methods from xdsClient.Client which are used by
// the cdsBalancer. This will be faked out in unittests.
type xdsClientInterface interface {
WatchCluster(string, func(xdsclient.CDSUpdate, error)) func()
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
Close()
}

Expand All @@ -132,7 +132,7 @@ type scUpdate struct {
// results in creating a new edsBalancer (if one doesn't already exist) and
// pushing the update to it.
type watchUpdate struct {
cds xdsclient.CDSUpdate
cds xdsclient.ClusterUpdate
err error
}

Expand Down Expand Up @@ -274,7 +274,7 @@ func (b *cdsBalancer) run() {

// handleClusterUpdate is the CDS watch API callback. It simply pushes the
// received information on to the update channel for run() to pick it up.
func (b *cdsBalancer) handleClusterUpdate(cu xdsclient.CDSUpdate, err error) {
func (b *cdsBalancer) handleClusterUpdate(cu xdsclient.ClusterUpdate, err error) {
if b.isClosed() {
b.logger.Warningf("xds: received cluster update {%+v} after cdsBalancer was closed", cu)
return
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type testClientConn struct {

// cdsWatchInfo wraps the update and the error sent in a CDS watch callback.
type cdsWatchInfo struct {
update xdsclient.CDSUpdate
update xdsclient.ClusterUpdate
err error
}

Expand Down Expand Up @@ -369,18 +369,18 @@ func (s) TestHandleClusterUpdate(t *testing.T) {

tests := []struct {
name string
cdsUpdate xdsclient.CDSUpdate
cdsUpdate xdsclient.ClusterUpdate
updateErr error
wantCCS balancer.ClientConnState
}{
{
name: "happy-case-with-lrs",
cdsUpdate: xdsclient.CDSUpdate{ServiceName: serviceName, EnableLRS: true},
cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName, EnableLRS: true},
wantCCS: edsCCS(serviceName, true, xdsC),
},
{
name: "happy-case-without-lrs",
cdsUpdate: xdsclient.CDSUpdate{ServiceName: serviceName},
cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName},
wantCCS: edsCCS(serviceName, false, xdsC),
},
{
Expand Down Expand Up @@ -408,7 +408,7 @@ func (s) TestResolverError(t *testing.T) {
cdsB.Close()
}()

cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName}
wantCCS := edsCCS(serviceName, false, xdsC)
if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
Expand All @@ -433,7 +433,7 @@ func (s) TestUpdateSubConnState(t *testing.T) {
cdsB.Close()
}()

cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName}
wantCCS := edsCCS(serviceName, false, xdsC)
if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
Expand All @@ -453,7 +453,7 @@ func (s) TestClose(t *testing.T) {
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
defer cancel()

cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName}
wantCCS := edsCCS(serviceName, false, xdsC)
if err := invokeWatchCbAndWait(xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/edsbalancer/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB
// TODO: none of the methods in this interface needs to be exported.
type edsBalancerImplInterface interface {
// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
HandleEDSResponse(edsResp xdsclient.EndpointsUpdate)
// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
HandleChildPolicy(name string, config json.RawMessage)
// HandleSubConnStateChange handles state change for SubConn.
Expand Down Expand Up @@ -196,9 +196,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {

func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
switch u := update.(type) {
// TODO: this func should accept (*xdsclient.EDSUpdate, error), and process
// TODO: this func should accept (xdsclient.EndpointsUpdate, error), and process
// the error, instead of having a separate loseContact signal.
case *xdsclient.EDSUpdate:
case xdsclient.EndpointsUpdate:
x.edsImpl.HandleEDSResponse(u)
case *loseContact:
// loseContact can be useful for going into fallback.
Expand Down Expand Up @@ -246,7 +246,7 @@ func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return nil
}

func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error {
func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate) error {
// TODO: this function should take (resp, error), and send them together on
// the channel. There doesn't need to be a separate `loseContact` function.
select {
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC
// SubConns. It also handles drops.
//
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp xdsclient.EndpointsUpdate) {
// TODO: Unhandled fields from EDS response:
// - edsResp.GetPolicy().GetOverprovisioningFactor()
// - locality.GetPriority()
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/edsbalancer/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
}

func (f *fakeEDSBalancer) Close() {}
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {}
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp xdsclient.EndpointsUpdate) {}
func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}

func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
})

xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
}
}

Expand Down Expand Up @@ -335,7 +335,7 @@ func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
},
})
xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA),
Expand Down Expand Up @@ -384,7 +384,7 @@ func (s) TestXDSSubConnStateChange(t *testing.T) {
})

xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)

fsc := &fakeSubConn{}
Expand Down
11 changes: 7 additions & 4 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// xdsClientInterface contains only the xds_client methods needed by EDS
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func())
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func())
Close()
}
Expand All @@ -50,7 +50,7 @@ var (
type xdsclientWrapper struct {
logger *grpclog.PrefixLogger

newEDSUpdate func(*xdsclient.EDSUpdate) error
newEDSUpdate func(xdsclient.EndpointsUpdate) error
loseContact func()
bbo balancer.BuildOptions
loadStore lrs.Store
Expand Down Expand Up @@ -78,7 +78,7 @@ type xdsclientWrapper struct {
//
// The given callbacks won't be called until the underlying xds_client is
// working and sends updates.
func newXDSClientWrapper(newEDSUpdate func(*xdsclient.EDSUpdate) error, loseContact func(), bbo balancer.BuildOptions, loadStore lrs.Store, logger *grpclog.PrefixLogger) *xdsclientWrapper {
func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate) error, loseContact func(), bbo balancer.BuildOptions, loadStore lrs.Store, logger *grpclog.PrefixLogger) *xdsclientWrapper {
return &xdsclientWrapper{
logger: logger,
newEDSUpdate: newEDSUpdate,
Expand Down Expand Up @@ -184,7 +184,10 @@ func (c *xdsclientWrapper) startEndpointsWatch(nameToWatch string) {
}

c.edsServiceName = nameToWatch
cancelEDSWatch := c.xdsclient.WatchEndpoints(c.edsServiceName, func(update *xdsclient.EDSUpdate, err error) {
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
cancelEDSWatch := c.xdsclient.WatchEndpoints(c.edsServiceName, func(update xdsclient.EndpointsUpdate, err error) {
if err != nil {
// TODO: this should trigger a call to `c.loseContact`, when the
// error indicates "lose contact".
Expand Down
27 changes: 22 additions & 5 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package edsbalancer
import (
"errors"
"testing"
"time"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand Down Expand Up @@ -109,9 +110,25 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
EDSServiceName: test.edsServiceName,
}, nil)

req, err := fakeServer.XDSRequestChan.Receive()
if err != nil {
t.Fatalf("EDS RPC failed with err: %v", err)
var req interface{}
for i := 0; i < 2; i++ {
// Each new watch will first cancel the previous watch, and then
// start a new watch. The cancel will trigger a request as well.
// This loop receives the two requests and keeps the last.
r, err := fakeServer.XDSRequestChan.TimedReceive(time.Millisecond * 100)
if err != nil {
t.Fatalf("i: %v, expected xDS request, but got error: %v", i, err)
}
req = r
// If edsServiceName is empty string, the client doesn't cancel
// and resend request. The request from channel was from client
// init, and we don't expect a second request.
if test.edsServiceName == "" {
break
}
}
if r, err := fakeServer.XDSRequestChan.TimedReceive(time.Millisecond * 100); err == nil {
t.Fatalf("Expected req channel recv timeout, but got request: %v", r)
}
edsReq := req.(*fakeserver.Request)
if edsReq.Err != nil {
Expand Down Expand Up @@ -142,7 +159,7 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
// edsBalancer with the received error.
func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
edsRespChan := testutils.NewChannel()
newEDS := func(update *xdsclient.EDSUpdate) error {
newEDS := func(update xdsclient.EndpointsUpdate) error {
edsRespChan.Send(update)
return nil
}
Expand All @@ -159,7 +176,7 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
xdsC.InvokeWatchEDSCallback(nil, errors.New("EDS watch callback error"))
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, errors.New("EDS watch callback error"))

// The callback is called with an error, expect no update from edsRespChan.
//
Expand Down

0 comments on commit ff40ef4

Please sign in to comment.