Skip to content

Commit

Permalink
Shutdown the executor and wait for in-flight loads in JCache close
Browse files Browse the repository at this point in the history
This matches the RI's behavior which is best effort.
  • Loading branch information
ben-manes committed Dec 1, 2021
1 parent 58bfc14 commit 7bcfe81
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -85,6 +90,7 @@ public class CacheProxy<K, V> implements Cache<K, V> {
final String name;

protected final Optional<CacheLoader<K, V>> cacheLoader;
protected final Set<CompletableFuture<?>> inFlight;
protected final JCacheStatisticsMXBean statistics;
protected final EventDispatcher<K, V> dispatcher;
protected final ExpiryPolicy expiry;
Expand Down Expand Up @@ -117,6 +123,7 @@ public CacheProxy(String name, Executor executor, CacheManager cacheManager,
? configuration.getCacheWriter()
: DisabledCacheWriter.get();
cacheMXBean = new JCacheMXBean(this);
inFlight = ConcurrentHashMap.newKeySet();
}

@Override
Expand Down Expand Up @@ -237,6 +244,7 @@ protected Map<K, Expirable<V>> getAndFilterExpiredEntries(
}

@Override
@SuppressWarnings({"CatchingUnchecked", "FutureReturnValueIgnored"})
public void loadAll(Set<? extends K> keys, boolean replaceExistingValues,
CompletionListener completionListener) {
requireNotClosed();
Expand All @@ -249,7 +257,8 @@ public void loadAll(Set<? extends K> keys, boolean replaceExistingValues,
listener.onCompletion();
return;
}
executor.execute(() -> {

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
if (replaceExistingValues) {
loadAllAndReplaceExisting(keys);
Expand All @@ -264,7 +273,10 @@ public void loadAll(Set<? extends K> keys, boolean replaceExistingValues,
} finally {
dispatcher.ignoreSynchronous();
}
});
}, executor);

inFlight.add(future);
future.whenComplete((r, e) -> inFlight.remove(future));
}

/** Performs the bulk load where the existing entries are replace. */
Expand Down Expand Up @@ -892,7 +904,7 @@ public void close() {
cacheManager.destroyCache(name);
closed = true;

Throwable thrown = null;
Throwable thrown = shutdownExecutor();
thrown = tryClose(expiry, thrown);
thrown = tryClose(writer, thrown);
thrown = tryClose(cacheLoader.orElse(null), thrown);
Expand All @@ -907,6 +919,24 @@ public void close() {
cache.invalidateAll();
}

@SuppressWarnings("FutureReturnValueIgnored")
private @Nullable Throwable shutdownExecutor() {
Throwable thrown = null;
if (executor instanceof ExecutorService) {
ExecutorService es = (ExecutorService) executor;
es.shutdown();
}
try {
CompletableFuture
.allOf(inFlight.toArray(new CompletableFuture[0]))
.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
thrown = e;
}
inFlight.clear();
return thrown;
}

/**
* Attempts to close the resource. If an error occurs and an outermost exception is set, then adds
* the error to the suppression list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -150,6 +151,7 @@ private Map<K, V> getAll(Set<? extends K> keys, boolean updateAccessTime) {
}

@Override
@SuppressWarnings({"CatchingUnchecked", "FutureReturnValueIgnored"})
public void loadAll(Set<? extends K> keys, boolean replaceExistingValues,
CompletionListener completionListener) {
requireNotClosed();
Expand All @@ -158,7 +160,7 @@ public void loadAll(Set<? extends K> keys, boolean replaceExistingValues,
? NullCompletionListener.INSTANCE
: completionListener;

executor.execute(() -> {
CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
try {
if (replaceExistingValues) {
int[] ignored = { 0 };
Expand All @@ -176,6 +178,9 @@ public void loadAll(Set<? extends K> keys, boolean replaceExistingValues,
} finally {
dispatcher.ignoreSynchronous();
}
});
}, executor);

inFlight.add(future);
future.whenComplete((r, e) -> inFlight.remove(future));
}
}

0 comments on commit 7bcfe81

Please sign in to comment.