Skip to content

Commit

Permalink
xds: a myriad of cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 21, 2021
1 parent f2ec724 commit d6f1d66
Show file tree
Hide file tree
Showing 84 changed files with 544 additions and 576 deletions.
38 changes: 19 additions & 19 deletions xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,37 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/client/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/client/v3" // Register v3 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
)

// xdsClientInterface contains methods from xdsClient.Client which are used by
// xdsClient contains methods from xdsClient.Client which are used by
// the server. This is useful for overriding in unit tests.
type xdsClientInterface interface {
DumpLDS() (string, map[string]client.UpdateWithMD)
DumpRDS() (string, map[string]client.UpdateWithMD)
DumpCDS() (string, map[string]client.UpdateWithMD)
DumpEDS() (string, map[string]client.UpdateWithMD)
type xdsClient interface {
DumpLDS() (string, map[string]xdsclient.UpdateWithMD)
DumpRDS() (string, map[string]xdsclient.UpdateWithMD)
DumpCDS() (string, map[string]xdsclient.UpdateWithMD)
DumpEDS() (string, map[string]xdsclient.UpdateWithMD)
BootstrapConfig() *bootstrap.Config
Close()
}

var (
logger = grpclog.Component("xds")
newXDSClient = func() (xdsClientInterface, error) {
return client.New()
newXDSClient = func() (xdsClient, error) {
return xdsclient.New()
}
)

// ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer.
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practise. But we keep a copy in each
// server instance for testing.
xdsClient xdsClientInterface
xdsClient xdsClient
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
Expand Down Expand Up @@ -296,17 +296,17 @@ func (s *ClientStatusDiscoveryServer) buildEDSPerXDSConfig() *v3statuspb.PerXdsC
}
}

func serviceStatusToProto(serviceStatus client.ServiceStatus) v3adminpb.ClientResourceStatus {
func serviceStatusToProto(serviceStatus xdsclient.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case client.ServiceStatusUnknown:
case xdsclient.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case client.ServiceStatusRequested:
case xdsclient.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case client.ServiceStatusNotExist:
case xdsclient.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case client.ServiceStatusACKed:
case xdsclient.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case client.ServiceStatusNACKed:
case xdsclient.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
Expand Down
26 changes: 13 additions & 13 deletions xds/csds/csds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/xds/internal/client"
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
xtestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -59,11 +59,11 @@ const (
defaultTestTimeout = 10 * time.Second
)

type xdsClientInterfaceWithWatch interface {
WatchListener(string, func(client.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(client.RouteConfigUpdate, error)) func()
WatchCluster(string, func(client.ClusterUpdate, error)) func()
WatchEndpoints(string, func(client.EndpointsUpdate, error)) func()
type xdsClientWithWatch interface {
WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsclient.EndpointsUpdate, error)) func()
}

var cmpOpts = cmp.Options{
Expand Down Expand Up @@ -174,16 +174,16 @@ func TestCSDS(t *testing.T) {
defer cleanup()

for _, target := range ldsTargets {
xdsC.WatchListener(target, func(client.ListenerUpdate, error) {})
xdsC.WatchListener(target, func(xdsclient.ListenerUpdate, error) {})
}
for _, target := range rdsTargets {
xdsC.WatchRouteConfig(target, func(client.RouteConfigUpdate, error) {})
xdsC.WatchRouteConfig(target, func(xdsclient.RouteConfigUpdate, error) {})
}
for _, target := range cdsTargets {
xdsC.WatchCluster(target, func(client.ClusterUpdate, error) {})
xdsC.WatchCluster(target, func(xdsclient.ClusterUpdate, error) {})
}
for _, target := range edsTargets {
xdsC.WatchEndpoints(target, func(client.EndpointsUpdate, error) {})
xdsC.WatchEndpoints(target, func(xdsclient.EndpointsUpdate, error) {})
}

for i := 0; i < retryCount; i++ {
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestCSDS(t *testing.T) {
}
}

func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()

// Spin up a xDS management server on a local port.
Expand All @@ -270,12 +270,12 @@ func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServ
t.Fatal(err)
}
// Create xds_client.
xdsC, err := client.New()
xdsC, err := xdsclient.New()
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) {
newXDSClient = func() (xdsClient, error) {
return xdsC, nil
}

Expand Down
4 changes: 2 additions & 2 deletions xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down
2 changes: 1 addition & 1 deletion xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
)
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/xdsclient/load"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

var (
Expand Down
20 changes: 9 additions & 11 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
Expand All @@ -59,7 +59,7 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient func() (xdsClientInterface, error)
newXDSClient func() (xdsClient, error)
buildProvider = buildProviderFunc
)

Expand All @@ -73,6 +73,8 @@ func init() {
// JSON service config, to be passed to the cdsBalancer.
type cdsBB struct{}

var _ balancer.ConfigParser = cdsBB{}

// Build creates a new CDS balancer with the ClientConn.
func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &cdsBalancer{
Expand Down Expand Up @@ -138,12 +140,11 @@ func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
return &cfg, nil
}

// xdsClientInterface contains methods from xdsClient.Client which are used by
// the cdsBalancer. This will be faked out in unittests.
type xdsClientInterface interface {
// xdsClient contains methods from xdsclient.Client which are used by the
// cdsBalancer. This will be faked out in unittests.
type xdsClient interface {
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
BootstrapConfig() *bootstrap.Config
Close()
}

// ccUpdate wraps a clientConn update received from gRPC (pushed from the
Expand Down Expand Up @@ -185,7 +186,7 @@ type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsClientInterface // xDS client to watch Cluster resource.
xdsClient xdsClient // xDS client to watch Cluster resource.
cancelWatch func() // Cluster watch cancel func.
edsLB balancer.Balancer // EDS child policy.
clusterToWatch string
Expand Down Expand Up @@ -405,9 +406,6 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
if newXDSClient != nil {
b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/resolver"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
Expand Down Expand Up @@ -136,7 +136,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
Expand Down
7 changes: 3 additions & 4 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/client"
xdsclient "google.golang.org/grpc/xds/internal/client"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)

const (
Expand Down Expand Up @@ -215,7 +214,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
Expand Down Expand Up @@ -606,7 +605,7 @@ func (s) TestCircuitBreaking(t *testing.T) {

// Since the counter's max requests was set to 1, the first request should
// succeed and the second should fail.
counter := client.GetServiceRequestsCounter(serviceName)
counter := xdsclient.GetServiceRequestsCounter(serviceName)
if err := counter.StartRequest(maxRequests); err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"errors"
"sync"

xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/xdsclient"
)

var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
Expand All @@ -40,7 +40,7 @@ type clusterHandler struct {
// CDS Balancer cares about is the most recent update.
updateChannel chan clusterHandlerUpdate

xdsClient xdsClientInterface
xdsClient xdsClient
}

func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
Expand Down Expand Up @@ -112,7 +112,7 @@ type clusterNode struct {

// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsClientInterface, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsClient, topLevelHandler *clusterHandler) *clusterNode {
c := &clusterNode{
clusterHandler: topLevelHandler,
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/cdsbalancer/cluster_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)

const (
Expand Down

0 comments on commit d6f1d66

Please sign in to comment.