Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: update xdsclient NACK to keep valid resources #4743

Merged
merged 4 commits into from Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 29 additions & 20 deletions xds/csds/csds_test.go
Expand Up @@ -211,22 +211,28 @@ func TestCSDS(t *testing.T) {
}

const nackResourceIdx = 0
var (
nackListeners = append([]*v3listenerpb.Listener{}, listeners...)
nackRoutes = append([]*v3routepb.RouteConfiguration{}, routes...)
nackClusters = append([]*v3clusterpb.Cluster{}, clusters...)
nackEndpoints = append([]*v3endpointpb.ClusterLoadAssignment{}, endpoints...)
)
nackListeners[0] = &v3listenerpb.Listener{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}} // 0 will be nacked. 1 will stay the same.
nackRoutes[0] = &v3routepb.RouteConfiguration{
Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}},
}
nackClusters[0] = &v3clusterpb.Cluster{
Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC},
}
nackEndpoints[0] = &v3endpointpb.ClusterLoadAssignment{
ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}},
}
if err := mgmServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}}, // 0 will be nacked. 1 will stay the same.
},
Routes: []*v3routepb.RouteConfiguration{
{Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{
Routes: []*v3routepb.Route{{}},
}}},
},
Clusters: []*v3clusterpb.Cluster{
{Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}},
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
{ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}},
},
NodeID: nodeID,
Listeners: nackListeners,
Routes: nackRoutes,
Clusters: nackClusters,
Endpoints: nackEndpoints,
SkipValidation: true,
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -490,7 +496,6 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
ackVersion = "1"
nackVersion = "2"
)

if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
Expand All @@ -514,13 +519,14 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ActiveState.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -540,12 +546,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -564,12 +571,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -589,12 +597,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand Down
165 changes: 93 additions & 72 deletions xds/internal/xdsclient/callback.go
Expand Up @@ -76,41 +76,45 @@ func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.ldsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.ldsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.ldsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.ldsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.ldsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.ldsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.ldsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.ldsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.ldsCache[name] = update
c.ldsMD[name] = metadata
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.ldsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.ldsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
Expand All @@ -137,41 +141,46 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

// If no error received, the status is ACK.
c.rdsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.rdsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.rdsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.rdsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.rdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.rdsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.rdsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.rdsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.rdsCache[name] = update
c.rdsMD[name] = metadata
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.rdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.rdsMD[name] = mdCopy
}
}
}
Expand All @@ -181,41 +190,47 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.cdsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.cdsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.cdsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.cdsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.cdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.cdsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.cdsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.cdsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.cdsCache[name] = update
c.cdsMD[name] = metadata
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.cdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.cdsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
Expand All @@ -242,41 +257,47 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.edsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.edsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.edsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.edsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.edsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.edsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.edsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.edsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.edsCache[name] = update
c.edsMD[name] = metadata
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.edsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.edsMD[name] = mdCopy
}
}
}
Expand Down