New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xdsclient: populate error details for NACK #3975
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,7 +51,7 @@ type VersionedClient interface { | |
|
||
// SendRequest constructs and sends out a DiscoveryRequest message specific | ||
// to the underlying transport protocol version. | ||
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version string, nonce string) error | ||
SendRequest(s grpc.ClientStream, resourceNames []string, rType ResourceType, version, nonce, errMsg string) error | ||
|
||
// RecvResponse uses the provided stream to receive a response specific to | ||
// the underlying transport protocol version. | ||
|
@@ -246,16 +246,16 @@ func (t *TransportHelper) send(ctx context.Context) { | |
t.sendCh.Load() | ||
|
||
var ( | ||
target []string | ||
rType ResourceType | ||
version, nonce string | ||
send bool | ||
target []string | ||
rType ResourceType | ||
version, nonce, errMsg string | ||
send bool | ||
) | ||
switch update := u.(type) { | ||
case *watchAction: | ||
target, rType, version, nonce = t.processWatchInfo(update) | ||
case *ackAction: | ||
target, rType, version, nonce, send = t.processAckInfo(update, stream) | ||
target, rType, version, nonce, errMsg, send = t.processAckInfo(update, stream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. This is unnecessary, and |
||
if !send { | ||
continue | ||
} | ||
|
@@ -267,7 +267,7 @@ func (t *TransportHelper) send(ctx context.Context) { | |
// sending response back). | ||
continue | ||
} | ||
if err := t.vClient.SendRequest(stream, target, rType, version, nonce); err != nil { | ||
if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil { | ||
t.logger.Warningf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err) | ||
// send failed, clear the current stream. | ||
stream = nil | ||
|
@@ -292,7 +292,7 @@ func (t *TransportHelper) sendExisting(stream grpc.ClientStream) bool { | |
t.nonceMap = make(map[ResourceType]string) | ||
|
||
for rType, s := range t.watchMap { | ||
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", ""); err != nil { | ||
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil { | ||
t.logger.Errorf("ADS request failed: %v", err) | ||
return false | ||
} | ||
|
@@ -321,6 +321,7 @@ func (t *TransportHelper) recv(stream grpc.ClientStream) bool { | |
rType: rType, | ||
version: "", | ||
nonce: nonce, | ||
errMsg: err.Error(), | ||
stream: stream, | ||
}) | ||
t.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err) | ||
|
@@ -387,6 +388,7 @@ type ackAction struct { | |
rType ResourceType | ||
version string // NACK if version is an empty string. | ||
nonce string | ||
errMsg string // Empty unless it's a NACK. | ||
// ACK/NACK are tagged with the stream it's for. When the stream is down, | ||
// all the ACK/NACK for this stream will be dropped, and the version/nonce | ||
// won't be updated. | ||
|
@@ -396,13 +398,13 @@ type ackAction struct { | |
// processAckInfo pulls the fields needed by the ack request from a ackAction. | ||
// | ||
// If no active watch is found for this ack, it returns false for send. | ||
func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce string, send bool) { | ||
func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType ResourceType, version, nonce, errMsg string, send bool) { | ||
if ack.stream != stream { | ||
// If ACK's stream isn't the current sending stream, this means the ACK | ||
// was pushed to queue before the old stream broke, and a new stream has | ||
// been started since. Return immediately here so we don't update the | ||
// nonce for the new stream. | ||
return nil, UnknownResource, "", "", false | ||
return nil, UnknownResource, "", "", "", false | ||
} | ||
rType = ack.rType | ||
|
||
|
@@ -422,7 +424,7 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea | |
// canceled while the ackAction is in queue), because there's no resource | ||
// name. And if we send a request with empty resource name list, the | ||
// server may treat it as a wild card and send us everything. | ||
return nil, UnknownResource, "", "", false | ||
return nil, UnknownResource, "", "", "", false | ||
} | ||
send = true | ||
target = mapToSlice(s) | ||
|
@@ -437,7 +439,7 @@ func (t *TransportHelper) processAckInfo(ack *ackAction, stream grpc.ClientStrea | |
} else { | ||
t.versionMap[rType] = version | ||
} | ||
return target, rType, version, nonce, send | ||
return target, rType, version, nonce, ack.errMsg, send | ||
} | ||
|
||
// ReportLoad starts an LRS stream to report load data to the management server. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We might as well get rid of
errMsg
and useupdate.errMsg
directly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a
switch: case
. OnlyackAction
has theerrMsg
field.