Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple in-flight requests for one key #625

Closed
pertu opened this issue Nov 17, 2021 · 9 comments
Closed

Multiple in-flight requests for one key #625

pertu opened this issue Nov 17, 2021 · 9 comments

Comments

@pertu
Copy link

pertu commented Nov 17, 2021

Demo project: https://github.com/pertu/caffeine-async-loading-in-flight-ignored (mvn test should work).

Test from the demo project (simplified):

AsyncCacheLoader<Integer, Integer> loader =
      /* the future never completes = always "in flight" */
      (key, ignoredExecutor) -> new CompletableFuture<>();

/* I want this cache to reuse simultaneous/in-flight requests
   but never actually cache results */
var cacheInFlight = Caffeine.newBuilder()
      .expireAfterWrite(Duration.ZERO)
      // duration > 1ms makes the test pass:
      //.expireAfterWrite(Duration.ofMillis(1))
      .buildAsync(loader);

var future1 = cacheInFlight.get(1);
// this sleep() also makes the test pass:
//Thread.sleep(1);
var future2 = cacheInFlight.get(1);

// I expected these futures to be the same one future
// but this fails (sometimes):
assertSame(future1, future2);

See also: #297
Edit: tested version 3.0.4 and 2.9.2, result is the same
Edit 2: the test above is timing-dependent and doesn't always fail
actual test in demo project runs the code above in loop and usually fails on the second iteration on my machine

@ben-manes
Copy link
Owner

I won't get a chance to look at this for a few hours, but fwiw you can use Caffeine.ticker(Ticker) to control time in tests, e.g. by using Guava's FakeTicker. That might help you debug it.

@ben-manes
Copy link
Owner

ben-manes commented Nov 17, 2021

The mistake is that the expiration check is performed prior to an async check and it does not handle if the entry is still in-flight. I think this occurs in other read paths, like getIfPresent, as all check for async only with respect to setting the new timestamps.

public @Nullable V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction,
boolean recordStats, boolean recordLoad) {
requireNonNull(key);
requireNonNull(mappingFunction);
long now = expirationTicker().read();
// An optimistic fast path to avoid unnecessary locking
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node != null) {
V value = node.getValue();
if ((value != null) && !hasExpired(node, now)) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}
afterRead(node, now, /* recordHit */ recordStats);
return value;
}
}
if (recordStats) {
mappingFunction = statsAware(mappingFunction, recordLoad);
}
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
return doComputeIfAbsent(key, keyRef, mappingFunction, new long[] { now }, recordStats);
}

Previously hasExpired used to check if an in-flight entry, but that seems to have been lost in v2.6.2 when fixing an overflow scenario. The rational might have been that if the loading time exceeds the duration then it might be stale, but I am not certain without reading the ticket and code changes more thoroughly. Offhand I'd agree with you and the previous logic did this, so we'll need to understand if this is a regression or expected behavior.

2.6.1

/** Returns if the entry has expired. */
boolean hasExpired(Node<K, V> node, long now) {
if (isComputingAsync(node)) {
return false;
}
return (expiresAfterAccess() && (now - node.getAccessTime() >= expiresAfterAccessNanos()))
|| (expiresAfterWrite() && (now - node.getWriteTime() >= expiresAfterWriteNanos()))
|| (expiresVariable() && (now - node.getVariableTime() >= 0));
}

3.0.4

/** Returns if the entry has expired. */
@SuppressWarnings("ShortCircuitBoolean")
boolean hasExpired(Node<K, V> node, long now) {
return (expiresAfterAccess() && (now - node.getAccessTime() >= expiresAfterAccessNanos()))
| (expiresAfterWrite() && (now - node.getWriteTime() >= expiresAfterWriteNanos()))
| (expiresVariable() && (now - node.getVariableTime() >= 0));
}

@ben-manes
Copy link
Owner

I believe this was accidentally removed when debugging to simulate the overflow issue in #217. The commit's description, changes, and discussions don't appear relevant to this snippet. The test suite passes when it is restored.

I will add the appropriate tests to catch this from regressing in the future. I hope to work on it this weekend provide you a release by Monday for both 2.x and 3.x. In the meantime or if that is delayed, you can use the v3.0.4-SNAPSHOT jar on Sonatype's repository or jitpack the v3.dev branch. Thanks for reporting this issue and providing a test case.

@pertu
Copy link
Author

pertu commented Nov 18, 2021

It's a pleasure to report issues when they are answered. Thanks for the quick response and analysis.
I guess there is no urgency, as this issue was out there for quite some time already and no one reported it.

@koldat
Copy link

koldat commented Nov 19, 2021

I am not sure if it is related to this problem, but what is happening to me is that sometimes my loader is never called again, because Node still think it is running. Debugger says that future is complete (it also returns immediately when got from cache) as well as my job status of the async loader. But Node in the cache still has writeTime set to large value (200 years ahead - com.github.benmanes.caffeine.cache.Async.ASYNC_EXPIRY) which prevents expiration. That future time from what I understand is marker that task is still running.

Do you think it is same issue?

@ben-manes
Copy link
Owner

That would indicate that the callback is never invoked when the future completed. In this issue the future was prematurely replaced with a new execution, but when that replacement completed it's handler should successfully update the writeTime. The handler calls cache().replace(key, castedFuture, castedFuture); in the code below to update the metadata. If you can provide a reproducer then I can investigate that in a new issue.

@SuppressWarnings("FutureReturnValueIgnored")
default void handleCompletion(K key, CompletableFuture<? extends V> valueFuture,
long startTime, boolean recordMiss) {
var completed = new AtomicBoolean();
valueFuture.whenComplete((value, error) -> {
if (!completed.compareAndSet(false, true)) {
// Ignore multiple invocations due to ForkJoinPool retrying on delays
return;
}
long loadTime = cache().statsTicker().read() - startTime;
if (value == null) {
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during asynchronous load", error);
}
cache().remove(key, valueFuture);
cache().statsCounter().recordLoadFailure(loadTime);
} else {
@SuppressWarnings("unchecked")
var castedFuture = (CompletableFuture<V>) valueFuture;
// update the weight and expiration timestamps
cache().replace(key, castedFuture, castedFuture);
cache().statsCounter().recordLoadSuccess(loadTime);
}
if (recordMiss) {
cache().statsCounter().recordMisses(1);
}
});
}

@koldat
Copy link

koldat commented Nov 19, 2021

OK I have found the problem. It actually works fine, but thread that loads asynchronously is then used to "complete" future and notify. In my case I convert future to Mono in reactor and it steals thread to execute requests. This go back to the cache again and cycles (I do retry there). That as a result is actually working, but on incorrect place and cannot get back. Caffeine is then still thinking future is working on result even if it finished and is still in notify. Sorry for confusion.

ben-manes added a commit that referenced this issue Nov 21, 2021
This restores the behavior that was accidentally removed in v2.6.2 and
adds unit tests to assert it going forward.
ben-manes added a commit that referenced this issue Nov 21, 2021
This restores the behavior that was accidentally removed in v2.6.2 and
adds unit tests to assert it going forward.
ben-manes added a commit that referenced this issue Nov 21, 2021
This restores the behavior that was accidentally removed in v2.6.2 and
adds unit tests to assert it going forward.
@ben-manes
Copy link
Owner

I did not get to working on this today, where the intent was to backport to v2 and release. I will see what I can do over the week, but given the holidays and an ongoing investigations for the root cause of f143764, the goal for a release is delayed until the next weekend. The jitpack or snapshot jars pass our CI testing and should be stable if you'd prefer something earlier.

@koldat it sounds like you ran into the surprising fact that CompletableFuture's ordering of dependent actions is in LIFO order for efficiency, whereas one might naturally expect FIFO execution. Unfortunately we cannot work around this quirk without worse effects. If we instead returned the whenComplete future then the cache returns a different instance than was obtained by the loader and stored, but only on the creation's call. If we added a whenComplete variant within the computation, to at least return what is stored, then a direct execution could fail due to a recursive write to ConcurrentHashMap. Any logic that depended on the source future instance could not write concurrent logic using AsyncCache.asMap().replace(key, oldValue, newValue) or call a mutation method like future.cancel(). Therefore the best is to be consistent and treat the lack of specificity for dependent action ordering as not something that we can resolve.

ben-manes added a commit that referenced this issue Dec 1, 2021
@ben-manes
Copy link
Owner

Sorry that this took longer than expected. This is released in v2.9.3 and v3.0.5.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants