Skip to content

Commit

Permalink
Merge pull request #160 from GoogleCloudPlatform/metrics-fix
Browse files Browse the repository at this point in the history
Support common labels for multi-endpoint metrics
  • Loading branch information
nimf committed Jan 30, 2023
2 parents bc4316b + b26ee7d commit 58422f3
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 55 deletions.
Expand Up @@ -31,8 +31,8 @@ class GcpMetricsConstants {
public static String STATUS_UNAVAILABLE = "UNAVAILABLE";
public static String ME_NAME_LABEL = "me_name";
public static String ME_NAME_LABEL_DESC = "Multi-endpoint name.";
public static String TYPE_LABEL = "type";
public static String TYPE_LABEL_DESC = "Switch type (fallback/recover/replace).";
public static String SWITCH_TYPE_LABEL = "switch_type";
public static String SWITCH_TYPE_LABEL_DESC = "Switch type (fallback/recover/replace).";
public static String TYPE_FALLBACK = "FALLBACK";
public static String TYPE_RECOVER = "RECOVER";
public static String TYPE_REPLACE = "REPLACE";
Expand Down
Expand Up @@ -29,8 +29,8 @@
import static com.google.cloud.grpc.GcpMetricsConstants.STATUS_LABEL_DESC;
import static com.google.cloud.grpc.GcpMetricsConstants.STATUS_UNAVAILABLE;
import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_FALLBACK;
import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_LABEL;
import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_LABEL_DESC;
import static com.google.cloud.grpc.GcpMetricsConstants.SWITCH_TYPE_LABEL;
import static com.google.cloud.grpc.GcpMetricsConstants.SWITCH_TYPE_LABEL_DESC;
import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_RECOVER;
import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_REPLACE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -63,7 +63,7 @@
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -152,6 +152,7 @@ public class GcpMultiEndpointChannel extends ManagedChannel {
private MultiEndpoint defaultMultiEndpoint;
private final ApiConfig apiConfig;
private final GcpManagedChannelOptions gcpManagedChannelOptions;
private final GcpMetricsOptions gcpMetricsOptions;
private DerivedLongGauge endpointStateMetric;
private DerivedLongCumulative endpointSwitchMetric;
private DerivedLongGauge currentEndpointMetric;
Expand Down Expand Up @@ -179,6 +180,7 @@ public GcpMultiEndpointChannel(
GcpManagedChannelOptions gcpManagedChannelOptions) {
this.apiConfig = apiConfig;
this.gcpManagedChannelOptions = gcpManagedChannelOptions;
this.gcpMetricsOptions = gcpManagedChannelOptions.getMetricsOptions();
createMetrics();
setMultiEndpoints(meOptions);
}
Expand All @@ -202,15 +204,15 @@ private void setUpMetrics() {
}

endpointStateMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(endpoint),
LabelValue.create(STATUS_AVAILABLE)
),
this,
EndpointStateMonitor::reportAvailable
);
endpointStateMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(endpoint),
LabelValue.create(STATUS_UNAVAILABLE)
),
Expand All @@ -224,11 +226,11 @@ private void removeMetrics() {
return;
}

endpointStateMetric.removeTimeSeries(Arrays.asList(
endpointStateMetric.removeTimeSeries(appendCommonValues(
LabelValue.create(endpoint),
LabelValue.create(STATUS_AVAILABLE)
));
endpointStateMetric.removeTimeSeries(Arrays.asList(
endpointStateMetric.removeTimeSeries(appendCommonValues(
LabelValue.create(endpoint),
LabelValue.create(STATUS_UNAVAILABLE)
));
Expand Down Expand Up @@ -336,23 +338,23 @@ private void setUpMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Multi
String name = options.getName();
List<String> endpoints = options.getEndpoints();
endpointSwitchMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(TYPE_FALLBACK)
),
me,
MultiEndpoint::getFallbackCnt
);
endpointSwitchMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(TYPE_RECOVER)
),
me,
MultiEndpoint::getRecoverCnt
);
endpointSwitchMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(TYPE_REPLACE)
),
Expand All @@ -363,7 +365,7 @@ private void setUpMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Multi
CurrentEndpointWatcher watcher = new CurrentEndpointWatcher(me, e);
currentEndpointWatchers.put(name + ":" + e, watcher);
currentEndpointMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(e)
),
Expand All @@ -379,7 +381,7 @@ private void updateMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Mult
for (String e : existingEndpoints) {
if (!newEndpoints.contains(e)) {
currentEndpointMetric.removeTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(options.getName()),
LabelValue.create(e)
)
Expand All @@ -392,7 +394,7 @@ private void updateMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Mult
CurrentEndpointWatcher watcher = new CurrentEndpointWatcher(me, e);
currentEndpointWatchers.put(options.getName() + ":" + e, watcher);
currentEndpointMetric.createTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(options.getName()),
LabelValue.create(e)
),
Expand All @@ -405,26 +407,26 @@ private void updateMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Mult

private void removeMetricsForMultiEndpoint(String name, MultiEndpoint me) {
endpointSwitchMetric.removeTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(TYPE_FALLBACK)
)
);
endpointSwitchMetric.removeTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(TYPE_RECOVER)
)
);
endpointSwitchMetric.removeTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(TYPE_REPLACE)
)
);
for (String e : me.getEndpoints()) {
currentEndpointMetric.removeTimeSeries(
Arrays.asList(
appendCommonValues(
LabelValue.create(name),
LabelValue.create(e)
)
Expand Down Expand Up @@ -559,11 +561,11 @@ private synchronized void maybeCleanupPools(Set<String> endpoints) {
}

private void createMetrics() {
if (gcpManagedChannelOptions.getMetricsOptions() == null) {
if (gcpMetricsOptions == null) {
return;
}

MetricRegistry metricRegistry = gcpManagedChannelOptions.getMetricsOptions().getMetricRegistry();
MetricRegistry metricRegistry = gcpMetricsOptions.getMetricRegistry();
if (metricRegistry == null) {
return;
}
Expand All @@ -572,60 +574,64 @@ private void createMetrics() {
return;
}

String prefix = gcpManagedChannelOptions.getMetricsOptions().getNamePrefix();

final List<LabelKey> endpointStateKeys = Arrays.asList(
LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC),
LabelKey.create(STATUS_LABEL, STATUS_LABEL_DESC)
);
String prefix = gcpMetricsOptions.getNamePrefix();

endpointStateMetric =
metricRegistry.addDerivedLongGauge(
prefix + METRIC_ENDPOINT_STATE,
createMetricOptions(
"Reports 1 when endpoint is in the status.",
endpointStateKeys,
COUNT
COUNT,
LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC),
LabelKey.create(STATUS_LABEL, STATUS_LABEL_DESC)
)
);

final List<LabelKey> endpointSwitchKeys = Arrays.asList(
LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC),
LabelKey.create(TYPE_LABEL, TYPE_LABEL_DESC)
);

endpointSwitchMetric =
metricRegistry.addDerivedLongCumulative(
prefix + METRIC_ENDPOINT_SWITCH,
createMetricOptions(
"Reports occurrences of changes of current endpoint for a multi-endpoint with " +
"the name, specifying change type.",
endpointSwitchKeys,
COUNT
COUNT,
LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC),
LabelKey.create(SWITCH_TYPE_LABEL, SWITCH_TYPE_LABEL_DESC)
)
);

final List<LabelKey> currentEndpointKeys = Arrays.asList(
LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC),
LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC)
);

currentEndpointMetric =
metricRegistry.addDerivedLongGauge(
prefix + METRIC_CURRENT_ENDPOINT,
createMetricOptions(
"Reports 1 when an endpoint is current for multi-endpoint with the name.",
currentEndpointKeys,
COUNT
COUNT,
LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC),
LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC)
)
);
}

private List<LabelValue> appendCommonValues(LabelValue ...labelValues) {
final List<LabelValue> values = new ArrayList<>();
Collections.addAll(values, labelValues);
if (gcpMetricsOptions != null &&
gcpMetricsOptions.getLabelValues() != null) {
values.addAll(gcpMetricsOptions.getLabelValues());
}
return values;
}

private MetricOptions createMetricOptions(
String description, List<LabelKey> labelKeys, String unit) {
String description, String unit, LabelKey ...labelKeys) {
final List<LabelKey> keys = new ArrayList<>();
Collections.addAll(keys, labelKeys);
if (gcpMetricsOptions != null &&
gcpMetricsOptions.getLabelKeys() != null) {
keys.addAll(gcpMetricsOptions.getLabelKeys());
}
return MetricOptions.builder()
.setDescription(description)
.setLabelKeys(labelKeys)
.setLabelKeys(keys)
.setUnit(unit)
.build();
}
Expand Down
Expand Up @@ -96,6 +96,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.StreamObserver;
import io.opencensus.metrics.LabelKey;
import io.opencensus.metrics.LabelValue;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -156,6 +157,9 @@ public final class SpannerIntegrationTest {
final String leaderME = "leader";
final String followerME = "follower";

final LabelKey commonKey = LabelKey.create("common_key", "Common key");
final LabelValue commonValue = LabelValue.create("common_value");

private void sleep(long millis) throws InterruptedException {
Sleeper.DEFAULT.sleep(millis);
}
Expand Down Expand Up @@ -469,12 +473,14 @@ private long getOkCallsCount(
List<PointWithFunction<?>> metric =
record.getMetrics().get(GcpMetricsConstants.METRIC_NUM_CALLS_COMPLETED);
for (PointWithFunction<?> m : metric) {
assertThat(m.keys().get(0).getKey()).isEqualTo("result");
assertThat(m.keys().get(1).getKey()).isEqualTo("endpoint");
assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.RESULT_LABEL);
assertThat(m.keys().get(1).getKey()).isEqualTo(commonKey.getKey());
assertThat(m.values().get(1).getValue()).isEqualTo(commonValue.getValue());
assertThat(m.keys().get(2).getKey()).isEqualTo(GcpMetricsConstants.ENDPOINT_LABEL);
if (!m.values().get(0).equals(LabelValue.create(GcpMetricsConstants.RESULT_SUCCESS))) {
continue;
}
if (!m.values().get(1).equals(LabelValue.create(endpoint))) {
if (!m.values().get(2).equals(LabelValue.create(endpoint))) {
continue;
}
return m.value();
Expand Down Expand Up @@ -502,6 +508,8 @@ private String getCurrentEndpoint(FakeMetricRegistry fakeRegistry, String meName
for (PointWithFunction<?> m : metric) {
assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.ME_NAME_LABEL);
assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.ENDPOINT_LABEL);
assertThat(m.keys().get(2).getKey()).isEqualTo(commonKey.getKey());
assertThat(m.values().get(2).getValue()).isEqualTo(commonValue.getValue());
if (!m.values().get(0).getValue().equals(meName)) {
continue;
}
Expand All @@ -520,6 +528,8 @@ private String getEndpointState(FakeMetricRegistry fakeRegistry, String endpoint
for (PointWithFunction<?> m : metric) {
assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.ENDPOINT_LABEL);
assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.STATUS_LABEL);
assertThat(m.keys().get(2).getKey()).isEqualTo(commonKey.getKey());
assertThat(m.values().get(2).getValue()).isEqualTo(commonValue.getValue());
if (!m.values().get(0).getValue().equals(endpoint)) {
continue;
}
Expand All @@ -537,7 +547,9 @@ private long getSwitchCount(FakeMetricRegistry fakeRegistry, String meName, Stri
record.getMetrics().get(GcpMetricsConstants.METRIC_ENDPOINT_SWITCH);
for (PointWithFunction<?> m : metric) {
assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.ME_NAME_LABEL);
assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.TYPE_LABEL);
assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.SWITCH_TYPE_LABEL);
assertThat(m.keys().get(2).getKey()).isEqualTo(commonKey.getKey());
assertThat(m.values().get(2).getValue()).isEqualTo(commonValue.getValue());
if (!m.values().get(0).getValue().equals(meName)) {
continue;
}
Expand Down Expand Up @@ -618,7 +630,10 @@ public void testSpannerMultiEndpointClient() throws IOException, InterruptedExce
.build())
.withMetricsOptions(GcpMetricsOptions.newBuilder()
.withMetricRegistry(fakeRegistry)
.build())
.withLabels(
Collections.singletonList(commonKey),
Collections.singletonList(commonValue)
).build())
.build());

final int currentIndex = GcpManagedChannel.channelPoolIndex.get();
Expand Down Expand Up @@ -650,14 +665,16 @@ public void testSpannerMultiEndpointClient() throws IOException, InterruptedExce
// Make sure endpoint is set as a metric label for each pool.
assertThat(logRecords.stream().filter(logRecord ->
logRecord.getMessage().matches(
leaderPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[endpoint: " +
"\"" + leaderEndpoint + "\"], metricRegistry: .*"
leaderPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[" +
commonKey.getKey() + ": \"" + commonValue.getValue() + "\", endpoint: " +
"\"" + leaderEndpoint + "\".*"
)).count()).isEqualTo(1);

assertThat(logRecords.stream().filter(logRecord ->
logRecord.getMessage().matches(
followerPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[endpoint: " +
"\"" + followerEndpoint + "\"], metricRegistry: .*"
followerPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[" +
commonKey.getKey() + ": \"" + commonValue.getValue() + "\", endpoint: " +
"\"" + followerEndpoint + "\".*"
)).count()).isEqualTo(1);

logRecords.clear();
Expand Down Expand Up @@ -885,7 +902,10 @@ public void testSpannerMultiEndpointClientWithDelay() throws IOException, Interr
.build())
.withMetricsOptions(GcpMetricsOptions.newBuilder()
.withMetricRegistry(fakeRegistry)
.build())
.withLabels(
Collections.singletonList(commonKey),
Collections.singletonList(commonValue)
).build())
.build());

final int currentIndex = GcpManagedChannel.channelPoolIndex.get();
Expand Down

0 comments on commit 58422f3

Please sign in to comment.