Skip to content

Commit

Permalink
xds: add server side Listener processing to ClientXdsClient (#7955)
Browse files Browse the repository at this point in the history
* xds: add server side Listener processing to ClientXdsClient

* address review comments

* minor fixes for review comments
  • Loading branch information
sanjaypujare committed Mar 12, 2021
1 parent 6a9c990 commit afe8831
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 6 deletions.
41 changes: 37 additions & 4 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,17 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String

// Unpack HttpConnectionManager messages.
Map<String, HttpConnectionManager> httpConnectionManagers = new HashMap<>(listeners.size());
Map<String, Listener> serverSideListeners = new HashMap<>(listeners.size());
try {
for (Listener listener : listeners) {
HttpConnectionManager hcm = unpackCompatibleType(
listener.getApiListener().getApiListener(), HttpConnectionManager.class,
TYPE_URL_HTTP_CONNECTION_MANAGER, TYPE_URL_HTTP_CONNECTION_MANAGER_V2);
httpConnectionManagers.put(listener.getName(), hcm);
if (listener.hasApiListener()) {
HttpConnectionManager hcm = unpackCompatibleType(
listener.getApiListener().getApiListener(), HttpConnectionManager.class,
TYPE_URL_HTTP_CONNECTION_MANAGER, TYPE_URL_HTTP_CONNECTION_MANAGER_V2);
httpConnectionManagers.put(listener.getName(), hcm);
} else {
serverSideListeners.put(listener.getName(), listener);
}
}
} catch (InvalidProtocolBufferException e) {
getLogger().log(
Expand Down Expand Up @@ -239,6 +244,21 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
}
ldsUpdates.put(listenerName, update);
}
// process serverSideListeners if any
for (Map.Entry<String, Listener> entry : serverSideListeners.entrySet()) {
String listenerName = entry.getKey();
Listener listener = entry.getValue();
LdsUpdate update;

StructOrError<EnvoyServerProtoData.Listener> convertedListener =
parseServerSideListener(listener);
if (convertedListener.getErrorDetail() != null) {
nackResponse(ResourceType.LDS, nonce, convertedListener.getErrorDetail());
return;
}
update = new LdsUpdate(convertedListener.getStruct());
ldsUpdates.put(listenerName, update);
}
ackResponse(ResourceType.LDS, versionInfo, nonce);

for (String resource : ldsResourceSubscribers.keySet()) {
Expand All @@ -257,6 +277,19 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
}
}

@VisibleForTesting static StructOrError<EnvoyServerProtoData.Listener> parseServerSideListener(
Listener listener) {
try {
return StructOrError.fromStruct(
EnvoyServerProtoData.Listener.fromEnvoyProtoListener(listener));
} catch (InvalidProtocolBufferException e) {
return StructOrError.fromError(
"Failed to unpack Listener " + listener.getName() + ":" + e.getMessage());
} catch (IllegalArgumentException e) {
return StructOrError.fromError(e.getMessage());
}
}

private static StructOrError<VirtualHost> parseVirtualHost(
io.envoyproxy.envoy.config.route.v3.VirtualHost proto) {
String name = proto.getName();
Expand Down
4 changes: 4 additions & 0 deletions xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.grpc.Internal;
import java.net.InetAddress;
Expand Down Expand Up @@ -455,6 +456,9 @@ private static String convertEnvoyAddressToString(Address proto) {

static Listener fromEnvoyProtoListener(io.envoyproxy.envoy.config.listener.v3.Listener proto)
throws InvalidProtocolBufferException {
if (!proto.getTrafficDirection().equals(TrafficDirection.INBOUND)) {
throw new IllegalArgumentException("Listener " + proto.getName() + " is not INBOUND");
}
List<FilterChain> filterChains = new ArrayList<>(proto.getFilterChainsCount());
for (io.envoyproxy.envoy.config.listener.v3.FilterChain filterChain :
proto.getFilterChainsList()) {
Expand Down
21 changes: 19 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ static final class LdsUpdate implements ResourceUpdate {
final boolean hasFaultInjection;
@Nullable // Can be null even if hasFaultInjection is true.
final HttpFault httpFault;
// Server side Listener.
@Nullable
final Listener listener;

LdsUpdate(
long httpMaxStreamDurationNano, String rdsName, boolean hasFaultInjection,
Expand All @@ -82,12 +85,22 @@ private LdsUpdate(
? null : Collections.unmodifiableList(new ArrayList<>(virtualHosts));
this.hasFaultInjection = hasFaultInjection;
this.httpFault = httpFault;
this.listener = null;
}

LdsUpdate(Listener listener) {
this.listener = listener;
this.httpMaxStreamDurationNano = 0L;
this.rdsName = null;
this.virtualHosts = null;
this.hasFaultInjection = false;
this.httpFault = null;
}

@Override
public int hashCode() {
return Objects.hash(
httpMaxStreamDurationNano, rdsName, virtualHosts, hasFaultInjection, httpFault);
httpMaxStreamDurationNano, rdsName, virtualHosts, hasFaultInjection, httpFault, listener);
}

@Override
Expand All @@ -103,7 +116,8 @@ public boolean equals(Object o) {
&& Objects.equals(rdsName, that.rdsName)
&& Objects.equals(virtualHosts, that.virtualHosts)
&& hasFaultInjection == that.hasFaultInjection
&& Objects.equals(httpFault, that.httpFault);
&& Objects.equals(httpFault, that.httpFault)
&& Objects.equals(listener, that.listener);
}

@Override
Expand All @@ -119,6 +133,9 @@ public String toString() {
toStringHelper.add("faultInjectionEnabled", true)
.add("httpFault", httpFault);
}
if (listener != null) {
toStringHelper.add("listener", listener);
}
return toStringHelper.toString();
}
}
Expand Down
14 changes: 14 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.envoyproxy.envoy.config.core.v3.Locality;
import io.envoyproxy.envoy.config.core.v3.RuntimeFractionalPercent;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.DirectResponseAction;
import io.envoyproxy.envoy.config.route.v3.FilterAction;
import io.envoyproxy.envoy.config.route.v3.RedirectAction;
Expand Down Expand Up @@ -613,4 +615,16 @@ public void parseLocalityLbEndpoints_invalidPriority() {
StructOrError<LocalityLbEndpoints> struct = ClientXdsClient.parseLocalityLbEndpoints(proto);
assertThat(struct.getErrorDetail()).isEqualTo("negative priority");
}

@Test
public void parseServerSideListener_invalidTrafficDirection() {
Listener listener =
Listener.newBuilder()
.setName("listener1")
.setTrafficDirection(TrafficDirection.OUTBOUND)
.build();
StructOrError<io.grpc.xds.EnvoyServerProtoData.Listener> struct =
ClientXdsClient.parseServerSideListener(listener);
assertThat(struct.getErrorDetail()).isEqualTo("Listener listener1 is not INBOUND");
}
}
93 changes: 93 additions & 0 deletions xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
import io.envoyproxy.envoy.config.listener.v3.FilterChain;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -64,6 +69,7 @@
import io.grpc.xds.XdsClient.RdsResourceWatcher;
import io.grpc.xds.XdsClient.RdsUpdate;
import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
Expand Down Expand Up @@ -1284,6 +1290,71 @@ public void reportLoadStatsToServer() {
// See more test on LoadReportClientTest.java
}

private static final String LISTENER_RESOURCE =
"grpc/server?xds.resource.listening_address=0.0.0.0:7000";

@Test
public void serverSideListenerFound() throws InvalidProtocolBufferException {
Assume.assumeTrue(useProtocolV3());
ClientXdsClientTestBase.DiscoveryRpcCall call =
startResourceWatcher(LDS, LISTENER_RESOURCE, ldsResourceWatcher);
Listener listener =
buildListenerWithFilterChain(
LISTENER_RESOURCE, 7000, "0.0.0.0", "google-sds-config-default", "ROOTCA");
List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
// Client sends an ACK LDS request.
call.verifyRequest(
ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().listener)
.isEqualTo(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(listener));

listener =
buildListenerWithFilterChain(
LISTENER_RESOURCE, 7000, "0.0.0.0", "CERT2", "ROOTCA2");
listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "1", "0001");

// Client sends an ACK LDS request.
call.verifyRequest(
ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "1", "0001", NODE);
verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().listener)
.isEqualTo(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(listener));

assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}

@Test
public void serverSideListenerNotFound() {
Assume.assumeTrue(useProtocolV3());
ClientXdsClientTestBase.DiscoveryRpcCall call =
startResourceWatcher(LDS, LISTENER_RESOURCE, ldsResourceWatcher);
final FilterChain filterChainInbound =
ServerXdsClientNewServerApiTest.buildFilterChain(
ServerXdsClientNewServerApiTest.buildFilterChainMatch("managed-mtls"),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext(
"google-sds-config-default", "ROOTCA"),
ServerXdsClientNewServerApiTest.buildTestFilter("envoy.http_connection_manager"));
Listener listener =
ServerXdsClientNewServerApiTest.buildListenerWithFilterChain(
"grpc/server?xds.resource.listening_address=0.0.0.0:8000",
7000,
"0.0.0.0",
filterChainInbound);
List<Any> listeners = ImmutableList.of(Any.pack(listener));
call.sendResponse(ResourceType.LDS, listeners, "0", "0000");
// Client sends an ACK LDS request.
call.verifyRequest(
ResourceType.LDS, Collections.singletonList(LISTENER_RESOURCE), "0", "0000", NODE);

verifyNoInteractions(ldsResourceWatcher);
fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(ldsResourceWatcher).onResourceDoesNotExist(LISTENER_RESOURCE);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}

private DiscoveryRpcCall startResourceWatcher(
ResourceType type, String name, ResourceWatcher watcher) {
FakeClock.TaskFilter timeoutTaskFilter;
Expand Down Expand Up @@ -1416,4 +1487,26 @@ protected abstract Message buildLbEndpoint(String address, int port, String heal

protected abstract Message buildDropOverload(String category, int dropPerMillion);
}

static Listener buildListenerWithFilterChain(
String name, int portValue, String address, String certName, String validationContextName) {
FilterChain filterChain =
ServerXdsClientNewServerApiTest.buildFilterChain(
ServerXdsClientNewServerApiTest.buildFilterChainMatch(),
CommonTlsContextTestsUtil.buildTestDownstreamTlsContext(
certName, validationContextName),
ServerXdsClientNewServerApiTest.buildTestFilter("envoy.http_connection_manager"));
io.envoyproxy.envoy.config.core.v3.Address listenerAddress =
io.envoyproxy.envoy.config.core.v3.Address.newBuilder()
.setSocketAddress(
SocketAddress.newBuilder().setPortValue(portValue).setAddress(address))
.build();
return Listener.newBuilder()
.setName(name)
.setAddress(listenerAddress)
.setDefaultFilterChain(FilterChain.getDefaultInstance())
.addAllFilterChains(Arrays.asList(filterChain))
.setTrafficDirection(TrafficDirection.INBOUND)
.build();
}
}
2 changes: 2 additions & 0 deletions xds/src/test/java/io/grpc/xds/EnvoyServerProtoDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.CidrRange;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
import io.envoyproxy.envoy.config.core.v3.TransportSocket;
import io.envoyproxy.envoy.config.listener.v3.Filter;
import io.envoyproxy.envoy.config.listener.v3.FilterChain;
Expand Down Expand Up @@ -58,6 +59,7 @@ public void listener_convertFromListenerProto() throws InvalidProtocolBufferExce
.addFilterChains(createOutFilter())
.addFilterChains(createInFilter())
.setDefaultFilterChain(createDefaultFilterChain())
.setTrafficDirection(TrafficDirection.INBOUND)
.build();

Listener xdsListener = Listener.fromEnvoyProtoListener(listener);
Expand Down
2 changes: 2 additions & 0 deletions xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
import io.envoyproxy.envoy.api.v2.core.CidrRange;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.core.TrafficDirection;
import io.envoyproxy.envoy.api.v2.core.TransportSocket;
import io.envoyproxy.envoy.api.v2.listener.Filter;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
Expand Down Expand Up @@ -775,6 +776,7 @@ static Listener buildListenerWithFilterChain(String name, int portValue, String
.setName(name)
.setAddress(listenerAddress)
.addAllFilterChains(Arrays.asList(filterChains))
.setTrafficDirection(TrafficDirection.INBOUND)
.build();
}

Expand Down

0 comments on commit afe8831

Please sign in to comment.