Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: Adding extra debug logs #10902

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ static Status convertRlsServerStatus(Status status, String serverName) {

@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
if (throttler.shouldThrottle()) {
logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
Expand Down Expand Up @@ -224,6 +225,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
throttler.registerBackendResponse(false);
}
});
Expand All @@ -237,10 +239,13 @@ public void onCompleted() {
*/
@CheckReturnValue
final CachedRouteLookupResponse get(final RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
final CacheEntry cacheEntry;
cacheEntry = linkedHashLruCache.read(request);
if (cacheEntry == null) {
logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
return handleNewRequest(request);
}

Expand All @@ -249,10 +254,12 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) {
logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
if (dataEntry.isStaled(ticker.read())) {
logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
dataEntry.maybeRefresh();
}
return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
}
logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
}
}
Expand Down Expand Up @@ -582,22 +589,31 @@ final class DataCacheEntry extends CacheEntry {
* </pre>
*/
void maybeRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired");
if (pendingCallCache.containsKey(request)) {
// pending already requested
logger.log(ChannelLogLevel.DEBUG,
"A pending refresh request already created, no need to proceed with refresh");
return;
}
final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Async call to rls not yet complete, adding a pending cache entry");
pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall));
} else {
// async call returned finished future is most likely throttled
try {
logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return");
RouteLookupResponse response = asyncCall.get();
logger.log(ChannelLogLevel.DEBUG, "RLS call to returned");
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e);
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cacheAndClean(request, backoffEntry);
Expand Down Expand Up @@ -710,33 +726,46 @@ public void run() {
delayNanos,
TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
delayNanos);
}

/** Forcefully refreshes cache entry by ignoring the backoff timer. */
void forceRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry");
if (scheduledHandle.isPending()) {
scheduledHandle.cancel();
transitionToPending();
}
}

private void transitionToPending() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending");
if (shutdown) {
logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending");
return;
}
logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
if (!call.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call not done, adding a pending cache entry");
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
pendingCallCache.put(request, pendingEntry);
linkedHashLruCache.invalidate(request);
} else {
try {
logger.log(ChannelLogLevel.DEBUG,
"Waiting for transition to pending RLS call response");
RouteLookupResponse response = call.get();
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call failed, creating a backoff entry", e);
linkedHashLruCache.cacheAndClean(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
Expand Down Expand Up @@ -939,9 +968,13 @@ private final class BackoffRefreshListener implements ChildLbStatusListener {

@Override
public void onStatusChanged(ConnectivityState newState) {
logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
if (prevState == ConnectivityState.TRANSIENT_FAILURE
&& newState == ConnectivityState.READY) {
logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
((BackoffCacheEntry) value).forceRefresh();
Expand Down Expand Up @@ -978,29 +1011,40 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
new Object[]{serviceName, methodName, args.getHeaders(), response});

if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
Metadata headers = args.getHeaders();
headers.discardAll(RLS_DATA_KEY);
headers.put(RLS_DATA_KEY, response.getHeaderData());
}
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
if (response.hasData()) {
logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
SubchannelPicker picker =
(childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
if (picker == null) {
logger.log(ChannelLogLevel.DEBUG,
"Child policy wrapper didn't return a picker, returning PickResult with no results");
return PickResult.withNoResult();
}
// Happy path
logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
return picker.pickSubchannel(args);
} else if (response.hasError()) {
logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
if (hasFallback) {
logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
return useFallback(args);
}
logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
return PickResult.withError(
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
logger.log(ChannelLogLevel.DEBUG,
"RLS response had no data, return a PickResult with no data");
return PickResult.withNoResult();
}
}
Expand All @@ -1021,7 +1065,9 @@ private PickResult useFallback(PickSubchannelArgs args) {
private void startFallbackChildPolicy() {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
if (fallbackChildPolicyWrapper != null) {
return;
}
Expand All @@ -1031,6 +1077,7 @@ private void startFallbackChildPolicy() {

// GuardedBy CachingRlsLbClient.lock
void close() {
logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
}
Expand Down
11 changes: 10 additions & 1 deletion rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
LbPolicyConfiguration lbPolicyConfiguration =
(LbPolicyConfiguration) resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(lbPolicyConfiguration, "Missing rls lb config");
checkNotNull(lbPolicyConfiguration, "Missing RLS LB config");
if (!lbPolicyConfiguration.equals(this.lbPolicyConfiguration)) {
logger.log(ChannelLogLevel.DEBUG, "A new RLS LB config received");
boolean needToConnect = this.lbPolicyConfiguration == null
|| !this.lbPolicyConfiguration.getRouteLookupConfig().lookupService().equals(
lbPolicyConfiguration.getRouteLookupConfig().lookupService());
if (needToConnect) {
logger.log(ChannelLogLevel.DEBUG, "RLS lookup service changed, need to connect");
if (routeLookupClient != null) {
routeLookupClient.close();
}
Expand All @@ -78,12 +80,15 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// not required.
this.lbPolicyConfiguration = lbPolicyConfiguration;
}
logger.log(ChannelLogLevel.DEBUG, "RLS LB accepted resolved addresses successfully");
return Status.OK;
}

@Override
public void requestConnection() {
logger.log(ChannelLogLevel.DEBUG, "connection requested from RLS LB");
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "requesting a connection from the routeLookupClient");
routeLookupClient.requestConnection();
}
}
Expand All @@ -106,17 +111,21 @@ public String toString() {
}

if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient on a name resolution error");
routeLookupClient.close();
routeLookupClient = null;
lbPolicyConfiguration = null;
}
logger.log(ChannelLogLevel.DEBUG,
"Updating balancing state to TRANSIENT_FAILURE with an error picker");
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}

@Override
public void shutdown() {
logger.log(ChannelLogLevel.DEBUG, "Rls lb shutdown");
if (routeLookupClient != null) {
logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient because of RLS LB shutdown");
routeLookupClient.close();
routeLookupClient = null;
}
Expand Down