Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Mono.cache() forget its source when it terminates #3598

Open
yamass opened this issue Oct 2, 2023 · 6 comments
Open

Make Mono.cache() forget its source when it terminates #3598

yamass opened this issue Oct 2, 2023 · 6 comments
Labels
status/need-investigation This needs more in-depth investigation

Comments

@yamass
Copy link

yamass commented Oct 2, 2023

In our application, we use a chain of Monos that represent the fact a previous work has been completed. So every time some new work is done, the "previous work completed" Mono is replaced by a new one using then().cache().

var previousWorkCompletedMono = Mono.empty();
...
previousWorkCompletedMono = previousWorkCompletedMono
        .then(someMonoRepresentingWorkToBeDone)
        .cache();

However, doing this often enough will result in an OutOfMemoryError.

Example (Please run with -Xmx100m):

class CacheMemoryTest {

	@Test
	void name() {
		Mono<Integer> mono = Mono.empty();

		for (int i = 0; i < 1_000_000; i++) {
			mono = mono.then(Mono.just(i)).cache();
			mono.block();
		}

		System.out.println("Done");
	}
}

This is due to the fact that the MonoCacheTime does not unset the reference to its source after it emits a terminal signal. Therefore, the whole chain of operators is kept in memory.

Note that chaining using map() instead of then() also might make sense for similar cases, when the new value depends on the old one.

Expected Behavior

MonoCacheTime should unset its reference to their source as soon as its source emits a terminal signal.

Actual Behavior

See example above.

Possible Solution

Make MonoCacheTime unset its reference to its source as soon as its source emits a terminal signal.

Your Environment

reactor-core-3.4.24

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 2, 2023
@OlegDokuka
Copy link
Contributor

@yamass how about onTerminateDetach which exactly forget the source on termination?

@OlegDokuka OlegDokuka added for/stackoverflow Questions are best asked on SO or Gitter and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Oct 4, 2023
@yamass
Copy link
Author

yamass commented Oct 4, 2023

@OlegDokuka: Doesn't help.

	@Test
	void testCache() {
		Mono<Integer> mono = Mono.empty();

		for (int i = 0; i < 1_000_000; i++) {
			mono = mono.then(Mono.just(i))
					.onTerminateDetach()
					.cache();
			mono.block();
		}

		System.out.println("Done");
	}

Still runs into an OutOfMemoryError. (with -Xmx100m).

@kimec
Copy link

kimec commented Oct 4, 2023

I fail to see, how this is a reactor-core library problem. You are essentially building a single linked list like data structure while only being interested in its head which you are accessing directly via a pointer. In my view, this is a conscious memory leak on the programmer's part.

Why don't you do:

   @Test
    public void name() {
        Mono<Integer> mono = Mono.empty();

        for (int i = 0; i < 5_000_000; i++) {
            final Sinks.One<Integer> sink = Sinks.one();
            final Mono<Integer> hotMono = (i % 2 == 0) ? Mono.empty() : Mono.just(i);

            mono.then(hotMono).subscribe(sink::tryEmitValue, sink::tryEmitError, sink::tryEmitEmpty);

            mono = sink.asMono();
            System.out.println(mono.block());
        }

        System.out.println("Done");
    }

Or

    @Test
    public void name() {
        Mono<Integer> mono = Mono.empty();

        for (int i = 0; i < 5_000_000; i++) {
            final Sinks.One<Integer> sink = Sinks.one();
            final Mono<Integer> hotMono = (i % 2 == 0) ? Mono.empty() : Mono.just(i);

            mono.then(hotMono).subscribe(new HandcraftedSubscriber<>(sink));

            mono = sink.asMono();
            System.out.println(mono.block());
        }

        System.out.println("Done");
    }

    private static class HandcraftedSubscriber<T> extends BaseSubscriber<T> {

        final Sinks.One<T> sink;

        HandcraftedSubscriber(Sinks.One<T> sink) {
            this.sink = sink;
        }

        @Override
        protected void hookOnNext(T value) {
            sink.tryEmitValue(value);
        }

        @Override
        protected void hookOnError(Throwable throwable) {
            sink.tryEmitError(throwable);
        }

        @Override
        protected void hookOnComplete() {
            sink.tryEmitEmpty();
        }

    }

Sinks.one() has the same semantics you get from cache() (replay of the emission for late subscribers). Also since emissions cannot fail on Sinks.one() in these examples, no need to check for emission result.

@yamass
Copy link
Author

yamass commented Oct 4, 2023

@kimec

Yes, I am essentially building up a linked list. Didn't I essentially say that myself? The reason why I regard this as problematic behavior of the library is because it is unnecessary. The library could (and in my opinion should) cap references to previous (terminated) monos, especially in the case of cache.

I clearly described why we need this. Also, I clearly described how the cache() operator could cap its reference to its upstream source, which is what I would have expected in the first place. And finally, I also made clear that the onTerminateDetach() operator seems to not solve the problem, opposed to what one would expect.

It's all about being able to express solutions to problems in a way that is idiomatic to reactor. Your workarounds look everything else than idiomatic, imho.

By the way, we have obviously also come up with some workaround (since we needed to) but it is neither very idiomatic.

@kimec
Copy link

kimec commented Oct 4, 2023

@yamass

Your workarounds look everything else than idiomatic, imho.

You are correct. It's probably because it would never occur to me that modifying reactive chain of Monos the way you do should release any references to their parents.
Aren't you trying to build an infinite sequence by chaining Monos instead of using, you know, an idiomatic infinite sequence like Flux?

@yamass
Copy link
Author

yamass commented Oct 4, 2023

@kimec The answer is no.

@OlegDokuka Please, could you look into why it does not work with onTerminateDetach()

@OlegDokuka OlegDokuka added status/need-investigation This needs more in-depth investigation and removed for/stackoverflow Questions are best asked on SO or Gitter labels Nov 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation
Projects
None yet
Development

No branches or pull requests

4 participants