Skip to content

Commit

Permalink
xds: update xdsclient NACK to keep valid resources (#4743)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Sep 10, 2021
1 parent 43e8fd4 commit 16cf656
Show file tree
Hide file tree
Showing 19 changed files with 808 additions and 506 deletions.
49 changes: 29 additions & 20 deletions xds/csds/csds_test.go
Expand Up @@ -211,22 +211,28 @@ func TestCSDS(t *testing.T) {
}

const nackResourceIdx = 0
var (
nackListeners = append([]*v3listenerpb.Listener{}, listeners...)
nackRoutes = append([]*v3routepb.RouteConfiguration{}, routes...)
nackClusters = append([]*v3clusterpb.Cluster{}, clusters...)
nackEndpoints = append([]*v3endpointpb.ClusterLoadAssignment{}, endpoints...)
)
nackListeners[0] = &v3listenerpb.Listener{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}} // 0 will be nacked. 1 will stay the same.
nackRoutes[0] = &v3routepb.RouteConfiguration{
Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}},
}
nackClusters[0] = &v3clusterpb.Cluster{
Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC},
}
nackEndpoints[0] = &v3endpointpb.ClusterLoadAssignment{
ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}},
}
if err := mgmServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}}, // 0 will be nacked. 1 will stay the same.
},
Routes: []*v3routepb.RouteConfiguration{
{Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{
Routes: []*v3routepb.Route{{}},
}}},
},
Clusters: []*v3clusterpb.Cluster{
{Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}},
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
{ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}},
},
NodeID: nodeID,
Listeners: nackListeners,
Routes: nackRoutes,
Clusters: nackClusters,
Endpoints: nackEndpoints,
SkipValidation: true,
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -490,7 +496,6 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
ackVersion = "1"
nackVersion = "2"
)

if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
Expand All @@ -514,13 +519,14 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ActiveState.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -540,12 +546,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -564,12 +571,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -589,12 +597,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand Down
165 changes: 93 additions & 72 deletions xds/internal/xdsclient/callback.go
Expand Up @@ -76,41 +76,45 @@ func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.ldsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.ldsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.ldsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.ldsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.ldsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.ldsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.ldsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.ldsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.ldsCache[name] = update
c.ldsMD[name] = metadata
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.ldsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.ldsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
Expand All @@ -137,41 +141,46 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

// If no error received, the status is ACK.
c.rdsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.rdsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.rdsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.rdsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.rdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.rdsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.rdsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.rdsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.rdsCache[name] = update
c.rdsMD[name] = metadata
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.rdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.rdsMD[name] = mdCopy
}
}
}
Expand All @@ -181,41 +190,47 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.cdsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.cdsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.cdsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.cdsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.cdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.cdsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.cdsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.cdsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.cdsCache[name] = update
c.cdsMD[name] = metadata
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.cdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.cdsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
Expand All @@ -242,41 +257,47 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdateErrTuple, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.edsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.edsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.edsWatchers[name]; ok {
}
for name, uErr := range updates {
if s, ok := c.edsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.edsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.edsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.edsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.edsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.edsCache[name] = update
c.edsMD[name] = metadata
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.edsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.edsMD[name] = mdCopy
}
}
}
Expand Down

0 comments on commit 16cf656

Please sign in to comment.