Skip to content

Commit

Permalink
Do not expire in-flight async loads (fixes #625)
Browse files Browse the repository at this point in the history
Backport of 7d0d903
  • Loading branch information
ben-manes committed Dec 1, 2021
1 parent 7bcfe81 commit 336ef93
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 2 deletions.
Expand Up @@ -889,6 +889,9 @@ private long getExpirationDelay(long now) {
/** Returns if the entry has expired. */
@SuppressWarnings("ShortCircuitBoolean")
boolean hasExpired(Node<K, V> node, long now) {
if (isComputingAsync(node)) {
return false;
}
return (expiresAfterAccess() && (now - node.getAccessTime() >= expiresAfterAccessNanos()))
| (expiresAfterWrite() && (now - node.getWriteTime() >= expiresAfterWriteNanos()))
| (expiresVariable() && (now - node.getVariableTime() >= 0));
Expand Down
Expand Up @@ -37,6 +37,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -629,7 +630,22 @@ public void refresh_writerFails(LoadingCache<Integer, Integer> cache, CacheConte
assertThat(cache.asMap(), equalTo(context.original()));
}

/* --------------- AsyncLoadingCache --------------- */
/* --------------- AsyncCache --------------- */

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void getIfPresent_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> future = new CompletableFuture<>();
cache.put(context.absentKey(), future);
assertThat(cache.getIfPresent(context.absentKey()), is(sameInstance(future)));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.getIfPresent(context.absentKey()), is(sameInstance(future)));
future.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, loader = Loader.IDENTITY,
Expand Down Expand Up @@ -695,6 +711,21 @@ public void get_async(AsyncLoadingCache<Integer, Integer> cache, CacheContext co
verifyWriter(context, verifier -> verifier.deletions(1, RemovalCause.EXPIRED));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void get_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> future = new CompletableFuture<>();
cache.put(context.absentKey(), future);
assertThat(cache.get(context.absentKey(), k -> k), is(sameInstance(future)));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.get(context.absentKey(), k -> k), is(sameInstance(future)));
future.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.SINGLETON, removalListener = Listener.CONSUMING,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -815,6 +846,23 @@ public void containsKey(Map<Integer, Integer> map, CacheContext context) {
assertThat(map.containsKey(context.firstKey()), is(false));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void containsKey_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> future = new CompletableFuture<>();
cache.put(context.absentKey(), future);
assertThat(cache.asMap().containsKey(context.absentKey()), is(true));
assertThat(cache.synchronous().asMap().containsKey(context.absentKey()), is(true));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().containsKey(context.absentKey()), is(true));
assertThat(cache.synchronous().asMap().containsKey(context.absentKey()), is(true));
future.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand All @@ -826,6 +874,21 @@ public void containsValue(Map<Integer, Integer> map, CacheContext context) {
assertThat(map.containsValue(context.original().get(context.firstKey())), is(false));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void containsValue_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> future = new CompletableFuture<>();
cache.put(context.absentKey(), future);
assertThat(cache.asMap().containsValue(future), is(true));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().containsValue(future), is(true));
future.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -942,6 +1005,23 @@ public void put_writerFails(Map<Integer, Integer> map, CacheContext context) {
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void put_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
CompletableFuture<Integer> f2 = new CompletableFuture<>();
CompletableFuture<Integer> f3 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
assertThat(cache.asMap().put(context.absentKey(), f2), is(sameInstance(f1)));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().put(context.absentKey(), f3), is(sameInstance(f2)));
f3.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -978,6 +1058,23 @@ public void replace_updated(Map<Integer, Integer> map, CacheContext context) {
verifyWriter(context, verifier -> verifier.deletions(count));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void replace_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
CompletableFuture<Integer> f2 = new CompletableFuture<>();
CompletableFuture<Integer> f3 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
assertThat(cache.asMap().replace(context.absentKey(), f2), is(sameInstance(f1)));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().replace(context.absentKey(), f3), is(sameInstance(f2)));
f3.complete(null);
}

// replace_writerFail: Not needed due to exiting without side-effects

@Test(dataProvider = "caches")
Expand Down Expand Up @@ -1018,6 +1115,24 @@ public void replaceConditionally_updated(Map<Integer, Integer> map, CacheContext
verifyWriter(context, verifier -> verifier.deletions(count));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void replaceConditionally_inFlight(
AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
CompletableFuture<Integer> f2 = new CompletableFuture<>();
CompletableFuture<Integer> f3 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
assertThat(cache.asMap().replace(context.absentKey(), f1, f2), is(true));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().replace(context.absentKey(), f2, f3), is(true));
f3.complete(null);
}

// replaceConditionally_writerFail: Not needed due to exiting without side-effects

@Test(dataProvider = "caches")
Expand Down Expand Up @@ -1054,6 +1169,41 @@ public void remove_writerFails(Map<Integer, Integer> map, CacheContext context)
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void remove_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
assertThat(cache.asMap().remove(context.absentKey()), is(sameInstance(f1)));

CompletableFuture<Integer> f2 = new CompletableFuture<>();
cache.put(context.absentKey(), f2);
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().remove(context.absentKey()), is(sameInstance(f2)));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void removeConditionally_inFlight(
AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
assertThat(cache.asMap().remove(context.absentKey(), f1), is(true));

CompletableFuture<Integer> f2 = new CompletableFuture<>();
cache.put(context.absentKey(), f2);
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().remove(context.absentKey(), f2), is(true));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -1144,6 +1294,23 @@ public void computeIfAbsent_writerFails(Map<Integer, Integer> map, CacheContext
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void computeIfAbsent_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
assertThat(cache.asMap().computeIfAbsent(
context.absentKey(), key -> null), is(sameInstance(f1)));
context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().computeIfAbsent(
context.absentKey(), key -> null), is(sameInstance(f1)));
f1.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -1205,6 +1372,30 @@ public void computeIfPresent_writerFails(Map<Integer, Integer> map, CacheContext
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void computeIfPresent_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
CompletableFuture<Integer> f2 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
cache.asMap().computeIfPresent(context.absentKey(), (k, f) -> {
assertThat(f, is(sameInstance(f1)));
return f2;
});

CompletableFuture<Integer> f3 = new CompletableFuture<>();
context.ticker().advance(5, TimeUnit.MINUTES);
cache.asMap().computeIfPresent(context.absentKey(), (k, f) -> {
assertThat(f, is(sameInstance(f2)));
return f3;
});
f3.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -1263,6 +1454,30 @@ public void compute_writerFails(Map<Integer, Integer> map, CacheContext context)
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void compute_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> f1 = new CompletableFuture<>();
CompletableFuture<Integer> f2 = new CompletableFuture<>();
cache.put(context.absentKey(), f1);
cache.asMap().compute(context.absentKey(), (k, f) -> {
assertThat(f, is(sameInstance(f1)));
return f2;
});

CompletableFuture<Integer> f3 = new CompletableFuture<>();
context.ticker().advance(5, TimeUnit.MINUTES);
cache.asMap().compute(context.absentKey(), (k, f) -> {
assertThat(f, is(sameInstance(f2)));
return f3;
});
f3.complete(null);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
Expand Down Expand Up @@ -1334,6 +1549,28 @@ public void iterators(Map<Integer, Integer> map, CacheContext context) {
assertThat(Iterators.size(map.entrySet().iterator()), is(0));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, expiryTime = Expire.ONE_MINUTE,
mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS },
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void iterator_inFlight(AsyncCache<Integer, Integer> cache, CacheContext context) {
CompletableFuture<Integer> future = new CompletableFuture<>();
cache.put(context.absentKey(), future);
assertThat(cache.asMap().keySet().contains(context.absentKey()), is(true));
assertThat(cache.asMap().values().contains(future), is(true));
assertThat(cache.asMap().entrySet().contains(
Maps.immutableEntry(context.absentKey(), future)), is(true));

context.ticker().advance(5, TimeUnit.MINUTES);
assertThat(cache.asMap().keySet().contains(context.absentKey()), is(true));
assertThat(cache.asMap().values().contains(future), is(true));
assertThat(cache.asMap().entrySet().contains(
Maps.immutableEntry(context.absentKey(), future)), is(true));
future.complete(null);
}

/* --------------- Weights --------------- */

@Test(dataProvider = "caches")
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.github.benmanes.caffeine.cache.issues;

import static com.github.benmanes.caffeine.testing.Awaits.await;
import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf;
import static java.time.ZoneOffset.UTC;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -125,7 +126,9 @@ private void firstUpdate(AsyncLoadingCache<String, String> cache,

Thread.sleep(TTL + EPSILON); // sleep until expiration
assertThat("now serve first updated value", cache.get(A_KEY), is(futureOf(A_UPDATE_1)));
assertThat("now serve first updated value", cache.get(B_KEY), is(futureOf(B_UPDATE_1)));
await().untilAsserted(() -> {
assertThat("now serve first updated value", cache.get(B_KEY), is(futureOf(B_UPDATE_1)));
});
}

private void secondUpdate(AsyncLoadingCache<String, String> cache,
Expand Down

0 comments on commit 336ef93

Please sign in to comment.