Skip to content

Commit

Permalink
[xds_client_nack_part] ttt
Browse files Browse the repository at this point in the history
[xds_client_nack_part] test not done

[xds_client_nack_part] temp, lds failing

[xds_client_nack_part] ttt

[xds_client_nack_part] 222

[xds_client_nack_part] lll

[xds_client_nack_part] eee

[xds_client_nack_part] cds

[xds_client_nack_part] eds

[xds_client_nack_part] lds

[xds_client_nack_part] rds

[xds_client_nack_part] csds tests failing

[xds_client_nack_part] cds dump

[xds_client_nack_part] eds dump

[xds_client_nack_part] rds dump

[xds_client_nack_part] xdsclient v2 tests

[xds_client_nack_part] fix csds test
  • Loading branch information
menghanl committed Sep 8, 2021
1 parent a6a6317 commit a67bae7
Show file tree
Hide file tree
Showing 19 changed files with 481 additions and 179 deletions.
49 changes: 29 additions & 20 deletions xds/csds/csds_test.go
Expand Up @@ -211,22 +211,28 @@ func TestCSDS(t *testing.T) {
}

const nackResourceIdx = 0
var (
nackListeners = listeners
nackRoutes = routes
nackClusters = clusters
nackEndpoints = endpoints
)
nackListeners[0] = &v3listenerpb.Listener{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}} // 0 will be nacked. 1 will stay the same.
nackRoutes[0] = &v3routepb.RouteConfiguration{
Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}},
}
nackClusters[0] = &v3clusterpb.Cluster{
Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC},
}
nackEndpoints[0] = &v3endpointpb.ClusterLoadAssignment{
ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}},
}
if err := mgmServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
{Name: ldsTargets[nackResourceIdx], ApiListener: &v3listenerpb.ApiListener{}}, // 0 will be nacked. 1 will stay the same.
},
Routes: []*v3routepb.RouteConfiguration{
{Name: rdsTargets[nackResourceIdx], VirtualHosts: []*v3routepb.VirtualHost{{
Routes: []*v3routepb.Route{{}},
}}},
},
Clusters: []*v3clusterpb.Cluster{
{Name: cdsTargets[nackResourceIdx], ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}},
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{
{ClusterName: edsTargets[nackResourceIdx], Endpoints: []*v3endpointpb.LocalityLbEndpoints{{}}},
},
NodeID: nodeID,
Listeners: nackListeners,
Routes: nackRoutes,
Clusters: nackClusters,
Endpoints: nackEndpoints,
SkipValidation: true,
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -490,7 +496,6 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
ackVersion = "1"
nackVersion = "2"
)

if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
return fmt.Errorf("failed to send: %v", err)
}
Expand All @@ -514,13 +519,14 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
configDump := &v3adminpb.ListenersConfigDump_DynamicListener{
Name: ldsTargets[i],
ActiveState: &v3adminpb.ListenersConfigDump_DynamicListenerState{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
Listener: listenerAnys[i],
LastUpdated: nil,
},
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.ActiveState.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -540,12 +546,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantRoutes []*v3adminpb.RoutesConfigDump_DynamicRouteConfig
for i := range rdsTargets {
configDump := &v3adminpb.RoutesConfigDump_DynamicRouteConfig{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
RouteConfig: routeAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -564,12 +571,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantCluster []*v3adminpb.ClustersConfigDump_DynamicCluster
for i := range cdsTargets {
configDump := &v3adminpb.ClustersConfigDump_DynamicCluster{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
Cluster: clusterAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand All @@ -589,12 +597,13 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
var wantEndpoint []*v3adminpb.EndpointsConfigDump_DynamicEndpointConfig
for i := range cdsTargets {
configDump := &v3adminpb.EndpointsConfigDump_DynamicEndpointConfig{
VersionInfo: ackVersion,
VersionInfo: nackVersion,
EndpointConfig: endpointAnys[i],
LastUpdated: nil,
ClientStatus: v3adminpb.ClientResourceStatus_ACKED,
}
if i == nackResourceIdx {
configDump.VersionInfo = ackVersion
configDump.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
configDump.ErrorState = &v3adminpb.UpdateFailureState{
Details: "blahblah",
Expand Down
133 changes: 77 additions & 56 deletions xds/internal/xdsclient/callback.go
Expand Up @@ -80,37 +80,41 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up
c.mu.Lock()
defer c.mu.Unlock()

c.ldsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.ldsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.ldsWatchers[name]; ok {
}
for name, update := range updates {
if s, ok := c.ldsWatchers[name]; ok {
if update.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.ldsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.ldsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
wi.newError(update.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.ldsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.ldsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
}
// Sync cache.
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.ldsCache[name] = update
c.ldsMD[name] = metadata
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.ldsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
Expand Down Expand Up @@ -141,37 +145,42 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad
c.mu.Lock()
defer c.mu.Unlock()

// If no error received, the status is ACK.
c.rdsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.rdsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.rdsWatchers[name]; ok {
}
for name, update := range updates {
if s, ok := c.rdsWatchers[name]; ok {
if update.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.rdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.rdsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
wi.newError(update.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.rdsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.rdsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
}
// Sync cache.
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.rdsCache[name] = update
c.rdsMD[name] = metadata
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.rdsMD[name] = mdCopy
}
}
}
Expand All @@ -185,37 +194,43 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda
c.mu.Lock()
defer c.mu.Unlock()

c.cdsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.cdsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.cdsWatchers[name]; ok {
}
for name, update := range updates {
if s, ok := c.cdsWatchers[name]; ok {
if update.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.cdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.cdsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(update.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.cdsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.cdsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
}
// Sync cache.
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.cdsCache[name] = update
c.cdsMD[name] = metadata
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.cdsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
Expand Down Expand Up @@ -246,37 +261,43 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U
c.mu.Lock()
defer c.mu.Unlock()

c.edsVersion = metadata.Version
if metadata.ErrState != nil {
// On NACK, update overall version to the NACKed resp.
c.edsVersion = metadata.ErrState.Version
for name := range updates {
if s, ok := c.edsWatchers[name]; ok {
}
for name, update := range updates {
if s, ok := c.edsWatchers[name]; ok {
if update.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.edsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.edsMD[name] = mdCopy
for wi := range s {
wi.newError(metadata.ErrState.Err)
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(update.Err)
}
continue
}
}
return
}

// If no error received, the status is ACK.
c.edsVersion = metadata.Version
for name, update := range updates {
if s, ok := c.edsWatchers[name]; ok {
// Only send the update if this is not an error.
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
}
// Sync cache.
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.edsCache[name] = update
c.edsMD[name] = metadata
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
c.edsMD[name] = mdCopy
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions xds/internal/xdsclient/cds_test.go
Expand Up @@ -957,7 +957,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
Version: testVersion,
ErrState: &UpdateErrorMetadata{
Version: testVersion,
Err: errPlaceHolder,
Err: cmpopts.AnyError,
},
},
wantErr: true,
Expand All @@ -975,7 +975,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
Version: testVersion,
ErrState: &UpdateErrorMetadata{
Version: testVersion,
Err: errPlaceHolder,
Err: cmpopts.AnyError,
},
},
wantErr: true,
Expand All @@ -988,13 +988,15 @@ func (s) TestUnmarshalCluster(t *testing.T) {
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC},
}),
},
wantUpdate: map[string]ClusterUpdate{"test": {}},
wantUpdate: map[string]ClusterUpdate{
"test": {Err: cmpopts.AnyError},
},
wantMD: UpdateMetadata{
Status: ServiceStatusNACKed,
Version: testVersion,
ErrState: &UpdateErrorMetadata{
Version: testVersion,
Err: errPlaceHolder,
Err: cmpopts.AnyError,
},
},
wantErr: true,
Expand Down Expand Up @@ -1072,14 +1074,14 @@ func (s) TestUnmarshalCluster(t *testing.T) {
EDSServiceName: v3Service, EnableLRS: true,
Raw: v3ClusterAny,
},
"bad": {},
"bad": {Err: cmpopts.AnyError},
},
wantMD: UpdateMetadata{
Status: ServiceStatusNACKed,
Version: testVersion,
ErrState: &UpdateErrorMetadata{
Version: testVersion,
Err: errPlaceHolder,
Err: cmpopts.AnyError,
},
},
wantErr: true,
Expand Down

0 comments on commit a67bae7

Please sign in to comment.