Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache listeners including writes to caches deadlock when used in ForkJoinPool #1065

Closed
monitorjbl opened this issue Jul 11, 2023 · 7 comments

Comments

@monitorjbl
Copy link
Contributor

When an application using a cache listener that writes to another Caffeine cache while running from a ForkJoinPool, it will eventually deadlock 100% of the time. This is very much an edge case, but it is very difficult to diagnose when it occurs. If your application includes a listener that uses a library that transitively writes a Caffeine cache, this can manifest as well.

The root cause appears to be that the EventDispatcher.awaitSynchronous() method checks for all pending Futures in a static ThreadLocal. When used outside of a ForkJoinPool, this is fine as the ThreadLocal will isolate any calling thread from others. But when used in a ForkJoinPool, the calling thread may be reused, which causes the listener to inadvertently block on its own Future.

I suspect changing the EventDispatcher.pending field to be non-static will skate around this issue. Here is a sample reproduction that will eventually always fail (though it may take a few hundred iterations to do so):

import javax.cache.Cache;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import java.io.Serializable;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.toList;

public class Test {

    public static class FallbackCache {
        private final Cache<String, String> fallbackCache;

        public FallbackCache() {
            this.fallbackCache = Caching.getCachingProvider().getCacheManager()
                    .createCache("fallbackCache", new MutableConfiguration<>());
        }

        public String get(String key) {
            return fallbackCache.get(key);
        }

        public void put(String key, String value) {
            fallbackCache.put(key, value);
        }
    }

    public static class PrimaryCacheEntryListener<K, V> implements CacheEntryCreatedListener<K, V>, Serializable {
        FallbackCache fallbackRepo;

        public PrimaryCacheEntryListener(FallbackCache fallbackRepo) {
            this.fallbackRepo = fallbackRepo;
        }

        @Override
        public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> iterable) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends K, ? extends V> cacheEntryEvent : iterable) {
                CacheEntryEvent<String, String> nextEvent = (CacheEntryEvent<String, String>) cacheEntryEvent;
                fallbackRepo.put(nextEvent.getKey(), nextEvent.getValue());
            }
        }
    }

    public static class PrimaryCacheEntryEventFilter<K, V> implements CacheEntryEventFilter<K, V>, Serializable {
        @Override
        public boolean evaluate(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
            return true;
        }
    }

    public static class DummyCacheLoader implements CacheLoader<String, String> {
        private final HttpClient http = HttpClient.newHttpClient();

        @Override
        public String load(String key) throws CacheLoaderException {
            try {
                var resp = http.send(HttpRequest.newBuilder()
                        .uri(URI.create("https://google.com/"))
                        .GET()
                        .build(), HttpResponse.BodyHandlers.ofString());
                return resp.body();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
            return StreamSupport.stream(keys.spliterator(), false).collect(Collectors.toMap(k -> (String) k, this::load));
        }
    }

    public static void main(String[] args) throws Exception {
        var pool = Executors.newWorkStealingPool(5);
        var fallbackCache = new FallbackCache();
        var listener = new PrimaryCacheEntryListener<>(fallbackCache);
        var filter = new PrimaryCacheEntryEventFilter<>();
        var cache = Caching.getCachingProvider().getCacheManager().createCache("primaryCache",
                new MutableConfiguration<String, String>()
                        .addCacheEntryListenerConfiguration(
                                new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(listener),
                                        FactoryBuilder.factoryOf(filter), true, true))
                        .setReadThrough(true)
                        .setCacheLoaderFactory(new FactoryBuilder.SingletonFactory<>(new DummyCacheLoader()))
                        .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(javax.cache.expiry.Duration.TEN_MINUTES)));

        for (int i = 0; true; i++) {
            var futures = IntStream.range(0, 5).mapToObj(c -> pool.submit(() -> {
                cache.get("fake");
            })).collect(toList());
            for (var f : futures) {
                f.get();
            }
            System.out.println("Iteration " + i);
            cache.removeAll();
        }
    }
}
@ben-manes
Copy link
Owner

Thank you for the detailed and thoughtful bug report. Your idea of using a non-static thread local sounds like the right solution. I'll try to look into this by the end of the week.

Please let me know if you need/want a release once this is fixed. Until then, you might be able to not enable isSynchronous or change from the default executor to a cached threadpool in the CaffeineConfiguration.

@ben-manes
Copy link
Owner

When an application using a cache listener that writes to another... cache... it will eventually deadlock 100% of the time.

also, fwiw, it looks like the JCache spec does seem to warn about this in CacheEntryListener:
"A listener that mutates a cache on the CacheManager may cause a deadlock. Detection and response to deadlocks is implementation specific."

I like your solution, not sure if they are implying to avoid this or were simply warning of the danger of undefined behavior.

monitorjbl pushed a commit to monitorjbl/caffeine that referenced this issue Jul 11, 2023
This sidesteps an edge-case wherein a JCache listener in a primary cache
mutates a secondary Caffeine cache. While this is typically not a problem,
if the thread that mutates the primary cache is from a ForkJoinPool, the
EventDispatcher can deadlock as the calling thread may be re-used. Under
this condition, the secondary cache mutation is effectively waiting on its
own Future and will deadlock.
monitorjbl added a commit to monitorjbl/caffeine that referenced this issue Jul 11, 2023
This sidesteps an edge-case wherein a JCache listener in a primary cache
mutates a secondary Caffeine cache. While this is typically not a problem,
if the thread that mutates the primary cache is from a ForkJoinPool, the
EventDispatcher can deadlock as the calling thread may be re-used. Under
this condition, the secondary cache mutation is effectively waiting on its
own Future and will deadlock.
@monitorjbl
Copy link
Contributor Author

Submitted a PR as the fix was pretty simple. Took a while to build though, the tests in this repo are very thorough!

@ben-manes
Copy link
Owner

thanks! I'd like to incorporate your test case though. You can leave that to me and I'll merge it in with your pr.

ben-manes pushed a commit that referenced this issue Jul 17, 2023
This sidesteps an edge-case wherein a JCache listener in a primary cache
mutates a secondary Caffeine cache. While this is typically not a problem,
if the thread that mutates the primary cache is from a ForkJoinPool, the
EventDispatcher can deadlock as the calling thread may be re-used. Under
this condition, the secondary cache mutation is effectively waiting on its
own Future and will deadlock.
ben-manes added a commit that referenced this issue Jul 17, 2023
The unit and integration tests verify that the event dispatcher does
not leak the pending acknowledgements across threads if the executor
descides to run the listener on the same thread as the publisher. This
occurred due to using a static ThreadLocal rather than per cache
instance, which was a mistake as typically that is how ThreadLocals
are meant to be used.

While crafting tests, it was noticed that the CaffeineConfiguration's
setters are not fluent like the JCache's basic implementation,
MutableConfiguration. This was changed for consistency and convenience.
@ben-manes
Copy link
Owner

Thanks again for the fix, merged with tests. That was certainly a tricky one!

Let me know if you'd like a release and I'll try to get to that soon.

@monitorjbl
Copy link
Contributor Author

Thanks for merging it! A release would be nice, we've worked around the problem for the moment by not using a ForkJoinPool but there are other places in our code that might be at risk through a client library using Caffeine without our knowledge.

@ben-manes
Copy link
Owner

Released in 3.1.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants