Skip to content

Removal listener not called #859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mario-schwede-hivemq opened this issue Jan 12, 2023 · 13 comments
Closed

Removal listener not called #859

mario-schwede-hivemq opened this issue Jan 12, 2023 · 13 comments

Comments

@mario-schwede-hivemq
Copy link

First, thank you for this really great library!

Now to my issue:

I have the problem that the removal listener is sometimes not executed in low throughput situations.
I use the most recent version 3.1.2. As a workaround, I created a scheduled task that executes cleanUp() every second instead of setting a scheduler.

Here is a reproducer:

package caffeine;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import org.junit.jupiter.api.RepeatedTest;

import java.time.Duration;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.fail;

public class CaffeineReproducer {
    private static final Object VALUE = new Object();
    private static final int NUMBER_OF_RUNS = 100_000;
    private static final int NUMBER_OF_KEYS = 10;
    private static final long WAIT_NANOS = TimeUnit.SECONDS.toNanos(10);

    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final AtomicInteger failedRun = new AtomicInteger(-1);

    @RepeatedTest(10)
    void testRunRemovalListener() throws InterruptedException {
        for (int i = 0; i < NUMBER_OF_RUNS; i++) {
            if (failedRun.get() != -1) {
                if (failedRun.get() == Integer.MIN_VALUE) {
                    fail("Interrupted");
                } else {
                    fail("Removal listener not called on run: " + failedRun.get());
                }
                return;
            }
            final int run = i + 1;
            executor.execute(() -> runTest(run));
            TimeUnit.MILLISECONDS.sleep(1);
        }
    }

    private void runTest(final int run) {
        final CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();

        final Cache<Object, Object> cache = Caffeine.newBuilder()
                .scheduler(Scheduler.systemScheduler())
                .executor(executor)
                .removalListener((key, value, cause) -> list.add(value))
                .expireAfterWrite(Duration.ofMillis(5))
                .build();

        for (int i = 0; i < NUMBER_OF_KEYS; i++) {
            final int key = i;
            executor.execute(() -> cache.put(key, VALUE));
        }

        final long start = System.nanoTime();
        while (list.size() < NUMBER_OF_KEYS) {
            if ((System.nanoTime() - start) >= WAIT_NANOS) {
                failedRun.set(run);
                return;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (final InterruptedException e) {
                failedRun.set(Integer.MIN_VALUE);
                return;
            }
        }
    }
}
@ben-manes
Copy link
Owner

ben-manes commented Jan 12, 2023

Hi @mario-schwede-hivemq ,

When I add a println to your test's removal listener then I see it being called, but the test fails claiming no calls were made.

.removalListener((key, value, cause) -> {
  System.out.printf("#%s - %s: %s=%s%n", run, cause, key, value);
  list.add(value);
})
#23 - EXPIRED: 0=java.lang.Object@6688f5b6
#31 - EXPIRED: 0=java.lang.Object@6688f5b6
#27 - EXPIRED: 0=java.lang.Object@6688f5b6
#8 - EXPIRED: 1=java.lang.Object@6688f5b6
#8 - EXPIRED: 2=java.lang.Object@6688f5b6
org.opentest4j.AssertionFailedError: Removal listener not called on run: 27
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
	at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
	at CaffeineReproducer.testRunRemovalListener(CaffeineReproducer.java:32)

@ben-manes
Copy link
Owner

An issue with the test is that it does not hold a strong reference to the cache. The scheduled cleanup task holds a weak reference to it, so that if it outlives the cache's use then the task can no-op and the cache garbage collected. It doesn't make much sense to have a cache that the application discarded but pending expiration keeps it around, a proper shut down or long-lived instance would be more reasonable application logic. So that seems to be halting the accumulation and explain part of it, at least.

@mario-schwede-hivemq
Copy link
Author

Thank you for your investigation, and you are right. The reproducer has the problem, that the cache can be garbage collected. I fixed that issue and now the test is green.
This left me a bit confused, because in the real life code, where I observed this behavior, the cache is an instance variable and can not be collected. And of course most of the time it works as expected and the removal listener is called.
I have to dig into that code again. Maybe I overlooked something.

@ben-manes
Copy link
Owner

When I rewrite your test to capture all of the run state and use a CountDownLatch to track completion, I see cases where the entries are in the data map, the expiration policy's data structures are empty as not yet updated, there is no scheduled future as nothing to expire, and those pending additions are waiting in the write buffer. Once another cache operation occurs, like a read or write, then the write buffer will be flushed and everything quiesces to the desired state.

As scheduling is documented as best-effort, from that perspective this is acceptable as there will be benign orchestration races which do not result in invalid state or operations. However if you are relying in strict timing events for business logic and otherwise leave the cache idle, then the divergences of goals could cause you problems. In that case it might make sense to use a stricter timing subsystem rather than rely on the cache's fuzziness. There may be some tweaks that we can do a bit better, but I am not sure yet if that would fully resolve this scenario or simply make it less likely.

In a cache every read is a write to update shared state, such as reorder items on an LRU list. This is why traditional caches do not scale well because every access requires obtaining an exclusive lock to update the policy metadata. The approach that Caffeine uses is to update the ConcurrentHashMap immediately, stage these events into ring buffers, and schedule a non-blocking task to replay these events onto the policy. This way we can absorb concurrent reads and writes without blocking or contention, guard the policy with a lock to use the best algorithms, and quiesce to the desired outcome.

This orchestration means there can be races. Here the maintenance work is running, flushed the write buffer, and is busy expiring all of the entries that the policy is aware of. A concurrent write adds the data entry and write event, updates a flag, but does not schedule a new maintenance task as one is running. When the maintenance finishes, there is nothing to schedule but new pending work when it releases the lock. If you use the default ForkJoinPool then we can safely schedule a new task immediately and the test passes without any hiccups. However, if you use a custom executor then we might over penalize it, e.g. if a caller-runs policy, so don't know if it is safe / acceptable / friendly to reschedule immediately. The cache's own logic is very fast, but when we make calls to listeners we don't know the cost of this foreign, user code. In your test since the cache is then idle, nothing triggers the next maintenance run and the listener is not notified yet.

Does that make sense? Do you have any suggestions for how this could be improved, etc?

Optimistic rescheduling

/**
* Performs the maintenance work, blocking until the lock is acquired.
*
* @param task an additional pending task to run, or {@code null} if not present
*/
void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
maintenance(task);
} finally {
evictionLock.unlock();
}
if ((drainStatusOpaque() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {
scheduleDrainBuffers();
}
}

Test Screen Shot 2023-01-12 at 12 19 09 PM
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.RepeatedTest;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;

public class CaffeineReproducer {
  private static final Object VALUE = new Object();
  private static final int NUMBER_OF_RUNS = 100_000;
  private static final int NUMBER_OF_KEYS = 10;

  private final ExecutorService cacheExecutor = Executors.newFixedThreadPool(200);
  private final ExecutorService testExecutor = Executors.newCachedThreadPool();
  private final ConcurrentHashMap<Integer, TestRun> runs = new ConcurrentHashMap<>();

  @RepeatedTest(1)
  void testRunRemovalListener() throws InterruptedException {
    System.out.printf("%n%nSTART%n%n");

    for (int i = 1; i <= NUMBER_OF_RUNS; i++) {
      runs.put(i, runTest(i));
    }
    Thread.currentThread().setName(getClass().getSimpleName());
    for (int i = 1; i <= NUMBER_OF_RUNS; i++) {
      System.out.printf("#%s - WAIT%n", i);
      var run = runs.get(i);
      boolean finished = run.latch.await(5, TimeUnit.SECONDS);
      if (!finished) {
        System.out.printf("debug for inspection");
        run.cache.cleanUp();
      }
    }
    System.out.printf("%n%nEND%n%n");
  }

  private TestRun runTest(final int run) {
    var list = Collections.synchronizedList(new ArrayList<>());
    var latch = new CountDownLatch(NUMBER_OF_KEYS);

    Cache<Object, Object> cache = Caffeine.newBuilder()
        .removalListener((key, value, cause) -> {
          //System.out.printf("#%s - %s: %s=%s%n", run, cause, key, value);
          latch.countDown();
          list.add(value);
        })
        .expireAfterWrite(Duration.ofMillis(5))
        .scheduler(Scheduler.systemScheduler())
        .executor(cacheExecutor)
        .build();
    for (int i = 0; i < NUMBER_OF_KEYS; i++) {
      var key = i;
      testExecutor.execute(() -> cache.put(key, VALUE));
    }
    return new TestRun(cache, list, latch);
  }

  class TestRun {
    Cache<Object, Object> cache;
    List<Object> notifications;
    CountDownLatch latch;

    TestRun(Cache<Object, Object> cache, List<Object> notifications, CountDownLatch latch) {
      this.notifications = notifications;
      this.cache = cache;
      this.latch = latch;
    }
  }
}

@ben-manes
Copy link
Owner

I think that the optimistic rescheduling could use the provided scheduler safely because we know that it should perform the work sometime in the future and not simply loop onto the calling thread immediately. The code change would be something like the following, which passes your test.

if (drainStatusOpaque() == REQUIRED) {
  if (executor == ForkJoinPool.commonPool()) {
    scheduleDrainBuffers();
    return;
  }

  var pacer = pacer();
  if ((pacer == null) || (pacer.future != null)) {
    return;
  }
  synchronized (pacer) {
    if (pacer.future == null) {
      pacer.schedule(executor, drainBuffersTask,
          expirationTicker().read(), Pacer.TOLERANCE);
    }
  }
}

@mario-schwede-hivemq
Copy link
Author

Good that you found the possible cause of the problem and also provided a proposal for a fix. Your explanation and the updated test makes total sense.

The commonPool is, in my case, not an option, because it is shared JVM wide and I have no control (and insights) over other usages.

Your finding also shows me that my workaround does not work. It just changes the cleanup timing a little:
executor.scheduleWithFixedDelay(cache::cleanUp, 1, 1, TimeUnit.SECONDS)

I thought a little about what you wrote according to business logic in the callbacks. I totally agree that there can not be any assumptions about the timing when a callback will be called. But it should be at least be called. This is not only important for business logic, but also in other cases, like cleanup resources.

I also agree that the most common case is that there is ongoing interaction with the cache, but it can always happen that, for whatever reason, the interaction stops. In this case, nothing should hang around in the cache anymore when a scheduler is configured.

I really appreciate your time on this and your detailed explanations.

@ben-manes
Copy link
Owner

ben-manes commented Jan 13, 2023

Your finding also shows me that my workaround does not work. It just changes the cleanup timing a little

Your workaround is fine because it resolves it continuously. My fix would do the same, except by inspecting the internals it can schedule only when necessary instead of (blindly) periodically.

I totally agree that there can not be any assumptions about the timing when a callback will be called. But it should be at least be called.

Right and the listener is always called, just that various tradeoffs results in there being no strict guarantee about when. That can cause an unexpected delay like in your findings, but not a missed invocation.

In this case, nothing should hang around in the cache anymore when a scheduler is configured.

I agree. Hopefully, I'll get a chance to work on the unit tests this weekend and release a fix with our proposed changes.

Thanks for the bug report, reproducer, and patience!

ben-manes added a commit that referenced this issue Jan 16, 2023
A benign race can occur when a maintenance cycle is running, writers
insert new entries, and all of the entries that the policy is aware of
have expired so they are removed. In this case no clean up was scheduled
by the expiration policy, the maintenance status is left as "required",
and entries in the write buffer are waiting to be applied. If the cache
is idle then those entries will expire and the removal listener will not
be notified promptly. As a scheduler is configured to communicate that
a prompt notification is desired, the cache should more aggressively use
it to ensure this case is handled.
ben-manes added a commit that referenced this issue Jan 17, 2023
ben-manes added a commit that referenced this issue Jan 17, 2023
@ben-manes
Copy link
Owner

Released in 3.1.3

@sagansfault
Copy link

Will this change not be implemented for pre v3 (java 8) Caffeine?

@ben-manes
Copy link
Owner

Correct, it’s not planned for 2.x. This is a quality of life improvement that is non-critical and the old behavior was allowed by the api contract. Java 8 reached EOL for public security fixes so those users are already accepting a very limited amount of support and have a high risk tolerance.

@sagansfault
Copy link

Thanks for the fast reply! I apologize for reviving this old thread.
Alright. Currently I provide my LoadingCache with a Executors.newSingleThreadedExecutor.. to hopefully have it regularly clean up and call the removal listener as soon as it can. This has not worked. Neither has scheduling another task outside of the executor to do cleanups every second as I guessed the other person above was doing.
I will have to find another solution.

@ben-manes
Copy link
Owner

My recollection was that calling cleanUp() manually would flush the pending work and resolve the problem.

@sagansfault
Copy link

I think the cleanup function is being called but the listener is just not being notified. I will do some testing and see if I can get some proper reproducible results

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants