From 821a953a57b8bfdc7632aaf3c2cb465356e4c57e Mon Sep 17 00:00:00 2001 From: sanjaypujare Date: Wed, 9 Dec 2020 18:32:18 -0800 Subject: [PATCH] xds: fix ServerXdsClient to return subscribed resources only for LDS (#7689) (#7714) --- .../java/io/grpc/xds/ServerXdsClient.java | 36 +++++++++++-------- .../xds/ServerXdsClientNewServerApiTest.java | 8 +++-- .../java/io/grpc/xds/ServerXdsClientTest.java | 7 +++- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java index 2f27d62ff1e..1d8f48b33f4 100644 --- a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java @@ -56,20 +56,25 @@ final class ServerXdsClient extends AbstractXdsClient { @Nullable private ListenerWatcher listenerWatcher; private int listenerPort = -1; - private final boolean newServerApi; + private final boolean useNewApiForListenerQuery; @Nullable private final String instanceIp; private String grpcServerResourceId; @Nullable private ScheduledHandle ldsRespTimer; - ServerXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService, - BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier, - boolean newServerApi, String instanceIp, String grpcServerResourceId) { + ServerXdsClient( + XdsChannel channel, + Node node, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier, + boolean useNewApiForListenerQuery, + String instanceIp, + String grpcServerResourceId) { super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier); - this.newServerApi = channel.isUseProtocolV3() && newServerApi; + this.useNewApiForListenerQuery = channel.isUseProtocolV3() && useNewApiForListenerQuery; this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0"); - this.grpcServerResourceId = - (grpcServerResourceId != null) ? grpcServerResourceId : "grpc/server"; + this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server"; } @Override @@ -78,7 +83,7 @@ void watchListenerData(final int port, final ListenerWatcher watcher) { listenerWatcher = checkNotNull(watcher, "watcher"); checkArgument(port > 0, "port needs to be > 0"); listenerPort = port; - if (newServerApi) { + if (useNewApiForListenerQuery) { String listeningAddress = instanceIp + ":" + listenerPort; grpcServerResourceId = grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress; @@ -89,7 +94,7 @@ void watchListenerData(final int port, final ListenerWatcher watcher) { @Override public void run() { getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port); - if (!newServerApi) { + if (!useNewApiForListenerQuery) { updateNodeMetadataForListenerRequest(port); } adjustResourceSubscription(ResourceType.LDS); @@ -107,7 +112,10 @@ public void run() { @Nullable @Override Collection getSubscribedResources(ResourceType type) { - if (newServerApi) { + if (type != ResourceType.LDS) { + return null; + } + if (useNewApiForListenerQuery) { return ImmutableList.of(grpcServerResourceId); } else { return Collections.emptyList(); @@ -175,17 +183,17 @@ protected void handleLdsResponse(String versionInfo, List resources, String } private boolean isRequestedListener(Listener listener) { - if (newServerApi) { + if (useNewApiForListenerQuery) { return grpcServerResourceId.equals(listener.getName()) - && listener.getTrafficDirection().equals(TrafficDirection.INBOUND) - && isAddressMatching(listener.getAddress(), listenerPort); + && listener.getTrafficDirection().equals(TrafficDirection.INBOUND) + && isAddressMatching(listener.getAddress(), listenerPort); } return isAddressMatching(listener.getAddress(), 15001) && hasMatchingFilter(listener.getFilterChainsList()); } private boolean isAddressMatching(Address address, int portToMatch) { - return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch); + return address.hasSocketAddress() && address.getSocketAddress().getPortValue() == portToMatch; } private boolean hasMatchingFilter(List filterChainsList) { diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java index 37b4adb2a79..7534153b99c 100644 --- a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java +++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java @@ -141,7 +141,7 @@ public boolean shouldAccept(Runnable command) { private ListenerWatcher listenerWatcher; private ManagedChannel channel; - private XdsClient xdsClient; + private ServerXdsClient xdsClient; @Before public void setUp() throws IOException { @@ -531,6 +531,7 @@ public void streamClosedAndRetry() { .onNext(eq(buildDiscoveryRequest(NODE, "0", ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"), ResourceType.LDS.typeUrl(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server becomes unreachable. responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -551,6 +552,7 @@ public void streamClosedAndRetry() { .onNext(eq(buildDiscoveryRequest(NODE, "0", ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"), ResourceType.LDS.typeUrl(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server is still not reachable. responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -571,6 +573,7 @@ public void streamClosedAndRetry() { .onNext(eq(buildDiscoveryRequest(NODE, "0", ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"), ResourceType.LDS.typeUrl(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server sends back a LDS response. response = buildDiscoveryResponse("1", listeners, @@ -595,6 +598,7 @@ public void streamClosedAndRetry() { .onNext(eq(buildDiscoveryRequest(NODE, "1", ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"), ResourceType.LDS.typeUrl(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server becomes unreachable again. responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -616,7 +620,7 @@ public void streamClosedAndRetry() { ResourceType.LDS.typeUrl(), ""))); verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, - backoffPolicy2); + backoffPolicy2, requestObserver); } static Listener buildListenerWithFilterChain(String name, int portValue, String address, diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java index 5730250eac3..2f34e34f896 100644 --- a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java @@ -636,6 +636,7 @@ public void streamClosedAndRetry() { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "", ResourceType.LDS.typeUrlV2(), ""))); + verifyNoMoreInteractions(requestObserver); final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null); final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT, @@ -675,6 +676,7 @@ public void streamClosedAndRetry() { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", ResourceType.LDS.typeUrlV2(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server becomes unreachable. responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -694,6 +696,7 @@ public void streamClosedAndRetry() { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", ResourceType.LDS.typeUrlV2(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server is still not reachable. responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -713,6 +716,7 @@ public void streamClosedAndRetry() { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0", ResourceType.LDS.typeUrlV2(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server sends back a LDS response. response = buildDiscoveryResponseV2("1", listeners, @@ -736,6 +740,7 @@ public void streamClosedAndRetry() { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1", ResourceType.LDS.typeUrlV2(), ""))); + verifyNoMoreInteractions(requestObserver); // Management server becomes unreachable again. responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -756,7 +761,7 @@ public void streamClosedAndRetry() { ResourceType.LDS.typeUrlV2(), ""))); verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, - backoffPolicy2); + backoffPolicy2, requestObserver); } static Listener buildListenerWithFilterChain(String name, int portValue, String address,