Skip to content

Commit

Permalink
xds: include node ID in RPC failure status messages from the XdsClient
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Apr 21, 2022
1 parent 3a303af commit 33791ca
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 42 deletions.
11 changes: 10 additions & 1 deletion xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -2537,8 +2537,17 @@ void onError(Status error) {
respTimer.cancel();
respTimer = null;
}

// Include node ID in RPC failure status messages that originate from XdsClient.
// We expect all watchers to propagate the description to the channel, and expose it
// to the caller.
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorAugmented = Status.fromCode(error.getCode())
.withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
.withCause(error.getCause());

for (ResourceWatcher watcher : watchers) {
watcher.onError(error);
watcher.onError(errorAugmented);
}
}

Expand Down
89 changes: 48 additions & 41 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Expand Up @@ -126,7 +126,8 @@ public abstract class ClientXdsClientTestBase {
private static final String VERSION_1 = "42";
private static final String VERSION_2 = "43";
private static final String VERSION_3 = "44";
private static final Node NODE = Node.newBuilder().build();
private static final String NODE_ID = "cool-node-id";
private static final Node NODE = Node.newBuilder().setId(NODE_ID).build();
private static final Any FAILING_ANY = MessageFactory.FAILING_ANY;
private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create();
private final ServerInfo lrsServerInfo =
Expand Down Expand Up @@ -314,7 +315,7 @@ ManagedChannel create(ServerInfo serverInfo) {
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3())))
.node(EnvoyProtoData.Node.newBuilder().build())
.node(NODE)
.authorities(ImmutableMap.of(
"authority.xds.com",
AuthorityInfo.create(
Expand Down Expand Up @@ -467,6 +468,15 @@ private ResourceMetadata verifyResourceMetadata(
return metadata;
}

private void verifyStatusWithNodeId(Status status, Code expectedCode, String expectedMsg) {
assertThat(status.getCode()).isEqualTo(expectedCode);
assertThat(status.getCause()).isNull();
// Watcher.onError propagates status description to the channel, and we want to
// augment the description with the node id.
String description = (expectedMsg.isEmpty() ? "" : expectedMsg + " ") + "nodeID: " + NODE_ID;
assertThat(status.getDescription()).isEqualTo(description);
}

/**
* Helper method to validate {@link XdsClient.EdsUpdate} created for the test CDS resource
* {@link ClientXdsClientTestBase#testClusterLoadAssignment}.
Expand Down Expand Up @@ -1840,11 +1850,8 @@ public void cdsResponseErrorHandling_badUpstreamTlsContext() {
+ "io.grpc.xds.ClientXdsClient$ResourceInvalidException: "
+ "ca_certificate_provider_instance is required in upstream-tls-context";
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(cdsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
}

/**
Expand All @@ -1866,11 +1873,8 @@ public void cdsResponseErrorHandling_badTransportSocketName() {
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
+ "transport-socket with name envoy.transport_sockets.bad not supported.";
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(cdsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
}

@Test
Expand Down Expand Up @@ -2461,13 +2465,13 @@ public void streamClosedAndRetryWithBackoff() {
// Management server closes the RPC stream with an error.
call.sendError(Status.UNKNOWN.asException());
verify(ldsResourceWatcher).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
verify(cdsResourceWatcher).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");

// Retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
Expand All @@ -2483,15 +2487,16 @@ public void streamClosedAndRetryWithBackoff() {
call.verifyRequest(EDS, EDS_RESOURCE, "", "", NODE);

// Management server becomes unreachable.
call.sendError(Status.UNAVAILABLE.asException());
String errorMsg = "my fault";
call.sendError(Status.UNAVAILABLE.withDescription(errorMsg).asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);

// Retry after backoff.
inOrder.verify(backoffPolicy1).nextBackoffNanos();
Expand All @@ -2518,13 +2523,13 @@ public void streamClosedAndRetryWithBackoff() {

call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");

// Reset backoff sequence and retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
Expand All @@ -2542,13 +2547,13 @@ public void streamClosedAndRetryWithBackoff() {
// Management server becomes unreachable again.
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");

// Retry after backoff.
inOrder.verify(backoffPolicy2).nextBackoffNanos();
Expand All @@ -2572,9 +2577,9 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
ScheduledTask retryTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER));
assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L);
Expand Down Expand Up @@ -2621,6 +2626,14 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
call.sendError(Status.UNAVAILABLE.asException());
assertThat(cdsResourceTimeout.isCancelled()).isTrue();
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");

fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);
Expand Down Expand Up @@ -2740,11 +2753,8 @@ public void serverSideListenerResponseErrorHandling_badDownstreamTlsContext() {
+ "0.0.0.0:7000\' validation error: "
+ "common-tls-context is required in downstream-tls-context";
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(ldsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
}

@Test
Expand All @@ -2770,11 +2780,8 @@ public void serverSideListenerResponseErrorHandling_badTransportSocketName() {
+ "transport-socket with name envoy.transport_sockets.bad1 not supported.";
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(
errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(ldsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
verify(ldsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
}

private DiscoveryRpcCall startResourceWatcher(
Expand Down

0 comments on commit 33791ca

Please sign in to comment.