Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: populate error details for NACK #3975

Merged
merged 2 commits into from Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions xds/internal/client/client_xds.go
Expand Up @@ -47,7 +47,7 @@ func UnmarshalListener(resources []*anypb.Any, logger *grpclog.PrefixLogger) (ma
update := make(map[string]ListenerUpdate)
for _, r := range resources {
if !IsListenerResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in LDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl())
}
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
Expand All @@ -71,7 +71,7 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog.
}
apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return "", fmt.Errorf("xds: unexpected resource type: %s in LDS response", apiLisAny.GetTypeUrl())
return "", fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
}
apiLis := &v3httppb.HttpConnectionManager{}
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
Expand Down Expand Up @@ -108,7 +108,7 @@ func UnmarshalRouteConfig(resources []*anypb.Any, logger *grpclog.PrefixLogger)
update := make(map[string]RouteConfigUpdate)
for _, r := range resources {
if !IsRouteConfigResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in RDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in RDS response", r.GetTypeUrl())
}
rc := &v3routepb.RouteConfiguration{}
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ func UnmarshalCluster(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map
update := make(map[string]ClusterUpdate)
for _, r := range resources {
if !IsClusterResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in CDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in CDS response", r.GetTypeUrl())
}

cluster := &v3clusterpb.Cluster{}
Expand Down Expand Up @@ -477,7 +477,7 @@ func UnmarshalEndpoints(resources []*anypb.Any, logger *grpclog.PrefixLogger) (m
update := make(map[string]EndpointsUpdate)
for _, r := range resources {
if !IsEndpointsResource(r.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %s in EDS response", r.GetTypeUrl())
return nil, fmt.Errorf("xds: unexpected resource type: %q in EDS response", r.GetTypeUrl())
}

cla := &v3endpointpb.ClusterLoadAssignment{}
Expand Down
17 changes: 10 additions & 7 deletions xds/internal/client/transport_helper.go
Expand Up @@ -51,7 +51,7 @@ type VersionedClient interface {

// SendRequest constructs and sends out a DiscoveryRequest message specific
// to the underlying transport protocol version.
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error

// RecvResponse uses the provided stream to receive a response specific to
// the underlying transport protocol version.
Expand Down Expand Up @@ -246,10 +246,10 @@ func (t *TransportHelper) send(ctx context.Context) {
t.sendCh.Load()

var (
target []string
rType ResourceType
version, nonce string
send bool
target []string
rType ResourceType
version, nonce, errMsg string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We might as well get rid of errMsg and use update.errMsg directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a switch: case. Only ackAction has the errMsg field.

send bool
)
switch update := u.(type) {
case *watchAction:
Expand All @@ -259,6 +259,7 @@ func (t *TransportHelper) send(ctx context.Context) {
if !send {
continue
}
errMsg = update.errMsg
}
if stream == nil {
// There's no stream yet. Skip the request. This request
Expand All @@ -267,7 +268,7 @@ func (t *TransportHelper) send(ctx context.Context) {
// sending response back).
continue
}
if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil {
if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
// send failed, clear the current stream.
stream = nil
Expand All @@ -292,7 +293,7 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool {
t.nonceMap = make(map[ResourceType]string)

for rType, s := range t.watchMap {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
t.logger.Errorf("ADS request failed: %v", err)
return false
}
Expand Down Expand Up @@ -321,6 +322,7 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool {
rType: rType,
version: "",
nonce: nonce,
errMsg: err.Error(),
stream: stream,
})
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
Expand Down Expand Up @@ -387,6 +389,7 @@ type ackAction struct {
rType ResourceType
version string // NACK if version is an empty string.
nonce string
errMsg string // Empty unless it's a NACK.
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
Expand Down
10 changes: 8 additions & 2 deletions xds/internal/client/v2/client.go
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
Expand All @@ -33,6 +34,7 @@ import (
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
v2adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)

func init() {
Expand Down Expand Up @@ -106,7 +108,7 @@ func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
Expand All @@ -117,7 +119,11 @@ func (v2c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
// TODO: populate ErrorDetails for nack.
}
if errMsg != "" {
req.ErrorDetail = &statuspb.Status{
Code: int32(codes.InvalidArgument), Message: errMsg,
}
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
Expand Down
40 changes: 26 additions & 14 deletions xds/internal/client/v2/client_ack_test.go
Expand Up @@ -29,6 +29,7 @@ import (
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/testutils"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
Expand Down Expand Up @@ -73,7 +74,7 @@ func startXDSV2Client(t *testing.T, cc *grpc.ClientConn) (v2c *client, cbLDS, cb
}

// compareXDSRequest reads requests from channel, compare it with want.
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string) error {
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver, nonce string, wantErr bool) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ch.Receive(ctx)
Expand All @@ -84,11 +85,22 @@ func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, ver,
if req.Err != nil {
return fmt.Errorf("unexpected error from request: %v", req.Err)
}

xdsReq := req.Req.(*xdspb.DiscoveryRequest)
if (xdsReq.ErrorDetail != nil) != wantErr {
return fmt.Errorf("received request with error details: %v, wantErr: %v", xdsReq.ErrorDetail, wantErr)
}
// All NACK request.ErrorDetails have hardcoded status code InvalidArguments.
if xdsReq.ErrorDetail != nil && xdsReq.ErrorDetail.Code != int32(codes.InvalidArgument) {
return fmt.Errorf("received request with error details: %v, want status with code: %v", xdsReq.ErrorDetail, codes.InvalidArgument)
}

xdsReq.ErrorDetail = nil // Clear the error details field before comparing.
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = ver
wantClone.ResponseNonce = nonce
if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
if !cmp.Equal(xdsReq, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone, cmp.Comparer(proto.Equal)))
}
return nil
}
Expand Down Expand Up @@ -118,7 +130,7 @@ func startXDS(t *testing.T, rType xdsclient.ResourceType, v2c *client, reqChan *
}
v2c.AddWatch(rType, nameToWatch)

if err := compareXDSRequest(reqChan, req, preVersion, preNonce); err != nil {
if err := compareXDSRequest(reqChan, req, preVersion, preNonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", rType, err)
}
t.Logf("FakeServer received %v request...", rType)
Expand All @@ -133,7 +145,7 @@ func sendGoodResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakese
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, ver)
t.Logf("Good %v response pushed to fakeServer...", rType)

if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver), nonce, false); err != nil {
return "", fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Good %v response acked", rType)
Expand Down Expand Up @@ -168,7 +180,7 @@ func sendBadResp(t *testing.T, rType xdsclient.ResourceType, fakeServer *fakeser
TypeUrl: typeURL,
}, ver)
t.Logf("Bad %v response pushed to fakeServer...", rType)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(ver-1), nonce, true); err != nil {
return fmt.Errorf("failed to receive %v request: %v", rType, err)
}
t.Logf("Bad %v response nacked", rType)
Expand Down Expand Up @@ -274,7 +286,7 @@ func (s) TestV2ClientAckFirstIsNack(t *testing.T) {

// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce, true); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
Expand Down Expand Up @@ -314,7 +326,7 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) {
t.Logf("Bad response pushed to fakeServer...")

// The expected version string is the previous acked version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce, true); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
Expand All @@ -339,7 +351,7 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {

// Start a CDS watch.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
t.Fatal(err)
}
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
Expand All @@ -356,12 +368,12 @@ func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) {
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
// Wait for a request with correct resource names and version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
Expand Down Expand Up @@ -394,7 +406,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {

// Start a CDS watch.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", "", false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
t.Logf("FakeServer received %v request...", xdsclient.ClusterResource)
Expand All @@ -410,7 +422,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
// Wait for a request with no resource names, because the only watch was
// removed.
emptyReq := &xdspb.DiscoveryRequest{Node: goodNodeProto, TypeUrl: version.V2ClusterURL}
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, emptyReq, strconv.Itoa(versionCDS), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}
versionCDS++
Expand Down Expand Up @@ -440,7 +452,7 @@ func (s) TestV2ClientAckCancelResponseRace(t *testing.T) {
// Start a new watch. The new watch should have the nonce from the response
// above, and version from the first good response.
v2c.AddWatch(xdsclient.ClusterResource, goodClusterName1)
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil {
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce, false); err != nil {
t.Fatalf("Failed to receive %v request: %v", xdsclient.ClusterResource, err)
}

Expand Down
10 changes: 8 additions & 2 deletions xds/internal/client/v3/client.go
Expand Up @@ -24,7 +24,9 @@ import (
"fmt"

"github.com/golang/protobuf/proto"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
Expand Down Expand Up @@ -106,7 +108,7 @@ func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce string) error {
func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rType xdsclient.ResourceType, version, nonce, errMsg string) error {
stream, ok := s.(adsStream)
if !ok {
return fmt.Errorf("xds: Attempt to send request on unsupported stream type: %T", s)
Expand All @@ -117,7 +119,11 @@ func (v3c *client) SendRequest(s grpc.ClientStream, resourceNames []string, rTyp
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
// TODO: populate ErrorDetails for nack.
}
if errMsg != "" {
req.ErrorDetail = &statuspb.Status{
Code: int32(codes.InvalidArgument), Message: errMsg,
}
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("xds: stream.Send(%+v) failed: %v", req, err)
Expand Down