diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 525b645d610..1314583cfa7 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -67,7 +67,7 @@ final class LoadReportClient { private final BackoffPolicy.Provider backoffPolicyProvider; // Sources of load stats data for each cluster:cluster_service. - private final Map> loadStatsStoreMap = new HashMap<>(); + private final Map> loadStatsEntities = new HashMap<>(); private boolean started; @Nullable @@ -148,18 +148,18 @@ void stopLoadReporting() { void addLoadStatsStore( String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { checkState( - !loadStatsStoreMap.containsKey(clusterName) - || !loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName), + !loadStatsEntities.containsKey(clusterName) + || !loadStatsEntities.get(clusterName).containsKey(clusterServiceName), "load stats for cluster: %s, cluster service: %s already exists", clusterName, clusterServiceName); logger.log( XdsLogLevel.INFO, "Add load stats for cluster: {0}, cluster_service: {1}", clusterName, clusterServiceName); - if (!loadStatsStoreMap.containsKey(clusterName)) { - loadStatsStoreMap.put(clusterName, new HashMap()); + if (!loadStatsEntities.containsKey(clusterName)) { + loadStatsEntities.put(clusterName, new HashMap()); } - Map clusterLoadStatsStores = loadStatsStoreMap.get(clusterName); - clusterLoadStatsStores.put(clusterServiceName, loadStatsStore); + Map clusterLoadStatsEntities = loadStatsEntities.get(clusterName); + clusterLoadStatsEntities.put(clusterServiceName, new LoadStatsEntity(loadStatsStore)); } /** @@ -167,8 +167,8 @@ void addLoadStatsStore( */ void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) { checkState( - loadStatsStoreMap.containsKey(clusterName) - && loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName), + loadStatsEntities.containsKey(clusterName) + && loadStatsEntities.get(clusterName).containsKey(clusterServiceName), "load stats for cluster: %s, cluster service: %s does not exist", clusterName, clusterServiceName); logger.log( @@ -176,10 +176,10 @@ void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceNam "Remove load stats for cluster: {0}, cluster_service: {1}", clusterName, clusterServiceName); - Map clusterLoadStatsStores = loadStatsStoreMap.get(clusterName); - clusterLoadStatsStores.remove(clusterServiceName); - if (clusterLoadStatsStores.isEmpty()) { - loadStatsStoreMap.remove(clusterName); + Map clusterLoadStatsEntities = loadStatsEntities.get(clusterName); + clusterLoadStatsEntities.remove(clusterServiceName); + if (clusterLoadStatsEntities.isEmpty()) { + loadStatsEntities.remove(clusterName); } } @@ -217,10 +217,8 @@ private void startLrsRpc() { private class LrsStream implements StreamObserver { - // Cluster to report loads for asked by management server. final Set clusterNames = new HashSet<>(); final LoadReportingServiceGrpc.LoadReportingServiceStub stub; - final Stopwatch reportStopwatch; StreamObserver lrsRequestWriter; boolean initialResponseReceived; boolean closed; @@ -229,15 +227,10 @@ private class LrsStream implements StreamObserver { LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) { this.stub = checkNotNull(stub, "stub"); - reportStopwatch = checkNotNull(stopwatch, "stopwatch"); } void start() { lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this); - reportStopwatch.reset().start(); - - // Send an initial LRS request with empty cluster stats. Management server is able to - // infer clusters the gRPC client sending loads to. LoadStatsRequest initRequest = LoadStatsRequest.newBuilder() .setNode(node) @@ -278,19 +271,12 @@ public void run() { } private void sendLoadReport() { - long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS); - reportStopwatch.reset().start(); LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node); for (String name : clusterNames) { - if (loadStatsStoreMap.containsKey(name)) { - Map clusterLoadStatsStores = loadStatsStoreMap.get(name); - for (LoadStatsStore statsStore : clusterLoadStatsStores.values()) { - ClusterStats report = - statsStore.generateLoadReport() - .toBuilder() - .setLoadReportInterval(Durations.fromNanos(interval)) - .build(); - requestBuilder.addClusterStats(report); + if (loadStatsEntities.containsKey(name)) { + Map clusterLoadStatsEntities = loadStatsEntities.get(name); + for (LoadStatsEntity entity : clusterLoadStatsEntities.values()) { + requestBuilder.addClusterStats(entity.getLoadStats()); } } } @@ -317,28 +303,27 @@ private void handleResponse(LoadStatsResponse response) { if (closed) { return; } - if (!initialResponseReceived) { logger.log(XdsLogLevel.DEBUG, "Received LRS initial response:\n{0}", response); initialResponseReceived = true; } else { logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); } - long interval = Durations.toNanos(response.getLoadReportingInterval()); - if (interval != loadReportIntervalNano) { - logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval); - loadReportIntervalNano = interval; - callback.onReportResponse(loadReportIntervalNano); - } - if (clusterNames.size() != response.getClustersCount() - || !clusterNames.containsAll(response.getClustersList())) { + clusterNames.clear(); + if (response.getSendAllClusters()) { + clusterNames.addAll(loadStatsEntities.keySet()); + logger.log(XdsLogLevel.INFO, "Update to report loads for all clusters"); + } else { logger.log( XdsLogLevel.INFO, "Update load reporting clusters to {0}", response.getClustersList()); - clusterNames.clear(); clusterNames.addAll(response.getClustersList()); } + long interval = Durations.toNanos(response.getLoadReportingInterval()); + logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval); + loadReportIntervalNano = interval; scheduleNextLoadReport(); + callback.onReportResponse(loadReportIntervalNano); } private void handleStreamClosed(Status status) { @@ -401,6 +386,27 @@ private void cleanUp() { } } + private final class LoadStatsEntity { + private final LoadStatsStore loadStatsStore; + private final Stopwatch stopwatch; + + private LoadStatsEntity(LoadStatsStore loadStatsStore) { + this.loadStatsStore = loadStatsStore; + this.stopwatch = stopwatchSupplier.get().reset().start(); + } + + private ClusterStats getLoadStats() { + ClusterStats stats = + loadStatsStore.generateLoadReport() + .toBuilder() + .setLoadReportInterval( + Durations.fromNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS))) + .build(); + stopwatch.reset().start(); + return stats; + } + } + /** * Callbacks for passing information received from client load reporting responses to xDS load * balancer, such as the load reporting interval requested by the traffic director. diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 8c2e6f5a9fc..4b8645a7e93 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -56,7 +56,9 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.LoadReportClient.LoadReportCallback; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -178,7 +180,7 @@ public void cancelled(Context context) { when(backoffPolicy1.nextBackoffNanos()) .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); when(backoffPolicy2.nextBackoffNanos()) - .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); + .thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L)); lrsClient = new LoadReportClient( logId, @@ -216,15 +218,17 @@ public void typicalWorkflow() { responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000)); inOrder.verify(callback).onReportResponse(1000); - ArgumentMatcher expectedLoadReportMatcher = - new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 1000); + ClusterStats expectedStats1 = + rawStats1.toBuilder().setLoadReportInterval(Durations.fromNanos(1000)).build(); fakeClock.forwardNanos(999); inOrder.verifyNoMoreInteractions(); fakeClock.forwardNanos(1); - inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher)); + inOrder.verify(requestObserver) + .onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1)))); fakeClock.forwardNanos(1000); - inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher)); + inOrder.verify(requestObserver) + .onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1)))); String cluster2 = "cluster-bar.googleapis.com"; ClusterStats rawStats2 = generateClusterLoadStats(cluster2, null); @@ -236,28 +240,38 @@ public void typicalWorkflow() { responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000)); inOrder.verify(callback).onReportResponse(2000); + expectedStats1 = + rawStats1.toBuilder().setLoadReportInterval(Durations.fromNanos(2000)).build(); fakeClock.forwardNanos(1000); inOrder.verifyNoMoreInteractions(); - fakeClock.forwardNanos(1000); inOrder.verify(requestObserver) - .onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 2000))); - - // Management server asks to report loads for cluster1 and cluster2. - responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000)); + .onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1)))); + + // Management server asks to report loads for all clusters. + responseObserver.onNext( + LoadStatsResponse.newBuilder() + .setSendAllClusters(true) + .setLoadReportingInterval(Durations.fromNanos(2000)) + .build()); + inOrder.verify(callback).onReportResponse(2000); + ClusterStats expectedStats2 = + rawStats2.toBuilder().setLoadReportInterval(Durations.fromNanos(2000 + 2000)).build(); fakeClock.forwardNanos(2000); inOrder.verify(requestObserver) .onNext( argThat( - new LoadStatsRequestMatcher(ImmutableList.of(rawStats1, rawStats2), 2000))); + new LoadStatsRequestMatcher(Arrays.asList(expectedStats1, expectedStats2)))); // Load reports for cluster1 is no longer wanted. - responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000)); + responseObserver.onNext(buildLrsResponse(Collections.singletonList(cluster2), 2000)); + expectedStats2 = + rawStats2.toBuilder().setLoadReportInterval(Durations.fromNanos(2000)).build(); fakeClock.forwardNanos(2000); inOrder.verify(requestObserver) - .onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats2), 2000))); + .onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats2)))); // Management server asks loads for a cluster that client has no load data. responseObserver @@ -331,7 +345,7 @@ public void lrsStreamClosedAndRetried() { // Balancer sends a response asking for loads of the cluster. responseObserver - .onNext(buildLrsResponse(ImmutableList.of(clusterName), 0)); + .onNext(buildLrsResponse(ImmutableList.of(clusterName), 5)); // Then breaks the RPC responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -348,12 +362,12 @@ public void lrsStreamClosedAndRetried() { fakeClock.forwardNanos(4); responseObserver.onError(Status.UNAVAILABLE.asException()); - // Will be on the first retry (1s) of backoff sequence 2. + // Will be on the first retry (2s) of backoff sequence 2. inOrder.verify(backoffPolicy2).nextBackoffNanos(); assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Fast-forward to a moment before the retry, the time spent in the last try is deducted. - fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) - 4 - 1); + fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 4 - 1); verifyNoMoreInteractions(mockLoadReportingService); // Then time for retry fakeClock.forwardNanos(1); @@ -368,8 +382,13 @@ public void lrsStreamClosedAndRetried() { responseObserver .onNext(buildLrsResponse(ImmutableList.of(clusterName), 10)); fakeClock.forwardNanos(10); + ClusterStats expectedStats = + stats.toBuilder() + .setLoadReportInterval( + Durations.add(Durations.fromSeconds(1 + 10 + 2), Durations.fromNanos(10))) + .build(); verify(requestObserver) - .onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(stats), 10))); + .onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats)))); // Wrapping up verify(backoffPolicyProvider, times(2)).get(); @@ -466,9 +485,9 @@ private static ClusterStats generateClusterLoadStats( UpstreamLocalityStats.newBuilder() .setLocality( Locality.newBuilder() - .setRegion("region-foo") - .setZone("zone-bar") - .setSubZone("subzone-baz")) + .setRegion(clusterName + "-region-foo") + .setZone(clusterName + "-zone-bar") + .setSubZone(clusterName + "-subzone-baz")) .setTotalRequestsInProgress(callsInProgress) .setTotalSuccessfulRequests(callsSucceeded) .setTotalErrorRequests(callsFailed) @@ -491,13 +510,9 @@ private static ClusterStats generateClusterLoadStats( private static class LoadStatsRequestMatcher implements ArgumentMatcher { private final Map expectedStats = new HashMap<>(); - LoadStatsRequestMatcher(Collection clusterStats, long expectedIntervalNano) { + LoadStatsRequestMatcher(Collection clusterStats) { for (ClusterStats stats : clusterStats) { - ClusterStats statsWithInterval = - stats.toBuilder() - .setLoadReportInterval(Durations.fromNanos(expectedIntervalNano)) - .build(); - expectedStats.put(statsWithInterval.getClusterName(), statsWithInterval); + expectedStats.put(stats.getClusterName(), stats); } }