Skip to content

Commit

Permalink
xds: fix xdsClient resource not exist for invalid resource, fix xdsSe…
Browse files Browse the repository at this point in the history
…rverWrapper start on resource not exist (grpc#8660)

Fix bugs:
1. Invalid resource at xdsClient, the watcher should have been delivered an error instead of resource not found.
2. If the resource is properly determined to not exist, it shouldn't cause start() to fail. From A36 xDS for Servers:
"XdsServer's start must not fail due to transient xDS issues, like missing xDS configuration from the xDS server."
  • Loading branch information
YifeiZhuang committed Nov 11, 2021
1 parent 48e8df4 commit a98e0db
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 38 deletions.
10 changes: 6 additions & 4 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -2077,11 +2077,13 @@ private void handleResourceUpdate(
}
retainedResources.add(edsName);
}
continue;
} else if (invalidResources.contains(resourceName)) {
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
} else {
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
}
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
Expand Down
4 changes: 0 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Expand Up @@ -571,10 +571,6 @@ private void handleConfigNotFound(StatusException exception) {
for (SslContextProviderSupplier s: toRelease) {
s.close();
}
if (!initialStarted) {
initialStarted = true;
initialStartFuture.set(exception);
}
if (restartTimer != null) {
restartTimer.cancel();
}
Expand Down
50 changes: 34 additions & 16 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Expand Up @@ -1555,12 +1555,16 @@ public void cdsResponseErrorHandling_badUpstreamTlsContext() {
call.sendResponse(CDS, clusters, VERSION_1, "0000");

// The response NACKed with errors indicating indices of the failed resources.
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(
"CDS response Cluster 'cluster.googleapis.com' validation error: "
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
+ "Cluster cluster.googleapis.com: malformed UpstreamTlsContext: "
+ "io.grpc.xds.ClientXdsClient$ResourceInvalidException: "
+ "ca_certificate_provider_instance is required in upstream-tls-context"));
verifyNoInteractions(cdsResourceWatcher);
+ "ca_certificate_provider_instance is required in upstream-tls-context";
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(cdsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
}

/**
Expand All @@ -1579,10 +1583,14 @@ public void cdsResponseErrorHandling_badTransportSocketName() {
call.sendResponse(CDS, clusters, VERSION_1, "0000");

// The response NACKed with errors indicating indices of the failed resources.
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(
"CDS response Cluster 'cluster.googleapis.com' validation error: "
+ "transport-socket with name envoy.transport_sockets.bad not supported."));
verifyNoInteractions(cdsResourceWatcher);
String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: "
+ "transport-socket with name envoy.transport_sockets.bad not supported.";
call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(cdsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
}

@Test
Expand Down Expand Up @@ -2429,10 +2437,15 @@ public void serverSideListenerResponseErrorHandling_badDownstreamTlsContext() {
List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
// The response NACKed with errors indicating indices of the failed resources.
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(
"LDS response Listener \'grpc/server?xds.resource.listening_address=0.0.0.0:7000\' "
+ "validation error: common-tls-context is required in downstream-tls-context"));
verifyNoInteractions(ldsResourceWatcher);
String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address="
+ "0.0.0.0:7000\' validation error: "
+ "common-tls-context is required in downstream-tls-context";
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(ldsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
}

@Test
Expand All @@ -2453,11 +2466,16 @@ public void serverSideListenerResponseErrorHandling_badTransportSocketName() {
List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
// The response NACKed with errors indicating indices of the failed resources.
String errorMsg = "LDS response Listener \'grpc/server?xds.resource.listening_address="
+ "0.0.0.0:7000\' validation error: "
+ "transport-socket with name envoy.transport_sockets.bad1 not supported.";
call.verifyRequestNack(LDS, LISTENER_RESOURCE, "", "0000", NODE, ImmutableList.of(
"LDS response Listener \'grpc/server?xds.resource.listening_address=0.0.0.0:7000\' "
+ "validation error: "
+ "transport-socket with name envoy.transport_sockets.bad1 not supported."));
verifyNoInteractions(ldsResourceWatcher);
errorMsg));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(ldsResourceWatcher).onError(captor.capture());
Status errorStatus = captor.getValue();
assertThat(errorStatus.getCode()).isEqualTo(Status.UNAVAILABLE.getCode());
assertThat(errorStatus.getDescription()).isEqualTo(errorMsg);
}

private DiscoveryRpcCall startResourceWatcher(
Expand Down
Expand Up @@ -63,15 +63,15 @@
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Settings;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -190,8 +190,8 @@ public void run() {
try {
start.get(5, TimeUnit.SECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
} catch (TimeoutException ex) {
assertThat(start.isDone()).isFalse();
}
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
}
Expand All @@ -214,8 +214,8 @@ public void run() {
try {
start.get(5, TimeUnit.SECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
} catch (TimeoutException ex) {
assertThat(start.isDone()).isFalse();
}
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
}
Expand All @@ -238,8 +238,8 @@ public void run() {
try {
start.get(5, TimeUnit.SECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
} catch (TimeoutException ex) {
assertThat(start.isDone()).isFalse();
}
assertThat(selectorManager.getSelectorToUpdateSelector()).isSameInstanceAs(NO_FILTER_CHAIN);
}
Expand Down
22 changes: 16 additions & 6 deletions xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java
Expand Up @@ -74,6 +74,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -261,9 +262,10 @@ public void run() {
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
fail("server should not start() successfully.");
} catch (TimeoutException ex) {
// expect to block here.
assertThat(start.isDone()).isFalse();
}
verify(mockBuilder, times(1)).build();
verify(mockServer, never()).start();
Expand Down Expand Up @@ -602,9 +604,10 @@ public void run() {
xdsClient.ldsWatcher.onResourceDoesNotExist(ldsResource);
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
fail("server should not start()");
} catch (TimeoutException ex) {
// expect to block here.
assertThat(start.isDone()).isFalse();
}
verify(listener, times(1)).onNotServing(any(StatusException.class));
verify(mockBuilder, times(1)).build();
Expand All @@ -627,6 +630,13 @@ public void run() {
assertThat(sslSupplier0.isShutdown()).isTrue();
xdsClient.deliverRdsUpdate("rds",
Collections.singletonList(createVirtualHost("virtual-host-1")));
try {
start.get(5000, TimeUnit.MILLISECONDS);
fail("Start should throw exception");
} catch (ExecutionException ex) {
assertThat(ex.getCause()).isInstanceOf(IOException.class);
assertThat(ex.getCause().getMessage()).isEqualTo("error!");
}
RdsResourceWatcher saveRdsWatcher = xdsClient.rdsWatchers.get("rds");
assertThat(executor.forwardNanos(RETRY_DELAY_NANOS)).isEqualTo(1);
verify(mockBuilder, times(1)).build();
Expand Down

0 comments on commit a98e0db

Please sign in to comment.