From cd9f79e024cd8910f01b15b8ad7c2fe9dc60b053 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 7 Sep 2021 14:55:52 -0700 Subject: [PATCH] [xds_client_nack_part] ttt [xds_client_nack_part] test not done [xds_client_nack_part] temp, lds failing [xds_client_nack_part] ttt [xds_client_nack_part] 222 [xds_client_nack_part] lll [xds_client_nack_part] eee [xds_client_nack_part] cds [xds_client_nack_part] eds [xds_client_nack_part] lds [xds_client_nack_part] rds [xds_client_nack_part] csds tests failing [xds_client_nack_part] cds dump [xds_client_nack_part] eds dump [xds_client_nack_part] rds dump [xds_client_nack_part] xdsclient v2 tests [xds_client_nack_part] fix csds test --- xds/csds/csds_test.go | 49 ++++--- xds/internal/xdsclient/callback.go | 133 ++++++++++-------- xds/internal/xdsclient/cds_test.go | 14 +- xds/internal/xdsclient/client.go | 12 ++ xds/internal/xdsclient/client_test.go | 10 +- xds/internal/xdsclient/dump_test.go | 44 +++--- xds/internal/xdsclient/eds_test.go | 13 +- xds/internal/xdsclient/lds_test.go | 73 +++++----- xds/internal/xdsclient/rds_test.go | 8 +- xds/internal/xdsclient/v2/cds_test.go | 5 +- xds/internal/xdsclient/v2/client_test.go | 6 +- xds/internal/xdsclient/v2/eds_test.go | 5 +- xds/internal/xdsclient/v2/lds_test.go | 13 +- xds/internal/xdsclient/v2/rds_test.go | 5 +- .../xdsclient/watchers_cluster_test.go | 66 ++++++++- .../xdsclient/watchers_endpoints_test.go | 62 +++++++- .../xdsclient/watchers_listener_test.go | 62 +++++++- xds/internal/xdsclient/watchers_route_test.go | 68 ++++++++- xds/internal/xdsclient/xds.go | 12 +- 19 files changed, 481 insertions(+), 179 deletions(-) diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index ffaae2d739ab..395fb217c338 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -211,22 +211,28 @@ func TestCSDS(t *testing.T) { } const nackResourceIdx = 0 + var ( + nackListeners = listeners + nackRoutes = routes + nackClusters = clusters + nackEndpoints = endpoints + ) + nackListeners[0] = &v3listenerpb.Listener{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}} // 0 will be nacked. 1 will stay the same. + nackRoutes[0] = &v3routepb.RouteConfiguration{ + Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}}, + } + nackClusters[0] = &v3clusterpb.Cluster{ + Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}, + } + nackEndpoints[0] = &v3endpointpb.ClusterLoadAssignment{ + ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}, + } if err := mgmServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{ - {Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}}, // 0 will be nacked. 1 will stay the same. - }, - Routes: []*v3routepb.RouteConfiguration{ - {Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{ - Routes: []*v3routepb.Route{{}}, - }}}, - }, - Clusters: []*v3clusterpb.Cluster{ - {Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}}, - }, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{ - {ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}}, - }, + NodeID: nodeID, + Listeners: nackListeners, + Routes: nackRoutes, + Clusters: nackClusters, + Endpoints: nackEndpoints, SkipValidation: true, }); err != nil { t.Fatal(err) @@ -490,7 +496,6 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco ackVersion = "1" nackVersion = "2" ) - if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { return fmt.Errorf("failed to send: %v", err) } @@ -514,13 +519,14 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco configDump := &v3adminpb.ListenersConfigDump_DynamicListener{ Name: ldsTargets[i], ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{ - VersionInfo: ackVersion, + VersionInfo: nackVersion, Listener: listenerAnys[i], LastUpdated: nil, }, ClientStatus: v3adminpb.ClientResourceStatus_ACKED, } if i == nackResourceIdx { + configDump.ActiveState.VersionInfo = ackVersion configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED configDump.ErrorState = &v3adminpb.UpdateFailureState{ Details: "blahblah", @@ -540,12 +546,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig for i := range rdsTargets { configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{ - VersionInfo: ackVersion, + VersionInfo: nackVersion, RouteConfig: routeAnys[i], LastUpdated: nil, ClientStatus: v3adminpb.ClientResourceStatus_ACKED, } if i == nackResourceIdx { + configDump.VersionInfo = ackVersion configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED configDump.ErrorState = &v3adminpb.UpdateFailureState{ Details: "blahblah", @@ -564,12 +571,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster for i := range cdsTargets { configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{ - VersionInfo: ackVersion, + VersionInfo: nackVersion, Cluster: clusterAnys[i], LastUpdated: nil, ClientStatus: v3adminpb.ClientResourceStatus_ACKED, } if i == nackResourceIdx { + configDump.VersionInfo = ackVersion configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED configDump.ErrorState = &v3adminpb.UpdateFailureState{ Details: "blahblah", @@ -589,12 +597,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig for i := range cdsTargets { configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{ - VersionInfo: ackVersion, + VersionInfo: nackVersion, EndpointConfig: endpointAnys[i], LastUpdated: nil, ClientStatus: v3adminpb.ClientResourceStatus_ACKED, } if i == nackResourceIdx { + configDump.VersionInfo = ackVersion configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED configDump.ErrorState = &v3adminpb.UpdateFailureState{ Details: "blahblah", diff --git a/xds/internal/xdsclient/callback.go b/xds/internal/xdsclient/callback.go index b8ad0ec76362..b765af231d0f 100644 --- a/xds/internal/xdsclient/callback.go +++ b/xds/internal/xdsclient/callback.go @@ -80,11 +80,13 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up c.mu.Lock() defer c.mu.Unlock() + c.ldsVersion = metadata.Version if metadata.ErrState != nil { - // On NACK, update overall version to the NACKed resp. c.ldsVersion = metadata.ErrState.Version - for name := range updates { - if s, ok := c.ldsWatchers[name]; ok { + } + for name, update := range updates { + if s, ok := c.ldsWatchers[name]; ok { + if update.Err != nil { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.ldsMD[name] @@ -92,25 +94,27 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up mdCopy.Status = metadata.Status c.ldsMD[name] = mdCopy for wi := range s { - wi.newError(metadata.ErrState.Err) + wi.newError(update.Err) } + continue } - } - return - } - - // If no error received, the status is ACK. - c.ldsVersion = metadata.Version - for name, update := range updates { - if s, ok := c.ldsWatchers[name]; ok { - // Only send the update if this is not an error. + // If the resource is valid, send the update. for wi := range s { wi.newUpdate(update) } // Sync cache. c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) c.ldsCache[name] = update - c.ldsMD[name] = metadata + // Set status to ACK, and clear error state. The metadata might be a + // NACK metadata because some other resources in the same response + // are invalid. + mdCopy := metadata + mdCopy.Status = ServiceStatusACKed + mdCopy.ErrState = nil + if metadata.ErrState != nil { + mdCopy.Version = metadata.ErrState.Version + } + c.ldsMD[name] = mdCopy } } // Resources not in the new update were removed by the server, so delete @@ -141,11 +145,14 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad c.mu.Lock() defer c.mu.Unlock() + // If no error received, the status is ACK. + c.rdsVersion = metadata.Version if metadata.ErrState != nil { - // On NACK, update overall version to the NACKed resp. c.rdsVersion = metadata.ErrState.Version - for name := range updates { - if s, ok := c.rdsWatchers[name]; ok { + } + for name, update := range updates { + if s, ok := c.rdsWatchers[name]; ok { + if update.Err != nil { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.rdsMD[name] @@ -153,25 +160,27 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad mdCopy.Status = metadata.Status c.rdsMD[name] = mdCopy for wi := range s { - wi.newError(metadata.ErrState.Err) + wi.newError(update.Err) } + continue } - } - return - } - - // If no error received, the status is ACK. - c.rdsVersion = metadata.Version - for name, update := range updates { - if s, ok := c.rdsWatchers[name]; ok { - // Only send the update if this is not an error. + // If the resource is valid, send the update. for wi := range s { wi.newUpdate(update) } // Sync cache. c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) c.rdsCache[name] = update - c.rdsMD[name] = metadata + // Set status to ACK, and clear error state. The metadata might be a + // NACK metadata because some other resources in the same response + // are invalid. + mdCopy := metadata + mdCopy.Status = ServiceStatusACKed + mdCopy.ErrState = nil + if metadata.ErrState != nil { + mdCopy.Version = metadata.ErrState.Version + } + c.rdsMD[name] = mdCopy } } } @@ -185,11 +194,13 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda c.mu.Lock() defer c.mu.Unlock() + c.cdsVersion = metadata.Version if metadata.ErrState != nil { - // On NACK, update overall version to the NACKed resp. c.cdsVersion = metadata.ErrState.Version - for name := range updates { - if s, ok := c.cdsWatchers[name]; ok { + } + for name, update := range updates { + if s, ok := c.cdsWatchers[name]; ok { + if update.Err != nil { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.cdsMD[name] @@ -197,25 +208,29 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda mdCopy.Status = metadata.Status c.cdsMD[name] = mdCopy for wi := range s { - wi.newError(metadata.ErrState.Err) + // Send the watcher the individual error, instead of the + // overall combined error from the metadata.ErrState. + wi.newError(update.Err) } + continue } - } - return - } - - // If no error received, the status is ACK. - c.cdsVersion = metadata.Version - for name, update := range updates { - if s, ok := c.cdsWatchers[name]; ok { - // Only send the update if this is not an error. + // If the resource is valid, send the update. for wi := range s { wi.newUpdate(update) } // Sync cache. c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) c.cdsCache[name] = update - c.cdsMD[name] = metadata + // Set status to ACK, and clear error state. The metadata might be a + // NACK metadata because some other resources in the same response + // are invalid. + mdCopy := metadata + mdCopy.Status = ServiceStatusACKed + mdCopy.ErrState = nil + if metadata.ErrState != nil { + mdCopy.Version = metadata.ErrState.Version + } + c.cdsMD[name] = mdCopy } } // Resources not in the new update were removed by the server, so delete @@ -246,11 +261,13 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U c.mu.Lock() defer c.mu.Unlock() + c.edsVersion = metadata.Version if metadata.ErrState != nil { - // On NACK, update overall version to the NACKed resp. c.edsVersion = metadata.ErrState.Version - for name := range updates { - if s, ok := c.edsWatchers[name]; ok { + } + for name, update := range updates { + if s, ok := c.edsWatchers[name]; ok { + if update.Err != nil { // On error, keep previous version for each resource. But update // status and error. mdCopy := c.edsMD[name] @@ -258,25 +275,29 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U mdCopy.Status = metadata.Status c.edsMD[name] = mdCopy for wi := range s { - wi.newError(metadata.ErrState.Err) + // Send the watcher the individual error, instead of the + // overall combined error from the metadata.ErrState. + wi.newError(update.Err) } + continue } - } - return - } - - // If no error received, the status is ACK. - c.edsVersion = metadata.Version - for name, update := range updates { - if s, ok := c.edsWatchers[name]; ok { - // Only send the update if this is not an error. + // If the resource is valid, send the update. for wi := range s { wi.newUpdate(update) } // Sync cache. c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update)) c.edsCache[name] = update - c.edsMD[name] = metadata + // Set status to ACK, and clear error state. The metadata might be a + // NACK metadata because some other resources in the same response + // are invalid. + mdCopy := metadata + mdCopy.Status = ServiceStatusACKed + mdCopy.ErrState = nil + if metadata.ErrState != nil { + mdCopy.Version = metadata.ErrState.Version + } + c.edsMD[name] = mdCopy } } } diff --git a/xds/internal/xdsclient/cds_test.go b/xds/internal/xdsclient/cds_test.go index 88eda33780d2..c43a285727fb 100644 --- a/xds/internal/xdsclient/cds_test.go +++ b/xds/internal/xdsclient/cds_test.go @@ -957,7 +957,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -975,7 +975,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -988,13 +988,15 @@ func (s) TestUnmarshalCluster(t *testing.T) { ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}, }), }, - wantUpdate: map[string]ClusterUpdate{"test": {}}, + wantUpdate: map[string]ClusterUpdate{ + "test": {Err: cmpopts.AnyError}, + }, wantMD: UpdateMetadata{ Status: ServiceStatusNACKed, Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -1072,14 +1074,14 @@ func (s) TestUnmarshalCluster(t *testing.T) { EDSServiceName: v3Service, EnableLRS: true, Raw: v3ClusterAny, }, - "bad": {}, + "bad": {Err: cmpopts.AnyError}, }, wantMD: UpdateMetadata{ Status: ServiceStatusNACKed, Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index b6310bb70081..32dbf89b694f 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -220,6 +220,9 @@ type ListenerUpdate struct { // Raw is the resource from the xds response. Raw *anypb.Any + // Err is non-nil if the resource in the response is invalid. It contains + // the unmarshal error message. + Err error } // HTTPFilter represents one HTTP filter from an LDS response's HTTP connection @@ -254,6 +257,9 @@ type RouteConfigUpdate struct { VirtualHosts []*VirtualHost // Raw is the resource from the xds response. Raw *anypb.Any + // Err is non-nil if the resource in the response is invalid. It contains + // the unmarshal error message. + Err error } // VirtualHost contains the routes for a list of Domains. @@ -482,6 +488,9 @@ type ClusterUpdate struct { // Raw is the resource from the xds response. Raw *anypb.Any + // Err is non-nil if the resource in the response is invalid. It contains + // the unmarshal error message. + Err error } // OverloadDropConfig contains the config to drop overloads. @@ -531,6 +540,9 @@ type EndpointsUpdate struct { // Raw is the resource from the xds response. Raw *anypb.Any + // Err is non-nil if the resource in the response is invalid. It contains + // the unmarshal error message. + Err error } // Function to be overridden in tests. diff --git a/xds/internal/xdsclient/client_test.go b/xds/internal/xdsclient/client_test.go index 9d8086aeb4e1..c89c526debbe 100644 --- a/xds/internal/xdsclient/client_test.go +++ b/xds/internal/xdsclient/client_test.go @@ -62,19 +62,11 @@ const ( var ( cmpOpts = cmp.Options{ cmpopts.EquateEmpty(), + cmp.FilterValues(func(x, y error) bool { return true }, cmpopts.EquateErrors()), cmp.Comparer(func(a, b time.Time) bool { return true }), - cmp.Comparer(func(x, y error) bool { - if x == nil || y == nil { - return x == nil && y == nil - } - return x.Error() == y.Error() - }), protocmp.Transform(), } - // When comparing NACK UpdateMetadata, we only care if error is nil, but not - // the details in error. - errPlaceHolder = fmt.Errorf("error whose details don't matter") cmpOptsIgnoreDetails = cmp.Options{ cmp.Comparer(func(a, b time.Time) bool { return true }), cmp.Comparer(func(x, y error) bool { diff --git a/xds/internal/xdsclient/dump_test.go b/xds/internal/xdsclient/dump_test.go index 83479978d765..6670da581654 100644 --- a/xds/internal/xdsclient/dump_test.go +++ b/xds/internal/xdsclient/dump_test.go @@ -106,11 +106,11 @@ func (s) TestLDSConfigDump(t *testing.T) { for n, r := range listenerRaws { update0[n] = xdsclient.ListenerUpdate{Raw: r} want0[n] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}, Raw: r, } } - updateHandler.NewListeners(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewListeners(update0, xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpLDS, testVersion, want0); err != nil { @@ -121,9 +121,11 @@ func (s) TestLDSConfigDump(t *testing.T) { var nackErr = fmt.Errorf("lds nack error") updateHandler.NewListeners( map[string]xdsclient.ListenerUpdate{ - ldsTargets[0]: {}, + ldsTargets[0]: {Err: nackErr}, + ldsTargets[1]: {Raw: listenerRaws[ldsTargets[1]]}, }, xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, Err: nackErr, @@ -137,6 +139,7 @@ func (s) TestLDSConfigDump(t *testing.T) { // message, as well as the NACK error. wantDump[ldsTargets[0]] = xdsclient.UpdateWithMD{ MD: xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, Version: testVersion, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, @@ -147,7 +150,7 @@ func (s) TestLDSConfigDump(t *testing.T) { } wantDump[ldsTargets[1]] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: nackVersion}, Raw: listenerRaws[ldsTargets[1]], } if err := compareDump(client.DumpLDS, nackVersion, wantDump); err != nil { @@ -217,11 +220,11 @@ func (s) TestRDSConfigDump(t *testing.T) { for n, r := range routeRaws { update0[n] = xdsclient.RouteConfigUpdate{Raw: r} want0[n] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}, Raw: r, } } - updateHandler.NewRouteConfigs(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewRouteConfigs(update0, xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpRDS, testVersion, want0); err != nil { @@ -232,9 +235,11 @@ func (s) TestRDSConfigDump(t *testing.T) { var nackErr = fmt.Errorf("rds nack error") updateHandler.NewRouteConfigs( map[string]xdsclient.RouteConfigUpdate{ - rdsTargets[0]: {}, + rdsTargets[0]: {Err: nackErr}, + rdsTargets[1]: {Raw: routeRaws[rdsTargets[1]]}, }, xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, Err: nackErr, @@ -248,6 +253,7 @@ func (s) TestRDSConfigDump(t *testing.T) { // message, as well as the NACK error. wantDump[rdsTargets[0]] = xdsclient.UpdateWithMD{ MD: xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, Version: testVersion, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, @@ -257,7 +263,7 @@ func (s) TestRDSConfigDump(t *testing.T) { Raw: routeRaws[rdsTargets[0]], } wantDump[rdsTargets[1]] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: nackVersion}, Raw: routeRaws[rdsTargets[1]], } if err := compareDump(client.DumpRDS, nackVersion, wantDump); err != nil { @@ -328,11 +334,11 @@ func (s) TestCDSConfigDump(t *testing.T) { for n, r := range clusterRaws { update0[n] = xdsclient.ClusterUpdate{Raw: r} want0[n] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}, Raw: r, } } - updateHandler.NewClusters(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewClusters(update0, xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpCDS, testVersion, want0); err != nil { @@ -343,9 +349,11 @@ func (s) TestCDSConfigDump(t *testing.T) { var nackErr = fmt.Errorf("cds nack error") updateHandler.NewClusters( map[string]xdsclient.ClusterUpdate{ - cdsTargets[0]: {}, + cdsTargets[0]: {Err: nackErr}, + cdsTargets[1]: {Raw: clusterRaws[cdsTargets[1]]}, }, xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, Err: nackErr, @@ -359,6 +367,7 @@ func (s) TestCDSConfigDump(t *testing.T) { // message, as well as the NACK error. wantDump[cdsTargets[0]] = xdsclient.UpdateWithMD{ MD: xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, Version: testVersion, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, @@ -368,7 +377,7 @@ func (s) TestCDSConfigDump(t *testing.T) { Raw: clusterRaws[cdsTargets[0]], } wantDump[cdsTargets[1]] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: nackVersion}, Raw: clusterRaws[cdsTargets[1]], } if err := compareDump(client.DumpCDS, nackVersion, wantDump); err != nil { @@ -425,11 +434,11 @@ func (s) TestEDSConfigDump(t *testing.T) { for n, r := range endpointRaws { update0[n] = xdsclient.EndpointsUpdate{Raw: r} want0[n] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}, Raw: r, } } - updateHandler.NewEndpoints(update0, xdsclient.UpdateMetadata{Version: testVersion}) + updateHandler.NewEndpoints(update0, xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: testVersion}) // Expect ACK. if err := compareDump(client.DumpEDS, testVersion, want0); err != nil { @@ -440,9 +449,11 @@ func (s) TestEDSConfigDump(t *testing.T) { var nackErr = fmt.Errorf("eds nack error") updateHandler.NewEndpoints( map[string]xdsclient.EndpointsUpdate{ - edsTargets[0]: {}, + edsTargets[0]: {Err: nackErr}, + edsTargets[1]: {Raw: endpointRaws[edsTargets[1]]}, }, xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, Err: nackErr, @@ -456,6 +467,7 @@ func (s) TestEDSConfigDump(t *testing.T) { // message, as well as the NACK error. wantDump[edsTargets[0]] = xdsclient.UpdateWithMD{ MD: xdsclient.UpdateMetadata{ + Status: xdsclient.ServiceStatusNACKed, Version: testVersion, ErrState: &xdsclient.UpdateErrorMetadata{ Version: nackVersion, @@ -465,7 +477,7 @@ func (s) TestEDSConfigDump(t *testing.T) { Raw: endpointRaws[edsTargets[0]], } wantDump[edsTargets[1]] = xdsclient.UpdateWithMD{ - MD: xdsclient.UpdateMetadata{Version: testVersion}, + MD: xdsclient.UpdateMetadata{Status: xdsclient.ServiceStatusACKed, Version: nackVersion}, Raw: endpointRaws[edsTargets[1]], } if err := compareDump(client.DumpEDS, nackVersion, wantDump); err != nil { diff --git a/xds/internal/xdsclient/eds_test.go b/xds/internal/xdsclient/eds_test.go index 2fe35989f7d3..ae1499e09e0e 100644 --- a/xds/internal/xdsclient/eds_test.go +++ b/xds/internal/xdsclient/eds_test.go @@ -30,6 +30,7 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/version" @@ -149,7 +150,7 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -167,7 +168,7 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -180,13 +181,13 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { clab0.addLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) return clab0.Build() }())}, - wantUpdate: map[string]EndpointsUpdate{"test": {}}, + wantUpdate: map[string]EndpointsUpdate{"test": {Err: cmpopts.AnyError}}, wantMD: UpdateMetadata{ Status: ServiceStatusNACKed, Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -266,14 +267,14 @@ func (s) TestUnmarshalEndpoints(t *testing.T) { }, Raw: v3EndpointsAny, }, - "bad": {}, + "bad": {Err: cmpopts.AnyError}, }, wantMD: UpdateMetadata{ Status: ServiceStatusNACKed, Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, diff --git a/xds/internal/xdsclient/lds_test.go b/xds/internal/xdsclient/lds_test.go index 8b9dc7135004..5f3efc94a843 100644 --- a/xds/internal/xdsclient/lds_test.go +++ b/xds/internal/xdsclient/lds_test.go @@ -29,6 +29,7 @@ import ( "github.com/golang/protobuf/proto" spb "github.com/golang/protobuf/ptypes/struct" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/grpc/internal/testutils" @@ -176,7 +177,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, } ) @@ -214,7 +215,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { }(), }, }, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -226,7 +227,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { ApiListener: testutils.MarshalAny(&v2xdspb.Listener{}), }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -242,7 +243,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { }), }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -256,7 +257,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { }), }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -279,7 +280,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { }), }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -322,7 +323,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { }), }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -395,7 +396,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { { name: "v3 with two filters with same name", resources: []*anypb.Any{v3LisWithFilters(customFilter, customFilter)}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -427,7 +428,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { { name: "v3 with server-only filter", resources: []*anypb.Any{v3LisWithFilters(serverOnlyCustomFilter)}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -471,21 +472,21 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { { name: "v3 with err filter", resources: []*anypb.Any{v3LisWithFilters(errFilter)}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, { name: "v3 with optional err filter", resources: []*anypb.Any{v3LisWithFilters(errOptionalFilter)}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, { name: "v3 with unknown filter", resources: []*anypb.Any{v3LisWithFilters(unknownFilter)}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: true, }, @@ -576,7 +577,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { wantUpdate: map[string]ListenerUpdate{ v2LDSTarget: {RouteConfigName: v2RouteConfigName, Raw: v2Lis}, v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second, Raw: v3LisWithFilters(), HTTPFilters: routerFilterList}, - "bad": {}, + "bad": {Err: cmpopts.AnyError}, }, wantMD: errMD, wantErr: true, @@ -757,7 +758,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, } ) @@ -777,7 +778,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { {Name: "listener-filter-1"}, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "unsupported field 'listener_filters'", }, @@ -787,14 +788,14 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { Name: v3LDSTarget, UseOriginalDst: &wrapperspb.BoolValue{Value: true}, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "unsupported field 'use_original_dst'", }, { name: "no address field", resources: []*anypb.Any{testutils.MarshalAny(&v3listenerpb.Listener{Name: v3LDSTarget})}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "no address field in LDS response", }, @@ -804,7 +805,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { Name: v3LDSTarget, Address: &v3corepb.Address{}, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "no socket_address field in LDS response", }, @@ -820,7 +821,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "no supported filter chains and no default filter chain", }, @@ -835,7 +836,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "missing HttpConnectionManager filter", }, @@ -857,7 +858,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "missing name field in filter", }, @@ -896,7 +897,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "duplicate filter name", }, @@ -923,7 +924,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "http filters list is empty", }, @@ -951,7 +952,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "is a terminal filter but it is not last in the filter chain", }, @@ -979,7 +980,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "is not a terminal filter", }, @@ -1000,7 +1001,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "unsupported config_type", }, @@ -1024,7 +1025,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "multiple filter chains with overlapping matching rules are defined", }, @@ -1047,7 +1048,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "unsupported network filter", }, @@ -1073,7 +1074,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "failed unmarshaling of network filter", }, @@ -1092,7 +1093,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "transport_socket field has unexpected name", }, @@ -1114,7 +1115,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "transport_socket field has unexpected typeURL", }, @@ -1139,7 +1140,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "failed to unmarshal DownstreamTlsContext in LDS response", }, @@ -1161,7 +1162,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "DownstreamTlsContext in LDS response does not contain a CommonTlsContext", }, @@ -1191,7 +1192,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "validation context contains unexpected type", }, @@ -1258,7 +1259,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "security configuration on the server-side does not contain root certificate provider instance name, but require_client_cert field is set", }, @@ -1282,7 +1283,7 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }, }, })}, - wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {}}, + wantUpdate: map[string]ListenerUpdate{v3LDSTarget: {Err: cmpopts.AnyError}}, wantMD: errMD, wantErr: "security configuration on the server-side does not contain identity certificate provider instance name", }, diff --git a/xds/internal/xdsclient/rds_test.go b/xds/internal/xdsclient/rds_test.go index 138e3a0bd2b6..a570e43ebd2c 100644 --- a/xds/internal/xdsclient/rds_test.go +++ b/xds/internal/xdsclient/rds_test.go @@ -680,7 +680,7 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -698,7 +698,7 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, @@ -857,14 +857,14 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { }, Raw: v2RouteConfig, }, - "bad": {}, + "bad": {Err: cmpopts.AnyError}, }, wantMD: UpdateMetadata{ Status: ServiceStatusNACKed, Version: testVersion, ErrState: &UpdateErrorMetadata{ Version: testVersion, - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantErr: true, diff --git a/xds/internal/xdsclient/v2/cds_test.go b/xds/internal/xdsclient/v2/cds_test.go index e15e13074681..b1683db17ad4 100644 --- a/xds/internal/xdsclient/v2/cds_test.go +++ b/xds/internal/xdsclient/v2/cds_test.go @@ -25,6 +25,7 @@ import ( xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" anypb "github.com/golang/protobuf/ptypes/any" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/xds/internal/version" "google.golang.org/grpc/xds/internal/xdsclient" @@ -113,7 +114,7 @@ func (s) TestCDSHandleResponse(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, @@ -127,7 +128,7 @@ func (s) TestCDSHandleResponse(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, diff --git a/xds/internal/xdsclient/v2/client_test.go b/xds/internal/xdsclient/v2/client_test.go index 2a45a52ca1d9..b9e5c4d6a7ba 100644 --- a/xds/internal/xdsclient/v2/client_test.go +++ b/xds/internal/xdsclient/v2/client_test.go @@ -21,7 +21,6 @@ package v2 import ( "context" "errors" - "fmt" "testing" "time" @@ -285,9 +284,6 @@ var ( }, TypeUrl: version.V2RouteConfigURL, } - // An place holder error. When comparing UpdateErrorMetadata, we only check - // if error is nil, and don't compare error content. - errPlaceHolder = fmt.Errorf("err place holder") ) type watchHandleTestcase struct { @@ -437,7 +433,7 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) { cmpopts.EquateEmpty(), protocmp.Transform(), cmpopts.IgnoreFields(xdsclient.UpdateMetadata{}, "Timestamp"), cmpopts.IgnoreFields(xdsclient.UpdateErrorMetadata{}, "Timestamp"), - cmp.Comparer(func(x, y error) bool { return (x == nil) == (y == nil) }), + cmp.FilterValues(func(x, y error) bool { return true }, cmpopts.EquateErrors()), } uErr, err := gotUpdateCh.Receive(ctx) if err == context.DeadlineExceeded { diff --git a/xds/internal/xdsclient/v2/eds_test.go b/xds/internal/xdsclient/v2/eds_test.go index 5062dff9c07c..5d26d0a75b8c 100644 --- a/xds/internal/xdsclient/v2/eds_test.go +++ b/xds/internal/xdsclient/v2/eds_test.go @@ -24,6 +24,7 @@ import ( v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" anypb "github.com/golang/protobuf/ptypes/any" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/xds/internal" xtestutils "google.golang.org/grpc/xds/internal/testutils" @@ -88,7 +89,7 @@ func (s) TestEDSHandleResponse(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, @@ -102,7 +103,7 @@ func (s) TestEDSHandleResponse(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, diff --git a/xds/internal/xdsclient/v2/lds_test.go b/xds/internal/xdsclient/v2/lds_test.go index db26681fb3d2..3d2199a150d3 100644 --- a/xds/internal/xdsclient/v2/lds_test.go +++ b/xds/internal/xdsclient/v2/lds_test.go @@ -23,6 +23,7 @@ import ( "time" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/xdsclient" ) @@ -48,7 +49,7 @@ func (s) TestLDSHandleResponse(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, @@ -62,7 +63,7 @@ func (s) TestLDSHandleResponse(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, @@ -75,12 +76,12 @@ func (s) TestLDSHandleResponse(t *testing.T) { ldsResponse: noAPIListenerLDSResponse, wantErr: true, wantUpdate: map[string]xdsclient.ListenerUpdate{ - goodLDSTarget1: {}, + goodLDSTarget1: {Err: cmpopts.AnyError}, }, wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, @@ -122,12 +123,12 @@ func (s) TestLDSHandleResponse(t *testing.T) { wantErr: true, wantUpdate: map[string]xdsclient.ListenerUpdate{ goodLDSTarget1: {RouteConfigName: goodRouteName1, Raw: marshaledListener1}, - goodLDSTarget2: {}, + goodLDSTarget2: {Err: cmpopts.AnyError}, }, wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, diff --git a/xds/internal/xdsclient/v2/rds_test.go b/xds/internal/xdsclient/v2/rds_test.go index efc010224778..2b15958d6f2a 100644 --- a/xds/internal/xdsclient/v2/rds_test.go +++ b/xds/internal/xdsclient/v2/rds_test.go @@ -24,6 +24,7 @@ import ( "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/xdsclient" @@ -62,7 +63,7 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, @@ -76,7 +77,7 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) { wantUpdateMD: xdsclient.UpdateMetadata{ Status: xdsclient.ServiceStatusNACKed, ErrState: &xdsclient.UpdateErrorMetadata{ - Err: errPlaceHolder, + Err: cmpopts.AnyError, }, }, wantUpdateErr: false, diff --git a/xds/internal/xdsclient/watchers_cluster_test.go b/xds/internal/xdsclient/watchers_cluster_test.go index 939b7921b0be..805f479c9f9c 100644 --- a/xds/internal/xdsclient/watchers_cluster_test.go +++ b/xds/internal/xdsclient/watchers_cluster_test.go @@ -473,8 +473,70 @@ func (s) TestClusterWatchNACKError(t *testing.T) { } wantError := fmt.Errorf("testing error") - client.NewClusters(map[string]ClusterUpdate{testCDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) - if err := verifyClusterUpdate(ctx, clusterUpdateCh, ClusterUpdate{}, nil); err != nil { + client.NewClusters(map[string]ClusterUpdate{testCDSName: { + Err: wantError, + }}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + if err := verifyClusterUpdate(ctx, clusterUpdateCh, ClusterUpdate{}, wantError); err != nil { + t.Fatal(err) + } +} + +// TestClusterWatchPartialValid covers the case that a response contains both +// valid and invalid resources. This response will be NACK'ed by the xdsclient. +// But the watchers with valid resources should receive the update, those with +// invalida resources should receive an error. +func (s) TestClusterWatchPartialValid(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + const badResourceName = "bad-resource" + updateChs := make(map[string]*testutils.Channel) + + for _, name := range []string{testCDSName, badResourceName} { + clusterUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchCluster(name, func(update ClusterUpdate, err error) { + clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) + }) + defer func() { + cancelWatch() + if _, err := apiClient.removeWatches[ClusterResource].Receive(ctx); err != nil { + t.Fatalf("want watch to be canceled, got err: %v", err) + } + }() + if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + updateChs[name] = clusterUpdateCh + } + + wantError := fmt.Errorf("testing error") + wantError2 := fmt.Errorf("individual error") + client.NewClusters(map[string]ClusterUpdate{ + testCDSName: {ClusterName: testEDSName}, + badResourceName: {Err: wantError2}, + }, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + + // The valid resource should be sent to the watcher. + if err := verifyClusterUpdate(ctx, updateChs[testCDSName], ClusterUpdate{ClusterName: testEDSName}, nil); err != nil { + t.Fatal(err) + } + + // The failed watcher should receive an error. + if err := verifyClusterUpdate(ctx, updateChs[badResourceName], ClusterUpdate{}, wantError2); err != nil { t.Fatal(err) } } diff --git a/xds/internal/xdsclient/watchers_endpoints_test.go b/xds/internal/xdsclient/watchers_endpoints_test.go index 0e46886cc4d3..d0f680a7ec1f 100644 --- a/xds/internal/xdsclient/watchers_endpoints_test.go +++ b/xds/internal/xdsclient/watchers_endpoints_test.go @@ -362,8 +362,68 @@ func (s) TestEndpointsWatchNACKError(t *testing.T) { } wantError := fmt.Errorf("testing error") - client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + client.NewEndpoints(map[string]EndpointsUpdate{testCDSName: {Err: wantError}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) if err := verifyEndpointsUpdate(ctx, endpointsUpdateCh, EndpointsUpdate{}, wantError); err != nil { t.Fatal(err) } } + +// TestEndpointsWatchPartialValid covers the case that a response contains both +// valid and invalid resources. This response will be NACK'ed by the xdsclient. +// But the watchers with valid resources should receive the update, those with +// invalida resources should receive an error. +func (s) TestEndpointsWatchPartialValid(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + const badResourceName = "bad-resource" + updateChs := make(map[string]*testutils.Channel) + + for _, name := range []string{testCDSName, badResourceName} { + endpointsUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchEndpoints(name, func(update EndpointsUpdate, err error) { + endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) + }) + defer func() { + cancelWatch() + if _, err := apiClient.removeWatches[EndpointsResource].Receive(ctx); err != nil { + t.Fatalf("want watch to be canceled, got err: %v", err) + } + }() + if _, err := apiClient.addWatches[EndpointsResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + updateChs[name] = endpointsUpdateCh + } + + wantError := fmt.Errorf("testing error") + wantError2 := fmt.Errorf("individual error") + client.NewEndpoints(map[string]EndpointsUpdate{ + testCDSName: {Localities: []Locality{testLocalities[0]}}, + badResourceName: {Err: wantError2}, + }, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + + // The valid resource should be sent to the watcher. + if err := verifyEndpointsUpdate(ctx, updateChs[testCDSName], EndpointsUpdate{Localities: []Locality{testLocalities[0]}}, nil); err != nil { + t.Fatal(err) + } + + // The failed watcher should receive an error. + if err := verifyEndpointsUpdate(ctx, updateChs[badResourceName], EndpointsUpdate{}, wantError2); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/xdsclient/watchers_listener_test.go b/xds/internal/xdsclient/watchers_listener_test.go index da0255e37a8e..9b555368f350 100644 --- a/xds/internal/xdsclient/watchers_listener_test.go +++ b/xds/internal/xdsclient/watchers_listener_test.go @@ -388,8 +388,68 @@ func (s) TestListenerWatchNACKError(t *testing.T) { } wantError := fmt.Errorf("testing error") - client.NewListeners(map[string]ListenerUpdate{testLDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + client.NewListeners(map[string]ListenerUpdate{testLDSName: {Err: wantError}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) if err := verifyListenerUpdate(ctx, ldsUpdateCh, ListenerUpdate{}, wantError); err != nil { t.Fatal(err) } } + +// TestListenerWatchPartialValid covers the case that a response contains both +// valid and invalid resources. This response will be NACK'ed by the xdsclient. +// But the watchers with valid resources should receive the update, those with +// invalida resources should receive an error. +func (s) TestListenerWatchPartialValid(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + const badResourceName = "bad-resource" + updateChs := make(map[string]*testutils.Channel) + + for _, name := range []string{testLDSName, badResourceName} { + ldsUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchListener(name, func(update ListenerUpdate, err error) { + ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) + }) + defer func() { + cancelWatch() + if _, err := apiClient.removeWatches[ListenerResource].Receive(ctx); err != nil { + t.Fatalf("want watch to be canceled, got err: %v", err) + } + }() + if _, err := apiClient.addWatches[ListenerResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + updateChs[name] = ldsUpdateCh + } + + wantError := fmt.Errorf("testing error") + wantError2 := fmt.Errorf("individual error") + client.NewListeners(map[string]ListenerUpdate{ + testLDSName: {RouteConfigName: testEDSName}, + badResourceName: {Err: wantError2}, + }, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + + // The valid resource should be sent to the watcher. + if err := verifyListenerUpdate(ctx, updateChs[testLDSName], ListenerUpdate{RouteConfigName: testEDSName}, nil); err != nil { + t.Fatal(err) + } + + // The failed watcher should receive an error. + if err := verifyListenerUpdate(ctx, updateChs[badResourceName], ListenerUpdate{}, wantError2); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/xdsclient/watchers_route_test.go b/xds/internal/xdsclient/watchers_route_test.go index e569192b510d..8065e48330f1 100644 --- a/xds/internal/xdsclient/watchers_route_test.go +++ b/xds/internal/xdsclient/watchers_route_test.go @@ -340,8 +340,74 @@ func (s) TestRouteWatchNACKError(t *testing.T) { } wantError := fmt.Errorf("testing error") - client.NewRouteConfigs(map[string]RouteConfigUpdate{testCDSName: {}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + client.NewRouteConfigs(map[string]RouteConfigUpdate{testCDSName: {Err: wantError}}, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, RouteConfigUpdate{}, wantError); err != nil { t.Fatal(err) } } + +// TestRouteWatchPartialValid covers the case that a response contains both +// valid and invalid resources. This response will be NACK'ed by the xdsclient. +// But the watchers with valid resources should receive the update, those with +// invalida resources should receive an error. +func (s) TestRouteWatchPartialValid(t *testing.T) { + apiClientCh, cleanup := overrideNewAPIClient() + defer cleanup() + + client, err := newWithConfig(clientOpts(testXDSServer, false)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + c, err := apiClientCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for API client to be created: %v", err) + } + apiClient := c.(*testAPIClient) + + const badResourceName = "bad-resource" + updateChs := make(map[string]*testutils.Channel) + + for _, name := range []string{testRDSName, badResourceName} { + rdsUpdateCh := testutils.NewChannel() + cancelWatch := client.WatchRouteConfig(name, func(update RouteConfigUpdate, err error) { + rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) + }) + defer func() { + cancelWatch() + if _, err := apiClient.removeWatches[RouteConfigResource].Receive(ctx); err != nil { + t.Fatalf("want watch to be canceled, got err: %v", err) + } + }() + if _, err := apiClient.addWatches[RouteConfigResource].Receive(ctx); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + updateChs[name] = rdsUpdateCh + } + + wantError := fmt.Errorf("testing error") + wantError2 := fmt.Errorf("individual error") + client.NewRouteConfigs(map[string]RouteConfigUpdate{ + testRDSName: {VirtualHosts: []*VirtualHost{{ + Domains: []string{testLDSName}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName: {Weight: 1}}}}, + }}}, + badResourceName: {Err: wantError2}, + }, UpdateMetadata{ErrState: &UpdateErrorMetadata{Err: wantError}}) + + // The valid resource should be sent to the watcher. + if err := verifyRouteConfigUpdate(ctx, updateChs[testRDSName], RouteConfigUpdate{VirtualHosts: []*VirtualHost{{ + Domains: []string{testLDSName}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName: {Weight: 1}}}}, + }}}, nil); err != nil { + t.Fatal(err) + } + + // The failed watcher should receive an error. + if err := verifyRouteConfigUpdate(ctx, updateChs[badResourceName], RouteConfigUpdate{}, wantError2); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/xdsclient/xds.go b/xds/internal/xdsclient/xds.go index 3d9148eebcd4..71985daa6174 100644 --- a/xds/internal/xdsclient/xds.go +++ b/xds/internal/xdsclient/xds.go @@ -992,6 +992,10 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, // processAllResources unmarshals and validates the resources, populates the // provided ret (a map), and returns metadata and error. // +// After this function, the ret map will be populated with both valid and +// invalid updates. Invalid resources will have an entry with the key as the +// resource name, value as an empty update. +// // The type of the resource is determined by the type of ret. E.g. // map[string]ListenerUpdate means this is for LDS. func processAllResources(version string, resources []*anypb.Any, logger *grpclog.PrefixLogger, ret interface{}) (UpdateMetadata, error) { @@ -1018,7 +1022,7 @@ func processAllResources(version string, resources []*anypb.Any, logger *grpclog perResourceErrors[name] = err // Add place holder in the map so we know this resource name was in // the response. - ret2[name] = ListenerUpdate{} + ret2[name] = ListenerUpdate{Err: err} case map[string]RouteConfigUpdate: name, update, err := unmarshalRouteConfigResource(r, logger) if err == nil { @@ -1032,7 +1036,7 @@ func processAllResources(version string, resources []*anypb.Any, logger *grpclog perResourceErrors[name] = err // Add place holder in the map so we know this resource name was in // the response. - ret2[name] = RouteConfigUpdate{} + ret2[name] = RouteConfigUpdate{Err: err} case map[string]ClusterUpdate: name, update, err := unmarshalClusterResource(r, logger) if err == nil { @@ -1046,7 +1050,7 @@ func processAllResources(version string, resources []*anypb.Any, logger *grpclog perResourceErrors[name] = err // Add place holder in the map so we know this resource name was in // the response. - ret2[name] = ClusterUpdate{} + ret2[name] = ClusterUpdate{Err: err} case map[string]EndpointsUpdate: name, update, err := unmarshalEndpointsResource(r, logger) if err == nil { @@ -1060,7 +1064,7 @@ func processAllResources(version string, resources []*anypb.Any, logger *grpclog perResourceErrors[name] = err // Add place holder in the map so we know this resource name was in // the response. - ret2[name] = EndpointsUpdate{} + ret2[name] = EndpointsUpdate{Err: err} } }