Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/server: fix RDS handling for non-inline route configs #6915

Merged
merged 6 commits into from Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion internal/testutils/xds/e2e/clientresources.go
Expand Up @@ -788,8 +788,23 @@ func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}

// RouteConfigNoRouteMatch returns an xDS RouteConfig resource which a route
// with no route match. This will be NACKed by the xDS Client.
func RouteConfigNoRouteMatch(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}

// RouteConfigNonForwardingAction returns an xDS RouteConfig resource which
// specifies to route to a route specfying non forwarding action. This is
// specifies to route to a route specifying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingAction(routeName string) *v3routepb.RouteConfiguration {
Expand Down
6 changes: 5 additions & 1 deletion internal/transport/http2_server.go
Expand Up @@ -1290,7 +1290,11 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo

// CallbackConn is a conn with a callback function.
type CallbackConn interface {
Callback(ServerTransport)
// PassServerTransport passes a ServerTransport to the callback Conn. This
// is called in the grpc layer after a ServerTransport for a connection has
// successfully been created, if this method exists on the accepted
// connection.
PassServerTransport(ServerTransport)
}

func (t *http2Server) Drain(debugData string) {
Expand Down
2 changes: 1 addition & 1 deletion server.go
Expand Up @@ -930,7 +930,7 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}

if cc, ok := rawConn.(transport.CallbackConn); ok {
cc.Callback(st)
cc.PassServerTransport(st)
}

if !s.addConn(lisAddr, st) {
Expand Down
63 changes: 63 additions & 0 deletions test/xds/xds_server_test.go
Expand Up @@ -171,6 +171,69 @@ func waitForFailedRPCWithStatusCode(ctx context.Context, t *testing.T, cc *grpc.
}
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns an RDS Resource which is NACKed. This should trigger server should
// move to serving, successfully Accept Connections, and fail at the L7 level
// with a certain error message.
func (s) TestRDSNack(t *testing.T) {
managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
// Setup the management server to respond with a listener resource that
// specifies a route name to watch, and no RDS resource corresponding to
// this route name.
host, port, err := hostPortFromListener(lis)
if err != nil {
t.Fatalf("failed to retrieve host and port of server: %v", err)
}

listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
routeConfig := e2e.RouteConfigNoRouteMatch("routeName")
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
Routes: []*v3routepb.RouteConfiguration{routeConfig},
SkipValidation: true,
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
serving := grpcsync.NewEvent()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
if args.Mode == connectivity.ServingModeServing {
serving.Fire()
}
})

server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

<-serving.Done()
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
}

// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
// returns resource not found. Before getting the resource not found, the xDS
// Server has not received all configuration needed, so it should Accept and
Expand Down
14 changes: 8 additions & 6 deletions xds/internal/server/conn_wrapper.go
Expand Up @@ -24,7 +24,6 @@
"sync"
"sync/atomic"
"time"
"unsafe"

"google.golang.org/grpc/credentials/tls/certprovider"
xdsinternal "google.golang.org/grpc/internal/credentials/xds"
Expand Down Expand Up @@ -68,14 +67,13 @@

// The virtual hosts with matchable routes and instantiated HTTP Filters per
// route, or an error.
urc *unsafe.Pointer // *xdsresource.UsableRouteConfiguration
urc *atomic.Pointer[xdsresource.UsableRouteConfiguration]
}

// UsableRouteConfiguration returns the UsableRouteConfiguration to be used for
// server side routing.
func (c *connWrapper) UsableRouteConfiguration() xdsresource.UsableRouteConfiguration {
uPtr := atomic.LoadPointer(c.urc)
return *(*xdsresource.UsableRouteConfiguration)(uPtr)
return *c.urc.Load()
}

// SetDeadline makes a copy of the passed in deadline and forwards the call to
Expand Down Expand Up @@ -131,17 +129,21 @@
return xdsinternal.NewHandshakeInfo(c.rootProvider, c.identityProvider, nil, secCfg.RequireClientCert), nil
}

func (c *connWrapper) Callback(st transport.ServerTransport) {
// PassServerTransport drains the passed in ServerTransportif draining is set,
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// or persists it to be drained once drained is called.
func (c *connWrapper) PassServerTransport(st transport.ServerTransport) {
c.mu.Lock()
defer c.mu.Unlock()
if c.draining {
st.Drain("draining")

Check warning on line 138 in xds/internal/server/conn_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/conn_wrapper.go#L138

Added line #L138 was not covered by tests
} else {
c.st = st
}
}

func (c *connWrapper) drain() {
// Drain drains the associated ServerTransport, or sets draining to true so it
// will be drained after it is created.
func (c *connWrapper) Drain() {
c.mu.Lock()
defer c.mu.Unlock()
if c.st == nil {
Expand Down
40 changes: 21 additions & 19 deletions xds/internal/server/listener_wrapper.go
Expand Up @@ -24,9 +24,7 @@
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -171,11 +169,11 @@
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do.
if ilc.Address != l.addr || ilc.Port != l.port {
l.mu.Lock()
l.switchModeLocked(connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
l.mu.Unlock()
return
}

Check warning on line 176 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L172-L176

Added lines #L172 - L176 were not covered by tests

l.pendingFilterChainManager = ilc.FilterChains
l.rdsHandler.updateRouteNamesToWatch(ilc.FilterChains.RouteConfigNames)
Expand All @@ -187,7 +185,7 @@

// maybeUpdateFilterChains swaps in the pending filter chain manager to the
// active one if the pending filter chain manager is present. If a swap occurs,
// it also drains (gracefully stops) any connections that were accepted on the
// it also drains (gracefully stops) any connections that were accepted on the
// old active filter chain manager, and puts this listener in state SERVING.
// Must be called within an xDS Client Callback.
func (l *listenerWrapper) maybeUpdateFilterChains() {
Expand All @@ -202,17 +200,20 @@
// gracefully shut down with a grace period of 10 minutes for long-lived
// RPC's, such that clients will reconnect and have the updated
// configuration apply." - A36
var connsToClose []net.Conn
var connsToClose []xdsresource.DrainConn
if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up.
connsToClose = l.activeFilterChainManager.Conns()
}
l.activeFilterChainManager = l.pendingFilterChainManager
l.pendingFilterChainManager = nil
l.instantiateFilterChainRoutingConfigurationsLocked()
l.mu.Unlock()
for _, conn := range connsToClose {
conn.(*connWrapper).drain()
}
go func() {
for _, conn := range connsToClose {
conn.Drain()
}
}()

}

// handleRDSUpdate rebuilds any routing configuration server side for any filter
Expand All @@ -221,17 +222,17 @@
func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate) {
// Update any filter chains that point to this route configuration.
if l.activeFilterChainManager != nil {
for _, fc := range l.activeFilterChainManager.FilterChains() { // v4 and v6 filter chains...why doesn't this update the first time?
for _, fc := range l.activeFilterChainManager.FilterChains() {
if fc.RouteConfigName != routeName {
continue

Check warning on line 227 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L227

Added line #L227 was not covered by tests
}
if rcu.err != nil && rcu.update == nil { // Either NACK before update, or resource not found triggers this conditional.
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(&xdsresource.UsableRouteConfiguration{
fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{
Err: rcu.err,
}))
})
continue
}
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*rcu.update)))
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.update))
}
}
if l.rdsHandler.determineRouteConfigurationReady() {
Expand All @@ -246,17 +247,15 @@
func (l *listenerWrapper) instantiateFilterChainRoutingConfigurationsLocked() {
for _, fc := range l.activeFilterChainManager.FilterChains() {
if fc.InlineRouteConfig != nil {
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig))) // Can't race with an RPC coming in but no harm making atomic.
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig)) // Can't race with an RPC coming in but no harm making atomic.
continue
} // Inline configuration constructed once here, will remain for lifetime of filter chain.
rcu := l.rdsHandler.updates[fc.RouteConfigName]
if rcu.err != nil && rcu.update == nil {
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(&xdsresource.UsableRouteConfiguration{
Err: rcu.err,
}))
fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{Err: rcu.err})
continue
}
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*rcu.update))) // Can't race with an RPC coming in but no harm making atomic.
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.update)) // Can't race with an RPC coming in but no harm making atomic.
}
}

Expand Down Expand Up @@ -322,7 +321,7 @@
SourcePort: srcAddr.Port,
})
if err != nil {
l.mu.RUnlock()

Check warning on line 324 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L324

Added line #L324 was not covered by tests
// When a matching filter chain is not found, we close the
// connection right away, but do not return an error back to
// `grpc.Serve()` from where this Accept() was invoked. Returning an
Expand Down Expand Up @@ -373,9 +372,12 @@
}
l.mode = newMode
if l.mode == connectivity.ServingModeNotServing {
for _, conn := range l.activeFilterChainManager.Conns() {
conn.(*connWrapper).drain()
}
connsToClose := l.activeFilterChainManager.Conns()
go func() {
for _, conn := range connsToClose {
conn.Drain()
}
}()
}
// The XdsServer API will allow applications to register a "serving state"
// callback to be invoked when the server begins serving and when the
Expand Down
22 changes: 9 additions & 13 deletions xds/internal/server/rds_handler.go
Expand Up @@ -76,10 +76,10 @@
w := &rdsWatcher{parent: rh, routeName: routeName}
cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
// Set bit on cancel function to eat any RouteConfiguration calls
// for this watcher after it has been cancelled.
// for this watcher after it has been canceled.
rh.cancels[routeName] = func() {
w.mu.Lock()
w.cancelled = true
w.canceled = true
w.mu.Unlock()
cancel()
}
Expand Down Expand Up @@ -107,10 +107,7 @@

// Must be called from an xDS Client Callback.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) {
rwu, ok := rh.updates[routeName]
if !ok {
rwu = rdsWatcherUpdate{}
}
rwu := rh.updates[routeName]

if update.err != nil {
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
Expand All @@ -121,8 +118,7 @@
// Write error.
rwu.err = update.err
} else {
rwu.update = update.update
rwu.err = nil
rwu = update
}
rh.updates[routeName] = rwu
rh.callback(routeName, rwu)
Expand Down Expand Up @@ -151,16 +147,16 @@
logger *igrpclog.PrefixLogger
routeName string

mu sync.Mutex
cancelled bool // eats callbacks if true
mu sync.Mutex
canceled bool // eats callbacks if true
}

func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
rw.mu.Lock()
if rw.cancelled {
if rw.canceled {
rw.mu.Unlock()
return
}

Check warning on line 159 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L157-L159

Added lines #L157 - L159 were not covered by tests
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
Expand All @@ -170,10 +166,10 @@

func (rw *rdsWatcher) OnError(err error) {
rw.mu.Lock()
if rw.cancelled {
if rw.canceled {
rw.mu.Unlock()
return
}

Check warning on line 172 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L170-L172

Added lines #L170 - L172 were not covered by tests
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
Expand All @@ -183,10 +179,10 @@

func (rw *rdsWatcher) OnResourceDoesNotExist() {
rw.mu.Lock()
if rw.cancelled {
if rw.canceled {
rw.mu.Unlock()
return
}

Check warning on line 185 in xds/internal/server/rds_handler.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/rds_handler.go#L183-L185

Added lines #L183 - L185 were not covered by tests
rw.mu.Unlock()
if rw.logger.V(2) {
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
Expand Down
18 changes: 5 additions & 13 deletions xds/internal/xdsclient/client_new.go
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -106,15 +107,10 @@ func init() {
internal.TriggerXDSResourceNameNotFoundClient = triggerXDSResourceNameNotFoundClient
}

var (
singletonClientForTestingMu sync.Mutex
singletonClientForTesting *clientRefCounted
)
var singletonClientForTesting = atomic.Pointer[clientRefCounted]{}

func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) error {
singletonClientForTestingMu.Lock()
c := singletonClientForTesting
singletonClientForTestingMu.Unlock()
c := singletonClientForTesting.Load()
return internal.TriggerXDSResourceNameNotFoundForTesting.(func(func(xdsresource.Type, string) error, string, string) error)(c.clientImpl.triggerResourceNotFoundForTesting, resourceType, resourceName)
}

Expand All @@ -141,18 +137,14 @@ func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), err
if err != nil {
return nil, nil, err
}
singletonClientForTestingMu.Lock()
singletonClientForTesting = c
singletonClientForTestingMu.Unlock()
singletonClientForTesting.Store(c)
return c, grpcsync.OnceFunc(func() {
clientsMu.Lock()
defer clientsMu.Unlock()
if c.decrRef() == 0 {
c.close()
delete(clients, string(contents))
singletonClientForTestingMu.Lock()
singletonClientForTesting = nil
singletonClientForTestingMu.Unlock()
singletonClientForTesting.Store(nil)
}
}), nil
}
Expand Down