diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMetricsConstants.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMetricsConstants.java index c8073742..2af7ed5f 100644 --- a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMetricsConstants.java +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMetricsConstants.java @@ -31,8 +31,8 @@ class GcpMetricsConstants { public static String STATUS_UNAVAILABLE = "UNAVAILABLE"; public static String ME_NAME_LABEL = "me_name"; public static String ME_NAME_LABEL_DESC = "Multi-endpoint name."; - public static String TYPE_LABEL = "type"; - public static String TYPE_LABEL_DESC = "Switch type (fallback/recover/replace)."; + public static String SWITCH_TYPE_LABEL = "switch_type"; + public static String SWITCH_TYPE_LABEL_DESC = "Switch type (fallback/recover/replace)."; public static String TYPE_FALLBACK = "FALLBACK"; public static String TYPE_RECOVER = "RECOVER"; public static String TYPE_REPLACE = "REPLACE"; diff --git a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMultiEndpointChannel.java b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMultiEndpointChannel.java index d389c054..80a5f3f4 100644 --- a/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMultiEndpointChannel.java +++ b/grpc-gcp/src/main/java/com/google/cloud/grpc/GcpMultiEndpointChannel.java @@ -29,8 +29,8 @@ import static com.google.cloud.grpc.GcpMetricsConstants.STATUS_LABEL_DESC; import static com.google.cloud.grpc.GcpMetricsConstants.STATUS_UNAVAILABLE; import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_FALLBACK; -import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_LABEL; -import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_LABEL_DESC; +import static com.google.cloud.grpc.GcpMetricsConstants.SWITCH_TYPE_LABEL; +import static com.google.cloud.grpc.GcpMetricsConstants.SWITCH_TYPE_LABEL_DESC; import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_RECOVER; import static com.google.cloud.grpc.GcpMetricsConstants.TYPE_REPLACE; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -63,7 +63,7 @@ import java.net.URL; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -152,6 +152,7 @@ public class GcpMultiEndpointChannel extends ManagedChannel { private MultiEndpoint defaultMultiEndpoint; private final ApiConfig apiConfig; private final GcpManagedChannelOptions gcpManagedChannelOptions; + private final GcpMetricsOptions gcpMetricsOptions; private DerivedLongGauge endpointStateMetric; private DerivedLongCumulative endpointSwitchMetric; private DerivedLongGauge currentEndpointMetric; @@ -179,6 +180,7 @@ public GcpMultiEndpointChannel( GcpManagedChannelOptions gcpManagedChannelOptions) { this.apiConfig = apiConfig; this.gcpManagedChannelOptions = gcpManagedChannelOptions; + this.gcpMetricsOptions = gcpManagedChannelOptions.getMetricsOptions(); createMetrics(); setMultiEndpoints(meOptions); } @@ -202,7 +204,7 @@ private void setUpMetrics() { } endpointStateMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(endpoint), LabelValue.create(STATUS_AVAILABLE) ), @@ -210,7 +212,7 @@ private void setUpMetrics() { EndpointStateMonitor::reportAvailable ); endpointStateMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(endpoint), LabelValue.create(STATUS_UNAVAILABLE) ), @@ -224,11 +226,11 @@ private void removeMetrics() { return; } - endpointStateMetric.removeTimeSeries(Arrays.asList( + endpointStateMetric.removeTimeSeries(appendCommonValues( LabelValue.create(endpoint), LabelValue.create(STATUS_AVAILABLE) )); - endpointStateMetric.removeTimeSeries(Arrays.asList( + endpointStateMetric.removeTimeSeries(appendCommonValues( LabelValue.create(endpoint), LabelValue.create(STATUS_UNAVAILABLE) )); @@ -336,7 +338,7 @@ private void setUpMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Multi String name = options.getName(); List endpoints = options.getEndpoints(); endpointSwitchMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(TYPE_FALLBACK) ), @@ -344,7 +346,7 @@ private void setUpMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Multi MultiEndpoint::getFallbackCnt ); endpointSwitchMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(TYPE_RECOVER) ), @@ -352,7 +354,7 @@ private void setUpMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Multi MultiEndpoint::getRecoverCnt ); endpointSwitchMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(TYPE_REPLACE) ), @@ -363,7 +365,7 @@ private void setUpMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Multi CurrentEndpointWatcher watcher = new CurrentEndpointWatcher(me, e); currentEndpointWatchers.put(name + ":" + e, watcher); currentEndpointMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(e) ), @@ -379,7 +381,7 @@ private void updateMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Mult for (String e : existingEndpoints) { if (!newEndpoints.contains(e)) { currentEndpointMetric.removeTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(options.getName()), LabelValue.create(e) ) @@ -392,7 +394,7 @@ private void updateMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Mult CurrentEndpointWatcher watcher = new CurrentEndpointWatcher(me, e); currentEndpointWatchers.put(options.getName() + ":" + e, watcher); currentEndpointMetric.createTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(options.getName()), LabelValue.create(e) ), @@ -405,26 +407,26 @@ private void updateMetricsForMultiEndpoint(GcpMultiEndpointOptions options, Mult private void removeMetricsForMultiEndpoint(String name, MultiEndpoint me) { endpointSwitchMetric.removeTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(TYPE_FALLBACK) ) ); endpointSwitchMetric.removeTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(TYPE_RECOVER) ) ); endpointSwitchMetric.removeTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(TYPE_REPLACE) ) ); for (String e : me.getEndpoints()) { currentEndpointMetric.removeTimeSeries( - Arrays.asList( + appendCommonValues( LabelValue.create(name), LabelValue.create(e) ) @@ -559,11 +561,11 @@ private synchronized void maybeCleanupPools(Set endpoints) { } private void createMetrics() { - if (gcpManagedChannelOptions.getMetricsOptions() == null) { + if (gcpMetricsOptions == null) { return; } - MetricRegistry metricRegistry = gcpManagedChannelOptions.getMetricsOptions().getMetricRegistry(); + MetricRegistry metricRegistry = gcpMetricsOptions.getMetricRegistry(); if (metricRegistry == null) { return; } @@ -572,60 +574,64 @@ private void createMetrics() { return; } - String prefix = gcpManagedChannelOptions.getMetricsOptions().getNamePrefix(); - - final List endpointStateKeys = Arrays.asList( - LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC), - LabelKey.create(STATUS_LABEL, STATUS_LABEL_DESC) - ); + String prefix = gcpMetricsOptions.getNamePrefix(); endpointStateMetric = metricRegistry.addDerivedLongGauge( prefix + METRIC_ENDPOINT_STATE, createMetricOptions( "Reports 1 when endpoint is in the status.", - endpointStateKeys, - COUNT + COUNT, + LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC), + LabelKey.create(STATUS_LABEL, STATUS_LABEL_DESC) ) ); - final List endpointSwitchKeys = Arrays.asList( - LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC), - LabelKey.create(TYPE_LABEL, TYPE_LABEL_DESC) - ); - endpointSwitchMetric = metricRegistry.addDerivedLongCumulative( prefix + METRIC_ENDPOINT_SWITCH, createMetricOptions( "Reports occurrences of changes of current endpoint for a multi-endpoint with " + "the name, specifying change type.", - endpointSwitchKeys, - COUNT + COUNT, + LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC), + LabelKey.create(SWITCH_TYPE_LABEL, SWITCH_TYPE_LABEL_DESC) ) ); - final List currentEndpointKeys = Arrays.asList( - LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC), - LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC) - ); - currentEndpointMetric = metricRegistry.addDerivedLongGauge( prefix + METRIC_CURRENT_ENDPOINT, createMetricOptions( "Reports 1 when an endpoint is current for multi-endpoint with the name.", - currentEndpointKeys, - COUNT + COUNT, + LabelKey.create(ME_NAME_LABEL, ME_NAME_LABEL_DESC), + LabelKey.create(ENDPOINT_LABEL, ENDPOINT_LABEL_DESC) ) ); } + private List appendCommonValues(LabelValue ...labelValues) { + final List values = new ArrayList<>(); + Collections.addAll(values, labelValues); + if (gcpMetricsOptions != null && + gcpMetricsOptions.getLabelValues() != null) { + values.addAll(gcpMetricsOptions.getLabelValues()); + } + return values; + } + private MetricOptions createMetricOptions( - String description, List labelKeys, String unit) { + String description, String unit, LabelKey ...labelKeys) { + final List keys = new ArrayList<>(); + Collections.addAll(keys, labelKeys); + if (gcpMetricsOptions != null && + gcpMetricsOptions.getLabelKeys() != null) { + keys.addAll(gcpMetricsOptions.getLabelKeys()); + } return MetricOptions.builder() .setDescription(description) - .setLabelKeys(labelKeys) + .setLabelKeys(keys) .setUnit(unit) .build(); } diff --git a/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java b/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java index 542e0717..3c24dbef 100644 --- a/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java +++ b/grpc-gcp/src/test/java/com/google/cloud/grpc/SpannerIntegrationTest.java @@ -96,6 +96,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.auth.MoreCallCredentials; import io.grpc.stub.StreamObserver; +import io.opencensus.metrics.LabelKey; import io.opencensus.metrics.LabelValue; import java.io.File; import java.io.IOException; @@ -156,6 +157,9 @@ public final class SpannerIntegrationTest { final String leaderME = "leader"; final String followerME = "follower"; + final LabelKey commonKey = LabelKey.create("common_key", "Common key"); + final LabelValue commonValue = LabelValue.create("common_value"); + private void sleep(long millis) throws InterruptedException { Sleeper.DEFAULT.sleep(millis); } @@ -469,12 +473,14 @@ private long getOkCallsCount( List> metric = record.getMetrics().get(GcpMetricsConstants.METRIC_NUM_CALLS_COMPLETED); for (PointWithFunction m : metric) { - assertThat(m.keys().get(0).getKey()).isEqualTo("result"); - assertThat(m.keys().get(1).getKey()).isEqualTo("endpoint"); + assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.RESULT_LABEL); + assertThat(m.keys().get(1).getKey()).isEqualTo(commonKey.getKey()); + assertThat(m.values().get(1).getValue()).isEqualTo(commonValue.getValue()); + assertThat(m.keys().get(2).getKey()).isEqualTo(GcpMetricsConstants.ENDPOINT_LABEL); if (!m.values().get(0).equals(LabelValue.create(GcpMetricsConstants.RESULT_SUCCESS))) { continue; } - if (!m.values().get(1).equals(LabelValue.create(endpoint))) { + if (!m.values().get(2).equals(LabelValue.create(endpoint))) { continue; } return m.value(); @@ -502,6 +508,8 @@ private String getCurrentEndpoint(FakeMetricRegistry fakeRegistry, String meName for (PointWithFunction m : metric) { assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.ME_NAME_LABEL); assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.ENDPOINT_LABEL); + assertThat(m.keys().get(2).getKey()).isEqualTo(commonKey.getKey()); + assertThat(m.values().get(2).getValue()).isEqualTo(commonValue.getValue()); if (!m.values().get(0).getValue().equals(meName)) { continue; } @@ -520,6 +528,8 @@ private String getEndpointState(FakeMetricRegistry fakeRegistry, String endpoint for (PointWithFunction m : metric) { assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.ENDPOINT_LABEL); assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.STATUS_LABEL); + assertThat(m.keys().get(2).getKey()).isEqualTo(commonKey.getKey()); + assertThat(m.values().get(2).getValue()).isEqualTo(commonValue.getValue()); if (!m.values().get(0).getValue().equals(endpoint)) { continue; } @@ -537,7 +547,9 @@ private long getSwitchCount(FakeMetricRegistry fakeRegistry, String meName, Stri record.getMetrics().get(GcpMetricsConstants.METRIC_ENDPOINT_SWITCH); for (PointWithFunction m : metric) { assertThat(m.keys().get(0).getKey()).isEqualTo(GcpMetricsConstants.ME_NAME_LABEL); - assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.TYPE_LABEL); + assertThat(m.keys().get(1).getKey()).isEqualTo(GcpMetricsConstants.SWITCH_TYPE_LABEL); + assertThat(m.keys().get(2).getKey()).isEqualTo(commonKey.getKey()); + assertThat(m.values().get(2).getValue()).isEqualTo(commonValue.getValue()); if (!m.values().get(0).getValue().equals(meName)) { continue; } @@ -618,7 +630,10 @@ public void testSpannerMultiEndpointClient() throws IOException, InterruptedExce .build()) .withMetricsOptions(GcpMetricsOptions.newBuilder() .withMetricRegistry(fakeRegistry) - .build()) + .withLabels( + Collections.singletonList(commonKey), + Collections.singletonList(commonValue) + ).build()) .build()); final int currentIndex = GcpManagedChannel.channelPoolIndex.get(); @@ -650,14 +665,16 @@ public void testSpannerMultiEndpointClient() throws IOException, InterruptedExce // Make sure endpoint is set as a metric label for each pool. assertThat(logRecords.stream().filter(logRecord -> logRecord.getMessage().matches( - leaderPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[endpoint: " + - "\"" + leaderEndpoint + "\"], metricRegistry: .*" + leaderPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[" + + commonKey.getKey() + ": \"" + commonValue.getValue() + "\", endpoint: " + + "\"" + leaderEndpoint + "\".*" )).count()).isEqualTo(1); assertThat(logRecords.stream().filter(logRecord -> logRecord.getMessage().matches( - followerPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[endpoint: " + - "\"" + followerEndpoint + "\"], metricRegistry: .*" + followerPoolIndex + ": Metrics options: \\{namePrefix: \"\", labels: \\[" + + commonKey.getKey() + ": \"" + commonValue.getValue() + "\", endpoint: " + + "\"" + followerEndpoint + "\".*" )).count()).isEqualTo(1); logRecords.clear(); @@ -885,7 +902,10 @@ public void testSpannerMultiEndpointClientWithDelay() throws IOException, Interr .build()) .withMetricsOptions(GcpMetricsOptions.newBuilder() .withMetricRegistry(fakeRegistry) - .build()) + .withLabels( + Collections.singletonList(commonKey), + Collections.singletonList(commonValue) + ).build()) .build()); final int currentIndex = GcpManagedChannel.channelPoolIndex.get();