Skip to content

Commit

Permalink
xds: support load reporting all clusters option and fix actual report…
Browse files Browse the repository at this point in the history
… interval measurement (grpc#7209)

- Add support for send_all_clusters field in LRS response. When it is set to true, just send load reports for clusters that the client is currently tracking (aka, is sending load to).

- The actual load report interval (in each ClusterStats message, which contains the stats for each cluster:eds_service) should be tracked individually.
  • Loading branch information
voidzcy committed Aug 11, 2020
1 parent 72a256f commit 369efa2
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 67 deletions.
88 changes: 47 additions & 41 deletions xds/src/main/java/io/grpc/xds/LoadReportClient.java
Expand Up @@ -67,7 +67,7 @@ final class LoadReportClient {
private final BackoffPolicy.Provider backoffPolicyProvider;

// Sources of load stats data for each cluster:cluster_service.
private final Map<String, Map<String, LoadStatsStore>> loadStatsStoreMap = new HashMap<>();
private final Map<String, Map<String, LoadStatsEntity>> loadStatsEntities = new HashMap<>();
private boolean started;

@Nullable
Expand Down Expand Up @@ -148,38 +148,38 @@ void stopLoadReporting() {
void addLoadStatsStore(
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
checkState(
!loadStatsStoreMap.containsKey(clusterName)
|| !loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName),
!loadStatsEntities.containsKey(clusterName)
|| !loadStatsEntities.get(clusterName).containsKey(clusterServiceName),
"load stats for cluster: %s, cluster service: %s already exists",
clusterName, clusterServiceName);
logger.log(
XdsLogLevel.INFO,
"Add load stats for cluster: {0}, cluster_service: {1}", clusterName, clusterServiceName);
if (!loadStatsStoreMap.containsKey(clusterName)) {
loadStatsStoreMap.put(clusterName, new HashMap<String, LoadStatsStore>());
if (!loadStatsEntities.containsKey(clusterName)) {
loadStatsEntities.put(clusterName, new HashMap<String, LoadStatsEntity>());
}
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(clusterName);
clusterLoadStatsStores.put(clusterServiceName, loadStatsStore);
Map<String, LoadStatsEntity> clusterLoadStatsEntities = loadStatsEntities.get(clusterName);
clusterLoadStatsEntities.put(clusterServiceName, new LoadStatsEntity(loadStatsStore));
}

/**
* Stops providing load stats data for the given cluster:cluster_service.
*/
void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) {
checkState(
loadStatsStoreMap.containsKey(clusterName)
&& loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName),
loadStatsEntities.containsKey(clusterName)
&& loadStatsEntities.get(clusterName).containsKey(clusterServiceName),
"load stats for cluster: %s, cluster service: %s does not exist",
clusterName, clusterServiceName);
logger.log(
XdsLogLevel.INFO,
"Remove load stats for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(clusterName);
clusterLoadStatsStores.remove(clusterServiceName);
if (clusterLoadStatsStores.isEmpty()) {
loadStatsStoreMap.remove(clusterName);
Map<String, LoadStatsEntity> clusterLoadStatsEntities = loadStatsEntities.get(clusterName);
clusterLoadStatsEntities.remove(clusterServiceName);
if (clusterLoadStatsEntities.isEmpty()) {
loadStatsEntities.remove(clusterName);
}
}

Expand Down Expand Up @@ -217,10 +217,8 @@ private void startLrsRpc() {

private class LrsStream implements StreamObserver<LoadStatsResponse> {

// Cluster to report loads for asked by management server.
final Set<String> clusterNames = new HashSet<>();
final LoadReportingServiceGrpc.LoadReportingServiceStub stub;
final Stopwatch reportStopwatch;
StreamObserver<LoadStatsRequest> lrsRequestWriter;
boolean initialResponseReceived;
boolean closed;
Expand All @@ -229,15 +227,10 @@ private class LrsStream implements StreamObserver<LoadStatsResponse> {

LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) {
this.stub = checkNotNull(stub, "stub");
reportStopwatch = checkNotNull(stopwatch, "stopwatch");
}

void start() {
lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this);
reportStopwatch.reset().start();

// Send an initial LRS request with empty cluster stats. Management server is able to
// infer clusters the gRPC client sending loads to.
LoadStatsRequest initRequest =
LoadStatsRequest.newBuilder()
.setNode(node)
Expand Down Expand Up @@ -278,19 +271,12 @@ public void run() {
}

private void sendLoadReport() {
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
reportStopwatch.reset().start();
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
for (String name : clusterNames) {
if (loadStatsStoreMap.containsKey(name)) {
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(name);
for (LoadStatsStore statsStore : clusterLoadStatsStores.values()) {
ClusterStats report =
statsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
requestBuilder.addClusterStats(report);
if (loadStatsEntities.containsKey(name)) {
Map<String, LoadStatsEntity> clusterLoadStatsEntities = loadStatsEntities.get(name);
for (LoadStatsEntity entity : clusterLoadStatsEntities.values()) {
requestBuilder.addClusterStats(entity.getLoadStats());
}
}
}
Expand All @@ -317,28 +303,27 @@ private void handleResponse(LoadStatsResponse response) {
if (closed) {
return;
}

if (!initialResponseReceived) {
logger.log(XdsLogLevel.DEBUG, "Received LRS initial response:\n{0}", response);
initialResponseReceived = true;
} else {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
}
long interval = Durations.toNanos(response.getLoadReportingInterval());
if (interval != loadReportIntervalNano) {
logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval);
loadReportIntervalNano = interval;
callback.onReportResponse(loadReportIntervalNano);
}
if (clusterNames.size() != response.getClustersCount()
|| !clusterNames.containsAll(response.getClustersList())) {
clusterNames.clear();
if (response.getSendAllClusters()) {
clusterNames.addAll(loadStatsEntities.keySet());
logger.log(XdsLogLevel.INFO, "Update to report loads for all clusters");
} else {
logger.log(
XdsLogLevel.INFO,
"Update load reporting clusters to {0}", response.getClustersList());
clusterNames.clear();
clusterNames.addAll(response.getClustersList());
}
long interval = Durations.toNanos(response.getLoadReportingInterval());
logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval);
loadReportIntervalNano = interval;
scheduleNextLoadReport();
callback.onReportResponse(loadReportIntervalNano);
}

private void handleStreamClosed(Status status) {
Expand Down Expand Up @@ -401,6 +386,27 @@ private void cleanUp() {
}
}

private final class LoadStatsEntity {
private final LoadStatsStore loadStatsStore;
private final Stopwatch stopwatch;

private LoadStatsEntity(LoadStatsStore loadStatsStore) {
this.loadStatsStore = loadStatsStore;
this.stopwatch = stopwatchSupplier.get().reset().start();
}

private ClusterStats getLoadStats() {
ClusterStats stats =
loadStatsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(
Durations.fromNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS)))
.build();
stopwatch.reset().start();
return stats;
}
}

/**
* Callbacks for passing information received from client load reporting responses to xDS load
* balancer, such as the load reporting interval requested by the traffic director.
Expand Down
67 changes: 41 additions & 26 deletions xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
Expand Up @@ -56,7 +56,9 @@
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -178,7 +180,7 @@ public void cancelled(Context context) {
when(backoffPolicy1.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
.thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L));
lrsClient =
new LoadReportClient(
logId,
Expand Down Expand Up @@ -216,15 +218,17 @@ public void typicalWorkflow() {
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000));
inOrder.verify(callback).onReportResponse(1000);

ArgumentMatcher<LoadStatsRequest> expectedLoadReportMatcher =
new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 1000);
ClusterStats expectedStats1 =
rawStats1.toBuilder().setLoadReportInterval(Durations.fromNanos(1000)).build();
fakeClock.forwardNanos(999);
inOrder.verifyNoMoreInteractions();
fakeClock.forwardNanos(1);
inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher));
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1))));

fakeClock.forwardNanos(1000);
inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher));
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1))));

String cluster2 = "cluster-bar.googleapis.com";
ClusterStats rawStats2 = generateClusterLoadStats(cluster2, null);
Expand All @@ -236,28 +240,38 @@ public void typicalWorkflow() {
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000));
inOrder.verify(callback).onReportResponse(2000);

expectedStats1 =
rawStats1.toBuilder().setLoadReportInterval(Durations.fromNanos(2000)).build();
fakeClock.forwardNanos(1000);
inOrder.verifyNoMoreInteractions();

fakeClock.forwardNanos(1000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 2000)));

// Management server asks to report loads for cluster1 and cluster2.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000));
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1))));

// Management server asks to report loads for all clusters.
responseObserver.onNext(
LoadStatsResponse.newBuilder()
.setSendAllClusters(true)
.setLoadReportingInterval(Durations.fromNanos(2000))
.build());
inOrder.verify(callback).onReportResponse(2000);

ClusterStats expectedStats2 =
rawStats2.toBuilder().setLoadReportInterval(Durations.fromNanos(2000 + 2000)).build();
fakeClock.forwardNanos(2000);
inOrder.verify(requestObserver)
.onNext(
argThat(
new LoadStatsRequestMatcher(ImmutableList.of(rawStats1, rawStats2), 2000)));
new LoadStatsRequestMatcher(Arrays.asList(expectedStats1, expectedStats2))));

// Load reports for cluster1 is no longer wanted.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000));
responseObserver.onNext(buildLrsResponse(Collections.singletonList(cluster2), 2000));

expectedStats2 =
rawStats2.toBuilder().setLoadReportInterval(Durations.fromNanos(2000)).build();
fakeClock.forwardNanos(2000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats2), 2000)));
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats2))));

// Management server asks loads for a cluster that client has no load data.
responseObserver
Expand Down Expand Up @@ -331,7 +345,7 @@ public void lrsStreamClosedAndRetried() {

// Balancer sends a response asking for loads of the cluster.
responseObserver
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 0));
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 5));

// Then breaks the RPC
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -348,12 +362,12 @@ public void lrsStreamClosedAndRetried() {
fakeClock.forwardNanos(4);
responseObserver.onError(Status.UNAVAILABLE.asException());

// Will be on the first retry (1s) of backoff sequence 2.
// Will be on the first retry (2s) of backoff sequence 2.
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));

// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) - 4 - 1);
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 4 - 1);
verifyNoMoreInteractions(mockLoadReportingService);
// Then time for retry
fakeClock.forwardNanos(1);
Expand All @@ -368,8 +382,13 @@ public void lrsStreamClosedAndRetried() {
responseObserver
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 10));
fakeClock.forwardNanos(10);
ClusterStats expectedStats =
stats.toBuilder()
.setLoadReportInterval(
Durations.add(Durations.fromSeconds(1 + 10 + 2), Durations.fromNanos(10)))
.build();
verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(stats), 10)));
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats))));

// Wrapping up
verify(backoffPolicyProvider, times(2)).get();
Expand Down Expand Up @@ -466,9 +485,9 @@ private static ClusterStats generateClusterLoadStats(
UpstreamLocalityStats.newBuilder()
.setLocality(
Locality.newBuilder()
.setRegion("region-foo")
.setZone("zone-bar")
.setSubZone("subzone-baz"))
.setRegion(clusterName + "-region-foo")
.setZone(clusterName + "-zone-bar")
.setSubZone(clusterName + "-subzone-baz"))
.setTotalRequestsInProgress(callsInProgress)
.setTotalSuccessfulRequests(callsSucceeded)
.setTotalErrorRequests(callsFailed)
Expand All @@ -491,13 +510,9 @@ private static ClusterStats generateClusterLoadStats(
private static class LoadStatsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
private final Map<String, ClusterStats> expectedStats = new HashMap<>();

LoadStatsRequestMatcher(Collection<ClusterStats> clusterStats, long expectedIntervalNano) {
LoadStatsRequestMatcher(Collection<ClusterStats> clusterStats) {
for (ClusterStats stats : clusterStats) {
ClusterStats statsWithInterval =
stats.toBuilder()
.setLoadReportInterval(Durations.fromNanos(expectedIntervalNano))
.build();
expectedStats.put(statsWithInterval.getClusterName(), statsWithInterval);
expectedStats.put(stats.getClusterName(), stats);
}
}

Expand Down

0 comments on commit 369efa2

Please sign in to comment.