Skip to content

Commit

Permalink
xdsclient: populate error details for NACK (#3975)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Oct 22, 2020
1 parent 0e8f1cd commit 37b72f9
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 30 deletions.
10 changes: 5 additions & 5 deletions xds/internal/client/client_xds.go
Expand Up @@ -47,7 +47,7 @@ func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (ma
update := make(map[string]ListenerUpdate)
for _, r := range resources {
if !IsListenerResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl())
}
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
Expand All @@ -71,7 +71,7 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog.
}
apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl())
return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
}
apiLis := &v3httppb.HttpConnectionManager{}
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
Expand Down Expand Up @@ -108,7 +108,7 @@ func UnmarshalRouteConfig(resources []*anypb.Any, logger *grpclog.PrefixLogger)
update := make(map[string]RouteConfigUpdate)
for _, r := range resources {
if !IsRouteConfigResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl())
}
rc := &v3routepb.RouteConfiguration{}
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map
update := make(map[string]ClusterUpdate)
for _, r := range resources {
if !IsClusterResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl())
}

cluster := &v3clusterpb.Cluster{}
Expand Down Expand Up @@ -477,7 +477,7 @@ func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (m
update := make(map[string]EndpointsUpdate)
for _, r := range resources {
if !IsEndpointsResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl())
}

cla := &v3endpointpb.ClusterLoadAssignment{}
Expand Down
17 changes: 10 additions & 7 deletions xds/internal/client/transport_helper.go
Expand Up @@ -51,7 +51,7 @@ type VersionedClient interface {

// SendRequest constructs and sends out a DiscoveryRequest message specific
// to the underlying transport protocol version.
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error

// RecvResponse uses the provided stream to receive a response specific to
// the underlying transport protocol version.
Expand Down Expand Up @@ -246,10 +246,10 @@ func (t *TransportHelper) send(ctx context.Context) {
t.sendCh.Load()

var (
target []string
rType ResourceType
version, nonce string
send bool
target []string
rType ResourceType
version, nonce, errMsg string
send bool
)
switch update := u.(type) {
case *watchAction:
Expand All @@ -259,6 +259,7 @@ func (t *TransportHelper) send(ctx context.Context) {
if !send {
continue
}
errMsg = update.errMsg
}
if stream == nil {
// There's no stream yet. Skip the request. This request
Expand All @@ -267,7 +268,7 @@ func (t *TransportHelper) send(ctx context.Context) {
// sending response back).
continue
}
if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil {
if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
// send failed, clear the current stream.
stream = nil
Expand All @@ -292,7 +293,7 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool {
t.nonceMap = make(map[ResourceType]string)

for rType, s := range t.watchMap {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
t.logger.Errorf("ADS request failed: %v", err)
return false
}
Expand Down Expand Up @@ -321,6 +322,7 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool {
rType: rType,
version: "",
nonce: nonce,
errMsg: err.Error(),
stream: stream,
})
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
Expand Down Expand Up @@ -387,6 +389,7 @@ type ackAction struct {
rType ResourceType
version string // NACK if version is an empty string.
nonce string
errMsg string // Empty unless it's a NACK.
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
Expand Down
10 changes: 8 additions & 2 deletions xds/internal/client/v2/client.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
Expand All @@ -33,6 +34,7 @@ import (
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)

func init() {
Expand Down Expand Up @@ -106,7 +108,7 @@ func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
Expand All @@ -117,7 +119,11 @@ func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
// TODO: populate ErrorDetails for nack.
}
if errMsg != "" {
req.ErrorDetail = &statuspb.Status{
Code: int32(codes.InvalidArgument), Message: errMsg,
}
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
Expand Down
40 changes: 26 additions & 14 deletions xds/internal/client/v2/client_ack_test.go
Expand Up @@ -29,6 +29,7 @@ import (
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/testutils"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
Expand Down Expand Up @@ -73,7 +74,7 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb
}

// compareXDSRequest reads requests from channel, compare it with want.
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error {
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ch.Receive(ctx)
Expand All @@ -84,11 +85,22 @@ func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver,
if req.Err != nil {
return fmt.Errorf("unexpected error from request: %v", req.Err)
}

xdsReq := req.Req.(*xdspb.DiscoveryRequest)
if (xdsReq.ErrorDetail != nil) != wantErr {
return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr)
}
// All NACK request.ErrorDetails have hardcoded status code InvalidArguments.
if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) {
return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument)
}

xdsReq.ErrorDetail = nil // Clear the error details field before comparing.
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = ver
wantClone.ResponseNonce = nonce
if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal)))
}
return nil
}
Expand Down Expand Up @@ -118,7 +130,7 @@ func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan *
}
v2c.AddWatch(rType, nameToWatch)

if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", rType, err)
}
t.Logf("FakeServer received %v request...", rType)
Expand All @@ -133,7 +145,7 @@ func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakese
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver)
t.Logf("Good %v response pushed to fakeServer...", rType)

if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil {
return "", fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Good %v response acked", rType)
Expand Down Expand Up @@ -168,7 +180,7 @@ func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeser
TypeUrl: typeURL,
}, ver)
t.Logf("Bad %v response pushed to fakeServer...", rType)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil {
return fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Bad %v response nacked", rType)
Expand Down Expand Up @@ -274,7 +286,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) {

// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
Expand Down Expand Up @@ -314,7 +326,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
t.Logf("Bad response pushed to fakeServer...")

// The expected version string is the previous acked version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
Expand All @@ -339,7 +351,7 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {

// Start a CDS watch.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
t.Fatal(err)
}
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
Expand All @@ -356,12 +368,12 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
// Wait for a request with correct resource names and version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
Expand Down Expand Up @@ -394,7 +406,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {

// Start a CDS watch.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
Expand All @@ -410,7 +422,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
Expand Down Expand Up @@ -440,7 +452,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
// Start a new watch. The new watch should have the nonce from the response
// above, and version from the first good response.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}

Expand Down
10 changes: 8 additions & 2 deletions xds/internal/client/v3/client.go
Expand Up @@ -24,7 +24,9 @@ import (
"fmt"

"github.com/golang/protobuf/proto"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
Expand Down Expand Up @@ -106,7 +108,7 @@ func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
Expand All @@ -117,7 +119,11 @@ func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
// TODO: populate ErrorDetails for nack.
}
if errMsg != "" {
req.ErrorDetail = &statuspb.Status{
Code: int32(codes.InvalidArgument), Message: errMsg,
}
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
Expand Down

0 comments on commit 37b72f9

Please sign in to comment.