Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: support load reporting all clusters option and fix actual report interval measurement #7209

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 49 additions & 41 deletions xds/src/main/java/io/grpc/xds/LoadReportClient.java
Expand Up @@ -67,7 +67,7 @@ final class LoadReportClient {
private final BackoffPolicy.Provider backoffPolicyProvider;

// Sources of load stats data for each cluster:cluster_service.
private final Map<String, Map<String, LoadStatsStore>> loadStatsStoreMap = new HashMap<>();
private final Map<String, Map<String, LoadStatsEntity>> loadStatsEntities = new HashMap<>();
private boolean started;

@Nullable
Expand Down Expand Up @@ -148,38 +148,39 @@ void stopLoadReporting() {
void addLoadStatsStore(
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
checkState(
!loadStatsStoreMap.containsKey(clusterName)
|| !loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName),
!loadStatsEntities.containsKey(clusterName)
|| !loadStatsEntities.get(clusterName).containsKey(clusterServiceName),
"load stats for cluster: %s, cluster service: %s already exists",
clusterName, clusterServiceName);
logger.log(
XdsLogLevel.INFO,
"Add load stats for cluster: {0}, cluster_service: {1}", clusterName, clusterServiceName);
if (!loadStatsStoreMap.containsKey(clusterName)) {
loadStatsStoreMap.put(clusterName, new HashMap<String, LoadStatsStore>());
if (!loadStatsEntities.containsKey(clusterName)) {
loadStatsEntities.put(clusterName, new HashMap<String, LoadStatsEntity>());
}
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(clusterName);
clusterLoadStatsStores.put(clusterServiceName, loadStatsStore);
Map<String, LoadStatsEntity> clusterLoadStatsEntities = loadStatsEntities.get(clusterName);
clusterLoadStatsEntities.put(
clusterServiceName, new LoadStatsEntity(loadStatsStore, stopwatchSupplier.get()));
}

/**
* Stops providing load stats data for the given cluster:cluster_service.
*/
void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) {
checkState(
loadStatsStoreMap.containsKey(clusterName)
&& loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName),
loadStatsEntities.containsKey(clusterName)
&& loadStatsEntities.get(clusterName).containsKey(clusterServiceName),
"load stats for cluster: %s, cluster service: %s does not exist",
clusterName, clusterServiceName);
logger.log(
XdsLogLevel.INFO,
"Remove load stats for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(clusterName);
clusterLoadStatsStores.remove(clusterServiceName);
if (clusterLoadStatsStores.isEmpty()) {
loadStatsStoreMap.remove(clusterName);
Map<String, LoadStatsEntity> clusterLoadStatsEntities = loadStatsEntities.get(clusterName);
clusterLoadStatsEntities.remove(clusterServiceName);
if (clusterLoadStatsEntities.isEmpty()) {
loadStatsEntities.remove(clusterName);
}
}

Expand Down Expand Up @@ -217,10 +218,8 @@ private void startLrsRpc() {

private class LrsStream implements StreamObserver<LoadStatsResponse> {

// Cluster to report loads for asked by management server.
final Set<String> clusterNames = new HashSet<>();
final LoadReportingServiceGrpc.LoadReportingServiceStub stub;
final Stopwatch reportStopwatch;
StreamObserver<LoadStatsRequest> lrsRequestWriter;
boolean initialResponseReceived;
boolean closed;
Expand All @@ -229,15 +228,10 @@ private class LrsStream implements StreamObserver<LoadStatsResponse> {

LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) {
this.stub = checkNotNull(stub, "stub");
reportStopwatch = checkNotNull(stopwatch, "stopwatch");
}

void start() {
lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this);
reportStopwatch.reset().start();

// Send an initial LRS request with empty cluster stats. Management server is able to
// infer clusters the gRPC client sending loads to.
LoadStatsRequest initRequest =
LoadStatsRequest.newBuilder()
.setNode(node)
Expand Down Expand Up @@ -278,19 +272,12 @@ public void run() {
}

private void sendLoadReport() {
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
reportStopwatch.reset().start();
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
for (String name : clusterNames) {
if (loadStatsStoreMap.containsKey(name)) {
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(name);
for (LoadStatsStore statsStore : clusterLoadStatsStores.values()) {
ClusterStats report =
statsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
requestBuilder.addClusterStats(report);
if (loadStatsEntities.containsKey(name)) {
Map<String, LoadStatsEntity> clusterLoadStatsEntities = loadStatsEntities.get(name);
for (LoadStatsEntity entity : clusterLoadStatsEntities.values()) {
requestBuilder.addClusterStats(entity.getLoadStats());
}
}
}
Expand All @@ -317,28 +304,27 @@ private void handleResponse(LoadStatsResponse response) {
if (closed) {
return;
}

if (!initialResponseReceived) {
logger.log(XdsLogLevel.DEBUG, "Received LRS initial response:\n{0}", response);
initialResponseReceived = true;
} else {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
}
long interval = Durations.toNanos(response.getLoadReportingInterval());
if (interval != loadReportIntervalNano) {
logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval);
loadReportIntervalNano = interval;
callback.onReportResponse(loadReportIntervalNano);
}
if (clusterNames.size() != response.getClustersCount()
|| !clusterNames.containsAll(response.getClustersList())) {
clusterNames.clear();
if (response.getSendAllClusters()) {
clusterNames.addAll(loadStatsEntities.keySet());
logger.log(XdsLogLevel.INFO, "Update to report loads for all clusters");
} else {
logger.log(
XdsLogLevel.INFO,
"Update load reporting clusters to {0}", response.getClustersList());
clusterNames.clear();
clusterNames.addAll(response.getClustersList());
}
long interval = Durations.toNanos(response.getLoadReportingInterval());
logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval);
loadReportIntervalNano = interval;
scheduleNextLoadReport();
callback.onReportResponse(loadReportIntervalNano);
}

private void handleStreamClosed(Status status) {
Expand Down Expand Up @@ -401,6 +387,28 @@ private void cleanUp() {
}
}

private static final class LoadStatsEntity {
private final LoadStatsStore loadStatsStore;
private final Stopwatch stopwatch;

private LoadStatsEntity(LoadStatsStore loadStatsStore, Stopwatch stopwatch) {
this.loadStatsStore = loadStatsStore;
this.stopwatch = stopwatch;
stopwatch.reset().start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to pass a StopwatchSupplier in constructor, and create a new stopwatch here, so that it's clear that the stopwatch's lifecycle is solely managed by the LoadStatsEntity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to take the stopwatch supplier from the LrsClient directly.

}

private ClusterStats getLoadStats() {
ClusterStats stats =
loadStatsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(
Durations.fromNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS)))
.build();
stopwatch.reset().start();
return stats;
}
}

/**
* Callbacks for passing information received from client load reporting responses to xDS load
* balancer, such as the load reporting interval requested by the traffic director.
Expand Down
67 changes: 41 additions & 26 deletions xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
Expand Up @@ -56,7 +56,9 @@
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -178,7 +180,7 @@ public void cancelled(Context context) {
when(backoffPolicy1.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
.thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L));
lrsClient =
new LoadReportClient(
logId,
Expand Down Expand Up @@ -216,15 +218,17 @@ public void typicalWorkflow() {
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000));
inOrder.verify(callback).onReportResponse(1000);

ArgumentMatcher<LoadStatsRequest> expectedLoadReportMatcher =
new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 1000);
ClusterStats expectedStats1 =
rawStats1.toBuilder().setLoadReportInterval(Durations.fromNanos(1000)).build();
fakeClock.forwardNanos(999);
inOrder.verifyNoMoreInteractions();
fakeClock.forwardNanos(1);
inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher));
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1))));

fakeClock.forwardNanos(1000);
inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher));
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1))));

String cluster2 = "cluster-bar.googleapis.com";
ClusterStats rawStats2 = generateClusterLoadStats(cluster2, null);
Expand All @@ -236,28 +240,38 @@ public void typicalWorkflow() {
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000));
inOrder.verify(callback).onReportResponse(2000);

expectedStats1 =
rawStats1.toBuilder().setLoadReportInterval(Durations.fromNanos(2000)).build();
fakeClock.forwardNanos(1000);
inOrder.verifyNoMoreInteractions();

fakeClock.forwardNanos(1000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 2000)));

// Management server asks to report loads for cluster1 and cluster2.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000));
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats1))));

// Management server asks to report loads for all clusters.
responseObserver.onNext(
LoadStatsResponse.newBuilder()
.setSendAllClusters(true)
.setLoadReportingInterval(Durations.fromNanos(2000))
.build());
inOrder.verify(callback).onReportResponse(2000);

ClusterStats expectedStats2 =
rawStats2.toBuilder().setLoadReportInterval(Durations.fromNanos(2000 + 2000)).build();
fakeClock.forwardNanos(2000);
inOrder.verify(requestObserver)
.onNext(
argThat(
new LoadStatsRequestMatcher(ImmutableList.of(rawStats1, rawStats2), 2000)));
new LoadStatsRequestMatcher(Arrays.asList(expectedStats1, expectedStats2))));

// Load reports for cluster1 is no longer wanted.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000));
responseObserver.onNext(buildLrsResponse(Collections.singletonList(cluster2), 2000));

expectedStats2 =
rawStats2.toBuilder().setLoadReportInterval(Durations.fromNanos(2000)).build();
fakeClock.forwardNanos(2000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats2), 2000)));
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats2))));

// Management server asks loads for a cluster that client has no load data.
responseObserver
Expand Down Expand Up @@ -331,7 +345,7 @@ public void lrsStreamClosedAndRetried() {

// Balancer sends a response asking for loads of the cluster.
responseObserver
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 0));
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 5));

// Then breaks the RPC
responseObserver.onError(Status.UNAVAILABLE.asException());
Expand All @@ -348,12 +362,12 @@ public void lrsStreamClosedAndRetried() {
fakeClock.forwardNanos(4);
responseObserver.onError(Status.UNAVAILABLE.asException());

// Will be on the first retry (1s) of backoff sequence 2.
// Will be on the first retry (2s) of backoff sequence 2.
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));

// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) - 4 - 1);
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 4 - 1);
verifyNoMoreInteractions(mockLoadReportingService);
// Then time for retry
fakeClock.forwardNanos(1);
Expand All @@ -368,8 +382,13 @@ public void lrsStreamClosedAndRetried() {
responseObserver
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 10));
fakeClock.forwardNanos(10);
ClusterStats expectedStats =
stats.toBuilder()
.setLoadReportInterval(
Durations.add(Durations.fromSeconds(1 + 10 + 2), Durations.fromNanos(10)))
.build();
verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(stats), 10)));
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(expectedStats))));

// Wrapping up
verify(backoffPolicyProvider, times(2)).get();
Expand Down Expand Up @@ -466,9 +485,9 @@ private static ClusterStats generateClusterLoadStats(
UpstreamLocalityStats.newBuilder()
.setLocality(
Locality.newBuilder()
.setRegion("region-foo")
.setZone("zone-bar")
.setSubZone("subzone-baz"))
.setRegion(clusterName + "-region-foo")
.setZone(clusterName + "-zone-bar")
.setSubZone(clusterName + "-subzone-baz"))
.setTotalRequestsInProgress(callsInProgress)
.setTotalSuccessfulRequests(callsSucceeded)
.setTotalErrorRequests(callsFailed)
Expand All @@ -491,13 +510,9 @@ private static ClusterStats generateClusterLoadStats(
private static class LoadStatsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
private final Map<String, ClusterStats> expectedStats = new HashMap<>();

LoadStatsRequestMatcher(Collection<ClusterStats> clusterStats, long expectedIntervalNano) {
LoadStatsRequestMatcher(Collection<ClusterStats> clusterStats) {
for (ClusterStats stats : clusterStats) {
ClusterStats statsWithInterval =
stats.toBuilder()
.setLoadReportInterval(Durations.fromNanos(expectedIntervalNano))
.build();
expectedStats.put(statsWithInterval.getClusterName(), statsWithInterval);
expectedStats.put(stats.getClusterName(), stats);
}
}

Expand Down