From ada5e8ecf1ab531d6d56ab10afc1dd90d9a2ed3f Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 21 Oct 2020 14:21:23 -0700 Subject: [PATCH 1/2] [xds_nack_error_details] xdsclient: populate error details for NACK --- xds/internal/client/client_xds.go | 10 +++--- xds/internal/client/transport_helper.go | 26 ++++++++------- xds/internal/client/v2/client.go | 10 ++++-- xds/internal/client/v2/client_ack_test.go | 40 +++++++++++++++-------- xds/internal/client/v3/client.go | 10 ++++-- 5 files changed, 61 insertions(+), 35 deletions(-) diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index e2a7b13f952..0acfd69cf89 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -47,7 +47,7 @@ func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (ma update := make(map[string]ListenerUpdate) for _, r := range resources { if !IsListenerResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl()) } lis := &v3listenerpb.Listener{} if err := proto.Unmarshal(r.GetValue(), lis); err != nil { @@ -71,7 +71,7 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog. } apiLisAny := lis.GetApiListener().GetApiListener() if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) { - return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl()) + return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl()) } apiLis := &v3httppb.HttpConnectionManager{} if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil { @@ -108,7 +108,7 @@ func UnmarshalRouteConfig(resources []*anypb.Any, logger *grpclog.PrefixLogger) update := make(map[string]RouteConfigUpdate) for _, r := range resources { if !IsRouteConfigResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl()) } rc := &v3routepb.RouteConfiguration{} if err := proto.Unmarshal(r.GetValue(), rc); err != nil { @@ -361,7 +361,7 @@ func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map update := make(map[string]ClusterUpdate) for _, r := range resources { if !IsClusterResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl()) } cluster := &v3clusterpb.Cluster{} @@ -477,7 +477,7 @@ func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (m update := make(map[string]EndpointsUpdate) for _, r := range resources { if !IsEndpointsResource(r.GetTypeUrl()) { - return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl()) + return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl()) } cla := &v3endpointpb.ClusterLoadAssignment{} diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go index 3ce1f8721b3..94bbd4f462e 100644 --- a/xds/internal/client/transport_helper.go +++ b/xds/internal/client/transport_helper.go @@ -51,7 +51,7 @@ type VersionedClient interface { // SendRequest constructs and sends out a DiscoveryRequest message specific // to the underlying transport protocol version. - SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error + SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error // RecvResponse uses the provided stream to receive a response specific to // the underlying transport protocol version. @@ -246,16 +246,16 @@ func (t *TransportHelper) send(ctx context.Context) { t.sendCh.Load() var ( - target []string - rType ResourceType - version, nonce string - send bool + target []string + rType ResourceType + version, nonce, errMsg string + send bool ) switch update := u.(type) { case *watchAction: target, rType, version, nonce = t.processWatchInfo(update) case *ackAction: - target, rType, version, nonce, send = t.processAckInfo(update, stream) + target, rType, version, nonce, errMsg, send = t.processAckInfo(update, stream) if !send { continue } @@ -267,7 +267,7 @@ func (t *TransportHelper) send(ctx context.Context) { // sending response back). continue } - if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil { + if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil { t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) // send failed, clear the current stream. stream = nil @@ -292,7 +292,7 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool { t.nonceMap = make(map[ResourceType]string) for rType, s := range t.watchMap { - if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil { + if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { t.logger.Errorf("ADS request failed: %v", err) return false } @@ -321,6 +321,7 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool { rType: rType, version: "", nonce: nonce, + errMsg: err.Error(), stream: stream, }) t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) @@ -387,6 +388,7 @@ type ackAction struct { rType ResourceType version string // NACK if version is an empty string. nonce string + errMsg string // Empty unless it's a NACK. // ACK/NACK are tagged with the stream it's for. When the stream is down, // all the ACK/NACK for this stream will be dropped, and the version/nonce // won't be updated. @@ -396,13 +398,13 @@ type ackAction struct { // processAckInfo pulls the fields needed by the ack request from a ackAction. // // If no active watch is found for this ack, it returns false for send. -func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) { +func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce, errMsg string, send bool) { if ack.stream != stream { // If ACK's stream isn't the current sending stream, this means the ACK // was pushed to queue before the old stream broke, and a new stream has // been started since. Return immediately here so we don't update the // nonce for the new stream. - return nil, UnknownResource, "", "", false + return nil, UnknownResource, "", "", "", false } rType = ack.rType @@ -422,7 +424,7 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea // canceled while the ackAction is in queue), because there's no resource // name. And if we send a request with empty resource name list, the // server may treat it as a wild card and send us everything. - return nil, UnknownResource, "", "", false + return nil, UnknownResource, "", "", "", false } send = true target = mapToSlice(s) @@ -437,7 +439,7 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea } else { t.versionMap[rType] = version } - return target, rType, version, nonce, send + return target, rType, version, nonce, ack.errMsg, send } // ReportLoad starts an LRS stream to report load data to the management server. diff --git a/xds/internal/client/v2/client.go b/xds/internal/client/v2/client.go index 674bba4798f..7b063ad4f55 100644 --- a/xds/internal/client/v2/client.go +++ b/xds/internal/client/v2/client.go @@ -25,6 +25,7 @@ import ( "github.com/golang/protobuf/proto" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" @@ -33,6 +34,7 @@ import ( v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + statuspb "google.golang.org/genproto/googleapis/rpc/status" ) func init() { @@ -106,7 +108,7 @@ func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { // - If this is an ack, version will be the version from the response. // - If this is a nack, version will be the previous acked version (from // versionMap). If there was no ack before, it will be empty. -func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { +func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error { stream, ok := s.(adsStream) if !ok { return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) @@ -117,7 +119,11 @@ func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, - // TODO: populate ErrorDetails for nack. + } + if errMsg != "" { + req.ErrorDetail = &statuspb.Status{ + Code: int32(codes.InvalidArgument), Message: errMsg, + } } if err := stream.Send(req); err != nil { return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err) diff --git a/xds/internal/client/v2/client_ack_test.go b/xds/internal/client/v2/client_ack_test.go index 2b38d3aa370..87437aa218c 100644 --- a/xds/internal/client/v2/client_ack_test.go +++ b/xds/internal/client/v2/client_ack_test.go @@ -29,6 +29,7 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/testutils" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeserver" @@ -73,7 +74,7 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb } // compareXDSRequest reads requests from channel, compare it with want. -func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error { +func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() val, err := ch.Receive(ctx) @@ -84,11 +85,22 @@ func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, if req.Err != nil { return fmt.Errorf("unexpected error from request: %v", req.Err) } + + xdsReq := req.Req.(*xdspb.DiscoveryRequest) + if (xdsReq.ErrorDetail != nil) != wantErr { + return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr) + } + // All NACK request.ErrorDetails have hardcoded status code InvalidArguments. + if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) { + return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument) + } + + xdsReq.ErrorDetail = nil // Clear the error details field before comparing. wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) wantClone.VersionInfo = ver wantClone.ResponseNonce = nonce - if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) { - return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone)) + if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) { + return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal))) } return nil } @@ -118,7 +130,7 @@ func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan * } v2c.AddWatch(rType, nameToWatch) - if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil { + if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", rType, err) } t.Logf("FakeServer received %v request...", rType) @@ -133,7 +145,7 @@ func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakese nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver) t.Logf("Good %v response pushed to fakeServer...", rType) - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil { return "", fmt.Errorf("failed to receive %v request: %v", rType, err) } t.Logf("Good %v response acked", rType) @@ -168,7 +180,7 @@ func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeser TypeUrl: typeURL, }, ver) t.Logf("Bad %v response pushed to fakeServer...", rType) - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil { return fmt.Errorf("failed to receive %v request: %v", rType, err) } t.Logf("Bad %v response nacked", rType) @@ -274,7 +286,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) { // The expected version string is an empty string, because this is the first // response, and it's nacked (so there's no previous ack version). - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil { t.Errorf("Failed to receive request: %v", err) } t.Logf("Bad response nacked") @@ -314,7 +326,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) { t.Logf("Bad response pushed to fakeServer...") // The expected version string is the previous acked version. - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil { t.Errorf("Failed to receive request: %v", err) } t.Logf("Bad response nacked") @@ -339,7 +351,7 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) { // Start a CDS watch. v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil { t.Fatal(err) } t.Logf("FakeServer received %v request...", xdsclient.ClusterResource) @@ -356,12 +368,12 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) { // Wait for a request with no resource names, because the only watch was // removed. emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL} - if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) // Wait for a request with correct resource names and version. - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } versionCDS++ @@ -394,7 +406,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Start a CDS watch. v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } t.Logf("FakeServer received %v request...", xdsclient.ClusterResource) @@ -410,7 +422,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Wait for a request with no resource names, because the only watch was // removed. emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL} - if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } versionCDS++ @@ -440,7 +452,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { // Start a new watch. The new watch should have the nonce from the response // above, and version from the first good response. v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1) - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil { + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil { t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err) } diff --git a/xds/internal/client/v3/client.go b/xds/internal/client/v3/client.go index 328cd8b9cbe..5d8d7198ce2 100644 --- a/xds/internal/client/v3/client.go +++ b/xds/internal/client/v3/client.go @@ -24,7 +24,9 @@ import ( "fmt" "github.com/golang/protobuf/proto" + statuspb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpclog" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" @@ -106,7 +108,7 @@ func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) { // - If this is an ack, version will be the version from the response. // - If this is a nack, version will be the previous acked version (from // versionMap). If there was no ack before, it will be empty. -func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error { +func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error { stream, ok := s.(adsStream) if !ok { return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s) @@ -117,7 +119,11 @@ func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, - // TODO: populate ErrorDetails for nack. + } + if errMsg != "" { + req.ErrorDetail = &statuspb.Status{ + Code: int32(codes.InvalidArgument), Message: errMsg, + } } if err := stream.Send(req); err != nil { return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err) From 607604c4a69795dea08a20e99beb2358efb11a6c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 21 Oct 2020 16:55:35 -0700 Subject: [PATCH 2/2] [xds_nack_error_details] not return errMsg --- xds/internal/client/transport_helper.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/xds/internal/client/transport_helper.go b/xds/internal/client/transport_helper.go index 94bbd4f462e..607f26fd5e1 100644 --- a/xds/internal/client/transport_helper.go +++ b/xds/internal/client/transport_helper.go @@ -255,10 +255,11 @@ func (t *TransportHelper) send(ctx context.Context) { case *watchAction: target, rType, version, nonce = t.processWatchInfo(update) case *ackAction: - target, rType, version, nonce, errMsg, send = t.processAckInfo(update, stream) + target, rType, version, nonce, send = t.processAckInfo(update, stream) if !send { continue } + errMsg = update.errMsg } if stream == nil { // There's no stream yet. Skip the request. This request @@ -398,13 +399,13 @@ type ackAction struct { // processAckInfo pulls the fields needed by the ack request from a ackAction. // // If no active watch is found for this ack, it returns false for send. -func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce, errMsg string, send bool) { +func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) { if ack.stream != stream { // If ACK's stream isn't the current sending stream, this means the ACK // was pushed to queue before the old stream broke, and a new stream has // been started since. Return immediately here so we don't update the // nonce for the new stream. - return nil, UnknownResource, "", "", "", false + return nil, UnknownResource, "", "", false } rType = ack.rType @@ -424,7 +425,7 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea // canceled while the ackAction is in queue), because there's no resource // name. And if we send a request with empty resource name list, the // server may treat it as a wild card and send us everything. - return nil, UnknownResource, "", "", "", false + return nil, UnknownResource, "", "", false } send = true target = mapToSlice(s) @@ -439,7 +440,7 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea } else { t.versionMap[rType] = version } - return target, rType, version, nonce, ack.errMsg, send + return target, rType, version, nonce, send } // ReportLoad starts an LRS stream to report load data to the management server.