diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java similarity index 52% rename from xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java rename to xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 6804f9bf39f..63e0995809d 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -17,20 +17,6 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsClientTestHelper.buildCluster; -import static io.grpc.xds.XdsClientTestHelper.buildClusterLoadAssignment; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryRequest; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse; -import static io.grpc.xds.XdsClientTestHelper.buildDropOverload; -import static io.grpc.xds.XdsClientTestHelper.buildLbEndpoint; -import static io.grpc.xds.XdsClientTestHelper.buildListener; -import static io.grpc.xds.XdsClientTestHelper.buildLocalityLbEndpoints; -import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration; -import static io.grpc.xds.XdsClientTestHelper.buildSecureCluster; -import static io.grpc.xds.XdsClientTestHelper.buildUpstreamTlsContext; -import static io.grpc.xds.XdsClientTestHelper.buildVirtualHosts; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -40,32 +26,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; -import com.google.protobuf.UInt32Value; -import com.google.protobuf.util.Durations; -import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; -import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; -import io.envoyproxy.envoy.config.core.v3.ConfigSource; -import io.envoyproxy.envoy.config.core.v3.HealthStatus; -import io.envoyproxy.envoy.config.core.v3.RoutingPriority; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; -import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import com.google.protobuf.Message; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; -import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; -import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; -import io.grpc.Context; -import io.grpc.Context.CancellationListener; +import io.grpc.BindableService; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.Status.Code; @@ -75,7 +39,6 @@ import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.FakeClock.TaskFilter; -import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.AbstractXdsClient.ResourceType; import io.grpc.xds.EnvoyProtoData.DropOverload; @@ -97,12 +60,11 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Queue; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -110,7 +72,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -121,7 +82,7 @@ * Tests for {@link ClientXdsClient}. */ @RunWith(JUnit4.class) -public class ClientXdsClientTest { +public abstract class ClientXdsClientTestBase { private static final String LDS_RESOURCE = "listener.googleapis.com"; private static final String RDS_RESOURCE = "route-configuration.googleapis.com"; private static final String CDS_RESOURCE = "cluster.googleapis.com"; @@ -171,12 +132,11 @@ public boolean shouldAccept(Runnable command) { public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private final FakeClock fakeClock = new FakeClock(); - private final Queue> resourceDiscoveryCalls = - new ArrayDeque<>(); - private final Queue> loadReportCalls = - new ArrayDeque<>(); - private final AtomicBoolean adsEnded = new AtomicBoolean(true); - private final AtomicBoolean lrsEnded = new AtomicBoolean(true); + protected final Queue resourceDiscoveryCalls = new ArrayDeque<>(); + protected final Queue loadReportCalls = new ArrayDeque<>(); + protected final AtomicBoolean adsEnded = new AtomicBoolean(true); + protected final AtomicBoolean lrsEnded = new AtomicBoolean(true); + private final MessageFactory mf = createMessageFactory(); @Captor private ArgumentCaptor ldsUpdateCaptor; @@ -214,55 +174,11 @@ public void setUp() throws IOException { when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); final String serverName = InProcessServerBuilder.generateName(); - AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() { - @Override - public StreamObserver streamAggregatedResources( - final StreamObserver responseObserver) { - assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended - adsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - resourceDiscoveryCalls.offer(call); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - adsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - return requestObserver; - } - }; - - LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() { - @Override - public StreamObserver streamLoadStats( - StreamObserver responseObserver) { - assertThat(lrsEnded.get()).isTrue(); - lrsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - lrsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - loadReportCalls.offer(call); - return requestObserver; - } - }; - cleanupRule.register( InProcessServerBuilder .forName(serverName) - .addService(adsServiceImpl) - .addService(lrsServiceImpl) + .addService(createAdsService()) + .addService(createLrsService()) .directExecutor() .build() .start()); @@ -271,7 +187,7 @@ public void cancelled(Context context) { xdsClient = new ClientXdsClient( - new XdsChannel(channel, /* useProtocolV3= */ true), + new XdsChannel(channel, useProtocolV3()), EnvoyProtoData.Node.newBuilder().build(), fakeClock.getScheduledExecutorService(), backoffPolicyProvider, @@ -290,24 +206,27 @@ public void tearDown() { assertThat(fakeClock.getPendingTasks()).isEmpty(); } + protected abstract boolean useProtocolV3(); + + protected abstract BindableService createAdsService(); + + protected abstract BindableService createLrsService(); + + protected abstract MessageFactory createMessageFactory(); + @Test public void ldsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener("bar.googleapis.com", - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig( - buildRouteConfiguration("route-bar.googleapis.com", buildVirtualHosts(1))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener("bar.googleapis.com", + mf.buildRouteConfiguration("route-bar.googleapis.com", + mf.buildOpaqueVirtualHosts(1))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verifyNoInteractions(ldsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -317,21 +236,16 @@ public void ldsResourceNotFound() { @Test public void ldsResourceFound_containsVirtualHosts() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -339,26 +253,15 @@ public void ldsResourceFound_containsVirtualHosts() { @Test public void ldsResourceFound_containsRdsName() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -366,114 +269,80 @@ public void ldsResourceFound_containsRdsName() { @Test public void cachedLdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); LdsResourceWatcher watcher = mock(LdsResourceWatcher.class); xdsClient.watchLdsResource(LDS_RESOURCE, watcher); verify(watcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedLdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); LdsResourceWatcher watcher = mock(LdsResourceWatcher.class); xdsClient.watchLdsResource(LDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(LDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void ldsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - response = - buildDiscoveryResponse("1", listeners, ResourceType.LDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("1", listeners, ResourceType.LDS, "0001"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0001"); verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); } @Test public void ldsResourceDeleted() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); - response = buildDiscoveryResponse("1", Collections.emptyList(), - ResourceType.LDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + call.sendResponse("1", Collections.emptyList(), ResourceType.LDS, "0001"); // Client sends an ACK LDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "0001"); verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); } @@ -485,10 +354,8 @@ public void multipleLdsWatchers() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchLdsResource(ldsResource, watcher1); xdsClient.watchLdsResource(ldsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(LDS_RESOURCE, ldsResource), - ResourceType.LDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(LDS_RESOURCE, ldsResource), ResourceType.LDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); @@ -496,19 +363,11 @@ public void multipleLdsWatchers() { verify(watcher2).onResourceDoesNotExist(ldsResource); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build()))), - Any.pack(buildListener(ldsResource, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(4))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2)))), + Any.pack(mf.buildListener(ldsResource, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(4))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); verify(watcher1).onChanged(ldsUpdateCaptor.capture()); @@ -519,17 +378,16 @@ public void multipleLdsWatchers() { @Test public void rdsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); List routeConfigs = ImmutableList.of( - Any.pack(buildRouteConfiguration("route-bar.googleapis.com", buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildRouteConfiguration("route-bar.googleapis.com", + mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); verifyNoInteractions(rdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -539,17 +397,15 @@ public void rdsResourceNotFound() { @Test public void rdsResourceFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -557,62 +413,56 @@ public void rdsResourceFound() { @Test public void cachedRdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); RdsResourceWatcher watcher = mock(RdsResourceWatcher.class); xdsClient.watchRdsResource(RDS_RESOURCE, watcher); verify(watcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedRdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); RdsResourceWatcher watcher = mock(RdsResourceWatcher.class); xdsClient.watchRdsResource(RDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(RDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void rdsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); - routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(4)))); - response = - buildDiscoveryResponse("1", routeConfigs, ResourceType.RDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(4)))); + call.sendResponse("1", routeConfigs, ResourceType.RDS, "0001"); // Client sends an ACK RDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "0001"); verify(rdsResourceWatcher, times(2)).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); } @@ -621,39 +471,23 @@ public void rdsResourceUpdated() { public void rdsResourceDeletedByLds() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRds( - Rds.newBuilder() - .setRouteConfigName(RDS_RESOURCE) - .setConfigSource( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance()))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListenerForRds(LDS_RESOURCE, RDS_RESOURCE))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(5))) - .build())))); - response = buildDiscoveryResponse("1", listeners, ResourceType.LDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(5))))); + call.sendResponse("1", listeners, ResourceType.LDS, "0001"); verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(5); verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); @@ -667,31 +501,25 @@ public void multipleRdsWatchers() { xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchRdsResource(rdsResource, watcher1); xdsClient.watchRdsResource(rdsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(RDS_RESOURCE, rdsResource), - ResourceType.RDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(RDS_RESOURCE, rdsResource), ResourceType.RDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); verify(watcher1).onResourceDoesNotExist(rdsResource); verify(watcher2).onResourceDoesNotExist(rdsResource); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - DiscoveryResponse response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); verifyNoMoreInteractions(watcher1, watcher2); - routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(rdsResource, buildVirtualHosts(4)))); - response = - buildDiscoveryResponse("2", routeConfigs, ResourceType.RDS.typeUrl(), "0002"); - call.responseObserver.onNext(response); + routeConfigs = ImmutableList.of(Any.pack( + mf.buildRouteConfiguration(rdsResource, mf.buildOpaqueVirtualHosts(4)))); + call.sendResponse("2", routeConfigs, ResourceType.RDS, "0002"); verify(watcher1).onChanged(rdsUpdateCaptor.capture()); assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); @@ -702,19 +530,17 @@ public void multipleRdsWatchers() { @Test public void cdsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), - Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildCluster("cluster-bar.googleapis.com", null, false, null, null)), + Any.pack(mf.buildCluster("cluster-baz.googleapis.com", null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verifyNoInteractions(cdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -724,16 +550,15 @@ public void cdsResourceNotFound() { @Test public void cdsResourceFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -746,27 +571,16 @@ public void cdsResourceFound() { @Test public void cdsResponseWithCircuitBreakers() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - Cluster cluster = buildCluster(CDS_RESOURCE, null, false); - cluster = cluster.toBuilder() - .setCircuitBreakers( - CircuitBreakers.newBuilder() - .addThresholds( - Thresholds.newBuilder() - .setPriority(RoutingPriority.HIGH) - .setMaxRequests(UInt32Value.newBuilder().setValue(50))) - .addThresholds( - Thresholds.newBuilder() - .setPriority(RoutingPriority.DEFAULT) - .setMaxRequests(UInt32Value.newBuilder().setValue(200)))) - .build(); - DiscoveryResponse response = buildDiscoveryResponse("0", - Collections.singletonList(Any.pack(cluster)), ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); - - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, + mf.buildCircuitBreakers(50, 200)))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); + + // Client sent an ACK CDS request. + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -781,24 +595,20 @@ public void cdsResponseWithCircuitBreakers() { */ @Test public void cdsResponseWithUpstreamTlsContext() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); // Management server sends back CDS response with UpstreamTlsContext. - UpstreamTlsContext testUpstreamTlsContext = - buildUpstreamTlsContext("secret1", "unix:/var/uds2"); List clusters = ImmutableList.of( - Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), - Any.pack(buildSecureCluster(CDS_RESOURCE, - "eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)), - Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildCluster("cluster-bar.googleapis.com", null, false, null, null)), + Any.pack(mf.buildCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", true, + mf.buildUpstreamTlsContext("secret1", "unix:/var/uds2"), null)), + Any.pack(mf.buildCluster("cluster-baz.googleapis.com", null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate @@ -819,16 +629,15 @@ public void cdsResponseWithUpstreamTlsContext() { @Test public void cachedCdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); CdsResourceWatcher watcher = mock(CdsResourceWatcher.class); xdsClient.watchCdsResource(CDS_RESOURCE, watcher); @@ -838,33 +647,32 @@ public void cachedCdsResource_data() { assertThat(cdsUpdate.getEdsServiceName()).isNull(); assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.getLrsServerName()).isNull(); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedCdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); CdsResourceWatcher watcher = mock(CdsResourceWatcher.class); xdsClient.watchCdsResource(CDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(CDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cdsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -873,13 +681,13 @@ public void cdsResourceUpdated() { assertThat(cdsUpdate.getLrsServerName()).isNull(); String edsService = "eds-service-bar.googleapis.com"; - clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, edsService, true))); - response = buildDiscoveryResponse("1", clusters, ResourceType.CDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, edsService, true, null, null))); + call.sendResponse("1", clusters, ResourceType.CDS, "0001"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -890,16 +698,15 @@ public void cdsResourceUpdated() { @Test public void cdsResourceDeleted() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -907,13 +714,11 @@ public void cdsResourceDeleted() { assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.getLrsServerName()).isNull(); - response = buildDiscoveryResponse("1", Collections.emptyList(), - ResourceType.CDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + call.sendResponse("1", Collections.emptyList(), ResourceType.CDS, "0001"); // Client sends an ACK CDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "1", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0001"))); + call.verifyRequest(NODE, "1", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0001"); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); } @@ -925,10 +730,8 @@ public void multipleCdsWatchers() { xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchCdsResource(cdsResource, watcher1); xdsClient.watchCdsResource(cdsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(CDS_RESOURCE, cdsResource), - ResourceType.CDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(CDS_RESOURCE, cdsResource), ResourceType.CDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); @@ -937,11 +740,9 @@ public void multipleCdsWatchers() { String edsService = "eds-service-bar.googleapis.com"; List clusters = ImmutableList.of( - Any.pack(buildCluster(CDS_RESOURCE, null, false)), - Any.pack(buildCluster(cdsResource, edsService, true))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null)), + Any.pack(mf.buildCluster(cdsResource, edsService, true, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); @@ -964,25 +765,23 @@ public void multipleCdsWatchers() { @Test public void edsResourceNotFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment("cluster-bar.googleapis.com", + mf.buildClusterLoadAssignment("cluster-bar.googleapis.com", ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0)), - ImmutableList.of()))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + ImmutableList.of()))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); // Client sent an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); verifyNoInteractions(edsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -992,34 +791,32 @@ public void edsResourceNotFound() { @Test public void edsResourceFound() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); // Client sent an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); @@ -1040,34 +837,32 @@ public void edsResourceFound() { @Test public void cachedEdsResource_data() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); - // Client sends an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + // Client sent an ACK EDS request. + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); EdsResourceWatcher watcher = mock(EdsResourceWatcher.class); xdsClient.watchEdsResource(EDS_RESOURCE, watcher); @@ -1087,51 +882,49 @@ public void cachedEdsResource_data() { 2, true)), 1, 0), new Locality("region3", "zone3", "subzone3"), new LocalityLbEndpoints(ImmutableList.of(), 2, 1)); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void cachedEdsResource_absent() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); EdsResourceWatcher watcher = mock(EdsResourceWatcher.class); xdsClient.watchEdsResource(EDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(EDS_RESOURCE); - verifyNoMoreInteractions(call.requestObserver); + call.verifyNoMoreRequest(); } @Test public void edsResourceUpdated() { - RpcCall call = + DiscoveryRpcCall call = startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); // Client sent an ACK EDS request. - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + call.verifyRequest(NODE, "0", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, + "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); @@ -1151,16 +944,14 @@ public void edsResourceUpdated() { clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", ImmutableList.of( - buildLbEndpoint("172.44.2.2", 8000, HealthStatus.HEALTHY, 3)), + mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3)), 2, 0)), - ImmutableList.of()))); - response = - buildDiscoveryResponse("1", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + ImmutableList.of()))); + call.sendResponse("1", clusterLoadAssignments, ResourceType.EDS, "0001"); verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture()); edsUpdate = edsUpdateCaptor.getValue(); @@ -1178,12 +969,10 @@ public void edsResourceUpdated() { public void edsResourceDeletedByCds() { xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); - List clusters = - ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, EDS_RESOURCE, false))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + List clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, EDS_RESOURCE, false, null, null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); assertThat(cdsUpdateCaptor.getValue().getClusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdateCaptor.getValue().getEdsServiceName()).isEqualTo(EDS_RESOURCE); @@ -1191,26 +980,23 @@ public void edsResourceDeletedByCds() { List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0)), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); - clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); - response = - buildDiscoveryResponse("1", clusters, ResourceType.CDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + clusters = ImmutableList.of( + Any.pack(mf.buildCluster(CDS_RESOURCE, null, false, null, null))); + call.sendResponse("1", clusters, ResourceType.CDS, "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); assertThat(cdsUpdateCaptor.getValue().getClusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdateCaptor.getValue().getEdsServiceName()).isNull(); @@ -1225,10 +1011,8 @@ public void multipleEdsWatchers() { xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); xdsClient.watchEdsResource(edsResource, watcher1); xdsClient.watchEdsResource(edsResource, watcher2); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(EDS_RESOURCE, edsResource), - ResourceType.EDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Arrays.asList(EDS_RESOURCE, edsResource), ResourceType.EDS, ""); fakeClock.forwardTime(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); @@ -1238,25 +1022,23 @@ public void multipleEdsWatchers() { List clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(EDS_RESOURCE, + mf.buildClusterLoadAssignment(EDS_RESOURCE, ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", + mf.buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + mf.buildLbEndpoint("192.168.0.1", 8080, "healthy", 2)), 1, 0), - buildLocalityLbEndpoints("region3", "zone3", "subzone3", - ImmutableList.of(), + mf.buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), 2, 1), /* locality with 0 endpoint */ - buildLocalityLbEndpoints("region4", "zone4", "subzone4", + mf.buildLocalityLbEndpoints("region4", "zone4", "subzone4", ImmutableList.of( - buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + mf.buildLbEndpoint("192.168.142.5", 80, "unknown", 5)), 0, 2) /* locality with 0 weight */), ImmutableList.of( - buildDropOverload("lb", 200), - buildDropOverload("throttle", 1000))))); - DiscoveryResponse response = - buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + mf.buildDropOverload("lb", 200), + mf.buildDropOverload("throttle", 1000))))); + call.sendResponse("0", clusterLoadAssignments, ResourceType.EDS, "0000"); verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); @@ -1277,16 +1059,14 @@ public void multipleEdsWatchers() { clusterLoadAssignments = ImmutableList.of( Any.pack( - buildClusterLoadAssignment(edsResource, + mf.buildClusterLoadAssignment(edsResource, ImmutableList.of( - buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", ImmutableList.of( - buildLbEndpoint("172.44.2.2", 8000, HealthStatus.HEALTHY, 3)), + mf.buildLbEndpoint("172.44.2.2", 8000, "healthy", 3)), 2, 0)), - ImmutableList.of()))); - response = - buildDiscoveryResponse("1", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0001"); - call.responseObserver.onNext(response); + ImmutableList.of()))); + call.sendResponse("1", clusterLoadAssignments, ResourceType.EDS, "0001"); verify(watcher1).onChanged(edsUpdateCaptor.capture()); edsUpdate = edsUpdateCaptor.getValue(); @@ -1318,18 +1098,14 @@ public void streamClosedAndRetryWithBackoff() { xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); // Management server closes the RPC stream with an error. - call.responseObserver.onError(Status.UNKNOWN.asException()); + call.sendError(Status.UNKNOWN.asException()); verify(ldsResourceWatcher).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); verify(rdsResourceWatcher).onError(errorCaptor.capture()); @@ -1347,17 +1123,13 @@ public void streamClosedAndRetryWithBackoff() { assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); fakeClock.forwardNanos(10L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); // Management server becomes unreachable. - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); @@ -1374,36 +1146,25 @@ public void streamClosedAndRetryWithBackoff() { assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(100L); fakeClock.forwardNanos(100L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("63", listeners, ResourceType.LDS.typeUrl(), "3242"); - call.responseObserver.onNext(response); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "3242"))); - - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = - buildDiscoveryResponse("5", routeConfigs, ResourceType.RDS.typeUrl(), "6764"); - call.responseObserver.onNext(response); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "6764"))); - - call.responseObserver.onError(Status.DEADLINE_EXCEEDED.asException()); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("63", listeners, ResourceType.LDS, "3242"); + call.verifyRequest(NODE, "63", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, + "3242"); + + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("5", routeConfigs, ResourceType.RDS, "6764"); + call.verifyRequest(NODE, "5", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, + "6764"); + + call.sendError(Status.DEADLINE_EXCEEDED.asException()); verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture()); @@ -1417,17 +1178,13 @@ public void streamClosedAndRetryWithBackoff() { inOrder.verify(backoffPolicyProvider).get(); fakeClock.runDueTasks(); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "63", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "5", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); // Management server becomes unreachable again. - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture()); @@ -1444,14 +1201,10 @@ public void streamClosedAndRetryWithBackoff() { assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(20L); fakeClock.forwardNanos(20L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + call.verifyRequest(NODE, "63", Collections.singletonList(LDS_RESOURCE), ResourceType.LDS, ""); + call.verifyRequest(NODE, "5", Collections.singletonList(RDS_RESOURCE), ResourceType.RDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); inOrder.verifyNoMoreInteractions(); } @@ -1460,8 +1213,8 @@ public void streamClosedAndRetryWithBackoff() { public void streamClosedAndRetryRaceWithAddRemoveWatchers() { xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher).onError(errorCaptor.capture()); assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); verify(rdsResourceWatcher).onError(errorCaptor.capture()); @@ -1476,26 +1229,17 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); fakeClock.forwardNanos(10L); call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); - verifyNoMoreInteractions(call.requestObserver); + call.verifyRequest(NODE, "", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, ""); + call.verifyRequest(NODE, "", Collections.singletonList(EDS_RESOURCE), ResourceType.EDS, ""); + call.verifyNoMoreRequest(); List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher); } @@ -1506,7 +1250,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); - RpcCall call = resourceDiscoveryCalls.poll(); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); ScheduledTask ldsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask rdsResourceTimeout = @@ -1515,26 +1259,18 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe Iterables.getOnlyElement(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask edsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); - List listeners = ImmutableList.of( - Any.pack(buildListener(LDS_RESOURCE, - Any.pack( - HttpConnectionManager.newBuilder() - .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) - .build())))); - DiscoveryResponse response = - buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + Any.pack(mf.buildListener(LDS_RESOURCE, mf.buildRouteConfiguration("do not care", + mf.buildOpaqueVirtualHosts(2))))); + call.sendResponse("0", listeners, ResourceType.LDS, "0000"); assertThat(ldsResourceTimeout.isCancelled()).isTrue(); - List routeConfigs = - ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); - response = - buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); - call.responseObserver.onNext(response); + List routeConfigs = ImmutableList.of( + Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); + call.sendResponse("0", routeConfigs, ResourceType.RDS, "0000"); assertThat(rdsResourceTimeout.isCancelled()).isTrue(); - call.responseObserver.onError(Status.UNAVAILABLE.asException()); + call.sendError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); assertThat(edsResourceTimeout.isCancelled()).isTrue(); @@ -1552,32 +1288,21 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe public void reportLoadStatsToServer() { String clusterName = "cluster-foo.googleapis.com"; xdsClient.addClientStats(clusterName, null); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(null); - RpcCall lrsCall = loadReportCalls.poll(); - verify(lrsCall.requestObserver).onNext(requestCaptor.capture()); - assertThat(requestCaptor.getValue().getClusterStatsCount()) - .isEqualTo(0); // initial request - - lrsCall.responseObserver.onNext( - LoadStatsResponse.newBuilder() - .addClusters(clusterName) - .setLoadReportingInterval(Durations.fromNanos(1000L)) - .build()); + LrsRpcCall lrsCall = loadReportCalls.poll(); + lrsCall.verifyNextReportClusters(Collections.emptyList()); // initial LRS request + + lrsCall.sendResponse(Collections.singletonList(clusterName), 1000L); fakeClock.forwardNanos(1000L); - verify(lrsCall.requestObserver, times(2)).onNext(requestCaptor.capture()); - ClusterStats report = Iterables.getOnlyElement(requestCaptor.getValue().getClusterStatsList()); - assertThat(report.getClusterName()).isEqualTo(clusterName); + lrsCall.verifyNextReportClusters(Collections.singletonList(new String[] {clusterName, null})); xdsClient.removeClientStats(clusterName, null); fakeClock.forwardNanos(1000L); - verify(lrsCall.requestObserver, times(3)).onNext(requestCaptor.capture()); - assertThat(requestCaptor.getValue().getClusterStatsCount()) - .isEqualTo(0); // no more stats reported + lrsCall.verifyNextReportClusters(Collections.emptyList()); // no more stats reported // See more test on LoadReportClientTest.java } - private RpcCall startResourceWatcher( + private DiscoveryRpcCall startResourceWatcher( ResourceType type, String name, ResourceWatcher watcher) { FakeClock.TaskFilter timeoutTaskFilter; switch (type) { @@ -1601,9 +1326,8 @@ private RpcCall startResourceWatcher( default: throw new AssertionError("should never be here"); } - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequest(NODE, "", name, type.typeUrl(), ""))); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(NODE, "", Collections.singletonList(name), type, ""); ScheduledTask timeoutTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) @@ -1611,59 +1335,60 @@ private RpcCall startResourceWatcher( return call; } - /** - * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for - * management server debugging purposes. - * - *

In general, if you are sure error_details field should not be set in a DiscoveryRequest, - * compare with message equality. Otherwise, this matcher is handy for comparing other fields - * only. - */ - private static class DiscoveryRequestMatcher implements ArgumentMatcher { - private final Node node; - private final String versionInfo; - private final String typeUrl; - private final Set resourceNames; - private final String responseNonce; - - private DiscoveryRequestMatcher( - Node node, String versionInfo, List resourceNames, String typeUrl, - String responseNonce) { - this.node = node; - this.versionInfo = versionInfo; - this.resourceNames = new HashSet<>(resourceNames); - this.typeUrl = typeUrl; - this.responseNonce = responseNonce; - } + protected abstract static class DiscoveryRpcCall { - @Override - public boolean matches(DiscoveryRequest argument) { - if (!typeUrl.equals(argument.getTypeUrl())) { - return false; - } - if (!versionInfo.equals(argument.getVersionInfo())) { - return false; - } - if (!responseNonce.equals(argument.getResponseNonce())) { - return false; - } - if (!node.toEnvoyProtoNode().equals(argument.getNode())) { - return false; - } - if (!resourceNames.equals(new HashSet<>(argument.getResourceNamesList()))) { - return false; - } - return argument.getNode().equals(NODE.toEnvoyProtoNode()); - } + protected abstract void verifyRequest(Node node, String versionInfo, List resources, + ResourceType type, String nonce); + + protected abstract void verifyNoMoreRequest(); + + protected abstract void sendResponse(String versionInfo, List resources, + ResourceType type, String nonce); + + protected abstract void sendError(Throwable t); + + protected abstract void sendCompleted(); } - private static class RpcCall { - private final StreamObserver requestObserver; - private final StreamObserver responseObserver; + protected abstract static class LrsRpcCall { - RpcCall(StreamObserver requestObserver, StreamObserver responseObserver) { - this.requestObserver = requestObserver; - this.responseObserver = responseObserver; - } + /** + * Verifies a LRS request has been sent with ClusterStats of the given list of clusters. + */ + protected abstract void verifyNextReportClusters(List clusters); + + protected abstract void sendResponse(List clusters, long loadReportIntervalNano); + } + + protected abstract static class MessageFactory { + + protected abstract Message buildListener(String name, Message routeConfiguration); + + protected abstract Message buildListenerForRds(String name, String rdsResourceName); + + protected abstract Message buildRouteConfiguration(String name, + List virtualHostList); + + protected abstract List buildOpaqueVirtualHosts(int num); + + protected abstract Message buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs, @Nullable Message upstreamTlsContext, + @Nullable Message circuitBreakers); + + protected abstract Message buildUpstreamTlsContext(String secretName, String targetUri); + + protected abstract Message buildCircuitBreakers(int highPriorityMaxRequests, + int defaultPriorityMaxRequests); + + protected abstract Message buildClusterLoadAssignment(String cluster, + List localityLbEndpoints, List dropOverloads); + + protected abstract Message buildLocalityLbEndpoints(String region, String zone, String subZone, + List lbEndpointList, int loadBalancingWeight, int priority); + + protected abstract Message buildLbEndpoint(String address, int port, String healthStatus, + int lbWeight); + + protected abstract Message buildDropOverload(String category, int dropPerMillion); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java deleted file mode 100644 index 2495f24ab9e..00000000000 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestV2.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright 2020 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static io.grpc.xds.XdsClientTestHelper.buildClusterV2; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryRequestV2; -import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponseV2; -import static io.grpc.xds.XdsClientTestHelper.buildSecureClusterV2; -import static io.grpc.xds.XdsClientTestHelper.buildUpstreamTlsContextV2; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.protobuf.Any; -import io.envoyproxy.envoy.api.v2.DiscoveryRequest; -import io.envoyproxy.envoy.api.v2.DiscoveryResponse; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; -import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; -import io.grpc.Context; -import io.grpc.Context.CancellationListener; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.BackoffPolicy; -import io.grpc.internal.FakeClock; -import io.grpc.internal.FakeClock.ScheduledTask; -import io.grpc.internal.FakeClock.TaskFilter; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.grpc.xds.AbstractXdsClient.ResourceType; -import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.XdsClient.CdsResourceWatcher; -import io.grpc.xds.XdsClient.CdsUpdate; -import io.grpc.xds.XdsClient.EdsResourceWatcher; -import io.grpc.xds.XdsClient.LdsResourceWatcher; -import io.grpc.xds.XdsClient.RdsResourceWatcher; -import io.grpc.xds.XdsClient.ResourceWatcher; -import io.grpc.xds.XdsClient.XdsChannel; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link ClientXdsClient} for xDS v2. - */ -@RunWith(JUnit4.class) -public class ClientXdsClientTestV2 { - private static final String CDS_RESOURCE = "cluster.googleapis.com"; - private static final Node NODE = Node.newBuilder().build(); - - private static final TaskFilter LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.LDS.toString()); - } - }; - - private static final TaskFilter RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.RDS.toString()); - } - }; - - private static final TaskFilter CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.CDS.toString()); - } - }; - - private static final TaskFilter EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = - new TaskFilter() { - @Override - public boolean shouldAccept(Runnable command) { - return command.toString().contains(ResourceType.EDS.toString()); - } - }; - - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - - private final FakeClock fakeClock = new FakeClock(); - private final Queue> resourceDiscoveryCalls = - new ArrayDeque<>(); - private final Queue> loadReportCalls = - new ArrayDeque<>(); - private final AtomicBoolean adsEnded = new AtomicBoolean(true); - private final AtomicBoolean lrsEnded = new AtomicBoolean(true); - - @Captor - private ArgumentCaptor cdsUpdateCaptor; - @Mock - private BackoffPolicy.Provider backoffPolicyProvider; - @Mock - private BackoffPolicy backoffPolicy1; - @Mock - private BackoffPolicy backoffPolicy2; - @Mock - private CdsResourceWatcher cdsResourceWatcher; - - private ManagedChannel channel; - private ClientXdsClient xdsClient; - - @Before - public void setUp() throws IOException { - MockitoAnnotations.initMocks(this); - when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); - when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); - when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); - - final String serverName = InProcessServerBuilder.generateName(); - AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() { - @Override - public StreamObserver streamAggregatedResources( - final StreamObserver responseObserver) { - assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended - adsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - resourceDiscoveryCalls.offer(call); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - adsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - return requestObserver; - } - }; - - LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() { - @Override - public StreamObserver streamLoadStats( - StreamObserver responseObserver) { - assertThat(lrsEnded.get()).isTrue(); - lrsEnded.set(false); - @SuppressWarnings("unchecked") - StreamObserver requestObserver = mock(StreamObserver.class); - RpcCall call = - new RpcCall<>(requestObserver, responseObserver); - Context.current().addListener( - new CancellationListener() { - @Override - public void cancelled(Context context) { - lrsEnded.set(true); - } - }, MoreExecutors.directExecutor()); - loadReportCalls.offer(call); - return requestObserver; - } - }; - - cleanupRule.register( - InProcessServerBuilder - .forName(serverName) - .addService(adsServiceImpl) - .addService(lrsServiceImpl) - .directExecutor() - .build() - .start()); - channel = - cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); - - xdsClient = - new ClientXdsClient( - new XdsChannel(channel, /* useProtocolV3= */ false), - EnvoyProtoData.Node.newBuilder().build(), - fakeClock.getScheduledExecutorService(), - backoffPolicyProvider, - fakeClock.getStopwatchSupplier()); - - assertThat(resourceDiscoveryCalls).isEmpty(); - assertThat(loadReportCalls).isEmpty(); - } - - @After - public void tearDown() { - xdsClient.shutdown(); - assertThat(adsEnded.get()).isTrue(); - assertThat(lrsEnded.get()).isTrue(); - assertThat(channel.isShutdown()).isTrue(); - assertThat(fakeClock.getPendingTasks()).isEmpty(); - } - - /** - * CDS response containing UpstreamTlsContext for a cluster. - */ - @Test - public void cdsResponseV2WithUpstreamTlsContext() { - RpcCall call = - startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); - - // Management server sends back CDS response with UpstreamTlsContext. - io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext testUpstreamTlsContext = - buildUpstreamTlsContextV2("secret1", "unix:/var/uds2"); - List clusters = ImmutableList.of( - Any.pack(buildClusterV2("cluster-bar.googleapis.com", null, false)), - Any.pack(buildSecureClusterV2(CDS_RESOURCE, - "eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)), - Any.pack(buildClusterV2("cluster-baz.googleapis.com", null, false))); - DiscoveryResponse response = - buildDiscoveryResponseV2("0", clusters, AbstractXdsClient.ADS_TYPE_URL_CDS_V2, "0000"); - call.responseObserver.onNext(response); - - // Client sent an ACK CDS request. - verify(call.requestObserver) - .onNext(eq(buildDiscoveryRequestV2(NODE, "0", CDS_RESOURCE, - AbstractXdsClient.ADS_TYPE_URL_CDS_V2, "0000"))); - verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); - CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate - .getUpstreamTlsContext(); - SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() - .getValidationContextSdsSecretConfig(); - assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); - assertThat( - Iterables.getOnlyElement( - validationContextSdsSecretConfig - .getSdsConfig() - .getApiConfigSource() - .getGrpcServicesList()) - .getGoogleGrpc() - .getTargetUri()) - .isEqualTo("unix:/var/uds2"); - } - - private RpcCall startResourceWatcher( - ResourceType type, String name, ResourceWatcher watcher) { - TaskFilter timeoutTaskFilter; - switch (type) { - case LDS: - timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchLdsResource(name, (LdsResourceWatcher) watcher); - break; - case RDS: - timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchRdsResource(name, (RdsResourceWatcher) watcher); - break; - case CDS: - timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchCdsResource(name, (CdsResourceWatcher) watcher); - break; - case EDS: - timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchEdsResource(name, (EdsResourceWatcher) watcher); - break; - case UNKNOWN: - default: - throw new AssertionError("should never be here"); - } - RpcCall call = resourceDiscoveryCalls.poll(); - verify(call.requestObserver).onNext( - eq(buildDiscoveryRequestV2(NODE, "", name, type.typeUrlV2(), ""))); - ScheduledTask timeoutTask = - Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); - assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) - .isEqualTo(ClientXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); - return call; - } - - private static class RpcCall { - private final StreamObserver requestObserver; - private final StreamObserver responseObserver; - - RpcCall(StreamObserver requestObserver, StreamObserver responseObserver) { - this.requestObserver = requestObserver; - this.responseObserver = responseObserver; - } - } -} diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java new file mode 100644 index 00000000000..83f22f04741 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java @@ -0,0 +1,505 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload; +import io.envoyproxy.envoy.api.v2.DiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.Listener; +import io.envoyproxy.envoy.api.v2.RouteConfiguration; +import io.envoyproxy.envoy.api.v2.auth.CommonTlsContext; +import io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig; +import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext; +import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers; +import io.envoyproxy.envoy.api.v2.cluster.CircuitBreakers.Thresholds; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; +import io.envoyproxy.envoy.api.v2.core.ApiConfigSource; +import io.envoyproxy.envoy.api.v2.core.ConfigSource; +import io.envoyproxy.envoy.api.v2.core.GrpcService; +import io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc; +import io.envoyproxy.envoy.api.v2.core.HealthStatus; +import io.envoyproxy.envoy.api.v2.core.Locality; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.RoutingPriority; +import io.envoyproxy.envoy.api.v2.core.SelfConfigSource; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.api.v2.core.TransportSocket; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; +import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints; +import io.envoyproxy.envoy.api.v2.listener.FilterChain; +import io.envoyproxy.envoy.api.v2.route.Route; +import io.envoyproxy.envoy.api.v2.route.RouteAction; +import io.envoyproxy.envoy.api.v2.route.VirtualHost; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; +import io.envoyproxy.envoy.config.listener.v2.ApiListener; +import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse; +import io.envoyproxy.envoy.type.FractionalPercent; +import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.AbstractXdsClient.ResourceType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +/** + * Tests for {@link ClientXdsClient} with protocol version v2. + */ +@RunWith(JUnit4.class) +public class ClientXdsClientV2Test extends ClientXdsClientTestBase { + + @Override + protected BindableService createAdsService() { + return new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended + adsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + DiscoveryRpcCall call = new DiscoveryRpcCallV2(requestObserver, responseObserver); + resourceDiscoveryCalls.offer(call); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + adsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + return requestObserver; + } + }; + } + + @Override + protected BindableService createLrsService() { + return new LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + LrsRpcCall call = new LrsRpcCallV2(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + } + + @Override + protected MessageFactory createMessageFactory() { + return new MessageFactoryV2(); + } + + @Override + protected boolean useProtocolV3() { + return false; + } + + private static class DiscoveryRpcCallV2 extends DiscoveryRpcCall { + StreamObserver requestObserver; + StreamObserver responseObserver; + + private DiscoveryRpcCallV2(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + } + + @Override + protected void verifyRequest(EnvoyProtoData.Node node, String versionInfo, + List resources, ResourceType type, String nonce) { + verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( + node.toEnvoyProtoNodeV2(), versionInfo, resources, type.typeUrlV2(), nonce))); + } + + @Override + protected void verifyNoMoreRequest() { + verifyNoMoreInteractions(requestObserver); + } + + @Override + protected void sendResponse(String versionInfo, List resources, ResourceType type, + String nonce) { + DiscoveryResponse response = + DiscoveryResponse.newBuilder() + .setVersionInfo(versionInfo) + .addAllResources(resources) + .setTypeUrl(type.typeUrl()) + .setNonce(nonce) + .build(); + responseObserver.onNext(response); + } + + @Override + protected void sendError(Throwable t) { + responseObserver.onError(t); + } + + @Override + protected void sendCompleted() { + responseObserver.onCompleted(); + } + } + + private static class LrsRpcCallV2 extends LrsRpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + private final InOrder inOrder; + + private LrsRpcCallV2(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + inOrder = inOrder(requestObserver); + } + + @Override + protected void verifyNextReportClusters(List clusters) { + inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters))); + } + + @Override + protected void sendResponse(List clusters, long loadReportIntervalNano) { + LoadStatsResponse response = + LoadStatsResponse.newBuilder() + .addAllClusters(clusters) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano)) + .build(); + responseObserver.onNext(response); + } + } + + private static class MessageFactoryV2 extends MessageFactory { + + @Override + protected Message buildListener(String name, Message routeConfiguration) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig((RouteConfiguration) routeConfiguration).build()))) + .build(); + } + + @Override + protected Message buildListenerForRds(String name, String rdsResourceName) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsResourceName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build()))) + .build(); + } + + @Override + protected Message buildRouteConfiguration(String name, List virtualHostList) { + RouteConfiguration.Builder builder = RouteConfiguration.newBuilder(); + builder.setName(name); + for (Message virtualHost : virtualHostList) { + builder.addVirtualHosts((VirtualHost) virtualHost); + } + return builder.build(); + } + + @Override + protected List buildOpaqueVirtualHosts(int num) { + List virtualHosts = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName(num + ": do not care") + .addDomains("do not care") + .addRoutes( + Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster("do not care")) + .setMatch(io.envoyproxy.envoy.api.v2.route.RouteMatch.newBuilder() + .setPrefix("do not care"))) + .build(); + virtualHosts.add(virtualHost); + } + return virtualHosts; + } + + @Override + protected Message buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs, @Nullable Message upstreamTlsContext, + @Nullable Message circuitBreakers) { + Cluster.Builder builder = Cluster.newBuilder(); + builder.setName(clusterName); + builder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + builder.setEdsClusterConfig(edsClusterConfigBuilder); + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + builder.setLrsServer( + ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())); + } + if (upstreamTlsContext != null) { + builder.setTransportSocket( + TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(upstreamTlsContext))); + } + if (circuitBreakers != null) { + builder.setCircuitBreakers((CircuitBreakers) circuitBreakers); + } + return builder.build(); + } + + @Override + protected Message buildUpstreamTlsContext(String secretName, String targetUri) { + GrpcService grpcService = + GrpcService.newBuilder() + .setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri)) + .build(); + ConfigSource sdsConfig = + ConfigSource.newBuilder() + .setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService)) + .build(); + SdsSecretConfig validationContextSdsSecretConfig = + SdsSecretConfig.newBuilder() + .setName(secretName) + .setSdsConfig(sdsConfig) + .build(); + return UpstreamTlsContext.newBuilder() + .setCommonTlsContext( + CommonTlsContext.newBuilder() + .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) + .build(); + } + + @Override + protected Message buildCircuitBreakers(int highPriorityMaxRequests, + int defaultPriorityMaxRequests) { + return CircuitBreakers.newBuilder() + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.HIGH) + .setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests))) + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests))) + .build(); + } + + @Override + protected Message buildClusterLoadAssignment(String cluster, + List localityLbEndpointsList, List dropOverloadList) { + ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder(); + builder.setClusterName(cluster); + for (Message localityLbEndpoints : localityLbEndpointsList) { + builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints); + } + Policy.Builder policyBuilder = Policy.newBuilder(); + for (Message dropOverload : dropOverloadList) { + policyBuilder.addDropOverloads((DropOverload) dropOverload); + } + builder.setPolicy(policyBuilder); + return builder.build(); + } + + @Override + protected Message buildLocalityLbEndpoints(String region, String zone, String subZone, + List lbEndpointList, int loadBalancingWeight, int priority) { + LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder(); + builder.setLocality( + Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone)); + for (Message lbEndpoint : lbEndpointList) { + builder.addLbEndpoints((LbEndpoint) lbEndpoint); + } + builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight)); + builder.setPriority(priority); + return builder.build(); + } + + @Override + protected Message buildLbEndpoint(String address, int port, String healthStatus, + int lbWeight) { + HealthStatus status; + switch (healthStatus) { + case "unknown": + status = HealthStatus.UNKNOWN; + break; + case "healthy": + status = HealthStatus.HEALTHY; + break; + case "unhealthy": + status = HealthStatus.UNHEALTHY; + break; + case "draining": + status = HealthStatus.DRAINING; + break; + case "timeout": + status = HealthStatus.TIMEOUT; + break; + case "degraded": + status = HealthStatus.DEGRADED; + break; + default: + status = HealthStatus.UNRECOGNIZED; + } + return LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress( + Address.newBuilder().setSocketAddress( + SocketAddress.newBuilder().setAddress(address).setPortValue(port)))) + .setHealthStatus(status) + .setLoadBalancingWeight(UInt32Value.of(lbWeight)) + .build(); + } + + @Override + protected Message buildDropOverload(String category, int dropPerMillion) { + return DropOverload.newBuilder() + .setCategory(category) + .setDropPercentage( + FractionalPercent.newBuilder() + .setNumerator(dropPerMillion) + .setDenominator(DenominatorType.MILLION)) + .build(); + } + } + + /** + * Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl, + * response nonce and collection of resource names regardless of order. + */ + private static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final Node node; + private final String versionInfo; + private final String typeUrl; + private final Set resources; + private final String responseNonce; + + private DiscoveryRequestMatcher(Node node, String versionInfo, List resources, + String typeUrl, String responseNonce) { + this.node = node; + this.versionInfo = versionInfo; + this.resources = new HashSet<>(resources); + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) { + return false; + } + return node.equals(argument.getNode()); + } + } + + /** + * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with + * the same list of clusterName:clusterServiceName pair. + */ + private static class LrsRequestMatcher implements ArgumentMatcher { + private final List expected; + + private LrsRequestMatcher(List clusterNames) { + expected = new ArrayList<>(); + for (String[] pair : clusterNames) { + expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1])); + } + Collections.sort(expected); + } + + @Override + public boolean matches(LoadStatsRequest argument) { + List actual = new ArrayList<>(); + for (ClusterStats clusterStats : argument.getClusterStatsList()) { + actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName()); + } + Collections.sort(actual); + return actual.equals(expected); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java new file mode 100644 index 00000000000..4ebf1a05ccf --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java @@ -0,0 +1,505 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.GrpcService; +import io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; +import io.envoyproxy.envoy.config.core.v3.SelfConfigSource; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.envoyproxy.envoy.type.v3.FractionalPercent; +import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.AbstractXdsClient.ResourceType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +/** + * Tests for {@link ClientXdsClient} with protocol version v3. + */ +@RunWith(JUnit4.class) +public class ClientXdsClientV3Test extends ClientXdsClientTestBase { + + @Override + protected BindableService createAdsService() { + return new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended + adsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + DiscoveryRpcCall call = new DiscoveryRpcCallV3(requestObserver, responseObserver); + resourceDiscoveryCalls.offer(call); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + adsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + return requestObserver; + } + }; + } + + @Override + protected BindableService createLrsService() { + return new LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + LrsRpcCall call = new LrsRpcCallV3(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + } + + @Override + protected MessageFactory createMessageFactory() { + return new MessageFactoryV3(); + } + + @Override + protected boolean useProtocolV3() { + return true; + } + + private static class DiscoveryRpcCallV3 extends DiscoveryRpcCall { + StreamObserver requestObserver; + StreamObserver responseObserver; + + private DiscoveryRpcCallV3(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + } + + @Override + protected void verifyRequest(EnvoyProtoData.Node node, String versionInfo, + List resources, ResourceType type, String nonce) { + verify(requestObserver).onNext(argThat(new DiscoveryRequestMatcher( + node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce))); + } + + @Override + protected void verifyNoMoreRequest() { + verifyNoMoreInteractions(requestObserver); + } + + @Override + protected void sendResponse(String versionInfo, List resources, ResourceType type, + String nonce) { + DiscoveryResponse response = + DiscoveryResponse.newBuilder() + .setVersionInfo(versionInfo) + .addAllResources(resources) + .setTypeUrl(type.typeUrl()) + .setNonce(nonce) + .build(); + responseObserver.onNext(response); + } + + @Override + protected void sendError(Throwable t) { + responseObserver.onError(t); + } + + @Override + protected void sendCompleted() { + responseObserver.onCompleted(); + } + } + + private static class LrsRpcCallV3 extends LrsRpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + private final InOrder inOrder; + + private LrsRpcCallV3(StreamObserver requestObserver, + StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + inOrder = inOrder(requestObserver); + } + + @Override + protected void verifyNextReportClusters(List clusters) { + inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters))); + } + + @Override + protected void sendResponse(List clusters, long loadReportIntervalNano) { + LoadStatsResponse response = + LoadStatsResponse.newBuilder() + .addAllClusters(clusters) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano)) + .build(); + responseObserver.onNext(response); + } + } + + private static class MessageFactoryV3 extends MessageFactory { + + @Override + protected Message buildListener(String name, Message routeConfiguration) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig((RouteConfiguration) routeConfiguration).build()))) + .build(); + } + + @Override + protected Message buildListenerForRds(String name, String rdsResourceName) { + return Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener( + ApiListener.newBuilder().setApiListener(Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(rdsResourceName) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build()))) + .build(); + } + + @Override + protected Message buildRouteConfiguration(String name, List virtualHostList) { + RouteConfiguration.Builder builder = RouteConfiguration.newBuilder(); + builder.setName(name); + for (Message virtualHost : virtualHostList) { + builder.addVirtualHosts((VirtualHost) virtualHost); + } + return builder.build(); + } + + @Override + protected List buildOpaqueVirtualHosts(int num) { + List virtualHosts = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName(num + ": do not care") + .addDomains("do not care") + .addRoutes( + Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster("do not care")) + .setMatch(RouteMatch.newBuilder().setPrefix("do not care"))) + .build(); + virtualHosts.add(virtualHost); + } + return virtualHosts; + } + + @Override + protected Message buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs, @Nullable Message upstreamTlsContext, + @Nullable Message circuitBreakers) { + Cluster.Builder builder = Cluster.newBuilder(); + builder.setName(clusterName); + builder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + builder.setEdsClusterConfig(edsClusterConfigBuilder); + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + builder.setLrsServer( + ConfigSource.newBuilder() + .setSelf(SelfConfigSource.getDefaultInstance())); + } + if (upstreamTlsContext != null) { + builder.setTransportSocket( + TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(upstreamTlsContext))); + } + if (circuitBreakers != null) { + builder.setCircuitBreakers((CircuitBreakers) circuitBreakers); + } + return builder.build(); + } + + @Override + protected Message buildUpstreamTlsContext(String secretName, String targetUri) { + GrpcService grpcService = + GrpcService.newBuilder() + .setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri)) + .build(); + ConfigSource sdsConfig = + ConfigSource.newBuilder() + .setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService)) + .build(); + SdsSecretConfig validationContextSdsSecretConfig = + SdsSecretConfig.newBuilder() + .setName(secretName) + .setSdsConfig(sdsConfig) + .build(); + return UpstreamTlsContext.newBuilder() + .setCommonTlsContext( + CommonTlsContext.newBuilder() + .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) + .build(); + } + + @Override + protected Message buildCircuitBreakers(int highPriorityMaxRequests, + int defaultPriorityMaxRequests) { + return CircuitBreakers.newBuilder() + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.HIGH) + .setMaxRequests(UInt32Value.newBuilder().setValue(highPriorityMaxRequests))) + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(defaultPriorityMaxRequests))) + .build(); + } + + @Override + protected Message buildClusterLoadAssignment(String cluster, + List localityLbEndpointsList, List dropOverloadList) { + ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder(); + builder.setClusterName(cluster); + for (Message localityLbEndpoints : localityLbEndpointsList) { + builder.addEndpoints((LocalityLbEndpoints) localityLbEndpoints); + } + Policy.Builder policyBuilder = Policy.newBuilder(); + for (Message dropOverload : dropOverloadList) { + policyBuilder.addDropOverloads((DropOverload) dropOverload); + } + builder.setPolicy(policyBuilder); + return builder.build(); + } + + @Override + protected Message buildLocalityLbEndpoints(String region, String zone, String subZone, + List lbEndpointList, int loadBalancingWeight, int priority) { + LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder(); + builder.setLocality( + Locality.newBuilder().setRegion(region).setZone(zone).setSubZone(subZone)); + for (Message lbEndpoint : lbEndpointList) { + builder.addLbEndpoints((LbEndpoint) lbEndpoint); + } + builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight)); + builder.setPriority(priority); + return builder.build(); + } + + @Override + protected Message buildLbEndpoint(String address, int port, String healthStatus, + int lbWeight) { + HealthStatus status; + switch (healthStatus) { + case "unknown": + status = HealthStatus.UNKNOWN; + break; + case "healthy": + status = HealthStatus.HEALTHY; + break; + case "unhealthy": + status = HealthStatus.UNHEALTHY; + break; + case "draining": + status = HealthStatus.DRAINING; + break; + case "timeout": + status = HealthStatus.TIMEOUT; + break; + case "degraded": + status = HealthStatus.DEGRADED; + break; + default: + status = HealthStatus.UNRECOGNIZED; + } + return LbEndpoint.newBuilder() + .setEndpoint( + Endpoint.newBuilder().setAddress( + Address.newBuilder().setSocketAddress( + SocketAddress.newBuilder().setAddress(address).setPortValue(port)))) + .setHealthStatus(status) + .setLoadBalancingWeight(UInt32Value.of(lbWeight)) + .build(); + } + + @Override + protected Message buildDropOverload(String category, int dropPerMillion) { + return DropOverload.newBuilder() + .setCategory(category) + .setDropPercentage( + FractionalPercent.newBuilder() + .setNumerator(dropPerMillion) + .setDenominator(DenominatorType.MILLION)) + .build(); + } + } + + /** + * Matches a {@link DiscoveryRequest} with the same node metadata, versionInfo, typeUrl, + * response nonce and collection of resource names regardless of order. + */ + private static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final Node node; + private final String versionInfo; + private final String typeUrl; + private final Set resources; + private final String responseNonce; + + private DiscoveryRequestMatcher(Node node, String versionInfo, List resources, + String typeUrl, String responseNonce) { + this.node = node; + this.versionInfo = versionInfo; + this.resources = new HashSet<>(resources); + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!resources.equals(new HashSet<>(argument.getResourceNamesList()))) { + return false; + } + return node.equals(argument.getNode()); + } + } + + /** + * Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with + * the same list of clusterName:clusterServiceName pair. + */ + private static class LrsRequestMatcher implements ArgumentMatcher { + private final List expected; + + private LrsRequestMatcher(List clusterNames) { + expected = new ArrayList<>(); + for (String[] pair : clusterNames) { + expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1])); + } + Collections.sort(expected); + } + + @Override + public boolean matches(LoadStatsRequest argument) { + List actual = new ArrayList<>(); + for (ClusterStats clusterStats : argument.getClusterStatsList()) { + actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName()); + } + Collections.sort(actual); + return actual.equals(expected); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java index 2167a842beb..da9d549a129 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java @@ -25,10 +25,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; -import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; import io.envoyproxy.envoy.config.core.v3.ConfigSource; -import io.envoyproxy.envoy.config.core.v3.GrpcService; -import io.envoyproxy.envoy.config.core.v3.GrpcService.GoogleGrpc; import io.envoyproxy.envoy.config.core.v3.HealthStatus; import io.envoyproxy.envoy.config.core.v3.Locality; import io.envoyproxy.envoy.config.core.v3.SelfConfigSource; @@ -48,21 +45,19 @@ import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.config.route.v3.RouteMatch; import io.envoyproxy.envoy.config.route.v3.VirtualHost; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; -import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.grpc.xds.EnvoyProtoData.Node; -import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; /** * Helper methods for building protobuf messages with custom data for xDS protocols. */ +// TODO(chengyuanzhang, sanjaypujare): delete this class, should not dump everything here. class XdsClientTestHelper { static DiscoveryResponse buildDiscoveryResponse(String versionInfo, List resources, String typeUrl, String nonce) { @@ -86,11 +81,6 @@ static io.envoyproxy.envoy.api.v2.DiscoveryResponse buildDiscoveryResponseV2(Str .build(); } - static DiscoveryRequest buildDiscoveryRequest(Node node, String versionInfo, - String resourceName, String typeUrl, String nonce) { - return buildDiscoveryRequest(node, versionInfo, ImmutableList.of(resourceName), typeUrl, nonce); - } - static DiscoveryRequest buildDiscoveryRequest(Node node, String versionInfo, List resourceNames, String typeUrl, String nonce) { return @@ -161,23 +151,6 @@ static io.envoyproxy.envoy.api.v2.RouteConfiguration buildRouteConfigurationV2(S .build(); } - static List buildVirtualHosts(int num) { - List virtualHosts = new ArrayList<>(num); - for (int i = 0; i < num; i++) { - VirtualHost virtualHost = - VirtualHost.newBuilder() - .setName(num + ": do not care") - .addDomains("do not care") - .addRoutes( - Route.newBuilder() - .setRoute(RouteAction.newBuilder().setCluster("do not care")) - .setMatch(RouteMatch.newBuilder().setPrefix("do not care"))) - .build(); - virtualHosts.add(virtualHost); - } - return virtualHosts; - } - static VirtualHost buildVirtualHost(List domains, String clusterName) { return VirtualHost.newBuilder() .setName("virtualhost00.googleapis.com") // don't care @@ -283,22 +256,6 @@ static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName, .build(); } - @SuppressWarnings("deprecation") // disableOverprovisioning is deprecated by needed for v2 - static io.envoyproxy.envoy.api.v2.ClusterLoadAssignment buildClusterLoadAssignmentV2( - String clusterName, - List localityLbEndpoints, - List dropOverloads) { - return - io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.newBuilder() - .setClusterName(clusterName) - .addAllEndpoints(localityLbEndpoints) - .setPolicy( - io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.newBuilder() - .setDisableOverprovisioning(true) - .addAllDropOverloads(dropOverloads)) - .build(); - } - static DropOverload buildDropOverload(String category, int dropPerMillion) { return DropOverload.newBuilder() @@ -384,49 +341,4 @@ static io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint buildLbEndpointV2( .setLoadBalancingWeight(UInt32Value.of(loadbalancingWeight)) .build(); } - - static UpstreamTlsContext buildUpstreamTlsContext(String secretName, String targetUri) { - GrpcService grpcService = - GrpcService.newBuilder() - .setGoogleGrpc(GoogleGrpc.newBuilder().setTargetUri(targetUri)) - .build(); - ConfigSource sdsConfig = - ConfigSource.newBuilder() - .setApiConfigSource(ApiConfigSource.newBuilder().addGrpcServices(grpcService)) - .build(); - SdsSecretConfig validationContextSdsSecretConfig = - SdsSecretConfig.newBuilder() - .setName(secretName) - .setSdsConfig(sdsConfig) - .build(); - return UpstreamTlsContext.newBuilder() - .setCommonTlsContext( - CommonTlsContext.newBuilder() - .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) - .build(); - } - - static io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext buildUpstreamTlsContextV2( - String secretName, String targetUri) { - io.envoyproxy.envoy.api.v2.core.GrpcService grpcService = - io.envoyproxy.envoy.api.v2.core.GrpcService.newBuilder() - .setGoogleGrpc(io.envoyproxy.envoy.api.v2.core.GrpcService.GoogleGrpc.newBuilder() - .setTargetUri(targetUri)) - .build(); - io.envoyproxy.envoy.api.v2.core.ConfigSource sdsConfig = - io.envoyproxy.envoy.api.v2.core.ConfigSource.newBuilder() - .setApiConfigSource(io.envoyproxy.envoy.api.v2.core.ApiConfigSource.newBuilder() - .addGrpcServices(grpcService)) - .build(); - io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig validationContextSdsSecretConfig = - io.envoyproxy.envoy.api.v2.auth.SdsSecretConfig.newBuilder() - .setName(secretName) - .setSdsConfig(sdsConfig) - .build(); - return io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext.newBuilder() - .setCommonTlsContext( - io.envoyproxy.envoy.api.v2.auth.CommonTlsContext.newBuilder() - .setValidationContextSdsSecretConfig(validationContextSdsSecretConfig)) - .build(); - } }