Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Too difficult to control how much Reactor buffers internally #3759

Open
RonBarkan opened this issue Mar 16, 2024 · 2 comments
Open

Too difficult to control how much Reactor buffers internally #3759

RonBarkan opened this issue Mar 16, 2024 · 2 comments
Labels
status/need-investigation This needs more in-depth investigation status/need-triage
Milestone

Comments

@RonBarkan
Copy link

RonBarkan commented Mar 16, 2024

Given the following toy example, and especially when converting it to real code, it is very hard to control how much Reactor buffers.
In practice, it would gladly buffer many 100K of elements, when only 800 will be sufficient for an efficient flow. Looks like the only way to control this is through limitRate(). However, using it is a whack-a-mole solution, which does not translate at all from the toy example to real code.
By "buffer" I mean the count of elements that went into a pipeline and their processing is still pending, not to be confused with purposefully buffering, such as .buffer() or .cache() etc.

Note I have posted this stackoverflow question, but there were no takers.

Expected Behavior

  1. Reactor make good use of memory for optimal buffering before/at operators and does not store too many elements in the pipeline.

  2. The developer should be getting close to optimal behavior out of the box or should be able to easily control the behavior to be optimal, without whack-a-mole or investing too much time to research it.

Actual Behavior

  1. Reactor gladly buffers many 100Ks of elements, where it could easily used only 800 could have been buffered instead,
    Not only that, if too much is buffered, this can have additional implication, such as drain time if takeWhile() is used, or the number of elements that will require reprocessing when a task is rerun, because the previous iteration was too greedy.

  2. I'd claim that limitRate() is not sufficient and that tuning the pipeline to do the right thing is too difficult and, even if I invested the time to get it right, it is fragile and can easily be broken when changing the pipeline.
    Worse, even the toy example below behaves differently than my real code, since the limitRate() solution did not copy the behavior, leaving me puzzled at what's going on.

Steps to Reproduce

Important notes:

  1. limitRate(2) looks like it is ignored, but gets buffering of 7000 to 9000 items on my machine with this test (which is close to reasonable BUT, see next bullet).
  2. Unfortunately, the use of limitRate() as per below does not copy the behavior to my real code, which still happily cache 100Ks of elements, even though both pipelines are very similar.

What happens with this is that generate() is called too eagerly while step C processes items slowly.
I am guessing that the watermark level for generate is 7.5 out of 10, but the level drops too quickly below the watermark.

  @Test
  void testTooMuchBuffering() {
    var counter = new AtomicInteger(0);
    var inA = new AtomicLong(0);
    var reqA = new AtomicLong(0);
    var inB = new AtomicLong(0);
    var reqB = new AtomicLong(0);
    var inB1 = new AtomicLong(0);
    var reqB1 = new AtomicLong(0);
    var inB2 = new AtomicLong(0);
    var reqB2 = new AtomicLong(0);
    var inB3 = new AtomicLong(0);
    var reqB3 = new AtomicLong(0);
    var inC = new AtomicLong(0);
    var reqC = new AtomicLong(0);
    var passed = new AtomicLong(0);

    Runnable snapshot = () -> {
      var a = inA.get() - inB.get();
      var b = inB.get() - inB1.get() / 1000;
      var b1 = inB1.get() - inB2.get() * 20;
      var b2 = inB2.get() - inB3.get();
      var b3 = inB3.get() - inC.get() / 20;
     // Requested/buffered (real elements buffered)
      System.out.printf(
          "A: %d/%d (%d) " +
              "B: %d/%d (%d) " +
              "B1: %d/%d " +
              "B2: %d/%d (%d) " +
              "B3: %d/%d (%d) " +
              "C: %d/%d done:%d buffered:%d\n",
      reqA.get(), a, a * 1000,
      reqB.get(), b, b * 1000,
      reqB1.get(), b1,
      reqB2.get(), b2, b2 * 20,
      reqB3.get(), b3, b3 * 20,
      reqC.get(), inC.get() - passed.get(),
      passed.get(),
      counter.get() - passed.get());
    };

    Flux.<List<Integer>>generate(
            sink -> {
              int val = counter.getAndAdd(1000);
              if (val >= 200_000) {
                counter.addAndGet(-1000);
                sink.complete();
                return;
              }
             // Reading from a "database"
              System.out.println("\nGenerating " + (val + 1000));
              snapshot.run();
              sink.next(IntStream.range(val, val + 1000).boxed().toList());
            })
        .doOnRequest(r -> reqA.addAndGet(r))
//        .limitRate(2)
        .doOnNext(i -> inA.incrementAndGet())
        .flatMap(l -> Flux.just(l), 1)  // A

        .doOnRequest(r -> reqB.addAndGet(r))
        .limitRate(2)
        .doOnNext(i -> inB.incrementAndGet())
        .flatMapIterable(Function.identity()) // B

        .doOnRequest(r -> reqB1.addAndGet(r))
        .limitRate(800)
        .doOnNext(i -> inB1.incrementAndGet())
        .buffer(20) // B1
        .doOnRequest(r -> reqB2.addAndGet(r))
        .doOnNext(i -> inB2.incrementAndGet())
        .takeWhile(l -> true) // B2
        .doOnRequest(r -> reqB3.addAndGet(r))
        .doOnNext(i -> inB3.incrementAndGet())
        .flatMapIterable(Function.identity()) // B3

        .doOnRequest(r -> {
          reqC.addAndGet(r);
          System.out.println("C Requesting: " + r);
        })
        .limitRate(800)
        .doOnNext(i -> inC.incrementAndGet())
        .flatMapSequential(i -> Flux.defer(() -> Flux.just(i).delayElements(Duration.ofMillis(30))), 4) // C
        .doOnNext(i -> {
          long val = passed.incrementAndGet();
          if ((val % 250) == 0) {
            snapshot.run();
          }
        })
        .count()
        .block();
  }

Things to try:

  1. Remove all limitRate() calls
  2. Buffer no more than 800 elements

Your Environment

  • Reactor version(s) used: 3.5.11
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): JDK 21
  • OS and version (eg uname -a): WSL2 / Ubuntu 5.15.146.1-microsoft-standard-WSL2
@RonBarkan
Copy link
Author

If this is indeed a defect, it is quite severe. It could mean that Reactor could not be used in some cases where memory consumption is important.

@chemicL
Copy link
Member

chemicL commented Apr 11, 2024

Hey, thanks for bringing this up. I didn't find the time to investigate deeply and can't make any prediction as to when I can do so, but I remembered potentially related discussions that might clarify things around this subject:

There might be more related concerns. Please have a look and let me know if this is along the lines of what you are concerned about.

@chemicL chemicL added status/need-investigation This needs more in-depth investigation status/need-triage labels Apr 11, 2024
@chemicL chemicL added this to the 3.6.x Backlog milestone Apr 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation status/need-triage
Projects
None yet
Development

No branches or pull requests

2 participants