diff --git a/xds/internal/test/xds_server_integration_test.go b/xds/internal/test/xds_server_integration_test.go new file mode 100644 index 000000000000..5534d311eb81 --- /dev/null +++ b/xds/internal/test/xds_server_integration_test.go @@ -0,0 +1,193 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package xds_test contains e2e tests for xDS use on the server. +package xds_test + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path" + "testing" + "time" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + v2discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + testpb "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/grpc/xds" + "google.golang.org/grpc/xds/internal/env" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" + "google.golang.org/grpc/xds/internal/version" +) + +const ( + defaultTestTimeout = 10 * time.Second + localAddress = "localhost:9999" + listenerName = "grpc/server?udpa.resource.listening_address=localhost:9999" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func setupListenerResponse(respCh chan *fakeserver.Response, name string) { + respCh <- &fakeserver.Response{ + Resp: &v2discoverypb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: version.V2ListenerURL, + Value: func() []byte { + l := &v3listenerpb.Listener{ + // This needs to match the name we are querying for. + Name: listenerName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: version.V2HTTPConnManagerURL, + Value: func() []byte { + cm := &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ + Rds: &v3httppb.Rds{ + ConfigSource: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, + }, + RouteConfigName: "route-config", + }, + }, + } + mcm, _ := proto.Marshal(cm) + return mcm + }(), + }, + }, + } + ml, _ := proto.Marshal(l) + return ml + }(), + }, + }, + TypeUrl: version.V2ListenerURL, + }, + } +} + +func setupBootstrapFile(t *testing.T, serverURI string) func() { + // Create a bootstrap file in a temporary directory. + tmpdir, err := ioutil.TempDir("", "xds-server-test*") + if err != nil { + t.Fatalf("failed to create tempdir: %v", err) + } + bootstrapContents := fmt.Sprintf(` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "%s", + "channel_creds": [ + { "type": "insecure" } + ] + }] + }`, serverURI) + bootstrapFileName := path.Join(tmpdir, "bootstrap") + if err := ioutil.WriteFile(bootstrapFileName, []byte(bootstrapContents), os.ModePerm); err != nil { + t.Fatalf("failed to write bootstrap file: %v", err) + } + + origBootstrapFileName := env.BootstrapFileName + env.BootstrapFileName = bootstrapFileName + t.Logf("Create bootstrap file at %s with contents\n%s", bootstrapFileName, bootstrapContents) + return func() { env.BootstrapFileName = origBootstrapFileName } +} + +// TestServerSideXDS is an e2e tests for xDS use on the server. This does not +// use any xDS features because we have not implemented any on the server side. +func (s) TestServerSideXDS(t *testing.T) { + // Spin up a fake xDS management server on a local port. + // TODO(easwars): Switch to using the server from envoy-go-control-plane. + fs, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("failed to start fake xDS server: %v", err) + } + defer cleanup() + t.Logf("Started xDS management server at %s", fs.Address) + + // Setup the fakeserver to respond with a Listener resource. + setupListenerResponse(fs.XDSResponseChan, listenerName) + // Create a bootstrap file in a temporary directory. + defer setupBootstrapFile(t, fs.Address)() + + // Initialize a gRPC server which uses xDS, and register stubServer on it. + server := xds.NewGRPCServer() + testpb.RegisterTestServiceServer(server, &testService{}) + + errCh := make(chan error, 1) + go func() { + defer server.Stop() + + // Create a clientconn and make a successful RPC + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.DialContext(ctx, localAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + errCh <- fmt.Errorf("failed to dial local test server: %v", err) + return + } + defer cc.Close() + + client := testpb.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + errCh <- fmt.Errorf("rpc EmptyCall() failed: %v", err) + return + } + errCh <- nil + }() + + opts := xds.ServeOptions{Network: "tcp", Address: localAddress} + if err := server.Serve(opts); err != nil { + t.Fatalf("Serve(%+v) failed: %v", opts, err) + } + + if err := <-errCh; err != nil { + t.Fatal(err) + } +} + +type testService struct { + testpb.TestServiceServer +} + +func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil +} diff --git a/xds/server.go b/xds/server.go new file mode 100644 index 000000000000..0b85527ce96f --- /dev/null +++ b/xds/server.go @@ -0,0 +1,346 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "fmt" + "net" + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" + xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" +) + +const ( + serverPrefix = "[xds-server %p] " + + // The resource_name in the LDS request sent by the xDS-enabled gRPC server + // is of this format where the formatting directive at the end is replaced + // with the IP:Port specified by the user application. + listenerResourceNameFormat = "grpc/server?udpa.resource.listening_address=%s" +) + +var ( + // These new functions will be overridden in unit tests. + newXDSClient = func(opts xdsclient.Options) (xdsClientInterface, error) { + return xdsclient.New(opts) + } + newXDSConfig = bootstrap.NewConfig + newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface { + return grpc.NewServer(opts...) + } + + logger = grpclog.Component("xds") +) + +func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, p)) +} + +// xdsClientInterface contains methods from xdsClient.Client which are used by +// the server. This is useful for overriding in unit tests. +type xdsClientInterface interface { + WatchListener(string, func(xdsclient.ListenerUpdate, error)) func() + Close() +} + +// grpcServerInterface contains methods from grpc.Server which are used by the +// GRPCServer type here. This is useful for overriding in unit tests. +type grpcServerInterface interface { + RegisterService(*grpc.ServiceDesc, interface{}) + Serve(net.Listener) error + Stop() + GracefulStop() +} + +// ServeOptions contains parameters to configure the Serve() method. +// +// Experimental +// +// Notice: This type is EXPERIMENTAL and may be changed or removed in a +// later release. +type ServeOptions struct { + // Network identifies the local network to listen on. The network must be + // "tcp", "tcp4", "tcp6". + Network string + // Address contains the local address to listen on. This should be of the + // form "host:port", where the host must be a literal IP address, and port + // must be a literal port number. If the host is a literal IPv6 address it + // must be enclosed in square brackets, as in "[2001:db8::1]:80. The host + // portion can be left unspecified. + Address string +} + +func (so *ServeOptions) validate() error { + if net := so.Network; net != "tcp" && net != "tcp4" && net != "tcp6" { + return fmt.Errorf("xds: unsupported network type %q for server listener", net) + } + if _, _, err := net.SplitHostPort(so.Address); err != nil { + return fmt.Errorf("xds: unsupported address %q for server listener", so.Address) + } + return nil +} + +// GRPCServer wraps a gRPC server and provides server-side xDS functionality, by +// communication with a management server using xDS APIs. It implements the +// grpc.ServiceRegistrar interface and can be passed to service registration +// functions in IDL generated code. +// +// Experimental +// +// Notice: This type is EXPERIMENTAL and may be changed or removed in a +// later release. +type GRPCServer struct { + gs grpcServerInterface + quit *grpcsync.Event + logger *internalgrpclog.PrefixLogger + + // clientMu is used only in initXDSClient(), which is called at the + // beginning of Serve(), where we have to decide if we have to create a + // client or use an existing one. + clientMu sync.Mutex + xdsC xdsClientInterface +} + +// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts. +// The underlying gRPC server has no service registered and has not started to +// accept requests yet. +// +// Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a later +// release. +func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { + newOpts := []grpc.ServerOption{ + grpc.ChainUnaryInterceptor(xdsUnaryInterceptor), + grpc.ChainStreamInterceptor(xdsStreamInterceptor), + } + newOpts = append(newOpts, opts...) + s := &GRPCServer{ + gs: newGRPCServer(newOpts...), + quit: grpcsync.NewEvent(), + } + s.logger = prefixLogger(s) + s.logger.Infof("Created xds.GRPCServer") + return s +} + +// RegisterService registers a service and its implementation to the underlying +// gRPC server. It is called from the IDL generated code. This must be called +// before invoking Serve. +func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) { + s.gs.RegisterService(sd, ss) +} + +// initXDSClient creates a new xdsClient if there is no existing one available. +func (s *GRPCServer) initXDSClient() error { + s.clientMu.Lock() + defer s.clientMu.Unlock() + + if s.xdsC != nil { + return nil + } + + // Read the bootstrap file as part of initializing the xdsClient. + config, err := newXDSConfig() + if err != nil { + return fmt.Errorf("xds: failed to read bootstrap file: %v", err) + } + client, err := newXDSClient(xdsclient.Options{Config: *config}) + if err != nil { + return fmt.Errorf("xds: failed to create xds-client: %v", err) + } + s.xdsC = client + s.logger.Infof("Created an xdsClient") + return nil +} + +// Serve gets the underlying gRPC server to accept incoming connections on the +// listening address in opts. A connection to the management server, to receive +// xDS configuration, is initiated here. +// +// Serve will return a non-nil error unless Stop or GracefulStop is called. +func (s *GRPCServer) Serve(opts ServeOptions) error { + s.logger.Infof("Serve() called with options: %+v", opts) + + // Validate the listening address in opts. + if err := opts.validate(); err != nil { + return err + } + + // If this is the first time Serve() is being called, we need to initialize + // our xdsClient. If not, we can use the existing one. + if err := s.initXDSClient(); err != nil { + return err + } + lw, err := s.newListenerWrapper(opts) + if lw == nil { + // Error returned can be nil (when Stop/GracefulStop() is called). So, + // we need to check the returned listenerWrapper instead. + return err + } + return s.gs.Serve(lw) +} + +// newListenerWrapper starts a net.Listener on the address specified in opts. It +// then registers a watch for a Listener resource and blocks until a good +// response is received or the server is stopped by a call to +// Stop/GracefulStop(). +// +// Returns a listenerWrapper, which implements the net.Listener interface, that +// can be passed to grpcServer.Serve(). +func (s *GRPCServer) newListenerWrapper(opts ServeOptions) (*listenerWrapper, error) { + lis, err := net.Listen(opts.Network, opts.Address) + if err != nil { + return nil, fmt.Errorf("xds: failed to listen on %+v: %v", opts, err) + } + lw := &listenerWrapper{Listener: lis} + s.logger.Infof("Started a net.Listener on %s", lis.Addr().String()) + + // This is used to notify that a good update has been received and that + // Serve() can be invoked on the underlying gRPC server. Using a + // grpcsync.Event instead of a vanilla channel simplifies the update handler + // as it need not keep track of whether the received update is the first one + // or not. + goodUpdate := grpcsync.NewEvent() + + // Register an LDS watch using our xdsClient, and specify the listening + // address as the resource name. + // TODO(easwars): Check if literal IPv6 addresses need an enclosing []. + name := fmt.Sprintf(listenerResourceNameFormat, opts.Address) + cancelWatch := s.xdsC.WatchListener(name, func(update xdsclient.ListenerUpdate, err error) { + if err != nil { + // We simply log an error here and hope we get a successful update + // in the future. The error could be because of a timeout or an + // actual error, like the requested resource not found. In any case, + // it is fine for the server to hang indefinitely until Stop() is + // called. + s.logger.Warningf("Received error for resource %q: %+v", name, err) + return + } + + s.logger.Infof("Received update for resource %q: %+v", name, update) + + // TODO(easwars): Handle security configuration, create appropriate + // certificate providers and update the listenerWrapper before firing + // the event. Errors encountered during any of these steps should result + // in an early exit, and the update event should not fire. + goodUpdate.Fire() + }) + + s.logger.Infof("Watch started on resource name %v", name) + lw.cancelWatch = func() { + cancelWatch() + s.logger.Infof("Watch cancelled on resource name %v", name) + } + + // Block until a good LDS response is received or the server is stopped. + select { + case <-s.quit.Done(): + // Since the listener has not yet been handed over to gs.Serve(), we + // need to explicitly close the listener. Cancellation of the xDS watch + // is handled by the listenerWrapper. + lw.Close() + return nil, nil + case <-goodUpdate.Done(): + } + return lw, nil +} + +// Stop stops the underlying gRPC server. It immediately closes all open +// connections. It cancels all active RPCs on the server side and the +// corresponding pending RPCs on the client side will get notified by connection +// errors. +func (s *GRPCServer) Stop() { + s.quit.Fire() + s.gs.Stop() + if s.xdsC != nil { + s.xdsC.Close() + } +} + +// GracefulStop stops the underlying gRPC server gracefully. It stops the server +// from accepting new connections and RPCs and blocks until all the pending RPCs +// are finished. +func (s *GRPCServer) GracefulStop() { + s.quit.Fire() + s.gs.GracefulStop() + if s.xdsC != nil { + s.xdsC.Close() + } +} + +// xdsUnaryInterceptor is the unary interceptor added to the gRPC server to +// perform any xDS specific functionality on unary RPCs. +// +// This is a no-op at this point. +func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return handler(ctx, req) +} + +// xdsStreamInterceptor is the stream interceptor added to the gRPC server to +// perform any xDS specific functionality on streaming RPCs. +// +// This is a no-op at this point. +func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, ss) +} + +// listenerWrapper wraps the net.Listener associated with the listening address +// passed to Serve(). It also contains all other state associated with this +// particular invocation of Serve(). +type listenerWrapper struct { + net.Listener + cancelWatch func() + + // TODO(easwars): Add fields for certificate providers. +} + +// Accept blocks on an Accept() on the underlying listener, and wraps the +// returned net.Conn with the configured certificate providers. +func (l *listenerWrapper) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + return &conn{Conn: c}, nil +} + +// Close closes the underlying listener. It also cancels the xDS watch +// registered in Serve() and closes any certificate provider instances created +// based on security configuration received in the LDS response. +func (l *listenerWrapper) Close() error { + l.Listener.Close() + if l.cancelWatch != nil { + l.cancelWatch() + } + return nil +} + +// conn is a thin wrapper around a net.Conn returned by Accept(). +type conn struct { + net.Conn + + // TODO(easwars): Add fields for certificate providers. +} diff --git a/xds/server_test.go b/xds/server_test.go new file mode 100644 index 000000000000..e95316236979 --- /dev/null +++ b/xds/server_test.go @@ -0,0 +1,391 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "errors" + "net" + "reflect" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils" + xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" + xdstestutils "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" +) + +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestServeOptions_Validate(t *testing.T) { + tests := []struct { + desc string + opts ServeOptions + wantErr bool + }{ + { + desc: "empty options", + opts: ServeOptions{}, + wantErr: true, + }, + { + desc: "unsupported network", + opts: ServeOptions{Network: "foo"}, + wantErr: true, + }, + { + desc: "bad address", + opts: ServeOptions{Network: "tcp", Address: "I'm a bad IP address"}, + wantErr: true, + }, + { + desc: "no port", + opts: ServeOptions{Network: "tcp", Address: "1.2.3.4"}, + wantErr: true, + }, + { + desc: "empty hostname", + opts: ServeOptions{Network: "tcp", Address: ":1234"}, + }, + { + desc: "ipv4", + opts: ServeOptions{Network: "tcp", Address: "1.2.3.4:1234"}, + }, + { + desc: "ipv6", + opts: ServeOptions{Network: "tcp", Address: "[1:2::3:4]:1234"}, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + err := test.opts.validate() + if (err != nil) != test.wantErr { + t.Errorf("ServeOptions.validate(%+v) returned err %v, wantErr: %v", test.opts, err, test.wantErr) + } + }) + } +} + +type fakeGRPCServer struct { + done chan struct{} + registerServiceCh *testutils.Channel + serveCh *testutils.Channel + stopCh *testutils.Channel + gracefulStopCh *testutils.Channel +} + +func (f *fakeGRPCServer) RegisterService(*grpc.ServiceDesc, interface{}) { + f.registerServiceCh.Send(nil) +} + +func (f *fakeGRPCServer) Serve(net.Listener) error { + f.serveCh.Send(nil) + <-f.done + return nil +} + +func (f *fakeGRPCServer) Stop() { + close(f.done) + f.stopCh.Send(nil) +} +func (f *fakeGRPCServer) GracefulStop() { + close(f.done) + f.gracefulStopCh.Send(nil) +} + +func newFakeGRPCServer() *fakeGRPCServer { + return &fakeGRPCServer{ + done: make(chan struct{}), + registerServiceCh: testutils.NewChannel(), + serveCh: testutils.NewChannel(), + stopCh: testutils.NewChannel(), + gracefulStopCh: testutils.NewChannel(), + } +} + +func (s) TestNewServer(t *testing.T) { + // The xds package adds a couple of server options (unary and stream + // interceptors) to the server options passed in by the user. + serverOpts := []grpc.ServerOption{grpc.Creds(insecure.NewCredentials())} + wantServerOpts := len(serverOpts) + 2 + + origNewGRPCServer := newGRPCServer + newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface { + if got := len(opts); got != wantServerOpts { + t.Fatalf("%d ServerOptions passed to grpc.Server, want %d", got, wantServerOpts) + } + // Verify that the user passed ServerOptions are forwarded as is. + if !reflect.DeepEqual(opts[2:], serverOpts) { + t.Fatalf("got ServerOptions %v, want %v", opts[2:], serverOpts) + } + return newFakeGRPCServer() + } + defer func() { + newGRPCServer = origNewGRPCServer + }() + + s := NewGRPCServer(serverOpts...) + defer s.Stop() +} + +func (s) TestRegisterService(t *testing.T) { + fs := newFakeGRPCServer() + + origNewGRPCServer := newGRPCServer + newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface { return fs } + defer func() { newGRPCServer = origNewGRPCServer }() + + s := NewGRPCServer() + defer s.Stop() + + s.RegisterService(&grpc.ServiceDesc{}, nil) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := fs.registerServiceCh.Receive(ctx); err != nil { + t.Fatalf("timeout when expecting RegisterService() to called on grpc.Server: %v", err) + } +} + +// setupOverrides sets up overrides for bootstrap config, new xdsClient creation +// and new gRPC.Server creation. +func setupOverrides(t *testing.T) (*fakeGRPCServer, *testutils.Channel, func()) { + t.Helper() + + origNewXDSConfig := newXDSConfig + newXDSConfig = func() (*bootstrap.Config, error) { + return &bootstrap.Config{ + BalancerName: "dummyBalancer", + Creds: grpc.WithInsecure(), + NodeProto: xdstestutils.EmptyNodeProtoV3, + }, nil + } + + clientCh := testutils.NewChannel() + origNewXDSClient := newXDSClient + newXDSClient = func(xdsclient.Options) (xdsClientInterface, error) { + c := fakeclient.NewClient() + clientCh.Send(c) + return c, nil + } + + fs := newFakeGRPCServer() + origNewGRPCServer := newGRPCServer + newGRPCServer = func(opts ...grpc.ServerOption) grpcServerInterface { return fs } + + return fs, clientCh, func() { + newXDSConfig = origNewXDSConfig + newXDSClient = origNewXDSClient + newGRPCServer = origNewGRPCServer + } +} + +// TestServeSuccess tests the successful case of calling Serve(). +// The following sequence of events happen: +// 1. Create a new GRPCServer and call Serve() in a goroutine. +// 2. Make sure an xdsClient is created, and an LDS watch is registered. +// 3. Push an error response from the xdsClient, and make sure that Serve() does +// not exit. +// 4. Push a good response from the xdsClient, and make sure that Serve() on the +// underlying grpc.Server is called. +func (s) TestServeSuccess(t *testing.T) { + fs, clientCh, cleanup := setupOverrides(t) + defer cleanup() + + server := NewGRPCServer() + defer server.Stop() + + // Call Serve() in a goroutine, and push on a channel when Serve returns. + serveDone := testutils.NewChannel() + go func() { + server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"}) + serveDone.Send(nil) + }() + + // Wait for an xdsClient to be created. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := clientCh.Receive(ctx) + if err != nil { + t.Fatalf("error when waiting for new xdsClient to be created: %v", err) + } + client := c.(*fakeclient.Client) + + // Wait for a listener watch to be registered on the xdsClient. + name, err := client.WaitForWatchListener(ctx) + if err != nil { + t.Fatalf("error when waiting for a ListenerWatch: %v", err) + } + wantPrefix := "grpc/server?udpa.resource.listening_address=localhost:" + if !strings.HasPrefix(name, wantPrefix) { + t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantPrefix) + } + + // Push an error to the registered listener watch callback and make sure + // that Serve does not return. + client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{}, errors.New("LDS error")) + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + if _, err := serveDone.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Serve() returned after a bad LDS response") + } + + // Push a good LDS response, and wait for Serve() to be invoked on the + // underlying grpc.Server. + client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: "routeconfig"}, nil) + if _, err := fs.serveCh.Receive(ctx); err != nil { + t.Fatalf("error when waiting for Serve() to be invoked on the grpc.Server") + } +} + +// TestServeWithStop tests the case where Stop() is called before an LDS update +// is received. This should cause Serve() to exit before calling Serve() on the +// underlying grpc.Server. +func (s) TestServeWithStop(t *testing.T) { + fs, clientCh, cleanup := setupOverrides(t) + defer cleanup() + + // Note that we are not deferring the Stop() here since we explicitly call + // it after the LDS watch has been registered. + server := NewGRPCServer() + + // Call Serve() in a goroutine, and push on a channel when Serve returns. + serveDone := testutils.NewChannel() + go func() { + server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"}) + serveDone.Send(nil) + }() + + // Wait for an xdsClient to be created. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := clientCh.Receive(ctx) + if err != nil { + t.Fatalf("error when waiting for new xdsClient to be created: %v", err) + } + client := c.(*fakeclient.Client) + + // Wait for a listener watch to be registered on the xdsClient. + name, err := client.WaitForWatchListener(ctx) + if err != nil { + server.Stop() + t.Fatalf("error when waiting for a ListenerWatch: %v", err) + } + wantPrefix := "grpc/server?udpa.resource.listening_address=localhost:" + if !strings.HasPrefix(name, wantPrefix) { + server.Stop() + t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantPrefix) + } + + // Call Stop() on the server before a listener update is received, and + // expect Serve() to exit. + server.Stop() + if _, err := serveDone.Receive(ctx); err != nil { + t.Fatalf("error when waiting for Serve() to exit") + } + + // Make sure that Serve() on the underlying grpc.Server is not called. + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + if _, err := fs.serveCh.Receive(sCtx); err != context.DeadlineExceeded { + t.Fatal("Serve() called on underlying grpc.Server") + } +} + +// TestServeBootstrapFailure tests the case where xDS bootstrap fails and +// verifies that Serve() exits with a non-nil error. +func (s) TestServeBootstrapFailure(t *testing.T) { + // Since we have not setup fakes for anything, this will attempt to do real + // xDS bootstrap and that will fail because the bootstrap environment + // variable is not set. + server := NewGRPCServer() + defer server.Stop() + + serveDone := testutils.NewChannel() + go func() { + err := server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"}) + serveDone.Send(err) + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + v, err := serveDone.Receive(ctx) + if err != nil { + t.Fatalf("error when waiting for Serve() to exit: %v", err) + } + if err, ok := v.(error); !ok || err == nil { + t.Fatal("Serve() did not exit with error") + } +} + +// TestServeNewClientFailure tests the case where xds client creation fails and +// verifies that Server() exits with a non-nil error. +func (s) TestServeNewClientFailure(t *testing.T) { + origNewXDSConfig := newXDSConfig + newXDSConfig = func() (*bootstrap.Config, error) { + return &bootstrap.Config{ + BalancerName: "dummyBalancer", + Creds: grpc.WithInsecure(), + NodeProto: xdstestutils.EmptyNodeProtoV3, + }, nil + } + defer func() { newXDSConfig = origNewXDSConfig }() + + origNewXDSClient := newXDSClient + newXDSClient = func(xdsclient.Options) (xdsClientInterface, error) { + return nil, errors.New("xdsClient creation failed") + } + defer func() { newXDSClient = origNewXDSClient }() + + server := NewGRPCServer() + defer server.Stop() + + serveDone := testutils.NewChannel() + go func() { + err := server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"}) + serveDone.Send(err) + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + v, err := serveDone.Receive(ctx) + if err != nil { + t.Fatalf("error when waiting for Serve() to exit: %v", err) + } + if err, ok := v.(error); !ok || err == nil { + t.Fatal("Serve() did not exit with error") + } +} diff --git a/xds/xds.go b/xds/xds.go index fa0d699e8734..3cd0fc20b871 100644 --- a/xds/xds.go +++ b/xds/xds.go @@ -16,8 +16,12 @@ * */ -// Package xds contains xds implementation. Users need to import this package to -// get all xds functionality. +// Package xds contains an implementation of the xDS suite of protocols, to be +// used by gRPC client and server applications. +// +// On the client-side, users simply need to import this package to get all xDS +// functionality. On the server-side, users need to use the GRPCServer type +// exported by this package instead of the regular grpc.Server. // // See https://github.com/grpc/grpc-go/tree/master/examples/features/xds for // example.