Skip to content

Commit

Permalink
[xds_client_nack_part] remove Err from Update struct and add a tuple …
Browse files Browse the repository at this point in the history
…type
  • Loading branch information
menghanl committed Sep 9, 2021
1 parent 9d0596b commit 3f82900
Show file tree
Hide file tree
Showing 18 changed files with 396 additions and 404 deletions.
56 changes: 28 additions & 28 deletions xds/internal/xdsclient/callback.go
Expand Up @@ -76,35 +76,35 @@ func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdateErr, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.ldsVersion = metadata.Version
if metadata.ErrState != nil {
c.ldsVersion = metadata.ErrState.Version
}
for name, update := range updates {
for name, uErr := range updates {
if s, ok := c.ldsWatchers[name]; ok {
if update.Err != nil {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.ldsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.ldsMD[name] = mdCopy
for wi := range s {
wi.newError(update.Err)
wi.newError(uErr.Err)
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.ldsCache[name] = update
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.ldsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
Expand Down Expand Up @@ -141,7 +141,7 @@ func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate, metadata Up
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdateErr, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -150,27 +150,27 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad
if metadata.ErrState != nil {
c.rdsVersion = metadata.ErrState.Version
}
for name, update := range updates {
for name, uErr := range updates {
if s, ok := c.rdsWatchers[name]; ok {
if update.Err != nil {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.rdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
c.rdsMD[name] = mdCopy
for wi := range s {
wi.newError(update.Err)
wi.newError(uErr.Err)
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.rdsCache[name] = update
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.rdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
Expand All @@ -190,17 +190,17 @@ func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate, metad
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdateErr, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.cdsVersion = metadata.Version
if metadata.ErrState != nil {
c.cdsVersion = metadata.ErrState.Version
}
for name, update := range updates {
for name, uErr := range updates {
if s, ok := c.cdsWatchers[name]; ok {
if update.Err != nil {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.cdsMD[name]
Expand All @@ -210,17 +210,17 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda
for wi := range s {
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(update.Err)
wi.newError(uErr.Err)
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.cdsCache[name] = update
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.cdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
Expand Down Expand Up @@ -257,17 +257,17 @@ func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate, metadata Upda
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata UpdateMetadata) {
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdateErr, metadata UpdateMetadata) {
c.mu.Lock()
defer c.mu.Unlock()

c.edsVersion = metadata.Version
if metadata.ErrState != nil {
c.edsVersion = metadata.ErrState.Version
}
for name, update := range updates {
for name, uErr := range updates {
if s, ok := c.edsWatchers[name]; ok {
if update.Err != nil {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := c.edsMD[name]
Expand All @@ -277,17 +277,17 @@ func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate, metadata U
for wi := range s {
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(update.Err)
wi.newError(uErr.Err)
}
continue
}
// If the resource is valid, send the update.
for wi := range s {
wi.newUpdate(update)
wi.newUpdate(uErr.Update)
}
// Sync cache.
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(update))
c.edsCache[name] = update
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
c.edsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
Expand Down
36 changes: 18 additions & 18 deletions xds/internal/xdsclient/cds_test.go
Expand Up @@ -945,7 +945,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
tests := []struct {
name string
resources []*anypb.Any
wantUpdate map[string]ClusterUpdate
wantUpdate map[string]ClusterUpdateErr
wantMD UpdateMetadata
wantErr bool
}{
Expand Down Expand Up @@ -988,7 +988,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC},
}),
},
wantUpdate: map[string]ClusterUpdate{
wantUpdate: map[string]ClusterUpdateErr{
"test": {Err: cmpopts.AnyError},
},
wantMD: UpdateMetadata{
Expand All @@ -1004,12 +1004,12 @@ func (s) TestUnmarshalCluster(t *testing.T) {
{
name: "v2 cluster",
resources: []*anypb.Any{v2ClusterAny},
wantUpdate: map[string]ClusterUpdate{
v2ClusterName: {
wantUpdate: map[string]ClusterUpdateErr{
v2ClusterName: {Update: ClusterUpdate{
ClusterName: v2ClusterName,
EDSServiceName: v2Service, EnableLRS: true,
Raw: v2ClusterAny,
},
}},
},
wantMD: UpdateMetadata{
Status: ServiceStatusACKed,
Expand All @@ -1019,12 +1019,12 @@ func (s) TestUnmarshalCluster(t *testing.T) {
{
name: "v3 cluster",
resources: []*anypb.Any{v3ClusterAny},
wantUpdate: map[string]ClusterUpdate{
v3ClusterName: {
wantUpdate: map[string]ClusterUpdateErr{
v3ClusterName: {Update: ClusterUpdate{
ClusterName: v3ClusterName,
EDSServiceName: v3Service, EnableLRS: true,
Raw: v3ClusterAny,
},
}},
},
wantMD: UpdateMetadata{
Status: ServiceStatusACKed,
Expand All @@ -1034,17 +1034,17 @@ func (s) TestUnmarshalCluster(t *testing.T) {
{
name: "multiple clusters",
resources: []*anypb.Any{v2ClusterAny, v3ClusterAny},
wantUpdate: map[string]ClusterUpdate{
v2ClusterName: {
wantUpdate: map[string]ClusterUpdateErr{
v2ClusterName: {Update: ClusterUpdate{
ClusterName: v2ClusterName,
EDSServiceName: v2Service, EnableLRS: true,
Raw: v2ClusterAny,
},
v3ClusterName: {
}},
v3ClusterName: {Update: ClusterUpdate{
ClusterName: v3ClusterName,
EDSServiceName: v3Service, EnableLRS: true,
Raw: v3ClusterAny,
},
}},
},
wantMD: UpdateMetadata{
Status: ServiceStatusACKed,
Expand All @@ -1063,17 +1063,17 @@ func (s) TestUnmarshalCluster(t *testing.T) {
}),
v3ClusterAny,
},
wantUpdate: map[string]ClusterUpdate{
v2ClusterName: {
wantUpdate: map[string]ClusterUpdateErr{
v2ClusterName: {Update: ClusterUpdate{
ClusterName: v2ClusterName,
EDSServiceName: v2Service, EnableLRS: true,
Raw: v2ClusterAny,
},
v3ClusterName: {
}},
v3ClusterName: {Update: ClusterUpdate{
ClusterName: v3ClusterName,
EDSServiceName: v3Service, EnableLRS: true,
Raw: v3ClusterAny,
},
}},
"bad": {Err: cmpopts.AnyError},
},
wantMD: UpdateMetadata{
Expand Down
20 changes: 4 additions & 16 deletions xds/internal/xdsclient/client.go
Expand Up @@ -134,14 +134,14 @@ type loadReportingOptions struct {
// resource updates from an APIClient for a specific version.
type UpdateHandler interface {
// NewListeners handles updates to xDS listener resources.
NewListeners(map[string]ListenerUpdate, UpdateMetadata)
NewListeners(map[string]ListenerUpdateErr, UpdateMetadata)
// NewRouteConfigs handles updates to xDS RouteConfiguration resources.
NewRouteConfigs(map[string]RouteConfigUpdate, UpdateMetadata)
NewRouteConfigs(map[string]RouteConfigUpdateErr, UpdateMetadata)
// NewClusters handles updates to xDS Cluster resources.
NewClusters(map[string]ClusterUpdate, UpdateMetadata)
NewClusters(map[string]ClusterUpdateErr, UpdateMetadata)
// NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely
// referred to as Endpoints) resources.
NewEndpoints(map[string]EndpointsUpdate, UpdateMetadata)
NewEndpoints(map[string]EndpointsUpdateErr, UpdateMetadata)
// NewConnectionError handles connection errors from the xDS stream. The
// error will be reported to all the resource watchers.
NewConnectionError(err error)
Expand Down Expand Up @@ -220,9 +220,6 @@ type ListenerUpdate struct {

// Raw is the resource from the xds response.
Raw *anypb.Any
// Err is non-nil if the resource in the response is invalid. It contains
// the unmarshal error message.
Err error
}

// HTTPFilter represents one HTTP filter from an LDS response's HTTP connection
Expand Down Expand Up @@ -257,9 +254,6 @@ type RouteConfigUpdate struct {
VirtualHosts []*VirtualHost
// Raw is the resource from the xds response.
Raw *anypb.Any
// Err is non-nil if the resource in the response is invalid. It contains
// the unmarshal error message.
Err error
}

// VirtualHost contains the routes for a list of Domains.
Expand Down Expand Up @@ -488,9 +482,6 @@ type ClusterUpdate struct {

// Raw is the resource from the xds response.
Raw *anypb.Any
// Err is non-nil if the resource in the response is invalid. It contains
// the unmarshal error message.
Err error
}

// OverloadDropConfig contains the config to drop overloads.
Expand Down Expand Up @@ -540,9 +531,6 @@ type EndpointsUpdate struct {

// Raw is the resource from the xds response.
Raw *anypb.Any
// Err is non-nil if the resource in the response is invalid. It contains
// the unmarshal error message.
Err error
}

// Function to be overridden in tests.
Expand Down

0 comments on commit 3f82900

Please sign in to comment.