Skip to content

Commit

Permalink
rls: Adding extra debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
temawi committed Feb 8, 2024
1 parent 4f7ec13 commit e220075
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
48 changes: 48 additions & 0 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ static Status convertRlsServerStatus(Status status, String serverName) {

@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
if (throttler.shouldThrottle()) {
logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
Expand Down Expand Up @@ -224,6 +225,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
throttler.registerBackendResponse(false);
}
});
Expand All @@ -237,10 +239,13 @@ public void onCompleted() {
*/
@CheckReturnValue
final CachedRouteLookupResponse get(final RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
final CacheEntry cacheEntry;
cacheEntry = linkedHashLruCache.read(request);
if (cacheEntry == null) {
logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
return handleNewRequest(request);
}

Expand All @@ -249,10 +254,12 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
if (dataEntry.isStaled(ticker.read())) {
logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
dataEntry.maybeRefresh();
}
return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
}
logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
}
}
Expand Down Expand Up @@ -582,22 +589,31 @@ final class DataCacheEntry extends CacheEntry {
* </pre>
*/
void maybeRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired");
if (pendingCallCache.containsKey(request)) {
// pending already requested
logger.log(ChannelLogLevel.DEBUG,

Check warning on line 597 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L597

Added line #L597 was not covered by tests
"A pending refresh request already created, no need to proceed with refresh");
return;
}
final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Async call to rls not yet complete, adding a pending cache entry");
pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall));
} else {
// async call returned finished future is most likely throttled
try {
logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return");

Check warning on line 609 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L609

Added line #L609 was not covered by tests
RouteLookupResponse response = asyncCall.get();
logger.log(ChannelLogLevel.DEBUG, "RLS call to returned");

Check warning on line 611 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L611

Added line #L611 was not covered by tests
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e);

Check warning on line 616 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L616

Added line #L616 was not covered by tests
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cacheAndClean(request, backoffEntry);
Expand Down Expand Up @@ -710,33 +726,47 @@ public void run() {
delayNanos,
TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.log(ChannelLogLevel.DEBUG,
"BackoffCacheEntry created with a delay of {0}s",
TimeUnit.NANOSECONDS.toSeconds(delayNanos));
}

/** Forcefully refreshes cache entry by ignoring the backoff timer. */
void forceRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry");

Check warning on line 736 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L736

Added line #L736 was not covered by tests
if (scheduledHandle.isPending()) {
scheduledHandle.cancel();
transitionToPending();
}
}

private void transitionToPending() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending");
if (shutdown) {
logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending");

Check warning on line 748 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L748

Added line #L748 was not covered by tests
return;
}
logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
if (!call.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call not done, adding a pending cache entry");
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
pendingCallCache.put(request, pendingEntry);
linkedHashLruCache.invalidate(request);
} else {
try {
logger.log(ChannelLogLevel.DEBUG,
"Waiting for transition to pending RLS call response");
RouteLookupResponse response = call.get();
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call failed, creating a backoff entry", e);
linkedHashLruCache.cacheAndClean(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
Expand Down Expand Up @@ -939,9 +969,13 @@ private final class BackoffRefreshListener implements ChildLbStatusListener {

@Override
public void onStatusChanged(ConnectivityState newState) {
logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
if (prevState == ConnectivityState.TRANSIENT_FAILURE
&& newState == ConnectivityState.READY) {
logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
((BackoffCacheEntry) value).forceRefresh();
Expand Down Expand Up @@ -978,29 +1012,40 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
new Object[]{serviceName, methodName, args.getHeaders(), response});

if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
Metadata headers = args.getHeaders();
headers.discardAll(RLS_DATA_KEY);
headers.put(RLS_DATA_KEY, response.getHeaderData());
}
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
if (response.hasData()) {
logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
SubchannelPicker picker =
(childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
if (picker == null) {
logger.log(ChannelLogLevel.DEBUG,

Check warning on line 1029 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L1029

Added line #L1029 was not covered by tests
"Child policy wrapper didn't return a picker, returning PickResult with no results");
return PickResult.withNoResult();
}
// Happy path
logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
return picker.pickSubchannel(args);
} else if (response.hasError()) {
logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
if (hasFallback) {
logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
return useFallback(args);
}
logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");

Check warning on line 1042 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L1042

Added line #L1042 was not covered by tests
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
logger.log(ChannelLogLevel.DEBUG,
"RLS response had no data, return a PickResult with no data");
return PickResult.withNoResult();
}
}
Expand All @@ -1021,7 +1066,9 @@ private PickResult useFallback(PickSubchannelArgs args) {
private void startFallbackChildPolicy() {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
if (fallbackChildPolicyWrapper != null) {
return;
}
Expand All @@ -1031,6 +1078,7 @@ private void startFallbackChildPolicy() {

// GuardedBy CachingRlsLbClient.lock
void close() {
logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
}
Expand Down
11 changes: 10 additions & 1 deletion rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
LbPolicyConfiguration lbPolicyConfiguration =
(LbPolicyConfiguration) resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(lbPolicyConfiguration, "Missing rls lb config");
checkNotNull(lbPolicyConfiguration, "Missing RLS LB config");
if (!lbPolicyConfiguration.equals(this.lbPolicyConfiguration)) {
logger.log(ChannelLogLevel.DEBUG, "A new RLS LB config received");
boolean needToConnect = this.lbPolicyConfiguration == null
|| !this.lbPolicyConfiguration.getRouteLookupConfig().lookupService().equals(
lbPolicyConfiguration.getRouteLookupConfig().lookupService());
if (needToConnect) {
logger.log(ChannelLogLevel.DEBUG, "RLS lookup service changed, need to connect");
if (routeLookupClient != null) {
routeLookupClient.close();
}
Expand All @@ -78,12 +80,15 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// not required.
this.lbPolicyConfiguration = lbPolicyConfiguration;
}
logger.log(ChannelLogLevel.DEBUG, "RLS LB accepted resolved addresses successfully");
return Status.OK;
}

@Override
public void requestConnection() {
logger.log(ChannelLogLevel.DEBUG, "connection requested from RLS LB");

Check warning on line 89 in rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java#L89

Added line #L89 was not covered by tests
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "requesting a connection from the routeLookupClient");

Check warning on line 91 in rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java#L91

Added line #L91 was not covered by tests
routeLookupClient.requestConnection();
}
}
Expand All @@ -106,17 +111,21 @@ public String toString() {
}

if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient on a name resolution error");
routeLookupClient.close();
routeLookupClient = null;
lbPolicyConfiguration = null;
}
logger.log(ChannelLogLevel.DEBUG,
"Updating balancing state to TRANSIENT_FAILURE with an error picker");
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}

@Override
public void shutdown() {
logger.log(ChannelLogLevel.DEBUG, "Rls lb shutdown");
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient because of RLS LB shutdown");
routeLookupClient.close();
routeLookupClient = null;
}
Expand Down

0 comments on commit e220075

Please sign in to comment.