From 8020a735f943b413795adb675eba7b66af6f45ed Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 4 Nov 2020 13:47:27 -0800 Subject: [PATCH] xds: refactor XdsClient test to cover protocol version v2 and v3 (#7577) This change refactors client side XdsClient's unit test. The main testing logic (test cases) will being the abstract class while the extended classes will be providing xDS version-specific services and messages. With this approach, we do not suffer from maintaining two copies of test logics in order to cover both v2 and v3 xDS protocols. So every time making changes to XdsClient's own logic, we only need to modify the corresponding test logic in the abstract class. Also, this approach could be sustainable for future xDS protocol version upgrades without necessity to re-implement test logics. --- ...Test.java => ClientXdsClientTestBase.java} | 1047 ++++++----------- .../io/grpc/xds/ClientXdsClientTestV2.java | 313 ----- .../io/grpc/xds/ClientXdsClientV2Test.java | 505 ++++++++ .../io/grpc/xds/ClientXdsClientV3Test.java | 505 ++++++++ .../java/io/grpc/xds/XdsClientTestHelper.java | 90 +- 5 files changed, 1397 insertions(+), 1063 deletions(-) rename xds/src/test/java/io/grpc/xds/{ClientXdsClientTest.java => ClientXdsClientTestBase.java} (52%) delete mode 100644 xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java create mode 100644 xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java create mode 100644 xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java similarity index 52% rename from xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java rename to xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 6804f9bf39f..63e0995809d 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -17,20 +17,6 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsClientTestHelper.buildCluster; -import static io.grpc.xds.XdsClientTestHelper.buildClusterLoadAssignment; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryRequest; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse; -import static io.grpc.xds.XdsClientTestHelper.buildDropOverload; -import static io.grpc.xds.XdsClientTestHelper.buildLbEndpoint; -import static io.grpc.xds.XdsClientTestHelper.buildListener; -import static io.grpc.xds.XdsClientTestHelper.buildLocalityLbEndpoints; -import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration; -import static io.grpc.xds.XdsClientTestHelper.buildSecureCluster; -import static io.grpc.xds.XdsClientTestHelper.buildUpstreamTlsContext; -import static io.grpc.xds.XdsClientTestHelper.buildVirtualHosts; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -40,32 +26,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; -import com.google.protobuf.UInt32Value; -import com.google.protobuf.util.Durations; -import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; -import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; -import io.envoyproxy.envoy.config.core.v3.ConfigSource; -import io.envoyproxy.envoy.config.core.v3.HealthStatus; -import io.envoyproxy.envoy.config.core.v3.RoutingPriority; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import com.google.protobuf.Message; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; -import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; -import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; -import io.grpc.Context; -import io.grpc.Context.CancellationListener; +import io.grpc.BindableService; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.Status.Code; @@ -75,7 +39,6 @@ import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.FakeClock.TaskFilter; -import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.AbstractXdsClient.ResourceType; import io.grpc.xds.EnvoyProtoData.DropOverload; @@ -97,12 +60,11 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Queue; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -110,7 +72,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -121,7 +82,7 @@ * Tests for {@link ClientXdsClient}. */ @RunWith(JUnit4.class) -public class ClientXdsClientTest { +public abstract class ClientXdsClientTestBase { private static final String LDS_RESOURCE = "listener.googleapis.com"; private static final String RDS_RESOURCE = "route-configuration.googleapis.com"; private static final String CDS_RESOURCE = "cluster.googleapis.com"; @@ -171,12 +132,11 @@ public boolean shouldAccept(Runnable command) { public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private final FakeClock fakeClock = new FakeClock(); - private final Queue> resourceDiscoveryCalls = - new ArrayDeque<>(); - private final Queue> loadReportCalls = - new ArrayDeque<>(); - private final AtomicBoolean adsEnded = new AtomicBoolean(true); - private final AtomicBoolean lrsEnded = new AtomicBoolean(true); + protected final Queue resourceDiscoveryCalls = new ArrayDeque<>(); + protected final Queue loadReportCalls = new ArrayDeque<>(); + protected final AtomicBoolean adsEnded = new AtomicBoolean(true); + protected final AtomicBoolean lrsEnded = new AtomicBoolean(true); + private final MessageFactory mf = createMessageFactory(); @Captor private ArgumentCaptor ldsUpdateCaptor; @@ -214,55 +174,11 @@ public void setUp() throws IOException { when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); final String serverName = InProcessServerBuilder.generateName(); - AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() { - @Override - public StreamObserver streamAggregatedResources( - final StreamObserver responseObserver) { - assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended - adsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - resourceDiscoveryCalls.offer(call); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - adsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - return requestObserver; - } - }; - - LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() { - @Override - public StreamObserver streamLoadStats( - StreamObserver responseObserver) { - assertThat(lrsEnded.get()).isTrue(); - lrsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - lrsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - loadReportCalls.offer(call); - return requestObserver; - } - }; - cleanupRule.register( InProcessServerBuilder .forName(serverName) - .addService(adsServiceImpl) - .addService(lrsServiceImpl) + .addService(createAdsService()) + .addService(createLrsService()) .directExecutor() .build() .start()); @@ -271,7 +187,7 @@ public void cancelled(Context context) { xdsClient = new ClientXdsClient( - new XdsChannel(channel, /* useProtocolV3= */ true), + new XdsChannel(channel, useProtocolV3()), EnvoyProtoData.Node.newBuilder().build(), fakeClock.getScheduledExecutorService(), backoffPolicyProvider, @@ -290,24 +206,27 @@ public void tearDown() { assertThat(fakeClock.getPendingTasks()).isEmpty(); } + protected abstract boolean useProtocolV3(); + + protected abstract BindableService createAdsService(); + + protected abstract BindableService createLrsService(); + + protected abstract MessageFactory createMessageFactory(); + @Test public void ldsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener("bar.googleapis.com", - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfiguration("route-bar.googleapis.com", buildVirtualHosts(1))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener("bar.googleapis.com", + mf.buildRouteConfiguration("route-bar.googleapis.com", + mf.buildOpaqueVirtualHosts(1))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verifyNoInteractions(ldsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -317,21 +236,16 @@ public void ldsResourceNotFound() { @Test public void ldsResourceFound_containsVirtualHosts() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -339,26 +253,15 @@ public void ldsResourceFound_containsVirtualHosts() { @Test public void ldsResourceFound_containsRdsName() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -366,114 +269,80 @@ public void ldsResourceFound_containsRdsName() { @Test public void cachedLdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); LdsResourceWatcher watcher = mock(LdsResourceWatcher.class); xdsClient.watchLdsResource(LDS_RESOURCE, watcher); verify(watcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedLdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); LdsResourceWatcher watcher = mock(LdsResourceWatcher.class); xdsClient.watchLdsResource(LDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(LDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void ldsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - response = - buildDiscoveryResponse("1", listeners, ResourceType.LDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("1", listeners, ResourceType.LDS, "0001"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0001"); verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); } @Test public void ldsResourceDeleted() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); - response = buildDiscoveryResponse("1", Collections.emptyList(), - ResourceType.LDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + call.sendResponse("1", Collections.emptyList(), ResourceType.LDS, "0001"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0001"); verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); } @@ -485,10 +354,8 @@ public void multipleLdsWatchers() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchLdsResource(ldsResource, watcher1); xdsClient.watchLdsResource(ldsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(LDS_RESOURCE, ldsResource), - ResourceType.LDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(LDS_RESOURCE, ldsResource), ResourceType.LDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); @@ -496,19 +363,11 @@ public void multipleLdsWatchers() { verify(watcher2).onResourceDoesNotExist(ldsResource); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build()))), - Any.pack(buildListener(ldsResource, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(4))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2)))), + Any.pack(mf.buildListener(ldsResource, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(4))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); verify(watcher1).onChanged(ldsUpdateCaptor.capture()); @@ -519,17 +378,16 @@ public void multipleLdsWatchers() { @Test public void rdsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); List routeConfigs = ImmutableList.of( - Any.pack(buildRouteConfiguration("route-bar.googleapis.com", buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildRouteConfiguration("route-bar.googleapis.com", + mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); verifyNoInteractions(rdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -539,17 +397,15 @@ public void rdsResourceNotFound() { @Test public void rdsResourceFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -557,62 +413,56 @@ public void rdsResourceFound() { @Test public void cachedRdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); RdsResourceWatcher watcher = mock(RdsResourceWatcher.class); xdsClient.watchRdsResource(RDS_RESOURCE, watcher); verify(watcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedRdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); RdsResourceWatcher watcher = mock(RdsResourceWatcher.class); xdsClient.watchRdsResource(RDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(RDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void rdsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); - routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(4)))); - response = - buildDiscoveryResponse("1", routeConfigs, ResourceType.RDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(4)))); + call.sendResponse("1", routeConfigs, ResourceType.RDS, "0001"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0001"); verify(rdsResourceWatcher, times(2)).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); } @@ -621,39 +471,23 @@ public void rdsResourceUpdated() { public void rdsResourceDeletedByLds() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(5))) - .build())))); - response = buildDiscoveryResponse("1", listeners, ResourceType.LDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(5))))); + call.sendResponse("1", listeners, ResourceType.LDS, "0001"); verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(5); verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); @@ -667,31 +501,25 @@ public void multipleRdsWatchers() { xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchRdsResource(rdsResource, watcher1); xdsClient.watchRdsResource(rdsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(RDS_RESOURCE, rdsResource), - ResourceType.RDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(RDS_RESOURCE, rdsResource), ResourceType.RDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); verify(watcher1).onResourceDoesNotExist(rdsResource); verify(watcher2).onResourceDoesNotExist(rdsResource); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); verifyNoMoreInteractions(watcher1, watcher2); - routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(rdsResource, buildVirtualHosts(4)))); - response = - buildDiscoveryResponse("2", routeConfigs, ResourceType.RDS.typeUrl(), "0002"); - call.responseObserver.onNext(response); + routeConfigs = ImmutableList.of(Any.pack( + mf.buildRouteConfiguration(rdsResource, mf.buildOpaqueVirtualHosts(4)))); + call.sendResponse("2", routeConfigs, ResourceType.RDS, "0002"); verify(watcher1).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); @@ -702,19 +530,17 @@ public void multipleRdsWatchers() { @Test public void cdsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), - Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildCluster("cluster-bar.googleapis.com", null, false, null, null)), + Any.pack(mf.buildCluster("cluster-baz.googleapis.com", null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verifyNoInteractions(cdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -724,16 +550,15 @@ public void cdsResourceNotFound() { @Test public void cdsResourceFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -746,27 +571,16 @@ public void cdsResourceFound() { @Test public void cdsResponseWithCircuitBreakers() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - Cluster cluster = buildCluster(CDS_RESOURCE, null, false); - cluster = cluster.toBuilder() - .setCircuitBreakers( - CircuitBreakers.newBuilder() - .addThresholds( - Thresholds.newBuilder() - .setPriority(RoutingPriority.HIGH) - .setMaxRequests(UInt32Value.newBuilder().setValue(50))) - .addThresholds( - Thresholds.newBuilder() - .setPriority(RoutingPriority.DEFAULT) - .setMaxRequests(UInt32Value.newBuilder().setValue(200)))) - .build(); - DiscoveryResponse response = buildDiscoveryResponse("0", - Collections.singletonList(Any.pack(cluster)), ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); - - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, + mf.buildCircuitBreakers(50, 200)))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); + + // Client sent an ACK CDS request. + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -781,24 +595,20 @@ public void cdsResponseWithCircuitBreakers() { */ @Test public void cdsResponseWithUpstreamTlsContext() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); // Management server sends back CDS response with UpstreamTlsContext. - UpstreamTlsContext testUpstreamTlsContext = - buildUpstreamTlsContext("secret1", "unix:/var/uds2"); List clusters = ImmutableList.of( - Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), - Any.pack(buildSecureCluster(CDS_RESOURCE, - "eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)), - Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildCluster("cluster-bar.googleapis.com", null, false, null, null)), + Any.pack(mf.buildCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", true, + mf.buildUpstreamTlsContext("secret1", "unix:/var/uds2"), null)), + Any.pack(mf.buildCluster("cluster-baz.googleapis.com", null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate @@ -819,16 +629,15 @@ public void cdsResponseWithUpstreamTlsContext() { @Test public void cachedCdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); CdsResourceWatcher watcher = mock(CdsResourceWatcher.class); xdsClient.watchCdsResource(CDS_RESOURCE, watcher); @@ -838,33 +647,32 @@ public void cachedCdsResource_data() { assertThat(cdsUpdate.getEdsServiceName()).isNull(); assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.getLrsServerName()).isNull(); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedCdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); CdsResourceWatcher watcher = mock(CdsResourceWatcher.class); xdsClient.watchCdsResource(CDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(CDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cdsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -873,13 +681,13 @@ public void cdsResourceUpdated() { assertThat(cdsUpdate.getLrsServerName()).isNull(); String edsService = "eds-service-bar.googleapis.com"; - clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, edsService, true))); - response = buildDiscoveryResponse("1", clusters, ResourceType.CDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, edsService, true, null, null))); + call.sendResponse("1", clusters, ResourceType.CDS, "0001"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -890,16 +698,15 @@ public void cdsResourceUpdated() { @Test public void cdsResourceDeleted() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -907,13 +714,11 @@ public void cdsResourceDeleted() { assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.getLrsServerName()).isNull(); - response = buildDiscoveryResponse("1", Collections.emptyList(), - ResourceType.CDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + call.sendResponse("1", Collections.emptyList(), ResourceType.CDS, "0001"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0001"); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); } @@ -925,10 +730,8 @@ public void multipleCdsWatchers() { xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchCdsResource(cdsResource, watcher1); xdsClient.watchCdsResource(cdsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(CDS_RESOURCE, cdsResource), - ResourceType.CDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(CDS_RESOURCE, cdsResource), ResourceType.CDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); @@ -937,11 +740,9 @@ public void multipleCdsWatchers() { String edsService = "eds-service-bar.googleapis.com"; List clusters = ImmutableList.of( - Any.pack(buildCluster(CDS_RESOURCE, null, false)), - Any.pack(buildCluster(cdsResource, edsService, true))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null)), + Any.pack(mf.buildCluster(cdsResource, edsService, true, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -964,25 +765,23 @@ public void multipleCdsWatchers() { @Test public void edsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment("cluster-bar.googleapis.com", + mf.buildClusterLoadAssignment("cluster-bar.googleapis.com", ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0)), - ImmutableList.of()))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + ImmutableList.of()))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); // Client sent an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); verifyNoInteractions(edsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -992,34 +791,32 @@ public void edsResourceNotFound() { @Test public void edsResourceFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); // Client sent an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); @@ -1040,34 +837,32 @@ public void edsResourceFound() { @Test public void cachedEdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); - // Client sends an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + // Client sent an ACK EDS request. + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); EdsResourceWatcher watcher = mock(EdsResourceWatcher.class); xdsClient.watchEdsResource(EDS_RESOURCE, watcher); @@ -1087,51 +882,49 @@ public void cachedEdsResource_data() { 2, true)), 1, 0), new Locality("region3", "zone3", "subzone3"), new LocalityLbEndpoints(ImmutableList.of(), 2, 1)); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedEdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); EdsResourceWatcher watcher = mock(EdsResourceWatcher.class); xdsClient.watchEdsResource(EDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(EDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void edsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); // Client sent an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); @@ -1151,16 +944,14 @@ public void edsResourceUpdated() { clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", ImmutableList.of( - buildLbEndpoint("172.44.2.2", 8000, HealthStatus.HEALTHY, 3)), + mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3)), 2, 0)), - ImmutableList.of()))); - response = - buildDiscoveryResponse("1", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + ImmutableList.of()))); + call.sendResponse("1", clusterLoadAssignments, ResourceType.EDS, "0001"); verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture()); edsUpdate = edsUpdateCaptor.getValue(); @@ -1178,12 +969,10 @@ public void edsResourceUpdated() { public void edsResourceDeletedByCds() { xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); - List clusters = - ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, EDS_RESOURCE, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, EDS_RESOURCE, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); assertThat(cdsUpdateCaptor.getValue().getClusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdateCaptor.getValue().getEdsServiceName()).isEqualTo(EDS_RESOURCE); @@ -1191,26 +980,23 @@ public void edsResourceDeletedByCds() { List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0)), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); - clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - response = - buildDiscoveryResponse("1", clusters, ResourceType.CDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("1", clusters, ResourceType.CDS, "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); assertThat(cdsUpdateCaptor.getValue().getClusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdateCaptor.getValue().getEdsServiceName()).isNull(); @@ -1225,10 +1011,8 @@ public void multipleEdsWatchers() { xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); xdsClient.watchEdsResource(edsResource, watcher1); xdsClient.watchEdsResource(edsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(EDS_RESOURCE, edsResource), - ResourceType.EDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(EDS_RESOURCE, edsResource), ResourceType.EDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); @@ -1238,25 +1022,23 @@ public void multipleEdsWatchers() { List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); @@ -1277,16 +1059,14 @@ public void multipleEdsWatchers() { clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(edsResource, + mf.buildClusterLoadAssignment(edsResource, ImmutableList.of( - buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", ImmutableList.of( - buildLbEndpoint("172.44.2.2", 8000, HealthStatus.HEALTHY, 3)), + mf.buildLbEndpoint("172.44.2.2", 8000, "healthy", 3)), 2, 0)), - ImmutableList.of()))); - response = - buildDiscoveryResponse("1", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + ImmutableList.of()))); + call.sendResponse("1", clusterLoadAssignments, ResourceType.EDS, "0001"); verify(watcher1).onChanged(edsUpdateCaptor.capture()); edsUpdate = edsUpdateCaptor.getValue(); @@ -1318,18 +1098,14 @@ public void streamClosedAndRetryWithBackoff() { xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); // Management server closes the RPC stream with an error. - call.responseObserver.onError(Status.UNKNOWN.asException()); + call.sendError(Status.UNKNOWN.asException()); verify(ldsResourceWatcher).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); verify(rdsResourceWatcher).onError(errorCaptor.capture()); @@ -1347,17 +1123,13 @@ public void streamClosedAndRetryWithBackoff() { assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); fakeClock.forwardNanos(10L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); // Management server becomes unreachable. - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); @@ -1374,36 +1146,25 @@ public void streamClosedAndRetryWithBackoff() { assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(100L); fakeClock.forwardNanos(100L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("63", listeners, ResourceType.LDS.typeUrl(), "3242"); - call.responseObserver.onNext(response); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "3242"))); - - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = - buildDiscoveryResponse("5", routeConfigs, ResourceType.RDS.typeUrl(), "6764"); - call.responseObserver.onNext(response); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "6764"))); - - call.responseObserver.onError(Status.DEADLINE_EXCEEDED.asException()); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("63", listeners, ResourceType.LDS, "3242"); + call.verifyRequest(NODE, "63", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "3242"); + + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("5", routeConfigs, ResourceType.RDS, "6764"); + call.verifyRequest(NODE, "5", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "6764"); + + call.sendError(Status.DEADLINE_EXCEEDED.asException()); verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture()); @@ -1417,17 +1178,13 @@ public void streamClosedAndRetryWithBackoff() { inOrder.verify(backoffPolicyProvider).get(); fakeClock.runDueTasks(); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "63", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "5", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); // Management server becomes unreachable again. - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture()); @@ -1444,14 +1201,10 @@ public void streamClosedAndRetryWithBackoff() { assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(20L); fakeClock.forwardNanos(20L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "63", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "5", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); inOrder.verifyNoMoreInteractions(); } @@ -1460,8 +1213,8 @@ public void streamClosedAndRetryWithBackoff() { public void streamClosedAndRetryRaceWithAddRemoveWatchers() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); verify(rdsResourceWatcher).onError(errorCaptor.capture()); @@ -1476,26 +1229,17 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); fakeClock.forwardNanos(10L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); - verifyNoMoreInteractions(call.requestObserver); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); + call.verifyNoMoreRequest(); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher); } @@ -1506,7 +1250,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); ScheduledTask ldsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask rdsResourceTimeout = @@ -1515,26 +1259,18 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe Iterables.getOnlyElement(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask edsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); - List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, mf.buildRouteConfiguration("do not care", + mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); assertThat(ldsResourceTimeout.isCancelled()).isTrue(); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); assertThat(rdsResourceTimeout.isCancelled()).isTrue(); - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + call.sendError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); assertThat(edsResourceTimeout.isCancelled()).isTrue(); @@ -1552,32 +1288,21 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe public void reportLoadStatsToServer() { String clusterName = "cluster-foo.googleapis.com"; xdsClient.addClientStats(clusterName, null); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(null); - RpcCall lrsCall = loadReportCalls.poll(); - verify(lrsCall.requestObserver).onNext(requestCaptor.capture()); - assertThat(requestCaptor.getValue().getClusterStatsCount()) - .isEqualTo(0); // initial request - - lrsCall.responseObserver.onNext( - LoadStatsResponse.newBuilder() - .addClusters(clusterName) - .setLoadReportingInterval(Durations.fromNanos(1000L)) - .build()); + LrsRpcCall lrsCall = loadReportCalls.poll(); + lrsCall.verifyNextReportClusters(Collections.emptyList()); // initial LRS request + + lrsCall.sendResponse(Collections.singletonList(clusterName), 1000L); fakeClock.forwardNanos(1000L); - verify(lrsCall.requestObserver, times(2)).onNext(requestCaptor.capture()); - ClusterStats report = Iterables.getOnlyElement(requestCaptor.getValue().getClusterStatsList()); - assertThat(report.getClusterName()).isEqualTo(clusterName); + lrsCall.verifyNextReportClusters(Collections.singletonList(new String[] {clusterName, null})); xdsClient.removeClientStats(clusterName, null); fakeClock.forwardNanos(1000L); - verify(lrsCall.requestObserver, times(3)).onNext(requestCaptor.capture()); - assertThat(requestCaptor.getValue().getClusterStatsCount()) - .isEqualTo(0); // no more stats reported + lrsCall.verifyNextReportClusters(Collections.emptyList()); // no more stats reported // See more test on LoadReportClientTest.java } - private RpcCall startResourceWatcher( + private DiscoveryRpcCall startResourceWatcher( ResourceType type, String name, ResourceWatcher watcher) { FakeClock.TaskFilter timeoutTaskFilter; switch (type) { @@ -1601,9 +1326,8 @@ private RpcCall startResourceWatcher( default: throw new AssertionError("should never be here"); } - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", name, type.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Collections.singletonList(name), type, ""); ScheduledTask timeoutTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) @@ -1611,59 +1335,60 @@ private RpcCall startResourceWatcher( return call; } - /** - * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for - * management server debugging purposes. - * - *

In general, if you are sure error_details field should not be set in a DiscoveryRequest, - * compare with message equality. Otherwise, this matcher is handy for comparing other fields - * only. - */ - private static class DiscoveryRequestMatcher implements ArgumentMatcher { - private final Node node; - private final String versionInfo; - private final String typeUrl; - private final Set resourceNames; - private final String responseNonce; - - private DiscoveryRequestMatcher( - Node node, String versionInfo, List resourceNames, String typeUrl, - String responseNonce) { - this.node = node; - this.versionInfo = versionInfo; - this.resourceNames = new HashSet<>(resourceNames); - this.typeUrl = typeUrl; - this.responseNonce = responseNonce; - } + protected abstract static class DiscoveryRpcCall { - @Override - public boolean matches(DiscoveryRequest argument) { - if (!typeUrl.equals(argument.getTypeUrl())) { - return false; - } - if (!versionInfo.equals(argument.getVersionInfo())) { - return false; - } - if (!responseNonce.equals(argument.getResponseNonce())) { - return false; - } - if (!node.toEnvoyProtoNode().equals(argument.getNode())) { - return false; - } - if (!resourceNames.equals(new HashSet<>(argument.getResourceNamesList()))) { - return false; - } - return argument.getNode().equals(NODE.toEnvoyProtoNode()); - } + protected abstract void verifyRequest(Node node, String versionInfo, List resources, + ResourceType type, String nonce); + + protected abstract void verifyNoMoreRequest(); + + protected abstract void sendResponse(String versionInfo, List resources, + ResourceType type, String nonce); + + protected abstract void sendError(Throwable t); + + protected abstract void sendCompleted(); } - private static class RpcCall { - private final StreamObserver requestObserver; - private final StreamObserver responseObserver; + protected abstract static class LrsRpcCall { - RpcCall(StreamObserver requestObserver, StreamObserver responseObserver) { - this.requestObserver = requestObserver; - this.responseObserver = responseObserver; - } + /** + * Verifies a LRS request has been sent with ClusterStats of the given list of clusters. + */ + protected abstract void verifyNextReportClusters(List clusters); + + protected abstract void sendResponse(List clusters, long loadReportIntervalNano); + } + + protected abstract static class MessageFactory { + + protected abstract Message buildListener(String name, Message routeConfiguration); + + protected abstract Message buildListenerForRds(String name, String rdsResourceName); + + protected abstract Message buildRouteConfiguration(String name, + List virtualHostList); + + protected abstract List buildOpaqueVirtualHosts(int num); + + protected abstract Message buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs, @Nullable Message upstreamTlsContext, + @Nullable Message circuitBreakers); + + protected abstract Message buildUpstreamTlsContext(String secretName, String targetUri); + + protected abstract Message buildCircuitBreakers(int highPriorityMaxRequests, + int defaultPriorityMaxRequests); + + protected abstract Message buildClusterLoadAssignment(String cluster, + List localityLbEndpoints, List dropOverloads); + + protected abstract Message buildLocalityLbEndpoints(String region, String zone, String subZone, + List lbEndpointList, int loadBalancingWeight, int priority); + + protected abstract Message buildLbEndpoint(String address, int port, String healthStatus, + int lbWeight); + + protected abstract Message buildDropOverload(String category, int dropPerMillion); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java deleted file mode 100644 index 2495f24ab9e..00000000000 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright 2020 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsClientTestHelper.buildClusterV2; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryRequestV2; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponseV2; -import static io.grpc.xds.XdsClientTestHelper.buildSecureClusterV2; -import static io.grpc.xds.XdsClientTestHelper.buildUpstreamTlsContextV2; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.protobuf.Any; -import io.envoyproxy.envoy.api.v2.DiscoveryRequest; -import io.envoyproxy.envoy.api.v2.DiscoveryResponse; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; -import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; -import io.grpc.Context; -import io.grpc.Context.CancellationListener; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.BackoffPolicy; -import io.grpc.internal.FakeClock; -import io.grpc.internal.FakeClock.ScheduledTask; -import io.grpc.internal.FakeClock.TaskFilter; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.grpc.xds.AbstractXdsClient.ResourceType; -import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.XdsClient.CdsResourceWatcher; -import io.grpc.xds.XdsClient.CdsUpdate; -import io.grpc.xds.XdsClient.EdsResourceWatcher; -import io.grpc.xds.XdsClient.LdsResourceWatcher; -import io.grpc.xds.XdsClient.RdsResourceWatcher; -import io.grpc.xds.XdsClient.ResourceWatcher; -import io.grpc.xds.XdsClient.XdsChannel; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link ClientXdsClient} for xDS v2. - */ -@RunWith(JUnit4.class) -public class ClientXdsClientTestV2 { - private static final String CDS_RESOURCE = "cluster.googleapis.com"; - private static final Node NODE = Node.newBuilder().build(); - - private static final TaskFilter LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.LDS.toString()); - } - }; - - private static final TaskFilter RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.RDS.toString()); - } - }; - - private static final TaskFilter CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.CDS.toString()); - } - }; - - private static final TaskFilter EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.EDS.toString()); - } - }; - - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - - private final FakeClock fakeClock = new FakeClock(); - private final Queue> resourceDiscoveryCalls = - new ArrayDeque<>(); - private final Queue> loadReportCalls = - new ArrayDeque<>(); - private final AtomicBoolean adsEnded = new AtomicBoolean(true); - private final AtomicBoolean lrsEnded = new AtomicBoolean(true); - - @Captor - private ArgumentCaptor cdsUpdateCaptor; - @Mock - private BackoffPolicy.Provider backoffPolicyProvider; - @Mock - private BackoffPolicy backoffPolicy1; - @Mock - private BackoffPolicy backoffPolicy2; - @Mock - private CdsResourceWatcher cdsResourceWatcher; - - private ManagedChannel channel; - private ClientXdsClient xdsClient; - - @Before - public void setUp() throws IOException { - MockitoAnnotations.initMocks(this); - when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); - when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); - when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); - - final String serverName = InProcessServerBuilder.generateName(); - AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() { - @Override - public StreamObserver streamAggregatedResources( - final StreamObserver responseObserver) { - assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended - adsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - resourceDiscoveryCalls.offer(call); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - adsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - return requestObserver; - } - }; - - LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() { - @Override - public StreamObserver streamLoadStats( - StreamObserver responseObserver) { - assertThat(lrsEnded.get()).isTrue(); - lrsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - lrsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - loadReportCalls.offer(call); - return requestObserver; - } - }; - - cleanupRule.register( - InProcessServerBuilder - .forName(serverName) - .addService(adsServiceImpl) - .addService(lrsServiceImpl) - .directExecutor() - .build() - .start()); - channel = - cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); - - xdsClient = - new ClientXdsClient( - new XdsChannel(channel, /* useProtocolV3= */ false), - EnvoyProtoData.Node.newBuilder().build(), - fakeClock.getScheduledExecutorService(), - backoffPolicyProvider, - fakeClock.getStopwatchSupplier()); - - assertThat(resourceDiscoveryCalls).isEmpty(); - assertThat(loadReportCalls).isEmpty(); - } - - @After - public void tearDown() { - xdsClient.shutdown(); - assertThat(adsEnded.get()).isTrue(); - assertThat(lrsEnded.get()).isTrue(); - assertThat(channel.isShutdown()).isTrue(); - assertThat(fakeClock.getPendingTasks()).isEmpty(); - } - - /** - * CDS response containing UpstreamTlsContext for a cluster. - */ - @Test - public void cdsResponseV2WithUpstreamTlsContext() { - RpcCall call = - startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - - // Management server sends back CDS response with UpstreamTlsContext. - io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext testUpstreamTlsContext = - buildUpstreamTlsContextV2("secret1", "unix:/var/uds2"); - List clusters = ImmutableList.of( - Any.pack(buildClusterV2("cluster-bar.googleapis.com", null, false)), - Any.pack(buildSecureClusterV2(CDS_RESOURCE, - "eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)), - Any.pack(buildClusterV2("cluster-baz.googleapis.com", null, false))); - DiscoveryResponse response = - buildDiscoveryResponseV2("0", clusters, AbstractXdsClient.ADS_TYPE_URL_CDS_V2, "0000"); - call.responseObserver.onNext(response); - - // Client sent an ACK CDS request. - verify(call.requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "0", CDS_RESOURCE, - AbstractXdsClient.ADS_TYPE_URL_CDS_V2, "0000"))); - verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); - CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate - .getUpstreamTlsContext(); - SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() - .getValidationContextSdsSecretConfig(); - assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); - assertThat( - Iterables.getOnlyElement( - validationContextSdsSecretConfig - .getSdsConfig() - .getApiConfigSource() - .getGrpcServicesList()) - .getGoogleGrpc() - .getTargetUri()) - .isEqualTo("unix:/var/uds2"); - } - - private RpcCall startResourceWatcher( - ResourceType type, String name, ResourceWatcher watcher) { - TaskFilter timeoutTaskFilter; - switch (type) { - case LDS: - timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchLdsResource(name, (LdsResourceWatcher) watcher); - break; - case RDS: - timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchRdsResource(name, (RdsResourceWatcher) watcher); - break; - case CDS: - timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchCdsResource(name, (CdsResourceWatcher) watcher); - break; - case EDS: - timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchEdsResource(name, (EdsResourceWatcher) watcher); - break; - case UNKNOWN: - default: - throw new AssertionError("should never be here"); - } - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequestV2(NODE, "", name, type.typeUrlV2(), ""))); - ScheduledTask timeoutTask = - Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); - assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) - .isEqualTo(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); - return call; - } - - private static class RpcCall { - private final StreamObserver requestObserver; - private final StreamObserver responseObserver; - - RpcCall(StreamObserver requestObserver, StreamObserver responseObserver) { - this.requestObserver = requestObserver; - this.responseObserver = responseObserver; - } - } -} diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java new file mode 100644 index 00000000000..83f22f04741 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java @@ -0,0 +1,505 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload; +import io.envoyproxy.envoy.api.v2.DiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.Listener; +import io.envoyproxy.envoy.api.v2.RouteConfiguration; +import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext; +import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig; +import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext; +import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers; +import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers.Thresholds; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; +import io.envoyproxy.envoy.api.v2.core.ApiConfigSource; +import io.envoyproxy.envoy.api.v2.core.ConfigSource; +import io.envoyproxy.envoy.api.v2.core.GrpcService; +import io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc; +import io.envoyproxy.envoy.api.v2.core.HealthStatus; +import io.envoyproxy.envoy.api.v2.core.Locality; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.RoutingPriority; +import io.envoyproxy.envoy.api.v2.core.SelfConfigSource; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.api.v2.core.TransportSocket; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; +import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints; +import io.envoyproxy.envoy.api.v2.listener.FilterChain; +import io.envoyproxy.envoy.api.v2.route.Route; +import io.envoyproxy.envoy.api.v2.route.RouteAction; +import io.envoyproxy.envoy.api.v2.route.VirtualHost; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; +import io.envoyproxy.envoy.config.listener.v2.ApiListener; +import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse; +import io.envoyproxy.envoy.type.FractionalPercent; +import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.AbstractXdsClient.ResourceType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +/** + * Tests for {@link ClientXdsClient} with protocol version v2. + */ +@RunWith(JUnit4.class) +public class ClientXdsClientV2Test extends ClientXdsClientTestBase { + + @Override + protected BindableService createAdsService() { + return new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended + adsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + DiscoveryRpcCall call = new DiscoveryRpcCallV2(requestObserver, responseObserver); + resourceDiscoveryCalls.offer(call); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + adsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + return requestObserver; + } + }; + } + + @Override + protected BindableService createLrsService() { + return new LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + LrsRpcCall call = new LrsRpcCallV2(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + } + + @Override + protected MessageFactory createMessageFactory() { + return new MessageFactoryV2(); + } + + @Override + protected boolean useProtocolV3() { + return false; + } + + private static class DiscoveryRpcCallV2 extends DiscoveryRpcCall { + StreamObserver requestObserver; + StreamObserver responseObserver; + + private DiscoveryRpcCallV2(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + } + + @Override + protected void verifyRequest(EnvoyProtoData.Node node, String versionInfo, + List resources, ResourceType type, String nonce) { + verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( + node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce))); + } + + @Override + protected void verifyNoMoreRequest() { + verifyNoMoreInteractions(requestObserver); + } + + @Override + protected void sendResponse(String versionInfo, List resources, ResourceType type, + String nonce) { + DiscoveryResponse response = + DiscoveryResponse.newBuilder() + .setVersionInfo(versionInfo) + .addAllResources(resources) + .setTypeUrl(type.typeUrl()) + .setNonce(nonce) + .build(); + responseObserver.onNext(response); + } + + @Override + protected void sendError(Throwable t) { + responseObserver.onError(t); + } + + @Override + protected void sendCompleted() { + responseObserver.onCompleted(); + } + } + + private static class LrsRpcCallV2 extends LrsRpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + private final InOrder inOrder; + + private LrsRpcCallV2(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + inOrder = inOrder(requestObserver); + } + + @Override + protected void verifyNextReportClusters(List clusters) { + inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters))); + } + + @Override + protected void sendResponse(List clusters, long loadReportIntervalNano) { + LoadStatsResponse response = + LoadStatsResponse.newBuilder() + .addAllClusters(clusters) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano)) + .build(); + responseObserver.onNext(response); + } + } + + private static class MessageFactoryV2 extends MessageFactory { + + @Override + protected Message buildListener(String name, Message routeConfiguration) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig((RouteConfiguration) routeConfiguration).build()))) + .build(); + } + + @Override + protected Message buildListenerForRds(String name, String rdsResourceName) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsResourceName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build()))) + .build(); + } + + @Override + protected Message buildRouteConfiguration(String name, List virtualHostList) { + RouteConfiguration.Builder builder = RouteConfiguration.newBuilder(); + builder.setName(name); + for (Message virtualHost : virtualHostList) { + builder.addVirtualHosts((VirtualHost) virtualHost); + } + return builder.build(); + } + + @Override + protected List buildOpaqueVirtualHosts(int num) { + List virtualHosts = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName(num + ": do not care") + .addDomains("do not care") + .addRoutes( + Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster("do not care")) + .setMatch(io.envoyproxy.envoy.api.v2.route.RouteMatch.newBuilder() + .setPrefix("do not care"))) + .build(); + virtualHosts.add(virtualHost); + } + return virtualHosts; + } + + @Override + protected Message buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs, @Nullable Message upstreamTlsContext, + @Nullable Message circuitBreakers) { + Cluster.Builder builder = Cluster.newBuilder(); + builder.setName(clusterName); + builder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + builder.setEdsClusterConfig(edsClusterConfigBuilder); + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + builder.setLrsServer( + ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())); + } + if (upstreamTlsContext != null) { + builder.setTransportSocket( + TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(upstreamTlsContext))); + } + if (circuitBreakers != null) { + builder.setCircuitBreakers((CircuitBreakers) circuitBreakers); + } + return builder.build(); + } + + @Override + protected Message buildUpstreamTlsContext(String secretName, String targetUri) { + GrpcService grpcService = + GrpcService.newBuilder() + .setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri)) + .build(); + ConfigSource sdsConfig = + ConfigSource.newBuilder() + .setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService)) + .build(); + SdsSecretConfig validationContextSdsSecretConfig = + SdsSecretConfig.newBuilder() + .setName(secretName) + .setSdsConfig(sdsConfig) + .build(); + return UpstreamTlsContext.newBuilder() + .setCommonTlsContext( + CommonTlsContext.newBuilder() + .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) + .build(); + } + + @Override + protected Message buildCircuitBreakers(int highPriorityMaxRequests, + int defaultPriorityMaxRequests) { + return CircuitBreakers.newBuilder() + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.HIGH) + .setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests))) + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests))) + .build(); + } + + @Override + protected Message buildClusterLoadAssignment(String cluster, + List localityLbEndpointsList, List dropOverloadList) { + ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder(); + builder.setClusterName(cluster); + for (Message localityLbEndpoints : localityLbEndpointsList) { + builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints); + } + Policy.Builder policyBuilder = Policy.newBuilder(); + for (Message dropOverload : dropOverloadList) { + policyBuilder.addDropOverloads((DropOverload) dropOverload); + } + builder.setPolicy(policyBuilder); + return builder.build(); + } + + @Override + protected Message buildLocalityLbEndpoints(String region, String zone, String subZone, + List lbEndpointList, int loadBalancingWeight, int priority) { + LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder(); + builder.setLocality( + Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone)); + for (Message lbEndpoint : lbEndpointList) { + builder.addLbEndpoints((LbEndpoint) lbEndpoint); + } + builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight)); + builder.setPriority(priority); + return builder.build(); + } + + @Override + protected Message buildLbEndpoint(String address, int port, String healthStatus, + int lbWeight) { + HealthStatus status; + switch (healthStatus) { + case "unknown": + status = HealthStatus.UNKNOWN; + break; + case "healthy": + status = HealthStatus.HEALTHY; + break; + case "unhealthy": + status = HealthStatus.UNHEALTHY; + break; + case "draining": + status = HealthStatus.DRAINING; + break; + case "timeout": + status = HealthStatus.TIMEOUT; + break; + case "degraded": + status = HealthStatus.DEGRADED; + break; + default: + status = HealthStatus.UNRECOGNIZED; + } + return LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress( + Address.newBuilder().setSocketAddress( + SocketAddress.newBuilder().setAddress(address).setPortValue(port)))) + .setHealthStatus(status) + .setLoadBalancingWeight(UInt32Value.of(lbWeight)) + .build(); + } + + @Override + protected Message buildDropOverload(String category, int dropPerMillion) { + return DropOverload.newBuilder() + .setCategory(category) + .setDropPercentage( + FractionalPercent.newBuilder() + .setNumerator(dropPerMillion) + .setDenominator(DenominatorType.MILLION)) + .build(); + } + } + + /** + * Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl, + * response nonce and collection of resource names regardless of order. + */ + private static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final Node node; + private final String versionInfo; + private final String typeUrl; + private final Set resources; + private final String responseNonce; + + private DiscoveryRequestMatcher(Node node, String versionInfo, List resources, + String typeUrl, String responseNonce) { + this.node = node; + this.versionInfo = versionInfo; + this.resources = new HashSet<>(resources); + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) { + return false; + } + return node.equals(argument.getNode()); + } + } + + /** + * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with + * the same list of clusterName:clusterServiceName pair. + */ + private static class LrsRequestMatcher implements ArgumentMatcher { + private final List expected; + + private LrsRequestMatcher(List clusterNames) { + expected = new ArrayList<>(); + for (String[] pair : clusterNames) { + expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1])); + } + Collections.sort(expected); + } + + @Override + public boolean matches(LoadStatsRequest argument) { + List actual = new ArrayList<>(); + for (ClusterStats clusterStats : argument.getClusterStatsList()) { + actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName()); + } + Collections.sort(actual); + return actual.equals(expected); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java new file mode 100644 index 00000000000..4ebf1a05ccf --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java @@ -0,0 +1,505 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.GrpcService; +import io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; +import io.envoyproxy.envoy.config.core.v3.SelfConfigSource; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.envoyproxy.envoy.type.v3.FractionalPercent; +import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.AbstractXdsClient.ResourceType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +/** + * Tests for {@link ClientXdsClient} with protocol version v3. + */ +@RunWith(JUnit4.class) +public class ClientXdsClientV3Test extends ClientXdsClientTestBase { + + @Override + protected BindableService createAdsService() { + return new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended + adsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + DiscoveryRpcCall call = new DiscoveryRpcCallV3(requestObserver, responseObserver); + resourceDiscoveryCalls.offer(call); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + adsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + return requestObserver; + } + }; + } + + @Override + protected BindableService createLrsService() { + return new LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + LrsRpcCall call = new LrsRpcCallV3(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + } + + @Override + protected MessageFactory createMessageFactory() { + return new MessageFactoryV3(); + } + + @Override + protected boolean useProtocolV3() { + return true; + } + + private static class DiscoveryRpcCallV3 extends DiscoveryRpcCall { + StreamObserver requestObserver; + StreamObserver responseObserver; + + private DiscoveryRpcCallV3(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + } + + @Override + protected void verifyRequest(EnvoyProtoData.Node node, String versionInfo, + List resources, ResourceType type, String nonce) { + verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( + node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce))); + } + + @Override + protected void verifyNoMoreRequest() { + verifyNoMoreInteractions(requestObserver); + } + + @Override + protected void sendResponse(String versionInfo, List resources, ResourceType type, + String nonce) { + DiscoveryResponse response = + DiscoveryResponse.newBuilder() + .setVersionInfo(versionInfo) + .addAllResources(resources) + .setTypeUrl(type.typeUrl()) + .setNonce(nonce) + .build(); + responseObserver.onNext(response); + } + + @Override + protected void sendError(Throwable t) { + responseObserver.onError(t); + } + + @Override + protected void sendCompleted() { + responseObserver.onCompleted(); + } + } + + private static class LrsRpcCallV3 extends LrsRpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + private final InOrder inOrder; + + private LrsRpcCallV3(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + inOrder = inOrder(requestObserver); + } + + @Override + protected void verifyNextReportClusters(List clusters) { + inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters))); + } + + @Override + protected void sendResponse(List clusters, long loadReportIntervalNano) { + LoadStatsResponse response = + LoadStatsResponse.newBuilder() + .addAllClusters(clusters) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano)) + .build(); + responseObserver.onNext(response); + } + } + + private static class MessageFactoryV3 extends MessageFactory { + + @Override + protected Message buildListener(String name, Message routeConfiguration) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig((RouteConfiguration) routeConfiguration).build()))) + .build(); + } + + @Override + protected Message buildListenerForRds(String name, String rdsResourceName) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsResourceName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build()))) + .build(); + } + + @Override + protected Message buildRouteConfiguration(String name, List virtualHostList) { + RouteConfiguration.Builder builder = RouteConfiguration.newBuilder(); + builder.setName(name); + for (Message virtualHost : virtualHostList) { + builder.addVirtualHosts((VirtualHost) virtualHost); + } + return builder.build(); + } + + @Override + protected List buildOpaqueVirtualHosts(int num) { + List virtualHosts = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName(num + ": do not care") + .addDomains("do not care") + .addRoutes( + Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster("do not care")) + .setMatch(RouteMatch.newBuilder().setPrefix("do not care"))) + .build(); + virtualHosts.add(virtualHost); + } + return virtualHosts; + } + + @Override + protected Message buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs, @Nullable Message upstreamTlsContext, + @Nullable Message circuitBreakers) { + Cluster.Builder builder = Cluster.newBuilder(); + builder.setName(clusterName); + builder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + builder.setEdsClusterConfig(edsClusterConfigBuilder); + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + builder.setLrsServer( + ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())); + } + if (upstreamTlsContext != null) { + builder.setTransportSocket( + TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(upstreamTlsContext))); + } + if (circuitBreakers != null) { + builder.setCircuitBreakers((CircuitBreakers) circuitBreakers); + } + return builder.build(); + } + + @Override + protected Message buildUpstreamTlsContext(String secretName, String targetUri) { + GrpcService grpcService = + GrpcService.newBuilder() + .setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri)) + .build(); + ConfigSource sdsConfig = + ConfigSource.newBuilder() + .setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService)) + .build(); + SdsSecretConfig validationContextSdsSecretConfig = + SdsSecretConfig.newBuilder() + .setName(secretName) + .setSdsConfig(sdsConfig) + .build(); + return UpstreamTlsContext.newBuilder() + .setCommonTlsContext( + CommonTlsContext.newBuilder() + .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) + .build(); + } + + @Override + protected Message buildCircuitBreakers(int highPriorityMaxRequests, + int defaultPriorityMaxRequests) { + return CircuitBreakers.newBuilder() + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.HIGH) + .setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests))) + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests))) + .build(); + } + + @Override + protected Message buildClusterLoadAssignment(String cluster, + List localityLbEndpointsList, List dropOverloadList) { + ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder(); + builder.setClusterName(cluster); + for (Message localityLbEndpoints : localityLbEndpointsList) { + builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints); + } + Policy.Builder policyBuilder = Policy.newBuilder(); + for (Message dropOverload : dropOverloadList) { + policyBuilder.addDropOverloads((DropOverload) dropOverload); + } + builder.setPolicy(policyBuilder); + return builder.build(); + } + + @Override + protected Message buildLocalityLbEndpoints(String region, String zone, String subZone, + List lbEndpointList, int loadBalancingWeight, int priority) { + LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder(); + builder.setLocality( + Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone)); + for (Message lbEndpoint : lbEndpointList) { + builder.addLbEndpoints((LbEndpoint) lbEndpoint); + } + builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight)); + builder.setPriority(priority); + return builder.build(); + } + + @Override + protected Message buildLbEndpoint(String address, int port, String healthStatus, + int lbWeight) { + HealthStatus status; + switch (healthStatus) { + case "unknown": + status = HealthStatus.UNKNOWN; + break; + case "healthy": + status = HealthStatus.HEALTHY; + break; + case "unhealthy": + status = HealthStatus.UNHEALTHY; + break; + case "draining": + status = HealthStatus.DRAINING; + break; + case "timeout": + status = HealthStatus.TIMEOUT; + break; + case "degraded": + status = HealthStatus.DEGRADED; + break; + default: + status = HealthStatus.UNRECOGNIZED; + } + return LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress( + Address.newBuilder().setSocketAddress( + SocketAddress.newBuilder().setAddress(address).setPortValue(port)))) + .setHealthStatus(status) + .setLoadBalancingWeight(UInt32Value.of(lbWeight)) + .build(); + } + + @Override + protected Message buildDropOverload(String category, int dropPerMillion) { + return DropOverload.newBuilder() + .setCategory(category) + .setDropPercentage( + FractionalPercent.newBuilder() + .setNumerator(dropPerMillion) + .setDenominator(DenominatorType.MILLION)) + .build(); + } + } + + /** + * Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl, + * response nonce and collection of resource names regardless of order. + */ + private static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final Node node; + private final String versionInfo; + private final String typeUrl; + private final Set resources; + private final String responseNonce; + + private DiscoveryRequestMatcher(Node node, String versionInfo, List resources, + String typeUrl, String responseNonce) { + this.node = node; + this.versionInfo = versionInfo; + this.resources = new HashSet<>(resources); + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) { + return false; + } + return node.equals(argument.getNode()); + } + } + + /** + * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with + * the same list of clusterName:clusterServiceName pair. + */ + private static class LrsRequestMatcher implements ArgumentMatcher { + private final List expected; + + private LrsRequestMatcher(List clusterNames) { + expected = new ArrayList<>(); + for (String[] pair : clusterNames) { + expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1])); + } + Collections.sort(expected); + } + + @Override + public boolean matches(LoadStatsRequest argument) { + List actual = new ArrayList<>(); + for (ClusterStats clusterStats : argument.getClusterStatsList()) { + actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName()); + } + Collections.sort(actual); + return actual.equals(expected); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java index 2167a842beb..da9d549a129 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java @@ -25,10 +25,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; -import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; import io.envoyproxy.envoy.config.core.v3.ConfigSource; -import io.envoyproxy.envoy.config.core.v3.GrpcService; -import io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc; import io.envoyproxy.envoy.config.core.v3.HealthStatus; import io.envoyproxy.envoy.config.core.v3.Locality; import io.envoyproxy.envoy.config.core.v3.SelfConfigSource; @@ -48,21 +45,19 @@ import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.config.route.v3.RouteMatch; import io.envoyproxy.envoy.config.route.v3.VirtualHost; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.grpc.xds.EnvoyProtoData.Node; -import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; /** * Helper methods for building protobuf messages with custom data for xDS protocols. */ +// TODO(chengyuanzhang, sanjaypujare): delete this class, should not dump everything here. class XdsClientTestHelper { static DiscoveryResponse buildDiscoveryResponse(String versionInfo, List resources, String typeUrl, String nonce) { @@ -86,11 +81,6 @@ static io.envoyproxy.envoy.api.v2.DiscoveryResponse buildDiscoveryResponseV2(Str .build(); } - static DiscoveryRequest buildDiscoveryRequest(Node node, String versionInfo, - String resourceName, String typeUrl, String nonce) { - return buildDiscoveryRequest(node, versionInfo, ImmutableList.of(resourceName), typeUrl, nonce); - } - static DiscoveryRequest buildDiscoveryRequest(Node node, String versionInfo, List resourceNames, String typeUrl, String nonce) { return @@ -161,23 +151,6 @@ static io.envoyproxy.envoy.api.v2.RouteConfiguration buildRouteConfigurationV2(S .build(); } - static List buildVirtualHosts(int num) { - List virtualHosts = new ArrayList<>(num); - for (int i = 0; i < num; i++) { - VirtualHost virtualHost = - VirtualHost.newBuilder() - .setName(num + ": do not care") - .addDomains("do not care") - .addRoutes( - Route.newBuilder() - .setRoute(RouteAction.newBuilder().setCluster("do not care")) - .setMatch(RouteMatch.newBuilder().setPrefix("do not care"))) - .build(); - virtualHosts.add(virtualHost); - } - return virtualHosts; - } - static VirtualHost buildVirtualHost(List domains, String clusterName) { return VirtualHost.newBuilder() .setName("virtualhost00.googleapis.com") // don't care @@ -283,22 +256,6 @@ static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName, .build(); } - @SuppressWarnings("deprecation") // disableOverprovisioning is deprecated by needed for v2 - static io.envoyproxy.envoy.api.v2.ClusterLoadAssignment buildClusterLoadAssignmentV2( - String clusterName, - List localityLbEndpoints, - List dropOverloads) { - return - io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.newBuilder() - .setClusterName(clusterName) - .addAllEndpoints(localityLbEndpoints) - .setPolicy( - io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.newBuilder() - .setDisableOverprovisioning(true) - .addAllDropOverloads(dropOverloads)) - .build(); - } - static DropOverload buildDropOverload(String category, int dropPerMillion) { return DropOverload.newBuilder() @@ -384,49 +341,4 @@ static io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint buildLbEndpointV2( .setLoadBalancingWeight(UInt32Value.of(loadbalancingWeight)) .build(); } - - static UpstreamTlsContext buildUpstreamTlsContext(String secretName, String targetUri) { - GrpcService grpcService = - GrpcService.newBuilder() - .setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri)) - .build(); - ConfigSource sdsConfig = - ConfigSource.newBuilder() - .setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService)) - .build(); - SdsSecretConfig validationContextSdsSecretConfig = - SdsSecretConfig.newBuilder() - .setName(secretName) - .setSdsConfig(sdsConfig) - .build(); - return UpstreamTlsContext.newBuilder() - .setCommonTlsContext( - CommonTlsContext.newBuilder() - .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) - .build(); - } - - static io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext buildUpstreamTlsContextV2( - String secretName, String targetUri) { - io.envoyproxy.envoy.api.v2.core.GrpcService grpcService = - io.envoyproxy.envoy.api.v2.core.GrpcService.newBuilder() - .setGoogleGrpc(io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc.newBuilder() - .setTargetUri(targetUri)) - .build(); - io.envoyproxy.envoy.api.v2.core.ConfigSource sdsConfig = - io.envoyproxy.envoy.api.v2.core.ConfigSource.newBuilder() - .setApiConfigSource(io.envoyproxy.envoy.api.v2.core.ApiConfigSource.newBuilder() - .addGrpcServices(grpcService)) - .build(); - io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig validationContextSdsSecretConfig = - io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig.newBuilder() - .setName(secretName) - .setSdsConfig(sdsConfig) - .build(); - return io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext.newBuilder() - .setCommonTlsContext( - io.envoyproxy.envoy.api.v2.auth.CommonTlsContext.newBuilder() - .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) - .build(); - } }