Skip to content

Commit

Permalink
rls: Add gauge metric recording
Browse files Browse the repository at this point in the history
Adds these gauges:
- grpc.lb.rls.cache_entries
- grpc.lb.rls.cache_size
  • Loading branch information
temawi committed May 8, 2024
1 parent 7a663f6 commit dfbd986
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 10 deletions.
48 changes: 48 additions & 0 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
Expand All @@ -65,6 +69,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,6 +99,11 @@ final class CachingRlsLbClient {
private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
private final Registration cacheEntriesGaugeRegistration;
private final Registration cacheSizeGaugeRegistration;
private final String metricsInstanceUuid = UUID.randomUUID().toString();

// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
Expand Down Expand Up @@ -138,6 +148,14 @@ final class CachingRlsLbClient {
"Number of LB picks failed due to either a failed RLS request or the RLS channel being "
+ "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
Collections.emptyList(), true);
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
"Number of entries in the RLS cache", "entry",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
Collections.emptyList(), true);
CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
"The current size of the RLS cache", "byte",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
Collections.emptyList(), true);
}

private CachingRlsLbClient(Builder builder) {
Expand Down Expand Up @@ -202,6 +220,34 @@ private CachingRlsLbClient(Builder builder) {
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
childLbHelperProvider,
new BackoffRefreshListener());

cacheEntriesGaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
int estimatedSize;
synchronized (lock) {
estimatedSize = linkedHashLruCache.estimatedSize();
}
recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
}
}, CACHE_ENTRIES_GAUGE);
cacheSizeGaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
long estimatedSizeBytes;
synchronized (lock) {
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
}
recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
}
}, CACHE_SIZE_GAUGE);

logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

Expand Down Expand Up @@ -306,6 +352,8 @@ void close() {
pendingCallCache.clear();
rlsChannel.shutdownNow();
rlsPicker.close();
cacheEntriesGaugeRegistration.close();
cacheSizeGaugeRegistration.close();
}
}

Expand Down
103 changes: 103 additions & 0 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
Expand All @@ -47,10 +50,14 @@
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -95,12 +102,15 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
Expand All @@ -124,6 +134,17 @@ public class CachingRlsLbClientTest {
private SocketAddress socketAddress;
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private BatchRecorder mockBatchRecorder;
@Mock
private Registration mockCacheEntriesRegistration;
@Mock
private Registration mockCacheSizeRegistration;
@Captor
private ArgumentCaptor<BatchCallback> cacheEntriesBatchCallbackCaptor;
@Captor
private ArgumentCaptor<BatchCallback> cacheSizeBatchCallbackCaptor;


private final SynchronizationContext syncContext =
new SynchronizationContext(new UncaughtExceptionHandler() {
Expand Down Expand Up @@ -168,6 +189,18 @@ private void setUpRlsLbClient() {
.build();
}

@Before
public void setUpMockMetricRecorder() {
when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_entries");
}))).thenReturn(mockCacheEntriesRegistration);
when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_size");
}))).thenReturn(mockCacheSizeRegistration);
}

@After
public void tearDown() throws Exception {
rlsLbClient.close();
Expand Down Expand Up @@ -636,6 +669,61 @@ private void setState(ChildPolicyWrapper policyWrapper, ConnectivityState newSta
policyWrapper.getHelper().updateBalancingState(newState, policyWrapper.getPicker());
}

@Test
public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException {
setUpRlsLbClient();

verify(mockMetricRecorder).registerBatchCallback(cacheEntriesBatchCallbackCaptor.capture(),
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")));
BatchCallback cacheEntriesBatchCallback = cacheEntriesBatchCallbackCaptor.getValue();
verify(mockMetricRecorder).registerBatchCallback(cacheSizeBatchCallbackCaptor.capture(),
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")));
BatchCallback cacheSizeBatchCallback = cacheSizeBatchCallbackCaptor.getValue();

// Verify the correct cache entries gauge value if requested at this point.
InOrder inOrder = inOrder(mockBatchRecorder);
cacheEntriesBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L),
isA(List.class), isA(List.class));

// Verify the correct cache size gauge value if requested at this point.
cacheSizeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(0L), isA(List.class), isA(List.class));

RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(
ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key",
"bar"));
rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest,
RouteLookupResponse.create(ImmutableList.of("target"), "header")));

// Make a request that will populate the cache with an entry
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();

// Gauge values should reflect the new cache entry.
cacheEntriesBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L),
isA(List.class), isA(List.class));

cacheSizeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(260L), isA(List.class), isA(List.class));

inOrder.verifyNoMoreInteractions();

// Shutdown
rlsLbClient.close();
verify(mockCacheEntriesRegistration).close();
verify(mockCacheSizeRegistration).close();
}

private static RouteLookupConfig getRouteLookupConfig() {
return RouteLookupConfig.builder()
.grpcKeybuilders(ImmutableList.of(
Expand Down Expand Up @@ -667,6 +755,21 @@ public long nextBackoffNanos() {
};
}

private static class LongGaugeInstrumentArgumentMatcher implements
ArgumentMatcher<LongGaugeMetricInstrument> {

private final String instrumentName;

public LongGaugeInstrumentArgumentMatcher(String instrumentName) {
this.instrumentName = instrumentName;
}

@Override
public boolean matches(LongGaugeMetricInstrument instrument) {
return instrument.getName().equals(instrumentName);
}
}

private static final class FakeBackoffProvider implements BackoffPolicy.Provider {

private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
Expand Down
28 changes: 18 additions & 10 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
Expand All @@ -52,13 +53,16 @@
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.Registration;
import io.grpc.MetricSink;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NoopMetricSink;
Expand Down Expand Up @@ -128,6 +132,10 @@ public void uncaughtException(Thread t, Throwable e) {
});
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private Registration mockCacheEntriesRegistration;
@Mock
private Registration mockCacheSizeRegistration;
private final FakeHelper helperDelegate = new FakeHelper();
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
Expand Down Expand Up @@ -185,6 +193,15 @@ public CachingRlsLbClient.Builder get() {
};

searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);

when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_entries");
}))).thenReturn(mockCacheEntriesRegistration);
when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_size");
}))).thenReturn(mockCacheSizeRegistration);
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);
}

Expand Down Expand Up @@ -226,7 +243,6 @@ public void lb_serverStatusCodeConversion() throws Exception {
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -290,8 +306,6 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");

verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -351,7 +365,6 @@ public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
inOrder.verify(helper).getChannelTarget();
inOrder.verifyNoMoreInteractions();
verifyFailedPicksCounterAdd(1, 1);
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand All @@ -377,7 +390,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1,
"defaultTarget", "complete");
verifyNoMoreInteractions(mockMetricRecorder);

Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
assertThat(subchannelIsReady(subchannel)).isTrue();
Expand Down Expand Up @@ -422,8 +434,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(res.getSubchannel()).isNull();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");

verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -499,7 +509,6 @@ public void lb_working_withoutDefaultTarget() throws Exception {
inOrder.verify(helper, atLeast(0)).refreshNameResolution();
inOrder.verifyNoMoreInteractions();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -542,7 +551,6 @@ public void lb_nameResolutionFailed() throws Exception {
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyNoMoreInteractions(mockMetricRecorder);
}

private PickResult markReadyAndGetPickResult(InOrder inOrder,
Expand Down

0 comments on commit dfbd986

Please sign in to comment.