diff --git a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java index 5d6fe798ec3..2f27d62ff1e 100644 --- a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java @@ -58,7 +58,7 @@ final class ServerXdsClient extends AbstractXdsClient { private int listenerPort = -1; private final boolean newServerApi; @Nullable private final String instanceIp; - private final String grpcServerResourceId; + private String grpcServerResourceId; @Nullable private ScheduledHandle ldsRespTimer; @@ -78,6 +78,13 @@ void watchListenerData(final int port, final ListenerWatcher watcher) { listenerWatcher = checkNotNull(watcher, "watcher"); checkArgument(port > 0, "port needs to be > 0"); listenerPort = port; + if (newServerApi) { + String listeningAddress = instanceIp + ":" + listenerPort; + grpcServerResourceId = + grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress; + } else { + grpcServerResourceId = ":" + listenerPort; + } getSyncContext().execute(new Runnable() { @Override public void run() { @@ -90,7 +97,7 @@ public void run() { ldsRespTimer = getSyncContext() .schedule( - new ListenerResourceFetchTimeoutTask(":" + port), + new ListenerResourceFetchTimeoutTask(grpcServerResourceId), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, getTimeService()); } } @@ -101,10 +108,7 @@ public void run() { @Override Collection getSubscribedResources(ResourceType type) { if (newServerApi) { - String listeningAddress = instanceIp + ":" + listenerPort; - String resourceName = - grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress; - return ImmutableList.of(resourceName); + return ImmutableList.of(grpcServerResourceId); } else { return Collections.emptyList(); } @@ -161,7 +165,7 @@ protected void handleLdsResponse(String versionInfo, List resources, String } } else { if (ldsRespTimer == null) { - listenerWatcher.onResourceDoesNotExist(":" + listenerPort); + listenerWatcher.onResourceDoesNotExist(grpcServerResourceId); } } ackResponse(ResourceType.LDS, versionInfo, nonce); @@ -172,17 +176,16 @@ protected void handleLdsResponse(String versionInfo, List resources, String private boolean isRequestedListener(Listener listener) { if (newServerApi) { - return "TRAFFICDIRECTOR_INBOUND_LISTENER".equals(listener.getName()) + return grpcServerResourceId.equals(listener.getName()) && listener.getTrafficDirection().equals(TrafficDirection.INBOUND) - && hasMatchingFilter(listener.getFilterChainsList()); + && isAddressMatching(listener.getAddress(), listenerPort); } - return isAddressMatching(listener.getAddress()) + return isAddressMatching(listener.getAddress(), 15001) && hasMatchingFilter(listener.getFilterChainsList()); } - private boolean isAddressMatching(Address address) { - return newServerApi || (address.hasSocketAddress() - && (address.getSocketAddress().getPortValue() == 15001)); + private boolean isAddressMatching(Address address, int portToMatch) { + return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch); } private boolean hasMatchingFilter(List filterChainsList) { @@ -211,7 +214,7 @@ protected void handleStreamRestarted() { ldsRespTimer = getSyncContext() .schedule( - new ListenerResourceFetchTimeoutTask(":" + listenerPort), + new ListenerResourceFetchTimeoutTask(grpcServerResourceId), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, getTimeService()); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java index 6a03a02fa25..80a514a943d 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java @@ -76,6 +76,7 @@ public final class XdsClientWrapperForServerSds { private final int port; private ScheduledExecutorService timeService; private XdsClient.ListenerWatcher listenerWatcher; + private boolean newServerApi; @VisibleForTesting final Set serverWatchers = new HashSet<>(); /** @@ -109,6 +110,7 @@ public void createXdsClientAndStart() throws IOException { } Node node = bootstrapInfo.getNode(); timeService = SharedResourceHolder.get(timeServiceResource); + newServerApi = channel.isUseProtocolV3() && experimentalNewServerApiEnvVar; XdsClient xdsClientImpl = new ServerXdsClient( channel, @@ -180,7 +182,8 @@ private DownstreamTlsContext getDownstreamTlsContext(InetSocketAddress localInet FilterChainComparator comparator = new FilterChainComparator(localInetAddr); FilterChain bestMatch = filterChains.isEmpty() ? null : Collections.max(filterChains, comparator); - if (bestMatch != null && comparator.isMatching(bestMatch.getFilterChainMatch())) { + if (bestMatch != null + && (newServerApi || comparator.isMatching(bestMatch.getFilterChainMatch()))) { return bestMatch.getDownstreamTlsContext(); } } diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java index 8f353e58f82..37b4adb2a79 100644 --- a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java +++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java @@ -222,7 +222,6 @@ public void ldsResponse_nonMatchingFilterChain_notFoundError() { StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); - // Client sends an LDS request with null in lds resource name verify(requestObserver) .onNext(eq(XdsClientTestHelper.buildDiscoveryRequest(NODE, "", ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"), @@ -262,7 +261,8 @@ public void ldsResponse_nonMatchingFilterChain_notFoundError() { verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT); verify(listenerWatcher, never()).onError(any(Status.class)); fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(listenerWatcher).onResourceDoesNotExist(":" + PORT); + verify(listenerWatcher) + .onResourceDoesNotExist("test/value?udpa.resource.listening_address=192.168.3.7:" + PORT); assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -273,7 +273,6 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); - // Client sends an LDS request with null in lds resource name verify(requestObserver) .onNext( eq( @@ -286,27 +285,31 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer ""))); assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); - final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); - final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, - CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) - .setPrefixLen(UInt32Value.of(32)).build()), + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch("managed-mtls"), CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", "ROOTCA"), buildTestFilter("envoy.http_connection_manager")); - List listeners = ImmutableList.of( - Any.pack(buildListener("bar.googleapis.com", - Any.pack(HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfiguration("route-bar.googleapis.com", - ImmutableList.of( - buildVirtualHost( - ImmutableList.of("bar.googleapis.com"), - "cluster-bar.googleapis.com")))) - .build()))), - Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0", - filterChainOutbound, - filterChainInbound - ))); + List listeners = + ImmutableList.of( + Any.pack( + buildListener( + "bar.googleapis.com", + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration( + "route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack( + buildListenerWithFilterChain( + "test/value?udpa.resource.listening_address=192.168.3.7:7000", + 7000, + "0.0.0.0", + filterChainInbound))); DiscoveryResponse response = buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); responseObserver.onNext(response); @@ -327,20 +330,12 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture()); ListenerUpdate configUpdate = listenerUpdateCaptor.getValue(); EnvoyServerProtoData.Listener listener = configUpdate.getListener(); - assertThat(listener.getName()).isEqualTo(LISTENER_NAME); - assertThat(listener.getAddress()).isEqualTo("0.0.0.0:15001"); - assertThat(listener.getFilterChains()).hasSize(2); - EnvoyServerProtoData.FilterChain filterChainOutboundInListenerUpdate - = listener.getFilterChains().get(0); - assertThat(filterChainOutboundInListenerUpdate.getFilterChainMatch().getDestinationPort()) - .isEqualTo(8000); + assertThat(listener.getName()) + .isEqualTo("test/value?udpa.resource.listening_address=192.168.3.7:7000"); + assertThat(listener.getAddress()).isEqualTo("0.0.0.0:7000"); + assertThat(listener.getFilterChains()).hasSize(1); EnvoyServerProtoData.FilterChain filterChainInboundInListenerUpdate - = listener.getFilterChains().get(1); - EnvoyServerProtoData.FilterChainMatch inBoundfilterChainMatch = - filterChainInboundInListenerUpdate.getFilterChainMatch(); - assertThat(inBoundfilterChainMatch.getDestinationPort()).isEqualTo(PORT); - assertThat(inBoundfilterChainMatch.getPrefixRanges()).containsExactly( - new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32)); + = listener.getFilterChains().get(0); CommonTlsContext downstreamCommonTlsContext = filterChainInboundInListenerUpdate.getDownstreamTlsContext().getCommonTlsContext(); assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName()) @@ -357,32 +352,23 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer /** Client receives LDS responses for updating Listener previously received. */ @SuppressWarnings("unchecked") @Test - public void notifyUpdatedListener() throws InvalidProtocolBufferException { + public void notifyUpdatedListener() { xdsClient.watchListenerData(PORT, listenerWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); - final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); - final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, - CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) - .setPrefixLen(UInt32Value.of(32)).build()), + final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch("managed-mtls"), CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", "ROOTCA"), buildTestFilter("envoy.http_connection_manager")); - List listeners = ImmutableList.of( - Any.pack(buildListener("bar.googleapis.com", - Any.pack(HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfiguration("route-bar.googleapis.com", - ImmutableList.of( - buildVirtualHost( - ImmutableList.of("bar.googleapis.com"), - "cluster-bar.googleapis.com")))) - .build()))), - Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0", - filterChainOutbound, - filterChainInbound - ))); + List listeners = + ImmutableList.of( + Any.pack( + buildListenerWithFilterChain( + "test/value?udpa.resource.listening_address=192.168.3.7:7000", + 7000, + "0.0.0.0", + filterChainInbound))); DiscoveryResponse response = buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); responseObserver.onNext(response); @@ -392,17 +378,20 @@ public void notifyUpdatedListener() throws InvalidProtocolBufferException { reset(requestObserver); // Management server sends another LDS response with updates for Listener. - final FilterChain filterChainNewInbound = buildFilterChain(buildFilterChainMatch(PORT, - CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) - .setPrefixLen(UInt32Value.of(32)).build()), - CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default1", - "ROOTCA2"), - buildTestFilter("envoy.http_connection_manager")); + final FilterChain filterChainNewInbound = + buildFilterChain( + buildFilterChainMatch("managed-mtls"), + CommonTlsContextTestsUtil.buildTestDownstreamTlsContext( + "google-sds-config-default1", "ROOTCA2"), + buildTestFilter("envoy.http_connection_manager")); List listeners1 = ImmutableList.of( Any.pack( buildListenerWithFilterChain( - LISTENER_NAME, 15001, "0.0.0.0", filterChainNewInbound))); + "test/value?udpa.resource.listening_address=192.168.3.7:7000", + 7000, + "0.0.0.0", + filterChainNewInbound))); DiscoveryResponse response1 = buildDiscoveryResponse("1", listeners1, ResourceType.LDS.typeUrl(), "0001"); responseObserver.onNext(response1); @@ -424,14 +413,11 @@ public void notifyUpdatedListener() throws InvalidProtocolBufferException { verify(listenerWatcher, times(2)).onListenerChanged(listenerUpdateCaptor.capture()); ListenerUpdate configUpdate = listenerUpdateCaptor.getValue(); EnvoyServerProtoData.Listener listener = configUpdate.getListener(); - assertThat(listener.getName()).isEqualTo(LISTENER_NAME); + assertThat(listener.getName()) + .isEqualTo("test/value?udpa.resource.listening_address=192.168.3.7:7000"); assertThat(listener.getFilterChains()).hasSize(1); EnvoyServerProtoData.FilterChain filterChain = Iterables.getOnlyElement(listener.getFilterChains()); - EnvoyServerProtoData.FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch(); - assertThat(filterChainMatch.getDestinationPort()).isEqualTo(PORT); - assertThat(filterChainMatch.getPrefixRanges()).containsExactly( - new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32)); CommonTlsContext downstreamCommonTlsContext = filterChain.getDownstreamTlsContext().getCommonTlsContext(); assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName()) @@ -444,45 +430,48 @@ public void notifyUpdatedListener() throws InvalidProtocolBufferException { .isEqualTo("ROOTCA2"); } - /** Client receives LDS response containing non-matching port in the filterMatch. */ + /** Client receives LDS response containing non-matching port. */ @Test public void ldsResponse_nonMatchingPort() { xdsClient.watchListenerData(PORT, listenerWatcher); StreamObserver responseObserver = responseObservers.poll(); requestObservers.poll(); - final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null); - final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch( - PORT + 1, // add 1 to mismatch - CidrRange.newBuilder().setAddressPrefix(LOCAL_IP) - .setPrefixLen(UInt32Value.of(32)).build()), + final FilterChain filterChainInbound = + buildFilterChain(buildFilterChainMatch("managed-mtls"), null); - CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default", - "ROOTCA"), - buildTestFilter("envoy.http_connection_manager")); - List listeners = ImmutableList.of( - Any.pack(buildListener("bar.googleapis.com", - Any.pack(HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfiguration("route-bar.googleapis.com", - ImmutableList.of( - buildVirtualHost( - ImmutableList.of("bar.googleapis.com"), - "cluster-bar.googleapis.com")))) - .build()))), - Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0", - filterChainInbound, - filterChainOutbound - ))); + List listeners = + ImmutableList.of( + Any.pack( + buildListener( + "bar.googleapis.com", + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration( + "route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack( + buildListenerWithFilterChain( + "test/value?udpa.resource.listening_address=192.168.3.7:7000", + PORT + 1, + "0.0.0.0", + filterChainInbound))); DiscoveryResponse response = buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); responseObserver.onNext(response); verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class)); - verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT); + verify(listenerWatcher, never()) + .onResourceDoesNotExist("test/value?udpa.resource.listening_address=192.168.3.7:" + PORT); verify(listenerWatcher, never()).onError(any(Status.class)); fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(listenerWatcher).onResourceDoesNotExist(":" + PORT); + verify(listenerWatcher) + .onResourceDoesNotExist("test/value?udpa.resource.listening_address=192.168.3.7:" + PORT); assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -670,6 +659,10 @@ static FilterChainMatch buildFilterChainMatch(int destPort, CidrRange...prefixRa .build(); } + static FilterChainMatch buildFilterChainMatch(String...values) { + return FilterChainMatch.newBuilder().addAllApplicationProtocols(Arrays.asList(values)).build(); + } + static Filter buildTestFilter(String name) { return Filter.newBuilder()