Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/xdsclient: ignore resource deletion as per gRFC A53 #6035

Merged
merged 15 commits into from
Apr 4, 2023
5 changes: 3 additions & 2 deletions xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
balancerName = tdURL
}
serverConfig := &bootstrap.ServerConfig{
ServerURI: balancerName,
Creds: grpc.WithCredentialsBundle(google.NewDefaultCredentials()),
ServerURI: balancerName,
Creds: grpc.WithCredentialsBundle(google.NewDefaultCredentials()),
IgnoreResourceDeletion: false,
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
config := &bootstrap.Config{
XDSServer: serverConfig,
Expand Down
31 changes: 25 additions & 6 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type resourceState struct {
cache xdsresource.ResourceData // Most recent ACKed update for this resource
md xdsresource.UpdateMetadata // Metadata for the most recent update

easwars marked this conversation as resolved.
Show resolved Hide resolved
resourceDeletionIgnored bool // Set if resource deletion was ignored for a prior update
easwars marked this conversation as resolved.
Show resolved Hide resolved

// Common watch state for all watchers of this resource.
wTimer *time.Timer // Expiry timer
wState watchState // State of the watch
Expand Down Expand Up @@ -186,8 +188,13 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// If we get here, it means that the update is a valid one.
// Check if resourceDeletionIgnored flag was set for a prior response from server.
easwars marked this conversation as resolved.
Show resolved Hide resolved
if state.resourceDeletionIgnored {
state.resourceDeletionIgnored = false
a.logger.Infof("A valid update was received for resource type %q with resource name %q after previously ignoring a deletion", rType.TypeEnum(), name)
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
// Notify watchers only if this is a first time update or it is different
// from the one currently cached.
if state.cache == nil || !state.cache.Equal(uErr.resource) {
for watcher := range state.watchers {
Expand All @@ -214,7 +221,8 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty
// If this resource type requires that all resources be present in every
// SotW response from the server, a response that does not include a
// previously seen resource will be interpreted as a deletion of that
// resource.
// resource unless ignore_resource_deletion option was set in the server
// config.
if !rType.AllResourcesRequiredInSotW() {
return
}
Expand Down Expand Up @@ -246,6 +254,16 @@ func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Ty
continue
}

// Per A53, we want to delete the resource from cache based on the environment
// variable ignore_resource_deletion in server config.
easwars marked this conversation as resolved.
Show resolved Hide resolved
if a.serverCfg.IgnoreResourceDeletion {
if !state.resourceDeletionIgnored {
state.resourceDeletionIgnored = true
a.logger.Warningf("Ignoring resource deletion for resource type %q with resource name %q ", rType.TypeEnum(), name)
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
// Skip sending updates to watchers.
continue
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
// If resource exists in cache, but not in the new update, delete
// the resource from cache, and also send a resource not found error
// to indicate resource removed. Metadata for the resource is still
Expand Down Expand Up @@ -424,9 +442,10 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
if state == nil {
a.logger.Debugf("First watch for type %q, resource name %q", rType.TypeEnum(), resourceName)
state = &resourceState{
watchers: make(map[xdsresource.ResourceWatcher]bool),
md: xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusRequested},
wState: watchStateStarted,
watchers: make(map[xdsresource.ResourceWatcher]bool),
md: xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusRequested},
wState: watchStateStarted,
resourceDeletionIgnored: false,
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
resources[resourceName] = state
a.sendDiscoveryRequestLocked(rType, resources)
Expand Down
173 changes: 152 additions & 21 deletions xds/internal/xdsclient/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,21 @@ func (w *testResourceWatcher) OnError(err error) {

func (w *testResourceWatcher) OnResourceDoesNotExist() {}

func (w *testResourceWatcher) BlockOnUpdateCh(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("Test timed out before watcher received the update.")
case err := <-w.errorCh:
return fmt.Errorf("Watch got an unexpected error update: %q; want: valid update.", err)
case <-w.updateCh:
}
return nil
}

func newTestResourceWatcher() *testResourceWatcher {
return &testResourceWatcher{
updateCh: make(chan *xdsresource.ResourceData),
errorCh: make(chan error),
updateCh: make(chan *xdsresource.ResourceData, 1),
errorCh: make(chan error, 1),
}
}

Expand Down Expand Up @@ -149,17 +160,16 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {

// Updating mgmt server with the same lds resource. Blocking on watcher's update
// ch to verify the watch state transition to `watchStateReceived`.
if err := updateResourceInServer(ctx, ms, rn, nodeID); err != nil {
if err := updateResourcesInServer(ctx, ms, []string{rn}, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", rn, err)
}
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received an update from server.")
case err := <-w.errorCh:
t.Fatalf("Watch got an unexpected error update: %q. Want valid updates.", err)
case <-w.updateCh:
// This means the OnUpdate callback was invoked and the watcher was notified.

// If this call returns without an error it means the OnUpdate callback was
// invoked and the watcher was notified.
if err := w.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}

if err := compareWatchState(a, rn, watchStateReceived); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -222,18 +232,14 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
watcherA := newTestResourceWatcher()
cancelA := a.watchResource(listenerResourceType, nameA, watcherA)

if err := updateResourceInServer(ctx, ms, nameA, nodeID); err != nil {
if err := updateResourcesInServer(ctx, ms, []string{nameA}, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", nameA, err)
}

// Blocking on resource A watcher's update Channel to verify that there is
// more than one msg(s) received the ADS stream.
select {
case <-ctx.Done():
t.Fatal("Test timed out before watcher received the update.")
case err := <-watcherA.errorCh:
t.Fatalf("Watch got an unexpected error update: %q; want: valid update.", err)
case <-watcherA.updateCh:
if err := watcherA.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}

nameB := "xdsclient-test-lds-resourceB"
Expand Down Expand Up @@ -267,12 +273,127 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {

}

// This tests the scenario when IgnoreResourceDeletion server_features variable is set on
// the bootstrap server config, the resource deletion is ignored by the client when a update
// is missing that resource. This is a conformance gRFC A53[https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md].
func (s) TestResourceDeletionIsIgnored(t *testing.T) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
a, mgmtServer, nodeID := setupTest(ctx, t, emptyServerOpts, defaultClientWatchExpiryTimeout)
defer mgmtServer.Stop()
defer a.close()

resourceNameA := "xdsclient-test-lds-resourceA"
watcherA := newTestResourceWatcher()
cancelResourceA := a.watchResource(listenerResourceType, resourceNameA, watcherA)
defer cancelResourceA()

resourceNameB := "xdsclient-test-lds-resourceB"
watcherB := newTestResourceWatcher()
cancelResourceB := a.watchResource(listenerResourceType, resourceNameB, watcherB)
defer cancelResourceB()

if err := updateResourcesInServer(ctx, mgmtServer, []string{resourceNameA, resourceNameB}, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", []string{resourceNameA, resourceNameB}, err)
}

// Waiting for watcher(s) to receieve a resource update.
if err := watcherA.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}
if err := watcherB.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}

if ok := checkResourceInCache(a, resourceNameA); !ok {
t.Fatalf("No valid update cached for resource: %q. Want resource to be present in cache.", resourceNameA)
}
if ok := checkResourceInCache(a, resourceNameB); !ok {
t.Fatalf("No valid update cached for resource: %q. Want resource to be present in cache.", resourceNameB)
}

// Sending an update without resource B.
if err := updateResourcesInServer(ctx, mgmtServer, []string{resourceNameA}, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", resourceNameA, err)
}

// Waiting for updates to reach the client.
if err := watcherA.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}

// Verifying that both resources are still in cache.
if ok := checkResourceInCache(a, resourceNameA); !ok {
t.Fatalf("No valid update cached for resource[%q] was found. Want resource to be present in cache.", resourceNameA)
}
if ok := checkResourceInCache(a, resourceNameB); ok {
t.Fatalf("A valid update for resource[%q] was found in cache. Want no resource cache deleted.", resourceNameB)
}
}

func (s) TestResourceDeletionIsNotIgnored(t *testing.T) {
// This tests the scenario when IgnoreResourceDeletion is unset on the bootstrap
// server config, the resource deletion is not ignored by the client.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*100)
defer cancel()
a, mgmtServer, nodeID := setupTest(ctx, t, emptyServerOpts, defaultClientWatchExpiryTimeout)
defer mgmtServer.Stop()
defer a.close()

resourceNameA := "xdsclient-test-lds-resourceA"
watcherA := newTestResourceWatcher()
cancelResourceA := a.watchResource(listenerResourceType, resourceNameA, watcherA)
defer cancelResourceA()

resourceNameB := "xdsclient-test-lds-resourceB"
watcherB := newTestResourceWatcher()
cancelResourceB := a.watchResource(listenerResourceType, resourceNameB, watcherB)
defer cancelResourceB()

if err := updateResourcesInServer(ctx, mgmtServer, []string{resourceNameA, resourceNameB}, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", []string{resourceNameA, resourceNameB}, err)
}

// Waiting for watcher(s) to receieve a resource update.
if err := watcherA.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}
if err := watcherB.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}
if ok := checkResourceInCache(a, resourceNameA); !ok {
t.Fatalf("No valid update cached for resource: %q. Want resource to be present in cache.", resourceNameA)
}
if ok := checkResourceInCache(a, resourceNameB); !ok {
t.Fatalf("No valid update cached for resource: %q. Want resource to be present in cache.", resourceNameB)
}

// Sending an update without resource B.
if err := updateResourcesInServer(ctx, mgmtServer, []string{resourceNameA}, nodeID); err != nil {
t.Fatalf("Failed to update server with resource: %q; err: %q", resourceNameA, err)
}

// Waiting for updates to reach the client.
if err := watcherA.BlockOnUpdateCh(ctx); err != nil {
t.Fatal(err)
}

// Verifying that resource A's cache is still valid, while resource B has been
// deleted.
if ok := checkResourceInCache(a, resourceNameA); !ok {
t.Fatalf("No valid update cached for resource: %q. Want resource to be present in cache.", resourceNameA)
}
if ok := checkResourceInCache(a, resourceNameB); ok {
t.Fatalf("No valid update cached for resource: %q. Want resource to be present in cache.", resourceNameB)
}
}

func compareWatchState(a *authority, rn string, wantState watchState) error {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
gotState := a.resources[listenerResourceType][rn].wState
if gotState != wantState {
return fmt.Errorf("%v. Want: %v", gotState, wantState)
return fmt.Errorf("Got %v. Want: %v", gotState, wantState)
}

wTimer := a.resources[listenerResourceType][rn].wTimer
Expand All @@ -294,11 +415,21 @@ func compareWatchState(a *authority, rn string, wantState watchState) error {
return nil
}

func updateResourceInServer(ctx context.Context, ms *e2e.ManagementServer, rn string, nID string) error {
l := e2e.DefaultClientListener(rn, "new-rds-resource")
func checkResourceInCache(a *authority, rn string) bool {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()
return a.resources[listenerResourceType][rn].cache != nil
}

func updateResourcesInServer(ctx context.Context, ms *e2e.ManagementServer, rns []string, nID string) error {
listeners := []*v3listenerpb.Listener{}
for _, rn := range rns {
listener := e2e.DefaultClientListener(rn, fmt.Sprintf("new-rds-%s", uuid.New().String()))
listeners = append(listeners, listener)
}
resources := e2e.UpdateOptions{
NodeID: nID,
Listeners: []*v3listenerpb.Listener{l},
Listeners: listeners,
SkipValidation: true,
}
return ms.Update(ctx, resources)
Expand Down
22 changes: 19 additions & 3 deletions xds/internal/xdsclient/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ import (

const (
// The "server_features" field in the bootstrap file contains a list of
// features supported by the server. A value of "xds_v3" indicates that the
// server supports the v3 version of the xDS transport protocol.
serverFeaturesV3 = "xds_v3"
// features supported by the server.
// + A value of "xds_v3" indicates that the server supports the v3 version of
easwars marked this conversation as resolved.
Show resolved Hide resolved
// the xDS transport protocol.
// + A value of "ignore_resource_deletion" indicates that the client should
// ignore resource deletion in updates from the server.
easwars marked this conversation as resolved.
Show resolved Hide resolved
serverFeaturesV3 = "xds_v3"
serverFeaturesIgnoreResourceDeletion = "ignore_resource_deletion"

gRPCUserAgentName = "gRPC Go"
clientFeatureNoOverprovisioning = "envoy.lb.does_not_support_overprovisioning"
Expand Down Expand Up @@ -97,6 +101,10 @@ type ServerConfig struct {
Creds grpc.DialOption
// CredsType is the type of the creds. It will be used to dedup servers.
CredsType string
// IgnoreResourceDeletion if set the XdsClient will not invoke the watchers'
// OnResourceDoesNotExist() method when a resource is deleted, nor will it
// remove the existing resource value from its cache.
easwars marked this conversation as resolved.
Show resolved Hide resolved
IgnoreResourceDeletion bool
}

// String returns the string representation of the ServerConfig.
Expand All @@ -120,6 +128,9 @@ func (sc ServerConfig) MarshalJSON() ([]byte, error) {
ChannelCreds: []channelCreds{{Type: sc.CredsType, Config: nil}},
}
server.ServerFeatures = []string{serverFeaturesV3}
if sc.IgnoreResourceDeletion {
server.ServerFeatures = append(server.ServerFeatures, serverFeaturesIgnoreResourceDeletion)
}
return json.Marshal(server)
}

Expand All @@ -144,6 +155,11 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
sc.Creds = grpc.WithCredentialsBundle(bundle)
break
}
for _, f := range server.ServerFeatures {
if f == serverFeaturesIgnoreResourceDeletion {
sc.IgnoreResourceDeletion = true
}
}
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions xds/internal/xdsclient/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,22 @@ var (
}
]
}`,
"serverSupportsIgnoreResourceDeletion": `
easwars marked this conversation as resolved.
Show resolved Hide resolved
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "google_default" }
],
"server_features" : ["foo", "ignores_resource_deletion", "xds_v3"]
}]
}`,
}
metadata = &structpb.Struct{
Fields: map[string]*structpb.Value{
Expand Down