diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index bec73d1c71d..ee16bea6952 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -2537,8 +2537,17 @@ void onError(Status error) { respTimer.cancel(); respTimer = null; } + + // Include node ID in RPC failure status messages that originate from XdsClient. + // We expect all watchers to propagate the description to the channel, and expose it + // to the caller. + String description = error.getDescription() == null ? "" : error.getDescription() + " "; + Status errorAugmented = Status.fromCode(error.getCode()) + .withDescription(description + "nodeID: " + bootstrapInfo.node().getId()) + .withCause(error.getCause()); + for (ResourceWatcher watcher : watchers) { - watcher.onError(error); + watcher.onError(errorAugmented); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 4fe067752d6..6a8481f5361 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -126,7 +126,8 @@ public abstract class ClientXdsClientTestBase { private static final String VERSION_1 = "42"; private static final String VERSION_2 = "43"; private static final String VERSION_3 = "44"; - private static final Node NODE = Node.newBuilder().build(); + private static final String NODE_ID = "cool-node-id"; + private static final Node NODE = Node.newBuilder().setId(NODE_ID).build(); private static final Any FAILING_ANY = MessageFactory.FAILING_ANY; private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create(); private final ServerInfo lrsServerInfo = @@ -314,7 +315,7 @@ ManagedChannel create(ServerInfo serverInfo) { Bootstrapper.BootstrapInfo.builder() .servers(Arrays.asList( Bootstrapper.ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3()))) - .node(EnvoyProtoData.Node.newBuilder().build()) + .node(NODE) .authorities(ImmutableMap.of( "authority.xds.com", AuthorityInfo.create( @@ -467,6 +468,15 @@ private ResourceMetadata verifyResourceMetadata( return metadata; } + private void verifyStatusWithNodeId(Status status, Code expectedCode, String expectedMsg) { + assertThat(status.getCode()).isEqualTo(expectedCode); + assertThat(status.getCause()).isNull(); + // Watcher.onError propagates status description to the channel, and we want to + // augment the description with the node id. + String description = (expectedMsg.isEmpty() ? "" : expectedMsg + " ") + "nodeID: " + NODE_ID; + assertThat(status.getDescription()).isEqualTo(description); + } + /** * Helper method to validate {@link XdsClient.EdsUpdate} created for the test CDS resource * {@link ClientXdsClientTestBase#testClusterLoadAssignment}. @@ -1840,11 +1850,8 @@ public void cdsResponseErrorHandling_badUpstreamTlsContext() { + "io.grpc.xds.ClientXdsClient$ResourceInvalidException: " + "ca_certificate_provider_instance is required in upstream-tls-context"; call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg)); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(cdsResourceWatcher).onError(captor.capture()); - Status errorStatus = captor.getValue(); - assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode()); - assertThat(errorStatus.getDescription()).isEqualTo(errorMsg); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); } /** @@ -1866,11 +1873,8 @@ public void cdsResponseErrorHandling_badTransportSocketName() { String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: " + "transport-socket with name envoy.transport_sockets.bad not supported."; call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg)); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(cdsResourceWatcher).onError(captor.capture()); - Status errorStatus = captor.getValue(); - assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode()); - assertThat(errorStatus.getDescription()).isEqualTo(errorMsg); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); } @Test @@ -2461,13 +2465,13 @@ public void streamClosedAndRetryWithBackoff() { // Management server closes the RPC stream with an error. call.sendError(Status.UNKNOWN.asException()); verify(ldsResourceWatcher).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); verify(rdsResourceWatcher).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); verify(cdsResourceWatcher).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); verify(edsResourceWatcher).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); // Retry after backoff. inOrder.verify(backoffPolicyProvider).get(); @@ -2483,15 +2487,16 @@ public void streamClosedAndRetryWithBackoff() { call.verifyRequest(EDS, EDS_RESOURCE, "", "", NODE); // Management server becomes unreachable. - call.sendError(Status.UNAVAILABLE.asException()); + String errorMsg = "my fault"; + call.sendError(Status.UNAVAILABLE.withDescription(errorMsg).asException()); verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); // Retry after backoff. inOrder.verify(backoffPolicy1).nextBackoffNanos(); @@ -2518,13 +2523,13 @@ public void streamClosedAndRetryWithBackoff() { call.sendError(Status.DEADLINE_EXCEEDED.asException()); verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); // Reset backoff sequence and retry after backoff. inOrder.verify(backoffPolicyProvider).get(); @@ -2542,13 +2547,13 @@ public void streamClosedAndRetryWithBackoff() { // Management server becomes unreachable again. call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); // Retry after backoff. inOrder.verify(backoffPolicy2).nextBackoffNanos(); @@ -2572,9 +2577,9 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(rdsResourceWatcher).onError(errorCaptor.capture()); - assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); ScheduledTask retryTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); @@ -2621,6 +2626,14 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe call.sendError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); assertThat(edsResourceTimeout.isCancelled()).isTrue(); + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(rdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(edsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); fakeClock.forwardNanos(10L); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); @@ -2740,11 +2753,8 @@ public void serverSideListenerResponseErrorHandling_badDownstreamTlsContext() { + "0.0.0.0:7000\' validation error: " + "common-tls-context is required in downstream-tls-context"; call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg)); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(ldsResourceWatcher).onError(captor.capture()); - Status errorStatus = captor.getValue(); - assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode()); - assertThat(errorStatus.getDescription()).isEqualTo(errorMsg); + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); } @Test @@ -2770,11 +2780,8 @@ public void serverSideListenerResponseErrorHandling_badTransportSocketName() { + "transport-socket with name envoy.transport_sockets.bad1 not supported."; call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of( errorMsg)); - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(ldsResourceWatcher).onError(captor.capture()); - Status errorStatus = captor.getValue(); - assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode()); - assertThat(errorStatus.getDescription()).isEqualTo(errorMsg); + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); } private DiscoveryRpcCall startResourceWatcher(