Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: Adding extra debug logs #10902

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@

@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
if (throttler.shouldThrottle()) {
logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
Expand Down Expand Up @@ -224,6 +225,7 @@

@Override
public void onCompleted() {
logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
throttler.registerBackendResponse(false);
}
});
Expand All @@ -237,10 +239,13 @@
*/
@CheckReturnValue
final CachedRouteLookupResponse get(final RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
final CacheEntry cacheEntry;
cacheEntry = linkedHashLruCache.read(request);
if (cacheEntry == null) {
logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
return handleNewRequest(request);
}

Expand All @@ -249,10 +254,12 @@
logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
if (dataEntry.isStaled(ticker.read())) {
logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
dataEntry.maybeRefresh();
}
return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
}
logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
}
}
Expand Down Expand Up @@ -582,22 +589,31 @@
* </pre>
*/
void maybeRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired");
if (pendingCallCache.containsKey(request)) {
// pending already requested
logger.log(ChannelLogLevel.DEBUG,

Check warning on line 597 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L597

Added line #L597 was not covered by tests
"A pending refresh request already created, no need to proceed with refresh");
return;
}
final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Async call to rls not yet complete, adding a pending cache entry");
pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall));
} else {
// async call returned finished future is most likely throttled
try {
logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return");

Check warning on line 609 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L609

Added line #L609 was not covered by tests
RouteLookupResponse response = asyncCall.get();
logger.log(ChannelLogLevel.DEBUG, "RLS call to returned");

Check warning on line 611 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L611

Added line #L611 was not covered by tests
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e);

Check warning on line 616 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L616

Added line #L616 was not covered by tests
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cacheAndClean(request, backoffEntry);
Expand Down Expand Up @@ -710,33 +726,47 @@
delayNanos,
TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.log(ChannelLogLevel.DEBUG,
"BackoffCacheEntry created with a delay of {0}s",
TimeUnit.NANOSECONDS.toSeconds(delayNanos));
temawi marked this conversation as resolved.
Show resolved Hide resolved
}

/** Forcefully refreshes cache entry by ignoring the backoff timer. */
void forceRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry");

Check warning on line 736 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L736

Added line #L736 was not covered by tests
if (scheduledHandle.isPending()) {
scheduledHandle.cancel();
transitionToPending();
}
}

private void transitionToPending() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending");
if (shutdown) {
logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending");

Check warning on line 748 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L748

Added line #L748 was not covered by tests
return;
}
logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
if (!call.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call not done, adding a pending cache entry");
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
pendingCallCache.put(request, pendingEntry);
linkedHashLruCache.invalidate(request);
} else {
try {
logger.log(ChannelLogLevel.DEBUG,
"Waiting for transition to pending RLS call response");
RouteLookupResponse response = call.get();
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call failed, creating a backoff entry", e);
linkedHashLruCache.cacheAndClean(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
Expand Down Expand Up @@ -939,9 +969,13 @@

@Override
public void onStatusChanged(ConnectivityState newState) {
logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
if (prevState == ConnectivityState.TRANSIENT_FAILURE
&& newState == ConnectivityState.READY) {
logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
((BackoffCacheEntry) value).forceRefresh();
Expand Down Expand Up @@ -978,29 +1012,40 @@
new Object[]{serviceName, methodName, args.getHeaders(), response});

if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
Metadata headers = args.getHeaders();
headers.discardAll(RLS_DATA_KEY);
headers.put(RLS_DATA_KEY, response.getHeaderData());
}
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
if (response.hasData()) {
logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
SubchannelPicker picker =
(childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
if (picker == null) {
logger.log(ChannelLogLevel.DEBUG,

Check warning on line 1029 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L1029

Added line #L1029 was not covered by tests
"Child policy wrapper didn't return a picker, returning PickResult with no results");
return PickResult.withNoResult();
}
// Happy path
logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
return picker.pickSubchannel(args);
} else if (response.hasError()) {
logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
if (hasFallback) {
logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
return useFallback(args);
}
logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");

Check warning on line 1042 in rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java#L1042

Added line #L1042 was not covered by tests
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
logger.log(ChannelLogLevel.DEBUG,
"RLS response had no data, return a PickResult with no data");
return PickResult.withNoResult();
}
}
Expand All @@ -1021,7 +1066,9 @@
private void startFallbackChildPolicy() {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
if (fallbackChildPolicyWrapper != null) {
return;
}
Expand All @@ -1031,6 +1078,7 @@

// GuardedBy CachingRlsLbClient.lock
void close() {
logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
}
Expand Down
11 changes: 10 additions & 1 deletion rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
LbPolicyConfiguration lbPolicyConfiguration =
(LbPolicyConfiguration) resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(lbPolicyConfiguration, "Missing rls lb config");
checkNotNull(lbPolicyConfiguration, "Missing RLS LB config");
if (!lbPolicyConfiguration.equals(this.lbPolicyConfiguration)) {
logger.log(ChannelLogLevel.DEBUG, "A new RLS LB config received");
boolean needToConnect = this.lbPolicyConfiguration == null
|| !this.lbPolicyConfiguration.getRouteLookupConfig().lookupService().equals(
lbPolicyConfiguration.getRouteLookupConfig().lookupService());
if (needToConnect) {
logger.log(ChannelLogLevel.DEBUG, "RLS lookup service changed, need to connect");
if (routeLookupClient != null) {
routeLookupClient.close();
}
Expand All @@ -78,12 +80,15 @@
// not required.
this.lbPolicyConfiguration = lbPolicyConfiguration;
}
logger.log(ChannelLogLevel.DEBUG, "RLS LB accepted resolved addresses successfully");
return Status.OK;
}

@Override
public void requestConnection() {
logger.log(ChannelLogLevel.DEBUG, "connection requested from RLS LB");

Check warning on line 89 in rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java#L89

Added line #L89 was not covered by tests
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "requesting a connection from the routeLookupClient");

Check warning on line 91 in rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java#L91

Added line #L91 was not covered by tests
routeLookupClient.requestConnection();
}
}
Expand All @@ -106,17 +111,21 @@
}

if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient on a name resolution error");
routeLookupClient.close();
routeLookupClient = null;
lbPolicyConfiguration = null;
}
logger.log(ChannelLogLevel.DEBUG,
"Updating balancing state to TRANSIENT_FAILURE with an error picker");
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}

@Override
public void shutdown() {
logger.log(ChannelLogLevel.DEBUG, "Rls lb shutdown");
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient because of RLS LB shutdown");
routeLookupClient.close();
routeLookupClient = null;
}
Expand Down