diff --git a/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java index 4796f5e0361..47d85244c16 100644 --- a/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java +++ b/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java @@ -62,6 +62,8 @@ class BootstrapperImpl extends Bootstrapper { static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING = "envoy.lb.does_not_support_overprovisioning"; @VisibleForTesting + static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw"; + @VisibleForTesting static boolean enableFederation = !Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION")) && Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION")); @@ -177,6 +179,7 @@ BootstrapInfo bootstrap(Map rawData) throws XdsInitializationExceptio nodeBuilder.setUserAgentName(buildVersion.getUserAgent()); nodeBuilder.setUserAgentVersion(buildVersion.getImplementationVersion()); nodeBuilder.addClientFeatures(CLIENT_FEATURE_DISABLE_OVERPROVISIONING); + nodeBuilder.addClientFeatures(CLIENT_FEATURE_RESOURCE_IN_SOTW); builder.node(nodeBuilder.build()); Map certProvidersBlob = JsonUtil.getObject(rawData, "certificate_providers"); diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 95b3bbfcfdf..41e80f2fe3d 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -62,6 +62,7 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext; +import io.envoyproxy.envoy.service.discovery.v3.Resource; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.grpc.ChannelCredentials; @@ -188,6 +189,9 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res "type.googleapis.com/xds.type.v3.TypedStruct"; private static final String TYPE_URL_FILTER_CONFIG = "type.googleapis.com/envoy.config.route.v3.FilterConfig"; + private static final String TYPE_URL_RESOURCE_V2 = "type.googleapis.com/envoy.api.v2.Resource"; + private static final String TYPE_URL_RESOURCE_V3 = + "type.googleapis.com/envoy.service.discovery.v3.Resource"; // TODO(zdapeng): need to discuss how to handle unsupported values. private static final Set SUPPORTED_RETRYABLE_CODES = Collections.unmodifiableSet(EnumSet.of( @@ -274,6 +278,17 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { serverLrsClientMap.put(serverInfo, lrsClient); } + private Any maybeUnwrapResources(Any resource) + throws InvalidProtocolBufferException { + if (resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V2) + || resource.getTypeUrl().equals(TYPE_URL_RESOURCE_V3)) { + return unpackCompatibleType(resource, Resource.class, TYPE_URL_RESOURCE_V3, + TYPE_URL_RESOURCE_V2).getResource(); + } else { + return resource; + } + } + @Override public void handleLdsResponse( ServerInfo serverInfo, String versionInfo, List resources, String nonce) { @@ -287,10 +302,12 @@ public void handleLdsResponse( for (int i = 0; i < resources.size(); i++) { Any resource = resources.get(i); - // Unpack the Listener. - boolean isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl()); + boolean isResourceV3; Listener listener; try { + resource = maybeUnwrapResources(resource); + // Unpack the Listener. + isResourceV3 = resource.getTypeUrl().equals(ResourceType.LDS.typeUrl()); listener = unpackCompatibleType(resource, Listener.class, ResourceType.LDS.typeUrl(), ResourceType.LDS.typeUrlV2()); } catch (InvalidProtocolBufferException e) { @@ -1424,6 +1441,7 @@ public void handleRdsResponse( // Unpack the RouteConfiguration. RouteConfiguration routeConfig; try { + resource = maybeUnwrapResources(resource); routeConfig = unpackCompatibleType(resource, RouteConfiguration.class, ResourceType.RDS.typeUrl(), ResourceType.RDS.typeUrlV2()); } catch (InvalidProtocolBufferException e) { @@ -1552,6 +1570,7 @@ public void handleCdsResponse( // Unpack the Cluster. Cluster cluster; try { + resource = maybeUnwrapResources(resource); cluster = unpackCompatibleType( resource, Cluster.class, ResourceType.CDS.typeUrl(), ResourceType.CDS.typeUrlV2()); } catch (InvalidProtocolBufferException e) { @@ -1801,6 +1820,7 @@ public void handleEdsResponse( // Unpack the ClusterLoadAssignment. ClusterLoadAssignment assignment; try { + resource = maybeUnwrapResources(resource); assignment = unpackCompatibleType(resource, ClusterLoadAssignment.class, ResourceType.EDS.typeUrl(), ResourceType.EDS.typeUrlV2()); diff --git a/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java b/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java index 53b52a7bc02..f3a1af3a1a1 100644 --- a/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java +++ b/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java @@ -827,6 +827,7 @@ private static Node.Builder getNodeBuilder() { .setBuildVersion(buildVersion.toString()) .setUserAgentName(buildVersion.getUserAgent()) .setUserAgentVersion(buildVersion.getImplementationVersion()) - .addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING); + .addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_DISABLE_OVERPROVISIONING) + .addClientFeatures(BootstrapperImpl.CLIENT_FEATURE_RESOURCE_IN_SOTW); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index ba182be76d1..5cedcff17fe 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -686,6 +686,21 @@ public void ldsResourceFound_containsVirtualHosts() { verifySubscribedResourcesMetadataSizes(1, 0, 0, 0); } + @Test + public void wrappedLdsResource() { + DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher); + + // Client sends an ACK LDS request. + call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0000"); + call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts()) + .hasSize(VHOST_SIZE); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT); + verifySubscribedResourcesMetadataSizes(1, 0, 0, 0); + } + @Test public void ldsResourceFound_containsRdsName() { DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher); @@ -1165,6 +1180,20 @@ public void rdsResourceFound() { verifySubscribedResourcesMetadataSizes(0, 0, 1, 0); } + @Test + public void wrappedRdsResource() { + DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher); + call.sendResponse(RDS, mf.buildWrappedResource(testRouteConfig), VERSION_1, "0000"); + + // Client sends an ACK RDS request. + call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE); + verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE); + assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT); + verifySubscribedResourcesMetadataSizes(0, 0, 1, 0); + } + @Test public void cachedRdsResource_data() { DiscoveryRpcCall call = startResourceWatcher(RDS, RDS_RESOURCE, rdsResourceWatcher); @@ -1596,6 +1625,28 @@ public void cdsResourceFound() { verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); } + @Test + public void wrappedCdsResource() { + DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); + call.sendResponse(CDS, mf.buildWrappedResource(testClusterRoundRobin), VERSION_1, "0000"); + + // Client sent an ACK CDS request. + call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(cdsUpdate.lrsServerInfo()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); + assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, + TIME_INCREMENT); + verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); + } + @Test public void cdsResourceFound_leastRequestLbPolicy() { DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); @@ -2141,6 +2192,20 @@ public void edsResourceFound() { verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); } + @Test + public void wrappedEdsResourceFound() { + DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher); + call.sendResponse(EDS, mf.buildWrappedResource(testClusterLoadAssignment), VERSION_1, "0000"); + + // Client sent an ACK EDS request. + call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + validateTestClusterLoadAssigment(edsUpdateCaptor.getValue()); + verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, + TIME_INCREMENT); + verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); + } + @Test public void cachedEdsResource_data() { DiscoveryRpcCall call = startResourceWatcher(EDS, EDS_RESOURCE, edsResourceWatcher); @@ -2789,6 +2854,8 @@ protected abstract static class MessageFactory { /** Throws {@link InvalidProtocolBufferException} on {@link Any#unpack(Class)}. */ protected static final Any FAILING_ANY = Any.newBuilder().setTypeUrl("fake").build(); + protected abstract Any buildWrappedResource(Any originalResource); + protected Message buildListenerWithApiListener(String name, Message routeConfiguration) { return buildListenerWithApiListener( name, routeConfiguration, Collections.emptyList()); diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java index 29c7fdc4c01..b47193e35eb 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java @@ -45,6 +45,7 @@ import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.Listener; +import io.envoyproxy.envoy.api.v2.Resource; import io.envoyproxy.envoy.api.v2.RouteConfiguration; import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext; import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig; @@ -253,6 +254,13 @@ protected void sendResponse(List clusters, long loadReportIntervalNano) private static class MessageFactoryV2 extends MessageFactory { + @Override + protected Any buildWrappedResource(Any originalResource) { + return Any.pack(Resource.newBuilder() + .setResource(originalResource) + .build()); + } + @SuppressWarnings("unchecked") @Override protected Message buildListenerWithApiListener( diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java index 6a75d9ab068..432cadbb5da 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java @@ -86,6 +86,7 @@ import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.discovery.v3.Resource; import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; @@ -261,6 +262,13 @@ protected void sendResponse(List clusters, long loadReportIntervalNano) private static class MessageFactoryV3 extends MessageFactory { + @Override + protected Any buildWrappedResource(Any originalResource) { + return Any.pack(Resource.newBuilder() + .setResource(originalResource) + .build()); + } + @SuppressWarnings("unchecked") @Override protected Message buildListenerWithApiListener(