Skip to content

Commit

Permalink
xds: manage load stats for all clusters in XdsClient (v1.31.x backpor…
Browse files Browse the repository at this point in the history
…t) (#7299) (#7317)

Move the creation of LoadStatsStore (aka, the stats object) into XdsClient. The XdsClient is responsible for managing the lifetime of stats objects. Creations of LoadStatsStores are reference counted so that multiple EDS policies can retrieve the same stats object for load recording. Counters for recording loads per locality also need to be reference counted, as each EDS policy for the same cluster will receive endpoints for the same group of localities, they will use the same load counters for recording each locality's loads.
  • Loading branch information
voidzcy committed Aug 12, 2020
1 parent 0abff44 commit 417a8f7
Show file tree
Hide file tree
Showing 20 changed files with 589 additions and 576 deletions.
45 changes: 18 additions & 27 deletions xds/src/main/java/io/grpc/xds/ClientLoadCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,10 @@ final class ClientLoadCounter {
private final AtomicLong callsIssued = new AtomicLong();
private final MetricRecorder[] metricRecorders = new MetricRecorder[THREAD_BALANCING_FACTOR];

// True if this counter continues to record stats after next snapshot. Otherwise, it will be
// discarded.
private boolean active;

ClientLoadCounter() {
for (int i = 0; i < THREAD_BALANCING_FACTOR; i++) {
metricRecorders[i] = new MetricRecorder();
}
active = true;
}

/**
* Must only be used for testing.
*/
@VisibleForTesting
ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) {
this();
this.callsSucceeded.set(callsSucceeded);
this.callsInProgress.set(callsInProgress);
this.callsFailed.set(callsFailed);
this.callsIssued.set(callsIssued);
}

void recordCallStarted() {
Expand All @@ -98,12 +81,8 @@ void recordMetric(String name, double value) {
}

/**
* Generates a snapshot for load stats recorded in this counter. Successive snapshots represent
* load stats recorded for the interval since the previous snapshot. So taking a snapshot clears
* the counter state except for ongoing RPC recordings.
*
* <p>This method is not thread-safe and must be called from {@link
* io.grpc.LoadBalancer.Helper#getSynchronizationContext()}.
* Generates a snapshot for load stats recorded in this counter for the interval between calls
* of this method.
*/
ClientLoadSnapshot snapshot() {
Map<String, MetricValue> aggregatedValues = new HashMap<>();
Expand All @@ -127,12 +106,24 @@ ClientLoadSnapshot snapshot() {
aggregatedValues);
}

void setActive(boolean value) {
active = value;
@VisibleForTesting
void setCallsIssued(long callsIssued) {
this.callsIssued.set(callsIssued);
}

@VisibleForTesting
void setCallsInProgress(long callsInProgress) {
this.callsInProgress.set(callsInProgress);
}

boolean isActive() {
return active;
@VisibleForTesting
void setCallsSucceeded(long callsSucceeded) {
this.callsSucceeded.set(callsSucceeded);
}

@VisibleForTesting
void setCallsFailed(long callsFailed) {
this.callsFailed.set(callsFailed);
}

/**
Expand Down
33 changes: 7 additions & 26 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
Expand Down Expand Up @@ -208,11 +209,9 @@ public void shutdown() {
*/
private final class ClusterEndpointsBalancerFactory extends LoadBalancer.Factory {
@Nullable final String clusterServiceName;
final LoadStatsStore loadStatsStore;

ClusterEndpointsBalancerFactory(@Nullable String clusterServiceName) {
this.clusterServiceName = clusterServiceName;
loadStatsStore = new LoadStatsStoreImpl(clusterName, clusterServiceName);
}

@Override
Expand Down Expand Up @@ -248,6 +247,7 @@ final class ClusterEndpointsBalancer extends LoadBalancer {
ClusterEndpointsBalancer(Helper helper) {
this.helper = helper;
resourceName = clusterServiceName != null ? clusterServiceName : clusterName;
LoadStatsStore loadStatsStore = xdsClient.addClientStats(clusterName, clusterServiceName);
localityStore =
localityStoreFactory.newLocalityStore(logId, helper, lbRegistry, loadStatsStore);
endpointWatcher = new EndpointWatcherImpl();
Expand All @@ -267,22 +267,12 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
throw new AssertionError("Can only report load to the same management server");
}
if (!isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Start reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.reportClientStats(clusterName, clusterServiceName, loadStatsStore);
xdsClient.reportClientStats();
isReportingLoad = true;
}
} else {
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
}
Expand All @@ -304,15 +294,11 @@ public boolean canHandleEmptyAddressListFromNameResolution() {
@Override
public void shutdown() {
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
localityStore.reset();
xdsClient.removeClientStats(clusterName, clusterServiceName);
xdsClient.cancelEndpointDataWatch(resourceName, endpointWatcher);
logger.log(
XdsLogLevel.INFO,
Expand Down Expand Up @@ -365,12 +351,7 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) {
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
localityStore.reset();
Expand Down

0 comments on commit 417a8f7

Please sign in to comment.