Skip to content

Commit

Permalink
xds: add data type for ClusterStats (#7335)
Browse files Browse the repository at this point in the history
In preparation of LRS v3 support.
  • Loading branch information
dapengzhang0 committed Aug 18, 2020
1 parent ee9109e commit cb07b0f
Show file tree
Hide file tree
Showing 9 changed files with 664 additions and 78 deletions.
526 changes: 520 additions & 6 deletions xds/src/main/java/io/grpc/xds/EnvoyProtoData.java

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions xds/src/main/java/io/grpc/xds/LoadReportClient.java
Expand Up @@ -37,6 +37,7 @@
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -228,10 +229,14 @@ public void run() {
private void sendLoadReport() {
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
if (reportAllClusters) {
requestBuilder.addAllClusterStats(loadStatsManager.getAllLoadReports());
for (ClusterStats clusterStats : loadStatsManager.getAllLoadReports()) {
requestBuilder.addClusterStats(clusterStats.toEnvoyProtoClusterStatsV2());
}
} else {
for (String name : clusterNames) {
requestBuilder.addAllClusterStats(loadStatsManager.getClusterLoadReports(name));
for (ClusterStats clusterStats : loadStatsManager.getClusterLoadReports(name)) {
requestBuilder.addClusterStats(clusterStats.toEnvoyProtoClusterStatsV2());
}
}
}
LoadStatsRequest request = requestBuilder.build();
Expand Down
4 changes: 1 addition & 3 deletions xds/src/main/java/io/grpc/xds/LoadStatsManager.java
Expand Up @@ -19,7 +19,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.Locality;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -90,7 +90,6 @@ void removeLoadStats(String cluster, @Nullable String clusterService) {
* the interval between calls of this method or {@link #getAllLoadReports}. A cluster may send
* loads to more than one cluster_service, they are included in separate stats reports.
*/
// TODO(chengyuanzhang): do not use proto type directly.
List<ClusterStats> getClusterLoadReports(String cluster) {
List<ClusterStats> res = new ArrayList<>();
Map<String, ReferenceCounted<LoadStatsStore>> clusterLoadStatsStores =
Expand All @@ -109,7 +108,6 @@ List<ClusterStats> getClusterLoadReports(String cluster) {
* interval between calls of this method or {@link #getClusterLoadReports}. Each report
* includes stats for one cluster:cluster_service.
*/
// TODO(chengyuanzhang): do not use proto type directly.
List<ClusterStats> getAllLoadReports() {
List<ClusterStats> res = new ArrayList<>();
for (Map<String, ReferenceCounted<LoadStatsStore>> clusterLoadStatsStores
Expand Down
28 changes: 13 additions & 15 deletions xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java
Expand Up @@ -20,15 +20,14 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats;
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.grpc.internal.GrpcUtil;
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
import io.grpc.xds.ClientLoadCounter.MetricValue;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.ClusterStats.DroppedRequests;
import io.grpc.xds.EnvoyProtoData.EndpointLoadMetricStats;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.UpstreamLocalityStats;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.LoadStatsManager.LoadStatsStoreFactory;
import java.util.Map;
Expand All @@ -50,7 +49,6 @@
final class LoadStatsStoreImpl implements LoadStatsStore {
private final String clusterName;
@Nullable
@SuppressWarnings("unused")
private final String clusterServiceName;
private final ConcurrentMap<Locality, ReferenceCounted<ClientLoadCounter>> localityLoadCounters
= new ConcurrentHashMap<>();
Expand Down Expand Up @@ -80,12 +78,14 @@ final class LoadStatsStoreImpl implements LoadStatsStore {
public ClusterStats generateLoadReport() {
ClusterStats.Builder statsBuilder = ClusterStats.newBuilder();
statsBuilder.setClusterName(clusterName);
// TODO(chengyuangzhang): also set cluster_service_name if provided.
if (clusterServiceName != null) {
statsBuilder.setClusterServiceName(clusterServiceName);
}
for (Map.Entry<Locality, ReferenceCounted<ClientLoadCounter>> entry
: localityLoadCounters.entrySet()) {
ClientLoadSnapshot snapshot = entry.getValue().get().snapshot();
UpstreamLocalityStats.Builder localityStatsBuilder =
UpstreamLocalityStats.newBuilder().setLocality(entry.getKey().toEnvoyProtoLocalityV2());
UpstreamLocalityStats.newBuilder().setLocality(entry.getKey());
localityStatsBuilder
.setTotalSuccessfulRequests(snapshot.getCallsSucceeded())
.setTotalErrorRequests(snapshot.getCallsFailed())
Expand All @@ -96,9 +96,10 @@ public ClusterStats generateLoadReport() {
EndpointLoadMetricStats.newBuilder()
.setMetricName(metric.getKey())
.setNumRequestsFinishedWithMetric(metric.getValue().getNumReports())
.setTotalMetricValue(metric.getValue().getTotalValue()));
.setTotalMetricValue(metric.getValue().getTotalValue())
.build());
}
statsBuilder.addUpstreamLocalityStats(localityStatsBuilder);
statsBuilder.addUpstreamLocalityStats(localityStatsBuilder.build());
// Discard counters for localities that are no longer exposed by the remote balancer and
// no RPCs ongoing.
if (entry.getValue().getReferenceCount() == 0 && snapshot.getCallsInProgress() == 0) {
Expand All @@ -109,13 +110,10 @@ public ClusterStats generateLoadReport() {
for (Map.Entry<String, AtomicLong> entry : dropCounters.entrySet()) {
long drops = entry.getValue().getAndSet(0);
totalDrops += drops;
statsBuilder.addDroppedRequests(DroppedRequests.newBuilder()
.setCategory(entry.getKey())
.setDroppedCount(drops));
statsBuilder.addDroppedRequests(new DroppedRequests(entry.getKey(),drops));
}
statsBuilder.setTotalDroppedRequests(totalDrops);
statsBuilder.setLoadReportInterval(
Durations.fromNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS)));
statsBuilder.setLoadReportIntervalNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS));
stopwatch.reset().start();
return statsBuilder.build();
}
Expand Down
79 changes: 79 additions & 0 deletions xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java
Expand Up @@ -34,12 +34,16 @@
import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.Int64Range;
import io.grpc.xds.EnvoyProtoData.Address;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.ClusterStats.DroppedRequests;
import io.grpc.xds.EnvoyProtoData.ClusterWeight;
import io.grpc.xds.EnvoyProtoData.EndpointLoadMetricStats;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.EnvoyProtoData.Route;
import io.grpc.xds.EnvoyProtoData.RouteAction;
import io.grpc.xds.EnvoyProtoData.StructOrError;
import io.grpc.xds.EnvoyProtoData.UpstreamLocalityStats;
import io.grpc.xds.RouteMatch.FractionMatcher;
import io.grpc.xds.RouteMatch.HeaderMatcher;
import io.grpc.xds.RouteMatch.PathMatcher;
Expand Down Expand Up @@ -564,4 +568,79 @@ public void convertClusterWeight() {
assertThat(struct.getName()).isEqualTo("cluster-foo");
assertThat(struct.getWeight()).isEqualTo(30);
}

@Test
public void clusterStats_convertToEnvoyProto() {
ClusterStats clusterStats =
ClusterStats.newBuilder()
.setClusterName("cluster1")
.setLoadReportIntervalNanos(1234)
.setTotalDroppedRequests(123)
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(new Locality("region1", "zone1", "subzone1"))
.setTotalErrorRequests(1)
.setTotalRequestsInProgress(2)
.setTotalSuccessfulRequests(100)
.setTotalIssuedRequests(103)
.addLoadMetricStats(EndpointLoadMetricStats.newBuilder()
.setMetricName("metric1")
.setNumRequestsFinishedWithMetric(1000)
.setTotalMetricValue(0.5D)
.build())
.build())
.addDroppedRequests(new DroppedRequests("category1", 100))
.build();

io.envoyproxy.envoy.config.endpoint.v3.ClusterStats clusterStatsProto =
clusterStats.toEnvoyProtoClusterStats();
assertThat(clusterStatsProto).isEqualTo(
io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
.setClusterName("cluster1")
.setLoadReportInterval(Durations.fromNanos(1234))
.setTotalDroppedRequests(123)
.addUpstreamLocalityStats(
io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
.setLocality(
new Locality("region1", "zone1", "subzone1").toEnvoyProtoLocality())
.setTotalErrorRequests(1)
.setTotalRequestsInProgress(2)
.setTotalSuccessfulRequests(100)
.setTotalIssuedRequests(103)
.addLoadMetricStats(
io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
.setMetricName("metric1")
.setNumRequestsFinishedWithMetric(1000)
.setTotalMetricValue(0.5D)))
.addDroppedRequests(
io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
.setCategory("category1")
.setDroppedCount(100))
.build());

io.envoyproxy.envoy.api.v2.endpoint.ClusterStats clusterStatsProtoV2 =
clusterStats.toEnvoyProtoClusterStatsV2();
assertThat(clusterStatsProtoV2).isEqualTo(
io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.newBuilder()
.setClusterName("cluster1")
.setLoadReportInterval(Durations.fromNanos(1234))
.setTotalDroppedRequests(123)
.addUpstreamLocalityStats(
io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats.newBuilder()
.setLocality(
new Locality("region1", "zone1", "subzone1").toEnvoyProtoLocalityV2())
.setTotalErrorRequests(1)
.setTotalRequestsInProgress(2)
.setTotalSuccessfulRequests(100)
.setTotalIssuedRequests(103)
.addLoadMetricStats(
io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats.newBuilder()
.setMetricName("metric1")
.setNumRequestsFinishedWithMetric(1000)
.setTotalMetricValue(0.5D)))
.addDroppedRequests(
io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests.newBuilder()
.setCategory("category1")
.setDroppedCount(100))
.build());
}
}
51 changes: 23 additions & 28 deletions xds/src/test/java/io/grpc/xds/LoadReportClientTest.java
Expand Up @@ -35,11 +35,7 @@
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
Expand All @@ -54,6 +50,10 @@
import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.ClusterStats.DroppedRequests;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.UpstreamLocalityStats;
import io.grpc.xds.LoadStatsManager.LoadStatsStore;
import io.grpc.xds.LoadStatsManager.LoadStatsStoreFactory;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -225,15 +225,15 @@ public void periodicLoadReporting() {
fakeClock.forwardNanos(1);
assertThat(loadStatsStore1.reported).hasSize(1);
ClusterStats report1 = loadStatsStore1.reported.poll();
assertThat(Durations.toNanos(report1.getLoadReportInterval())).isEqualTo(1000);
assertThat(report1.getLoadReportIntervalNanos()).isEqualTo(1000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(report1))));

loadStatsStore1.refresh();
fakeClock.forwardNanos(1000);
assertThat(loadStatsStore1.reported).hasSize(1);
report1 = loadStatsStore1.reported.poll();
assertThat(Durations.toNanos(report1.getLoadReportInterval())).isEqualTo(1000);
assertThat(report1.getLoadReportIntervalNanos()).isEqualTo(1000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(report1))));

Expand All @@ -250,7 +250,7 @@ public void periodicLoadReporting() {
fakeClock.forwardNanos(1000);
assertThat(loadStatsStore1.reported).hasSize(1);
report1 = loadStatsStore1.reported.poll();
assertThat(Durations.toNanos(report1.getLoadReportInterval())).isEqualTo(2000);
assertThat(report1.getLoadReportIntervalNanos()).isEqualTo(2000);
assertThat(loadStatsStore2.reported).isEmpty();
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(report1))));
Expand All @@ -269,8 +269,8 @@ public void periodicLoadReporting() {
report1 = loadStatsStore1.reported.poll();
assertThat(loadStatsStore2.reported).hasSize(1);
ClusterStats report2 = loadStatsStore2.reported.poll();
assertThat(Durations.toNanos(report1.getLoadReportInterval())).isEqualTo(2000);
assertThat(Durations.toNanos(report2.getLoadReportInterval())).isEqualTo(2000 + 2000);
assertThat(report1.getLoadReportIntervalNanos()).isEqualTo(2000);
assertThat(report2.getLoadReportIntervalNanos()).isEqualTo(2000 + 2000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Arrays.asList(report1, report2))));

Expand All @@ -283,7 +283,7 @@ public void periodicLoadReporting() {
assertThat(loadStatsStore1.reported).isEmpty();
assertThat(loadStatsStore2.reported).hasSize(1);
report2 = loadStatsStore2.reported.poll();
assertThat(Durations.toNanos(report2.getLoadReportInterval())).isEqualTo(2000);
assertThat(report2.getLoadReportIntervalNanos()).isEqualTo(2000);
inOrder.verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(report2))));

Expand Down Expand Up @@ -399,7 +399,7 @@ public void lrsStreamClosedAndRetried() {
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 10));
fakeClock.forwardNanos(10);
ClusterStats report = Iterables.getOnlyElement(loadStatsStore.reported);
assertThat(Durations.toNanos(report.getLoadReportInterval()))
assertThat(report.getLoadReportIntervalNanos())
.isEqualTo(TimeUnit.SECONDS.toNanos(1 + 10 + 2) + 10);
verify(requestObserver)
.onNext(argThat(new LoadStatsRequestMatcher(Collections.singletonList(report))));
Expand Down Expand Up @@ -500,8 +500,9 @@ public boolean matches(LoadStatsRequest argument) {
if (argument.getClusterStatsCount() != expectedStats.size()) {
return false;
}
for (ClusterStats stats : argument.getClusterStatsList()) {
if (!stats.equals(expectedStats.get(stats.getClusterName()))) {
for (io.envoyproxy.envoy.api.v2.endpoint.ClusterStats stats
: argument.getClusterStatsList()) {
if (!stats.equals(expectedStats.get(stats.getClusterName()).toEnvoyProtoClusterStatsV2())) {
return false;
}
}
Expand All @@ -528,7 +529,7 @@ private FakeLoadStatsStore(String cluster, String clusterService, Stopwatch stop
public ClusterStats generateLoadReport() {
ClusterStats report =
stats.toBuilder()
.setLoadReportInterval(Durations.fromNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS)))
.setLoadReportIntervalNanos(stopwatch.elapsed(TimeUnit.NANOSECONDS))
.build();
stopwatch.reset().start();
reported.offer(report);
Expand Down Expand Up @@ -563,25 +564,19 @@ private void refresh() {
if (clusterService != null) {
clusterStatsBuilder.setClusterServiceName(clusterService);
}
clusterStatsBuilder.addUpstreamLocalityStats(
UpstreamLocalityStats.newBuilder()
.setLocality(
Locality.newBuilder()
.setRegion(cluster + "-region-foo")
.setZone(cluster + "-zone-bar")
.setSubZone(cluster + "-subzone-baz"))
clusterStatsBuilder
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(new Locality(
cluster + "-region-foo", cluster + "-zone-bar", cluster + "-subzone-baz"))
.setTotalRequestsInProgress(callsInProgress)
.setTotalSuccessfulRequests(callsSucceeded)
.setTotalErrorRequests(callsFailed)
.setTotalIssuedRequests(callsIssued))
.setTotalIssuedRequests(callsIssued)
.build())
.addDroppedRequests(
DroppedRequests.newBuilder()
.setCategory("lb")
.setDroppedCount(numLbDrops))
new DroppedRequests("lb",numLbDrops))
.addDroppedRequests(
DroppedRequests.newBuilder()
.setCategory("throttle")
.setDroppedCount(numThrottleDrops))
new DroppedRequests("throttle", numThrottleDrops))
.setTotalDroppedRequests(numLbDrops + numThrottleDrops);
stats = clusterStatsBuilder.build();
}
Expand Down

0 comments on commit cb07b0f

Please sign in to comment.