Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: accept resources wrapped in a Resource message #8997

Merged
merged 3 commits into from Mar 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
Expand Up @@ -62,6 +62,8 @@ class BootstrapperImpl extends Bootstrapper {
static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
"envoy.lb.does_not_support_overprovisioning";
@VisibleForTesting
static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
@VisibleForTesting
static boolean enableFederation =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
Expand Down Expand Up @@ -177,6 +179,7 @@ BootstrapInfo bootstrap(Map<String, ?> rawData) throws XdsInitializationExceptio
nodeBuilder.setUserAgentName(buildVersion.getUserAgent());
nodeBuilder.setUserAgentVersion(buildVersion.getImplementationVersion());
nodeBuilder.addClientFeatures(CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
nodeBuilder.addClientFeatures(CLIENT_FEATURE_RESOURCE_IN_SOTW);
builder.node(nodeBuilder.build());

Map<String, ?> certProvidersBlob = JsonUtil.getObject(rawData, "certificate_providers");
Expand Down
24 changes: 22 additions & 2 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -62,6 +62,7 @@
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
import io.envoyproxy.envoy.service.discovery.v3.Resource;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.grpc.ChannelCredentials;
Expand Down Expand Up @@ -188,6 +189,9 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
"type.googleapis.com/xds.type.v3.TypedStruct";
private static final String TYPE_URL_FILTER_CONFIG =
"type.googleapis.com/envoy.config.route.v3.FilterConfig";
private static final String TYPE_URL_RESOURCE_V2 = "type.googleapis.com/envoy.api.v2.Resource";
private static final String TYPE_URL_RESOURCE_V3 =
"type.googleapis.com/envoy.service.discovery.v3.Resource";
// TODO(zdapeng): need to discuss how to handle unsupported values.
private static final Set<Code> SUPPORTED_RETRYABLE_CODES =
Collections.unmodifiableSet(EnumSet.of(
Expand Down Expand Up @@ -274,6 +278,17 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
serverLrsClientMap.put(serverInfo, lrsClient);
}

private Any maybeUnwrapResources(Any resource)
throws InvalidProtocolBufferException {
if (resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V2)
|| resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V3)) {
return unpackCompatibleType(resource, Resource.class, TYPE_URL_RESOURCE_V3,
TYPE_URL_RESOURCE_V2).getResource();
} else {
return resource;
}
}

@Override
public void handleLdsResponse(
ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce) {
Expand All @@ -287,10 +302,12 @@ public void handleLdsResponse(
for (int i = 0; i < resources.size(); i++) {
Any resource = resources.get(i);

// Unpack the Listener.
boolean isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl());
boolean isResourceV3;
Listener listener;
try {
resource = maybeUnwrapResources(resource);
// Unpack the Listener.
isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl());
listener = unpackCompatibleType(resource, Listener.class, ResourceType.LDS.typeUrl(),
ResourceType.LDS.typeUrlV2());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -1424,6 +1441,7 @@ public void handleRdsResponse(
// Unpack the RouteConfiguration.
RouteConfiguration routeConfig;
try {
resource = maybeUnwrapResources(resource);
routeConfig = unpackCompatibleType(resource, RouteConfiguration.class,
ResourceType.RDS.typeUrl(), ResourceType.RDS.typeUrlV2());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -1552,6 +1570,7 @@ public void handleCdsResponse(
// Unpack the Cluster.
Cluster cluster;
try {
resource = maybeUnwrapResources(resource);
cluster = unpackCompatibleType(
resource, Cluster.class, ResourceType.CDS.typeUrl(), ResourceType.CDS.typeUrlV2());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -1801,6 +1820,7 @@ public void handleEdsResponse(
// Unpack the ClusterLoadAssignment.
ClusterLoadAssignment assignment;
try {
resource = maybeUnwrapResources(resource);
assignment =
unpackCompatibleType(resource, ClusterLoadAssignment.class, ResourceType.EDS.typeUrl(),
ResourceType.EDS.typeUrlV2());
Expand Down
3 changes: 2 additions & 1 deletion xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java
Expand Up @@ -827,6 +827,7 @@ private static Node.Builder getNodeBuilder() {
.setBuildVersion(buildVersion.toString())
.setUserAgentName(buildVersion.getUserAgent())
.setUserAgentVersion(buildVersion.getImplementationVersion())
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING)
.addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_RESOURCE_IN_SOTW);
}
}
67 changes: 67 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Expand Up @@ -686,6 +686,21 @@ public void ldsResourceFound_containsVirtualHosts() {
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
}

@Test
public void wrappedLdsResource() {
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);

// Client sends an ACK LDS request.
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
}

@Test
public void ldsResourceFound_containsRdsName() {
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
Expand Down Expand Up @@ -1165,6 +1180,20 @@ public void rdsResourceFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
}

@Test
public void wrappedRdsResource() {
DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher);
call.sendResponse(RDS, mf.buildWrappedResource(testRouteConfig), VERSION_1, "0000");

// Client sends an ACK RDS request.
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
}

@Test
public void cachedRdsResource_data() {
DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher);
Expand Down Expand Up @@ -1596,6 +1625,28 @@ public void cdsResourceFound() {
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}

@Test
public void wrappedCdsResource() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
call.sendResponse(CDS, mf.buildWrappedResource(testClusterRoundRobin), VERSION_1, "0000");

// Client sent an ACK CDS request.
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}

@Test
public void cdsResourceFound_leastRequestLbPolicy() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
Expand Down Expand Up @@ -2141,6 +2192,20 @@ public void edsResourceFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void wrappedEdsResourceFound() {
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
call.sendResponse(EDS, mf.buildWrappedResource(testClusterLoadAssignment), VERSION_1, "0000");

// Client sent an ACK EDS request.
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void cachedEdsResource_data() {
DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher);
Expand Down Expand Up @@ -2789,6 +2854,8 @@ protected abstract static class MessageFactory {
/** Throws {@link InvalidProtocolBufferException} on {@link Any#unpack(Class)}. */
protected static final Any FAILING_ANY = Any.newBuilder().setTypeUrl("fake").build();

protected abstract Any buildWrappedResource(Any originalResource);

protected Message buildListenerWithApiListener(String name, Message routeConfiguration) {
return buildListenerWithApiListener(
name, routeConfiguration, Collections.<Message>emptyList());
Expand Down
8 changes: 8 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java
Expand Up @@ -45,6 +45,7 @@
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.Listener;
import io.envoyproxy.envoy.api.v2.Resource;
import io.envoyproxy.envoy.api.v2.RouteConfiguration;
import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext;
import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig;
Expand Down Expand Up @@ -253,6 +254,13 @@ protected void sendResponse(List<String> clusters, long loadReportIntervalNano)

private static class MessageFactoryV2 extends MessageFactory {

@Override
protected Any buildWrappedResource(Any originalResource) {
return Any.pack(Resource.newBuilder()
.setResource(originalResource)
.build());
}

@SuppressWarnings("unchecked")
@Override
protected Message buildListenerWithApiListener(
Expand Down
8 changes: 8 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java
Expand Up @@ -86,6 +86,7 @@
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.envoy.service.discovery.v3.Resource;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
Expand Down Expand Up @@ -261,6 +262,13 @@ protected void sendResponse(List<String> clusters, long loadReportIntervalNano)

private static class MessageFactoryV3 extends MessageFactory {

@Override
protected Any buildWrappedResource(Any originalResource) {
return Any.pack(Resource.newBuilder()
.setResource(originalResource)
.build());
}

@SuppressWarnings("unchecked")
@Override
protected Message buildListenerWithApiListener(
Expand Down