Skip to content

Commit

Permalink
xds: fix the new server API for ServerXdsClient (#7666)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjaypujare committed Nov 26, 2020
1 parent 192614b commit 3811ef3
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 105 deletions.
31 changes: 17 additions & 14 deletions xds/src/main/java/io/grpc/xds/ServerXdsClient.java
Expand Up @@ -58,7 +58,7 @@ final class ServerXdsClient extends AbstractXdsClient {
private int listenerPort = -1;
private final boolean newServerApi;
@Nullable private final String instanceIp;
private final String grpcServerResourceId;
private String grpcServerResourceId;
@Nullable
private ScheduledHandle ldsRespTimer;

Expand All @@ -78,6 +78,13 @@ void watchListenerData(final int port, final ListenerWatcher watcher) {
listenerWatcher = checkNotNull(watcher, "watcher");
checkArgument(port > 0, "port needs to be > 0");
listenerPort = port;
if (newServerApi) {
String listeningAddress = instanceIp + ":" + listenerPort;
grpcServerResourceId =
grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress;
} else {
grpcServerResourceId = ":" + listenerPort;
}
getSyncContext().execute(new Runnable() {
@Override
public void run() {
Expand All @@ -90,7 +97,7 @@ public void run() {
ldsRespTimer =
getSyncContext()
.schedule(
new ListenerResourceFetchTimeoutTask(":" + port),
new ListenerResourceFetchTimeoutTask(grpcServerResourceId),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, getTimeService());
}
}
Expand All @@ -101,10 +108,7 @@ public void run() {
@Override
Collection<String> getSubscribedResources(ResourceType type) {
if (newServerApi) {
String listeningAddress = instanceIp + ":" + listenerPort;
String resourceName =
grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress;
return ImmutableList.<String>of(resourceName);
return ImmutableList.<String>of(grpcServerResourceId);
} else {
return Collections.emptyList();
}
Expand Down Expand Up @@ -161,7 +165,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
}
} else {
if (ldsRespTimer == null) {
listenerWatcher.onResourceDoesNotExist(":" + listenerPort);
listenerWatcher.onResourceDoesNotExist(grpcServerResourceId);
}
}
ackResponse(ResourceType.LDS, versionInfo, nonce);
Expand All @@ -172,17 +176,16 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String

private boolean isRequestedListener(Listener listener) {
if (newServerApi) {
return "TRAFFICDIRECTOR_INBOUND_LISTENER".equals(listener.getName())
return grpcServerResourceId.equals(listener.getName())
&& listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
&& hasMatchingFilter(listener.getFilterChainsList());
&& isAddressMatching(listener.getAddress(), listenerPort);
}
return isAddressMatching(listener.getAddress())
return isAddressMatching(listener.getAddress(), 15001)
&& hasMatchingFilter(listener.getFilterChainsList());
}

private boolean isAddressMatching(Address address) {
return newServerApi || (address.hasSocketAddress()
&& (address.getSocketAddress().getPortValue() == 15001));
private boolean isAddressMatching(Address address, int portToMatch) {
return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch);
}

private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
Expand Down Expand Up @@ -211,7 +214,7 @@ protected void handleStreamRestarted() {
ldsRespTimer =
getSyncContext()
.schedule(
new ListenerResourceFetchTimeoutTask(":" + listenerPort),
new ListenerResourceFetchTimeoutTask(grpcServerResourceId),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, getTimeService());
}
}
Expand Down
Expand Up @@ -76,6 +76,7 @@ public final class XdsClientWrapperForServerSds {
private final int port;
private ScheduledExecutorService timeService;
private XdsClient.ListenerWatcher listenerWatcher;
private boolean newServerApi;
@VisibleForTesting final Set<ServerWatcher> serverWatchers = new HashSet<>();

/**
Expand Down Expand Up @@ -109,6 +110,7 @@ public void createXdsClientAndStart() throws IOException {
}
Node node = bootstrapInfo.getNode();
timeService = SharedResourceHolder.get(timeServiceResource);
newServerApi = channel.isUseProtocolV3() && experimentalNewServerApiEnvVar;
XdsClient xdsClientImpl =
new ServerXdsClient(
channel,
Expand Down Expand Up @@ -180,7 +182,8 @@ private DownstreamTlsContext getDownstreamTlsContext(InetSocketAddress localInet
FilterChainComparator comparator = new FilterChainComparator(localInetAddr);
FilterChain bestMatch =
filterChains.isEmpty() ? null : Collections.max(filterChains, comparator);
if (bestMatch != null && comparator.isMatching(bestMatch.getFilterChainMatch())) {
if (bestMatch != null
&& (newServerApi || comparator.isMatching(bestMatch.getFilterChainMatch()))) {
return bestMatch.getDownstreamTlsContext();
}
}
Expand Down
173 changes: 83 additions & 90 deletions xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
Expand Up @@ -222,7 +222,6 @@ public void ldsResponse_nonMatchingFilterChain_notFoundError() {
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();

// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(eq(XdsClientTestHelper.buildDiscoveryRequest(NODE, "",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
Expand Down Expand Up @@ -262,7 +261,8 @@ public void ldsResponse_nonMatchingFilterChain_notFoundError() {
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher)
.onResourceDoesNotExist("test/value?udpa.resource.listening_address=192.168.3.7:" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}

Expand All @@ -273,7 +273,6 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();

// Client sends an LDS request with null in lds resource name
verify(requestObserver)
.onNext(
eq(
Expand All @@ -286,27 +285,31 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer
"")));
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);

final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch("managed-mtls"),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListener("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
filterChainOutbound,
filterChainInbound
)));
List<Any> listeners =
ImmutableList.of(
Any.pack(
buildListener(
"bar.googleapis.com",
Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration(
"route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(
buildListenerWithFilterChain(
"test/value?udpa.resource.listening_address=192.168.3.7:7000",
7000,
"0.0.0.0",
filterChainInbound)));
DiscoveryResponse response =
buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
responseObserver.onNext(response);
Expand All @@ -327,20 +330,12 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer
verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
EnvoyServerProtoData.Listener listener = configUpdate.getListener();
assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
assertThat(listener.getAddress()).isEqualTo("0.0.0.0:15001");
assertThat(listener.getFilterChains()).hasSize(2);
EnvoyServerProtoData.FilterChain filterChainOutboundInListenerUpdate
= listener.getFilterChains().get(0);
assertThat(filterChainOutboundInListenerUpdate.getFilterChainMatch().getDestinationPort())
.isEqualTo(8000);
assertThat(listener.getName())
.isEqualTo("test/value?udpa.resource.listening_address=192.168.3.7:7000");
assertThat(listener.getAddress()).isEqualTo("0.0.0.0:7000");
assertThat(listener.getFilterChains()).hasSize(1);
EnvoyServerProtoData.FilterChain filterChainInboundInListenerUpdate
= listener.getFilterChains().get(1);
EnvoyServerProtoData.FilterChainMatch inBoundfilterChainMatch =
filterChainInboundInListenerUpdate.getFilterChainMatch();
assertThat(inBoundfilterChainMatch.getDestinationPort()).isEqualTo(PORT);
assertThat(inBoundfilterChainMatch.getPrefixRanges()).containsExactly(
new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32));
= listener.getFilterChains().get(0);
CommonTlsContext downstreamCommonTlsContext =
filterChainInboundInListenerUpdate.getDownstreamTlsContext().getCommonTlsContext();
assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName())
Expand All @@ -357,32 +352,23 @@ public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBuffer
/** Client receives LDS responses for updating Listener previously received. */
@SuppressWarnings("unchecked")
@Test
public void notifyUpdatedListener() throws InvalidProtocolBufferException {
public void notifyUpdatedListener() {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();

final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch("managed-mtls"),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListener("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
filterChainOutbound,
filterChainInbound
)));
List<Any> listeners =
ImmutableList.of(
Any.pack(
buildListenerWithFilterChain(
"test/value?udpa.resource.listening_address=192.168.3.7:7000",
7000,
"0.0.0.0",
filterChainInbound)));
DiscoveryResponse response =
buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
responseObserver.onNext(response);
Expand All @@ -392,17 +378,20 @@ public void notifyUpdatedListener() throws InvalidProtocolBufferException {

reset(requestObserver);
// Management server sends another LDS response with updates for Listener.
final FilterChain filterChainNewInbound = buildFilterChain(buildFilterChainMatch(PORT,
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default1",
"ROOTCA2"),
buildTestFilter("envoy.http_connection_manager"));
final FilterChain filterChainNewInbound =
buildFilterChain(
buildFilterChainMatch("managed-mtls"),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext(
"google-sds-config-default1", "ROOTCA2"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners1 =
ImmutableList.of(
Any.pack(
buildListenerWithFilterChain(
LISTENER_NAME, 15001, "0.0.0.0", filterChainNewInbound)));
"test/value?udpa.resource.listening_address=192.168.3.7:7000",
7000,
"0.0.0.0",
filterChainNewInbound)));
DiscoveryResponse response1 =
buildDiscoveryResponse("1", listeners1, ResourceType.LDS.typeUrl(), "0001");
responseObserver.onNext(response1);
Expand All @@ -424,14 +413,11 @@ public void notifyUpdatedListener() throws InvalidProtocolBufferException {
verify(listenerWatcher, times(2)).onListenerChanged(listenerUpdateCaptor.capture());
ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
EnvoyServerProtoData.Listener listener = configUpdate.getListener();
assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
assertThat(listener.getName())
.isEqualTo("test/value?udpa.resource.listening_address=192.168.3.7:7000");
assertThat(listener.getFilterChains()).hasSize(1);
EnvoyServerProtoData.FilterChain filterChain =
Iterables.getOnlyElement(listener.getFilterChains());
EnvoyServerProtoData.FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch();
assertThat(filterChainMatch.getDestinationPort()).isEqualTo(PORT);
assertThat(filterChainMatch.getPrefixRanges()).containsExactly(
new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32));
CommonTlsContext downstreamCommonTlsContext =
filterChain.getDownstreamTlsContext().getCommonTlsContext();
assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName())
Expand All @@ -444,45 +430,48 @@ public void notifyUpdatedListener() throws InvalidProtocolBufferException {
.isEqualTo("ROOTCA2");
}

/** Client receives LDS response containing non-matching port in the filterMatch. */
/** Client receives LDS response containing non-matching port. */
@Test
public void ldsResponse_nonMatchingPort() {
xdsClient.watchListenerData(PORT, listenerWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
requestObservers.poll();

final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(
PORT + 1, // add 1 to mismatch
CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
.setPrefixLen(UInt32Value.of(32)).build()),
final FilterChain filterChainInbound =
buildFilterChain(buildFilterChainMatch("managed-mtls"), null);

CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
"ROOTCA"),
buildTestFilter("envoy.http_connection_manager"));
List<Any> listeners = ImmutableList.of(
Any.pack(buildListener("bar.googleapis.com",
Any.pack(HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration("route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
filterChainInbound,
filterChainOutbound
)));
List<Any> listeners =
ImmutableList.of(
Any.pack(
buildListener(
"bar.googleapis.com",
Any.pack(
HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration(
"route-bar.googleapis.com",
ImmutableList.of(
buildVirtualHost(
ImmutableList.of("bar.googleapis.com"),
"cluster-bar.googleapis.com"))))
.build()))),
Any.pack(
buildListenerWithFilterChain(
"test/value?udpa.resource.listening_address=192.168.3.7:7000",
PORT + 1,
"0.0.0.0",
filterChainInbound)));
DiscoveryResponse response =
buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
responseObserver.onNext(response);

verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher, never())
.onResourceDoesNotExist("test/value?udpa.resource.listening_address=192.168.3.7:" + PORT);
verify(listenerWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
verify(listenerWatcher)
.onResourceDoesNotExist("test/value?udpa.resource.listening_address=192.168.3.7:" + PORT);
assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}

Expand Down Expand Up @@ -670,6 +659,10 @@ static FilterChainMatch buildFilterChainMatch(int destPort, CidrRange...prefixRa
.build();
}

static FilterChainMatch buildFilterChainMatch(String...values) {
return FilterChainMatch.newBuilder().addAllApplicationProtocols(Arrays.asList(values)).build();
}

static Filter buildTestFilter(String name) {
return
Filter.newBuilder()
Expand Down

0 comments on commit 3811ef3

Please sign in to comment.