Skip to content

Commit

Permalink
xds: accept resources wrapped in a Resource message (grpc#8997)
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang authored and temawi committed Apr 8, 2022
1 parent 3c843fb commit 2477548
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 3 deletions.
3 changes: 3 additions & 0 deletions xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
Expand Up @@ -62,6 +62,8 @@ class BootstrapperImpl extends Bootstrapper {
static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
"envoy.lb.does_not_support_overprovisioning";
@VisibleForTesting
static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
@VisibleForTesting
static boolean enableFederation =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
Expand Down Expand Up @@ -177,6 +179,7 @@ BootstrapInfo bootstrap(Map<String, ?> rawData) throws XdsInitializationExceptio
nodeBuilder.setUserAgentName(buildVersion.getUserAgent());
nodeBuilder.setUserAgentVersion(buildVersion.getImplementationVersion());
nodeBuilder.addClientFeatures(CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
nodeBuilder.addClientFeatures(CLIENT_FEATURE_RESOURCE_IN_SOTW);
builder.node(nodeBuilder.build());

Map<String, ?> certProvidersBlob = JsonUtil.getObject(rawData, "certificate_providers");
Expand Down
24 changes: 22 additions & 2 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -62,6 +62,7 @@
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
import io.envoyproxy.envoy.service.discovery.v3.Resource;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.ChannelCredentials;
Expand Down Expand Up @@ -188,6 +189,9 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
"type.googleapis.com/xds.type.v3.TypedStruct";
private static final String TYPE_URL_FILTER_CONFIG =
"type.googleapis.com/envoy.config.route.v3.FilterConfig";
private static final String TYPE_URL_RESOURCE_V2 = "type.googleapis.com/envoy.api.v2.Resource";
private static final String TYPE_URL_RESOURCE_V3 =
"type.googleapis.com/envoy.service.discovery.v3.Resource";
// TODO(zdapeng): need to discuss how to handle unsupported values.
private static final Set<Code> SUPPORTED_RETRYABLE_CODES =
Collections.unmodifiableSet(EnumSet.of(
Expand Down Expand Up @@ -274,6 +278,17 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
serverLrsClientMap.put(serverInfo, lrsClient);
}

private Any maybeUnwrapResources(Any resource)
throws InvalidProtocolBufferException {
if (resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V2)
|| resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V3)) {
return unpackCompatibleType(resource, Resource.class, TYPE_URL_RESOURCE_V3,
TYPE_URL_RESOURCE_V2).getResource();
} else {
return resource;
}
}

@Override
public void handleLdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
Expand All @@ -287,10 +302,12 @@ public void handleLdsResponse(
for (int i = 0; i < resources.size(); i++) {
Any resource = resources.get(i);

// Unpack the Listener.
boolean isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl());
boolean isResourceV3;
Listener listener;
try {
resource = maybeUnwrapResources(resource);
// Unpack the Listener.
isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl());
listener = unpackCompatibleType(resource, Listener.class, ResourceType.LDS.typeUrl(),
ResourceType.LDS.typeUrlV2());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -1424,6 +1441,7 @@ public void handleRdsResponse(
// Unpack the RouteConfiguration.
RouteConfiguration routeConfig;
try {
resource = maybeUnwrapResources(resource);
routeConfig = unpackCompatibleType(resource, RouteConfiguration.class,
ResourceType.RDS.typeUrl(), ResourceType.RDS.typeUrlV2());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -1552,6 +1570,7 @@ public void handleCdsResponse(
// Unpack the Cluster.
Cluster cluster;
try {
resource = maybeUnwrapResources(resource);
cluster = unpackCompatibleType(
resource, Cluster.class, ResourceType.CDS.typeUrl(), ResourceType.CDS.typeUrlV2());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -1770,6 +1789,7 @@ public void handleEdsResponse(
// Unpack the ClusterLoadAssignment.
ClusterLoadAssignment assignment;
try {
resource = maybeUnwrapResources(resource);
assignment =
unpackCompatibleType(resource, ClusterLoadAssignment.class, ResourceType.EDS.typeUrl(),
ResourceType.EDS.typeUrlV2());
Expand Down
3 changes: 2 additions & 1 deletion xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java
Expand Up @@ -827,6 +827,7 @@ private static Node.Builder getNodeBuilder() {
.setBuildVersion(buildVersion.toString())
.setUserAgentName(buildVersion.getUserAgent())
.setUserAgentVersion(buildVersion.getImplementationVersion())
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING)
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_RESOURCE_IN_SOTW);
}
}
67 changes: 67 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Expand Up @@ -686,6 +686,21 @@ public void ldsResourceFound_containsVirtualHosts() {
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
}

@Test
public void wrappedLdsResource() {
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);

// Client sends an ACK LDS request.
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
}

@Test
public void ldsResourceFound_containsRdsName() {
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
Expand Down Expand Up @@ -1165,6 +1180,20 @@ public void rdsResourceFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
}

@Test
public void wrappedRdsResource() {
DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher);
call.sendResponse(RDS, mf.buildWrappedResource(testRouteConfig), VERSION_1, "0000");

// Client sends an ACK RDS request.
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
}

@Test
public void cachedRdsResource_data() {
DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher);
Expand Down Expand Up @@ -1596,6 +1625,28 @@ public void cdsResourceFound() {
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}

@Test
public void wrappedCdsResource() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
call.sendResponse(CDS, mf.buildWrappedResource(testClusterRoundRobin), VERSION_1, "0000");

// Client sent an ACK CDS request.
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}

@Test
public void cdsResourceFound_leastRequestLbPolicy() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
Expand Down Expand Up @@ -2141,6 +2192,20 @@ public void edsResourceFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void wrappedEdsResourceFound() {
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
call.sendResponse(EDS, mf.buildWrappedResource(testClusterLoadAssignment), VERSION_1, "0000");

// Client sent an ACK EDS request.
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void cachedEdsResource_data() {
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
Expand Down Expand Up @@ -2789,6 +2854,8 @@ protected abstract static class MessageFactory {
/** Throws {@link InvalidProtocolBufferException} on {@link Any#unpack(Class)}. */
protected static final Any FAILING_ANY = Any.newBuilder().setTypeUrl("fake").build();

protected abstract Any buildWrappedResource(Any originalResource);

protected Message buildListenerWithApiListener(String name, Message routeConfiguration) {
return buildListenerWithApiListener(
name, routeConfiguration, Collections.<Message>emptyList());
Expand Down
8 changes: 8 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java
Expand Up @@ -45,6 +45,7 @@
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
import io.envoyproxy.envoy.api.v2.Resource;
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext;
import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig;
Expand Down Expand Up @@ -253,6 +254,13 @@ protected void sendResponse(List<String> clusters, long loadReportIntervalNano)

private static class MessageFactoryV2 extends MessageFactory {

@Override
protected Any buildWrappedResource(Any originalResource) {
return Any.pack(Resource.newBuilder()
.setResource(originalResource)
.build());
}

@SuppressWarnings("unchecked")
@Override
protected Message buildListenerWithApiListener(
Expand Down
8 changes: 8 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java
Expand Up @@ -86,6 +86,7 @@
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.envoy.service.discovery.v3.Resource;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
Expand Down Expand Up @@ -261,6 +262,13 @@ protected void sendResponse(List<String> clusters, long loadReportIntervalNano)

private static class MessageFactoryV3 extends MessageFactory {

@Override
protected Any buildWrappedResource(Any originalResource) {
return Any.pack(Resource.newBuilder()
.setResource(originalResource)
.build());
}

@SuppressWarnings("unchecked")
@Override
protected Message buildListenerWithApiListener(
Expand Down

0 comments on commit 2477548

Please sign in to comment.