Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: manage load stats for all clusters in XdsClient (v1.31.x backport) #7314

Large diffs are not rendered by default.

87 changes: 24 additions & 63 deletions xds/src/main/java/io/grpc/xds/Bootstrapper.java
Expand Up @@ -17,17 +17,13 @@
package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ListValue;
import com.google.protobuf.NullValue;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.grpc.Internal;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.GrpcUtil.GrpcBuildVersion;
import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -115,7 +111,11 @@ static BootstrapInfo parseConfig(String rawData) throws IOException {
channelCredsOptions.add(creds);
}
}
servers.add(new ServerInfo(serverUri, channelCredsOptions));
List<String> serverFeatures = JsonUtil.getListOfStrings(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
}
servers.add(new ServerInfo(serverUri, channelCredsOptions, serverFeatures));
}

Node.Builder nodeBuilder = Node.newBuilder();
Expand All @@ -133,34 +133,24 @@ static BootstrapInfo parseConfig(String rawData) throws IOException {
}
Map<String, ?> metadata = JsonUtil.getObject(rawNode, "metadata");
if (metadata != null) {
Struct.Builder structBuilder = Struct.newBuilder();
for (Map.Entry<String, ?> entry : metadata.entrySet()) {
logger.log(
XdsLogLevel.INFO,
"Node metadata field {0}: {1}", entry.getKey(), entry.getValue());
structBuilder.putFields(entry.getKey(), convertToValue(entry.getValue()));
}
nodeBuilder.setMetadata(structBuilder);
nodeBuilder.setMetadata(metadata);
}
Map<String, ?> rawLocality = JsonUtil.getObject(rawNode, "locality");
if (rawLocality != null) {
Locality.Builder localityBuilder = Locality.newBuilder();
if (rawLocality.containsKey("region")) {
String region = JsonUtil.getString(rawLocality, "region");
String region = JsonUtil.getString(rawLocality, "region");
String zone = JsonUtil.getString(rawLocality, "zone");
String subZone = JsonUtil.getString(rawLocality, "sub_zone");
if (region != null) {
logger.log(XdsLogLevel.INFO, "Locality region: {0}", region);
localityBuilder.setRegion(region);
}
if (rawLocality.containsKey("zone")) {
String zone = JsonUtil.getString(rawLocality, "zone");
logger.log(XdsLogLevel.INFO, "Locality zone: {0}", zone);
localityBuilder.setZone(zone);
}
if (rawLocality.containsKey("sub_zone")) {
String subZone = JsonUtil.getString(rawLocality, "sub_zone");
logger.log(XdsLogLevel.INFO, "Locality sub_zone: {0}", subZone);
localityBuilder.setSubZone(subZone);
}
nodeBuilder.setLocality(localityBuilder);
Locality locality = new Locality(region, zone, subZone);
nodeBuilder.setLocality(locality);
}
}
GrpcBuildVersion buildVersion = GrpcUtil.getGrpcBuildVersion();
Expand All @@ -173,43 +163,6 @@ static BootstrapInfo parseConfig(String rawData) throws IOException {
return new BootstrapInfo(servers, nodeBuilder.build());
}

/**
* Converts Java representation of the given JSON value to protobuf's {@link
* com.google.protobuf.Value} representation.
*
* <p>The given {@code rawObject} must be a valid JSON value in Java representation, which is
* either a {@code Map<String, ?>}, {@code List<?>}, {@code String}, {@code Double},
* {@code Boolean}, or {@code null}.
*/
private static Value convertToValue(Object rawObject) {
Value.Builder valueBuilder = Value.newBuilder();
if (rawObject == null) {
valueBuilder.setNullValue(NullValue.NULL_VALUE);
} else if (rawObject instanceof Double) {
valueBuilder.setNumberValue((Double) rawObject);
} else if (rawObject instanceof String) {
valueBuilder.setStringValue((String) rawObject);
} else if (rawObject instanceof Boolean) {
valueBuilder.setBoolValue((Boolean) rawObject);
} else if (rawObject instanceof Map) {
Struct.Builder structBuilder = Struct.newBuilder();
@SuppressWarnings("unchecked")
Map<String, ?> map = (Map<String, ?>) rawObject;
for (Map.Entry<String, ?> entry : map.entrySet()) {
structBuilder.putFields(entry.getKey(), convertToValue(entry.getValue()));
}
valueBuilder.setStructValue(structBuilder);
} else if (rawObject instanceof List) {
ListValue.Builder listBuilder = ListValue.newBuilder();
List<?> list = (List<?>) rawObject;
for (Object obj : list) {
listBuilder.addValues(convertToValue(obj));
}
valueBuilder.setListValue(listBuilder);
}
return valueBuilder.build();
}

/**
* Data class containing channel credentials configurations for xDS protocol communication.
*/
Expand Down Expand Up @@ -247,11 +200,14 @@ String getType() {
static class ServerInfo {
private final String serverUri;
private final List<ChannelCreds> channelCredsList;
@Nullable
private final List<String> serverFeatures;

@VisibleForTesting
ServerInfo(String serverUri, List<ChannelCreds> channelCredsList) {
ServerInfo(String serverUri, List<ChannelCreds> channelCredsList, List<String> serverFeatures) {
this.serverUri = serverUri;
this.channelCredsList = channelCredsList;
this.serverFeatures = serverFeatures;
}

String getServerUri() {
Expand All @@ -261,6 +217,12 @@ String getServerUri() {
List<ChannelCreds> getChannelCredentials() {
return Collections.unmodifiableList(channelCredsList);
}

List<String> getServerFeatures() {
return serverFeatures == null
? Collections.<String>emptyList()
: Collections.unmodifiableList(serverFeatures);
}
}

/**
Expand Down Expand Up @@ -291,6 +253,5 @@ List<ServerInfo> getServers() {
public Node getNode() {
return node;
}

}
}
45 changes: 18 additions & 27 deletions xds/src/main/java/io/grpc/xds/ClientLoadCounter.java
Expand Up @@ -54,27 +54,10 @@ final class ClientLoadCounter {
private final AtomicLong callsIssued = new AtomicLong();
private final MetricRecorder[] metricRecorders = new MetricRecorder[THREAD_BALANCING_FACTOR];

// True if this counter continues to record stats after next snapshot. Otherwise, it will be
// discarded.
private boolean active;

ClientLoadCounter() {
for (int i = 0; i < THREAD_BALANCING_FACTOR; i++) {
metricRecorders[i] = new MetricRecorder();
}
active = true;
}

/**
* Must only be used for testing.
*/
@VisibleForTesting
ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) {
this();
this.callsSucceeded.set(callsSucceeded);
this.callsInProgress.set(callsInProgress);
this.callsFailed.set(callsFailed);
this.callsIssued.set(callsIssued);
}

void recordCallStarted() {
Expand All @@ -98,12 +81,8 @@ void recordMetric(String name, double value) {
}

/**
* Generates a snapshot for load stats recorded in this counter. Successive snapshots represent
* load stats recorded for the interval since the previous snapshot. So taking a snapshot clears
* the counter state except for ongoing RPC recordings.
*
* <p>This method is not thread-safe and must be called from {@link
* io.grpc.LoadBalancer.Helper#getSynchronizationContext()}.
* Generates a snapshot for load stats recorded in this counter for the interval between calls
* of this method.
*/
ClientLoadSnapshot snapshot() {
Map<String, MetricValue> aggregatedValues = new HashMap<>();
Expand All @@ -127,12 +106,24 @@ ClientLoadSnapshot snapshot() {
aggregatedValues);
}

void setActive(boolean value) {
active = value;
@VisibleForTesting
void setCallsIssued(long callsIssued) {
this.callsIssued.set(callsIssued);
}

@VisibleForTesting
void setCallsInProgress(long callsInProgress) {
this.callsInProgress.set(callsInProgress);
}

boolean isActive() {
return active;
@VisibleForTesting
void setCallsSucceeded(long callsSucceeded) {
this.callsSucceeded.set(callsSucceeded);
}

@VisibleForTesting
void setCallsFailed(long callsFailed) {
this.callsFailed.set(callsFailed);
}

/**
Expand Down
35 changes: 8 additions & 27 deletions xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java
Expand Up @@ -22,7 +22,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.grpc.Attributes;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
Expand All @@ -38,6 +37,8 @@
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
Expand Down Expand Up @@ -208,11 +209,9 @@ public void shutdown() {
*/
private final class ClusterEndpointsBalancerFactory extends LoadBalancer.Factory {
@Nullable final String clusterServiceName;
final LoadStatsStore loadStatsStore;

ClusterEndpointsBalancerFactory(@Nullable String clusterServiceName) {
this.clusterServiceName = clusterServiceName;
loadStatsStore = new LoadStatsStoreImpl(clusterName, clusterServiceName);
}

@Override
Expand Down Expand Up @@ -248,6 +247,7 @@ final class ClusterEndpointsBalancer extends LoadBalancer {
ClusterEndpointsBalancer(Helper helper) {
this.helper = helper;
resourceName = clusterServiceName != null ? clusterServiceName : clusterName;
LoadStatsStore loadStatsStore = xdsClient.addClientStats(clusterName, clusterServiceName);
localityStore =
localityStoreFactory.newLocalityStore(logId, helper, lbRegistry, loadStatsStore);
endpointWatcher = new EndpointWatcherImpl();
Expand All @@ -267,22 +267,12 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
throw new AssertionError("Can only report load to the same management server");
}
if (!isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Start reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.reportClientStats(clusterName, clusterServiceName, loadStatsStore);
xdsClient.reportClientStats();
isReportingLoad = true;
}
} else {
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
}
Expand All @@ -304,15 +294,11 @@ public boolean canHandleEmptyAddressListFromNameResolution() {
@Override
public void shutdown() {
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
localityStore.reset();
xdsClient.removeClientStats(clusterName, clusterServiceName);
xdsClient.cancelEndpointDataWatch(resourceName, endpointWatcher);
logger.log(
XdsLogLevel.INFO,
Expand Down Expand Up @@ -365,12 +351,7 @@ public void onEndpointChanged(EndpointUpdate endpointUpdate) {
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
if (isReportingLoad) {
logger.log(
XdsLogLevel.INFO,
"Stop reporting loads for cluster: {0}, cluster_service: {1}",
clusterName,
clusterServiceName);
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
xdsClient.cancelClientStatsReport();
isReportingLoad = false;
}
localityStore.reset();
Expand Down