From 37b72f944a8f18ceb4dc5772ebfe95d0aea66574 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 22 Oct 2020 13:20:03 -0700 Subject: [PATCH] xdsclient: populate error details for NACK (#3975) --- xds/internal/client/client_xds.go | 10 +++--- xds/internal/client/transport_helper.go | 17 ++++++---- xds/internal/client/v2/client.go | 10 ++++-- xds/internal/client/v2/client_ack_test.go | 40 +++++++++++++++-------- xds/internal/client/v3/client.go | 10 ++++-- 5 files changed, 57 insertions(+), 30 deletions(-) diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index e2a7b13f952..0acfd69cf89 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -47,7 +47,7 @@ func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (ma update := make(map[string]ListenerUpdate) for _, r := range resources { if !IsListenerResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl()) } lis := &v3listenerpb.Listener{} if err := proto.Unmarshal(r.GetValue(), lis); err != nil { @@ -71,7 +71,7 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog. } apiLisAny := lis.GetApiListener().GetApiListener() if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) { - return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl()) + return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl()) } apiLis := &v3httppb.HttpConnectionManager{} if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil { @@ -108,7 +108,7 @@ func UnmarshalRouteConfig(resources []*anypb.Any, logger *grpclog.PrefixLogger) update := make(map[string]RouteConfigUpdate) for _, r := range resources { if !IsRouteConfigResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl()) } rc := &v3routepb.RouteConfiguration{} if err := proto.Unmarshal(r.GetValue(), rc); err != nil { @@ -361,7 +361,7 @@ func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map update := make(map[string]ClusterUpdate) for _, r := range resources { if !IsClusterResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl()) } cluster := &v3clusterpb.Cluster{} @@ -477,7 +477,7 @@ func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (m update := make(map[string]EndpointsUpdate) for _, r := range resources { if !IsEndpointsResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl()) } cla := &v3endpointpb.ClusterLoadAssignment{} diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go index 3ce1f8721b3..607f26fd5e1 100644 --- a/xds/internal/client/transport_helper.go +++ b/xds/internal/client/transport_helper.go @@ -51,7 +51,7 @@ type VersionedClient interface { // SendRequest constructs and sends out a DiscoveryRequest message specific // to the underlying transport protocol version. - SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error + SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error // RecvResponse uses the provided stream to receive a response specific to // the underlying transport protocol version. @@ -246,10 +246,10 @@ func (t *TransportHelper) send(ctx context.Context) { t.sendCh.Load() var ( - target []string - rType ResourceType - version, nonce string - send bool + target []string + rType ResourceType + version, nonce, errMsg string + send bool ) switch update := u.(type) { case *watchAction: @@ -259,6 +259,7 @@ func (t *TransportHelper) send(ctx context.Context) { if !send { continue } + errMsg = update.errMsg } if stream == nil { // There's no stream yet. Skip the request. This request @@ -267,7 +268,7 @@ func (t *TransportHelper) send(ctx context.Context) { // sending response back). continue } - if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil { + if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil { t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) // send failed, clear the current stream. stream = nil @@ -292,7 +293,7 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool { t.nonceMap = make(map[ResourceType]string) for rType, s := range t.watchMap { - if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil { + if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { t.logger.Errorf("ADS request failed: %v", err) return false } @@ -321,6 +322,7 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool { rType: rType, version: "", nonce: nonce, + errMsg: err.Error(), stream: stream, }) t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) @@ -387,6 +389,7 @@ type ackAction struct { rType ResourceType version string // NACK if version is an empty string. nonce string + errMsg string // Empty unless it's a NACK. // ACK/NACK are tagged with the stream it's for. When the stream is down, // all the ACK/NACK for this stream will be dropped, and the version/nonce // won't be updated. diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go index 674bba4798f..7b063ad4f55 100644 --- a/xds/internal/client/v2/client.go +++ b/xds/internal/client/v2/client.go @@ -25,6 +25,7 @@ import ( "github.com/golang/protobuf/proto" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" @@ -33,6 +34,7 @@ import ( v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + statuspb "google.golang.org/genproto/googleapis/rpc/status" ) func init() { @@ -106,7 +108,7 @@ func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { // - If this is an ack, version will be the version from the response. // - If this is a nack, version will be the previous acked version (from // versionMap). If there was no ack before, it will be empty. -func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { +func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error { stream, ok := s.(adsStream) if !ok { return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) @@ -117,7 +119,11 @@ func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, - // TODO: populate ErrorDetails for nack. + } + if errMsg != "" { + req.ErrorDetail = &statuspb.Status{ + Code: int32(codes.InvalidArgument), Message: errMsg, + } } if err := stream.Send(req); err != nil { return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err) diff --git a/xds/internal/client/v2/client_ack_test.go b/xds/internal/client/v2/client_ack_test.go index 2b38d3aa370..87437aa218c 100644 --- a/xds/internal/client/v2/client_ack_test.go +++ b/xds/internal/client/v2/client_ack_test.go @@ -29,6 +29,7 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/testutils" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeserver" @@ -73,7 +74,7 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb } // compareXDSRequest reads requests from channel, compare it with want. -func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error { +func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() val, err := ch.Receive(ctx) @@ -84,11 +85,22 @@ func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, if req.Err != nil { return fmt.Errorf("unexpected error from request: %v", req.Err) } + + xdsReq := req.Req.(*xdspb.DiscoveryRequest) + if (xdsReq.ErrorDetail != nil) != wantErr { + return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr) + } + // All NACK request.ErrorDetails have hardcoded status code InvalidArguments. + if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) { + return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument) + } + + xdsReq.ErrorDetail = nil // Clear the error details field before comparing. wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) wantClone.VersionInfo = ver wantClone.ResponseNonce = nonce - if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) { - return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone)) + if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) { + return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal))) } return nil } @@ -118,7 +130,7 @@ func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan * } v2c.AddWatch(rType, nameToWatch) - if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil { + if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", rType, err) } t.Logf("FakeServer received %v request...", rType) @@ -133,7 +145,7 @@ func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakese nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver) t.Logf("Good %v response pushed to fakeServer...", rType) - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil { return "", fmt.Errorf("failed to receive %v request: %v", rType, err) } t.Logf("Good %v response acked", rType) @@ -168,7 +180,7 @@ func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeser TypeUrl: typeURL, }, ver) t.Logf("Bad %v response pushed to fakeServer...", rType) - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil { return fmt.Errorf("failed to receive %v request: %v", rType, err) } t.Logf("Bad %v response nacked", rType) @@ -274,7 +286,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) { // The expected version string is an empty string, because this is the first // response, and it's nacked (so there's no previous ack version). - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil { t.Errorf("Failed to receive request: %v", err) } t.Logf("Bad response nacked") @@ -314,7 +326,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) { t.Logf("Bad response pushed to fakeServer...") // The expected version string is the previous acked version. - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil { t.Errorf("Failed to receive request: %v", err) } t.Logf("Bad response nacked") @@ -339,7 +351,7 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) { // Start a CDS watch. v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil { t.Fatal(err) } t.Logf("FakeServer received %v request...", xdsclient.ClusterResource) @@ -356,12 +368,12 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) { // Wait for a request with no resource names, because the only watch was // removed. emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL} - if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) // Wait for a request with correct resource names and version. - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } versionCDS++ @@ -394,7 +406,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Start a CDS watch. v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } t.Logf("FakeServer received %v request...", xdsclient.ClusterResource) @@ -410,7 +422,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Wait for a request with no resource names, because the only watch was // removed. emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL} - if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } versionCDS++ @@ -440,7 +452,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Start a new watch. The new watch should have the nonce from the response // above, and version from the first good response. v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go index 328cd8b9cbe..5d8d7198ce2 100644 --- a/xds/internal/client/v3/client.go +++ b/xds/internal/client/v3/client.go @@ -24,7 +24,9 @@ import ( "fmt" "github.com/golang/protobuf/proto" + statuspb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" @@ -106,7 +108,7 @@ func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { // - If this is an ack, version will be the version from the response. // - If this is a nack, version will be the previous acked version (from // versionMap). If there was no ack before, it will be empty. -func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { +func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error { stream, ok := s.(adsStream) if !ok { return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) @@ -117,7 +119,11 @@ func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, - // TODO: populate ErrorDetails for nack. + } + if errMsg != "" { + req.ErrorDetail = &statuspb.Status{ + Code: int32(codes.InvalidArgument), Message: errMsg, + } } if err := stream.Send(req); err != nil { return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)