diff --git a/connectivity/connectivity.go b/connectivity/connectivity.go index 01015626150..4a89926422b 100644 --- a/connectivity/connectivity.go +++ b/connectivity/connectivity.go @@ -18,7 +18,6 @@ // Package connectivity defines connectivity semantics. // For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md. -// All APIs in this package are experimental. package connectivity import ( @@ -45,7 +44,7 @@ func (s State) String() string { return "SHUTDOWN" default: logger.Errorf("unknown connectivity state: %d", s) - return "Invalid-State" + return "INVALID_STATE" } } @@ -61,3 +60,35 @@ const ( // Shutdown indicates the ClientConn has started shutting down. Shutdown ) + +// ServingMode indicates the current mode of operation of the server. +// +// Only xDS enabled gRPC servers currently report their serving mode. +type ServingMode int + +const ( + // ServingModeStarting indicates that the server is starting up. + ServingModeStarting ServingMode = iota + // ServingModeServing indicates that the server contains all required + // configuration and is serving RPCs. + ServingModeServing + // ServingModeNotServing indicates that the server is not accepting new + // connections. Existing connections will be closed gracefully, allowing + // in-progress RPCs to complete. A server enters this mode when it does not + // contain the required configuration to serve RPCs. + ServingModeNotServing +) + +func (s ServingMode) String() string { + switch s { + case ServingModeStarting: + return "STARTING" + case ServingModeServing: + return "SERVING" + case ServingModeNotServing: + return "NOT_SERVING" + default: + logger.Errorf("unknown serving mode: %d", s) + return "INVALID_MODE" + } +} diff --git a/credentials/xds/xds.go b/credentials/xds/xds.go index 0243009df64..680ea9cfa10 100644 --- a/credentials/xds/xds.go +++ b/credentials/xds/xds.go @@ -18,11 +18,6 @@ // Package xds provides a transport credentials implementation where the // security configuration is pushed by a management server using xDS APIs. -// -// Experimental -// -// Notice: All APIs in this package are EXPERIMENTAL and may be removed in a -// later release. package xds import ( diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 2d30ca1231d..0d1173324bb 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -30,6 +30,7 @@ import ( "unsafe" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" internalbackoff "google.golang.org/grpc/internal/backoff" internalgrpclog "google.golang.org/grpc/internal/grpclog" @@ -51,41 +52,11 @@ var ( backoffFunc = bs.Backoff ) -// ServingMode indicates the current mode of operation of the server. -// -// This API exactly mirrors the one in the public xds package. We have to -// redefine it here to avoid a cyclic dependency. -type ServingMode int - -const ( - // ServingModeStarting indicates that the serving is starting up. - ServingModeStarting ServingMode = iota - // ServingModeServing indicates the the server contains all required xDS - // configuration is serving RPCs. - ServingModeServing - // ServingModeNotServing indicates that the server is not accepting new - // connections. Existing connections will be closed gracefully, allowing - // in-progress RPCs to complete. A server enters this mode when it does not - // contain the required xDS configuration to serve RPCs. - ServingModeNotServing -) - -func (s ServingMode) String() string { - switch s { - case ServingModeNotServing: - return "not-serving" - case ServingModeServing: - return "serving" - default: - return "starting" - } -} - // ServingModeCallback is the callback that users can register to get notified // about the server's serving mode changes. The callback is invoked with the // address of the listener and its new mode. The err parameter is set to a // non-nil error if the server has transitioned into not-serving mode. -type ServingModeCallback func(addr net.Addr, mode ServingMode, err error) +type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error) // DrainCallback is the callback that an xDS-enabled server registers to get // notified about updates to the Listener configuration. The server is expected @@ -208,7 +179,7 @@ type listenerWrapper struct { // get a Listener resource update). mu sync.RWMutex // Current serving mode. - mode ServingMode + mode connectivity.ServingMode // Filter chains received as part of the last good update. filterChains *xdsclient.FilterChainManager @@ -267,7 +238,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) { } l.mu.RLock() - if l.mode == ServingModeNotServing { + if l.mode == connectivity.ServingModeNotServing { // Close connections as soon as we accept them when we are in // "not-serving" mode. Since we accept a net.Listener from the user // in Serve(), we cannot close the listener when we move to @@ -390,7 +361,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { if update.err != nil { l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err) if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound { - l.switchMode(nil, ServingModeNotServing, update.err) + l.switchMode(nil, connectivity.ServingModeNotServing, update.err) } // For errors which are anything other than "resource-not-found", we // continue to use the old configuration. @@ -398,7 +369,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { } atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates)) - l.switchMode(l.filterChains, ServingModeServing, nil) + l.switchMode(l.filterChains, connectivity.ServingModeServing, nil) l.goodUpdate.Fire() } @@ -406,7 +377,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { if update.err != nil { l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err) if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound { - l.switchMode(nil, ServingModeNotServing, update.err) + l.switchMode(nil, connectivity.ServingModeNotServing, update.err) } // For errors which are anything other than "resource-not-found", we // continue to use the old configuration. @@ -428,7 +399,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { // what we have decided to do. See gRPC A36 for more details. ilc := update.update.InboundListenerCfg if ilc.Address != l.addr || ilc.Port != l.port { - l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) + l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) return } @@ -447,12 +418,12 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { // from the management server, this listener has all the configuration // needed, and is ready to serve. if len(ilc.FilterChains.RouteConfigNames) == 0 { - l.switchMode(ilc.FilterChains, ServingModeServing, nil) + l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil) l.goodUpdate.Fire() } } -func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) { +func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode connectivity.ServingMode, err error) { l.mu.Lock() defer l.mu.Unlock() diff --git a/xds/internal/test/xds_server_serving_mode_test.go b/xds/internal/test/xds_server_serving_mode_test.go index 90a6aa10388..7282aa94dd2 100644 --- a/xds/internal/test/xds_server_serving_mode_test.go +++ b/xds/internal/test/xds_server_serving_mode_test.go @@ -30,6 +30,7 @@ import ( v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" testpb "google.golang.org/grpc/test/grpc_testing" @@ -64,8 +65,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { } // Create a couple of channels on which mode updates will be pushed. - updateCh1 := make(chan xds.ServingMode, 1) - updateCh2 := make(chan xds.ServingMode, 1) + updateCh1 := make(chan connectivity.ServingMode, 1) + updateCh2 := make(chan connectivity.ServingMode, 1) // Create a server option to get notified about serving mode changes, and // push the updated mode on the channels created above. @@ -124,16 +125,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: - if mode != xds.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + if mode != connectivity.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } select { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh2: - if mode != xds.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + if mode != connectivity.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } @@ -169,16 +170,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: - if mode != xds.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + if mode != connectivity.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } select { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh2: - if mode != xds.ServingModeNotServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing) + if mode != connectivity.ServingModeNotServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) } } @@ -203,8 +204,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: - if mode != xds.ServingModeNotServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing) + if mode != connectivity.ServingModeNotServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) } } @@ -233,16 +234,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: - if mode != xds.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + if mode != connectivity.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } select { case <-ctx.Done(): t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh2: - if mode != xds.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + if mode != connectivity.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } diff --git a/xds/server.go b/xds/server.go index 2014fcf5ec9..33a49095799 100644 --- a/xds/server.go +++ b/xds/server.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" @@ -231,7 +232,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error { ListenerResourceName: name, XDSCredsInUse: s.xdsCredsInUse, XDSClient: s.xdsC, - ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) { + ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) { modeUpdateCh.Put(&modeChangeArgs{ addr: addr, mode: mode, @@ -261,7 +262,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error { // modeChangeArgs wraps argument required for invoking mode change callback. type modeChangeArgs struct { addr net.Addr - mode server.ServingMode + mode connectivity.ServingMode err error } @@ -278,7 +279,7 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) { case u := <-updateCh.Get(): updateCh.Load() args := u.(*modeChangeArgs) - if args.mode == ServingModeNotServing { + if args.mode == connectivity.ServingModeNotServing { // We type assert our underlying gRPC server to the real // grpc.Server here before trying to initiate the drain // operation. This approach avoids performing the same type diff --git a/xds/server_options.go b/xds/server_options.go index 0918c097a3e..1d46c3adb7b 100644 --- a/xds/server_options.go +++ b/xds/server_options.go @@ -22,7 +22,7 @@ import ( "net" "google.golang.org/grpc" - iserver "google.golang.org/grpc/xds/internal/server" + "google.golang.org/grpc/connectivity" ) type serverOptions struct { @@ -41,20 +41,6 @@ func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption { return &serverOption{apply: func(o *serverOptions) { o.modeCallback = cb }} } -// ServingMode indicates the current mode of operation of the server. -type ServingMode = iserver.ServingMode - -const ( - // ServingModeServing indicates the the server contains all required xDS - // configuration is serving RPCs. - ServingModeServing = iserver.ServingModeServing - // ServingModeNotServing indicates that the server is not accepting new - // connections. Existing connections will be closed gracefully, allowing - // in-progress RPCs to complete. A server enters this mode when it does not - // contain the required xDS configuration to serve RPCs. - ServingModeNotServing = iserver.ServingModeNotServing -) - // ServingModeCallbackFunc is the callback that users can register to get // notified about the server's serving mode changes. The callback is invoked // with the address of the listener and its new mode. @@ -66,7 +52,7 @@ type ServingModeCallbackFunc func(addr net.Addr, args ServingModeChangeArgs) // function. type ServingModeChangeArgs struct { // Mode is the new serving mode of the server listener. - Mode ServingMode + Mode connectivity.ServingMode // Err is set to a non-nil error if the server has transitioned into // not-serving mode. Err error @@ -80,6 +66,11 @@ type ServingModeChangeArgs struct { // // This function should ONLY be used for testing and may not work with some // other features, including the CSDS service. +// +// Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. func BootstrapContentsForTesting(contents []byte) grpc.ServerOption { return &serverOption{apply: func(o *serverOptions) { o.bootstrapContents = contents }} } diff --git a/xds/server_test.go b/xds/server_test.go index 680b6562050..0866e0414ae 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -35,6 +35,7 @@ import ( v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/xds" @@ -435,8 +436,8 @@ func (s) TestServeSuccess(t *testing.T) { if err != nil { t.Fatalf("error when waiting for serving mode to change: %v", err) } - if mode := v.(ServingMode); mode != ServingModeNotServing { - t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing) + if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing { + t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeNotServing) } // Push a good LDS response, and wait for Serve() to be invoked on the @@ -463,8 +464,8 @@ func (s) TestServeSuccess(t *testing.T) { if err != nil { t.Fatalf("error when waiting for serving mode to change: %v", err) } - if mode := v.(ServingMode); mode != ServingModeServing { - t.Fatalf("server mode is %q, want %q", mode, ServingModeServing) + if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing { + t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeServing) } // Push an update to the registered listener watch callback with a Listener @@ -489,8 +490,8 @@ func (s) TestServeSuccess(t *testing.T) { if err != nil { t.Fatalf("error when waiting for serving mode to change: %v", err) } - if mode := v.(ServingMode); mode != ServingModeNotServing { - t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing) + if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing { + t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeNotServing) } } diff --git a/xds/xds.go b/xds/xds.go index 864d2e6a2e3..ec16c9f520b 100644 --- a/xds/xds.go +++ b/xds/xds.go @@ -25,11 +25,6 @@ // // See https://github.com/grpc/grpc-go/tree/master/examples/features/xds for // example. -// -// Experimental -// -// Notice: All APIs in this package are experimental and may be removed in a -// later release. package xds import ( @@ -87,6 +82,11 @@ func init() { // // This function should ONLY be used for testing and may not work with some // other features, including the CSDS service. +// +// Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. func NewXDSResolverWithConfigForTesting(bootstrapConfig []byte) (resolver.Builder, error) { return xdsresolver.NewBuilder(bootstrapConfig) }