Skip to content

Commit

Permalink
Use a single BatchCallback and actually batch the gauge calls
Browse files Browse the repository at this point in the history
  • Loading branch information
temawi committed May 8, 2024
1 parent dfbd986 commit 1350508
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 63 deletions.
22 changes: 6 additions & 16 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ final class CachingRlsLbClient {
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
private final Registration cacheEntriesGaugeRegistration;
private final Registration cacheSizeGaugeRegistration;
private final Registration gaugeRegistration;
private final String metricsInstanceUuid = UUID.randomUUID().toString();

// All cache status changes (pending, backoff, success) must be under this lock
Expand Down Expand Up @@ -221,32 +220,24 @@ private CachingRlsLbClient(Builder builder) {
childLbHelperProvider,
new BackoffRefreshListener());

cacheEntriesGaugeRegistration = helper.getMetricRecorder()
gaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
int estimatedSize;
long estimatedSizeBytes;
synchronized (lock) {
estimatedSize = linkedHashLruCache.estimatedSize();
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
}
recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
}
}, CACHE_ENTRIES_GAUGE);
cacheSizeGaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
long estimatedSizeBytes;
synchronized (lock) {
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
}
recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
}
}, CACHE_SIZE_GAUGE);
}, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);

logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}
Expand Down Expand Up @@ -352,8 +343,7 @@ void close() {
pendingCallCache.clear();
rlsChannel.shutdownNow();
rlsPicker.close();
cacheEntriesGaugeRegistration.close();
cacheSizeGaugeRegistration.close();
gaugeRegistration.close();
}
}

Expand Down
45 changes: 13 additions & 32 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -107,7 +108,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
Expand Down Expand Up @@ -137,13 +137,9 @@ public class CachingRlsLbClientTest {
@Mock
private BatchRecorder mockBatchRecorder;
@Mock
private Registration mockCacheEntriesRegistration;
@Mock
private Registration mockCacheSizeRegistration;
@Captor
private ArgumentCaptor<BatchCallback> cacheEntriesBatchCallbackCaptor;
private Registration mockGaugeRegistration;
@Captor
private ArgumentCaptor<BatchCallback> cacheSizeBatchCallbackCaptor;
private ArgumentCaptor<BatchCallback> gaugeBatchCallbackCaptor;


private final SynchronizationContext syncContext =
Expand All @@ -166,7 +162,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final ChildLoadBalancingPolicy childLbPolicy =
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
mock(Helper.class, delegatesTo(new FakeHelper()));
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final LbPolicyConfiguration lbPolicyConfiguration =
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
Expand All @@ -191,14 +187,7 @@ private void setUpRlsLbClient() {

@Before
public void setUpMockMetricRecorder() {
when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_entries");
}))).thenReturn(mockCacheEntriesRegistration);
when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_size");
}))).thenReturn(mockCacheSizeRegistration);
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
}

@After
Expand Down Expand Up @@ -673,22 +662,17 @@ private void setState(ChildPolicyWrapper policyWrapper, ConnectivityState newSta
public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException {
setUpRlsLbClient();

verify(mockMetricRecorder).registerBatchCallback(cacheEntriesBatchCallbackCaptor.capture(),
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")));
BatchCallback cacheEntriesBatchCallback = cacheEntriesBatchCallbackCaptor.getValue();
verify(mockMetricRecorder).registerBatchCallback(cacheSizeBatchCallbackCaptor.capture(),
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")));
BatchCallback cacheSizeBatchCallback = cacheSizeBatchCallbackCaptor.getValue();
verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
any());

BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue();

// Verify the correct cache entries gauge value if requested at this point.
// Verify the correct cache gauge values when requested at this point.
InOrder inOrder = inOrder(mockBatchRecorder);
cacheEntriesBatchCallback.accept(mockBatchRecorder);
gaugeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L),
isA(List.class), isA(List.class));

// Verify the correct cache size gauge value if requested at this point.
cacheSizeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(0L), isA(List.class), isA(List.class));
Expand All @@ -706,12 +690,10 @@ public void metricGauges() throws ExecutionException, InterruptedException, Time
assertThat(resp.hasData()).isTrue();

// Gauge values should reflect the new cache entry.
cacheEntriesBatchCallback.accept(mockBatchRecorder);
gaugeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L),
isA(List.class), isA(List.class));

cacheSizeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(260L), isA(List.class), isA(List.class));
Expand All @@ -720,8 +702,7 @@ public void metricGauges() throws ExecutionException, InterruptedException, Time

// Shutdown
rlsLbClient.close();
verify(mockCacheEntriesRegistration).close();
verify(mockCacheSizeRegistration).close();
verify(mockGaugeRegistration).close();
}

private static RouteLookupConfig getRouteLookupConfig() {
Expand Down
18 changes: 3 additions & 15 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -53,15 +52,13 @@
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.Registration;
import io.grpc.MetricSink;
import io.grpc.NameResolver.ConfigOrError;
Expand Down Expand Up @@ -133,9 +130,7 @@ public void uncaughtException(Thread t, Throwable e) {
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private Registration mockCacheEntriesRegistration;
@Mock
private Registration mockCacheSizeRegistration;
private Registration mockGaugeRegistration;
private final FakeHelper helperDelegate = new FakeHelper();
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
Expand Down Expand Up @@ -193,16 +188,9 @@ public CachingRlsLbClient.Builder get() {
};

searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);

when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_entries");
}))).thenReturn(mockCacheEntriesRegistration);
when(mockMetricRecorder.registerBatchCallback(isA(BatchCallback.class),
argThat((LongGaugeMetricInstrument metricInstruments) -> {
return metricInstruments.getName().equals("grpc.lb.rls.cache_size");
}))).thenReturn(mockCacheSizeRegistration);
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);

when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
}

@After
Expand Down

0 comments on commit 1350508

Please sign in to comment.