Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-cancelled Sink still accepts emissions #3715

Open
kaqqao opened this issue Feb 6, 2024 · 9 comments · May be fixed by #3789
Open

Auto-cancelled Sink still accepts emissions #3715

kaqqao opened this issue Feb 6, 2024 · 9 comments · May be fixed by #3789
Labels
help wanted We need contributions on this type/bug A general bug

Comments

@kaqqao
Copy link

kaqqao commented Feb 6, 2024

A sink created via Sinks.many().multicast().onBackpressureBuffer() will keep accepting emissions even after it's cancelled by autoCancel (which is implicitly enabled, and unfortunately not mentioned in the method's Javadoc), until the internal buffer is filled. Javadoc on the overload onBackpressureBuffer(int bufferSize, boolean autoCancel) mentions:

@param autoCancel should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels

Expected Behavior

After the sink is shut down (cancelled), sink.tryEmitNext(...) should fail.

Actual Behavior

sink.tryEmitNext(...) succeeds and keeps filling the buffer.

Steps to Reproduce

@Test
public void cancelledSinkStillAcceptsEmissions() {
    Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
    Disposable subscription1 = sink.asFlux().subscribe(s -> System.out.println("1: " + s));
    assertEquals(1, sink.currentSubscriberCount());
    sink.tryEmitNext("Test1");
    subscription1.dispose();
    assertEquals(0, sink.currentSubscriberCount()); //autoCancel kicks in, subscriptions no longer possible
    Disposable subscription2 = sink.asFlux().subscribe(s -> System.out.println("2: " + s));
    assertTrue(subscription2.isDisposed()); //auto-cancelled
    assertEquals(0, sink.currentSubscriberCount());
    assertTrue(sink.tryEmitNext("Test2").isFailure(), "Emissions on a cancelled sink should fail");
}

In this state, no subscriber will ever be able to subscribe to the sink and consume "Test2" or any future emissions, yet emissions are accepted and buffered.

Possible Solution

Add

if (isCancelled()) {
    return EmitResult.FAIL_CANCELLED;
}

to the beginning of tryEmitNext, tryEmitError and tryEmitComplete in SinkManyEmitterProcessor.java;

Your Environment

  • Reactor version(s) used: 3.6.2
  • JVM version (java -version): 17, 21
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Feb 6, 2024
@qnnn
Copy link

qnnn commented Feb 15, 2024

Hi, I'm lucky to come across this issue. I'm currently learning Reactor and have also encountered this problem, which has left me confused. Additionally, I have a question related to the EmitResult code, and perhaps it can be addressed together in this issue.

Is it necessary to use a new response code to represent "warm up" and distinguish whether subscribers exist when using tryEmitNext?

Issue #2338 mentions: do we really need a separate fail code (vs reusing FAIL_OVERFLOW)

The solution related to #2338 is to introduce the FAIL_ZERO_SUBSCRIBER status code. However, triggering the FAIL_ZERO_SUBSCRIBER response code requires meeting the following two conditions:

  1. The queue cache is full or there is no queue cache
  2. There are no subscribers

Perhaps adjusting or extending the meaning of the EmitResult code can help us better understand the state of Sinks.Many when using tryEmitNext. I'm not sure if the following points are reasonable:

  1. The FAIL_ZERO_SUBSCRIBER response code only indicates that there are no subscribers and does not consider whether the queue cache is full.

  2. When the queue overflows, only the FAIL_OVERFLOW status code is triggered (or a new extended status code, such as "FAIL_OVERFLOW_WITHOUT_SUBSCRIBERS," which means that there are no subscribers when FAIL_OVERFLOW occurs).

  3. A new status code can be used to represent "warm up," as mentioned in the Javadoc of Sinks.MulticastSpec#onBackpressureBuffer:

    "Without Subscriber: warm up. Remembers up to Queues.SMALL_BUFFER_SIZE elements pushed via Sinks.Many.tryEmitNext(Object) before the first Subscriber is registered."

image-20240214215905746

When the "warm up" response code appears, it can indicate the possibility of FAIL_OVERFLOW risk.

@chemicL
Copy link
Member

chemicL commented Feb 23, 2024

@kaqqao thanks for the report! I reviewed the other Sinks implementations and do believe your expectations are backed by the fact that SinkManyUnicast and SinkManyUnicastNoBackpressure also reject emissions in case of cancellation. I'll have a look at the PR you opened as well and provide feedback.

@chemicL chemicL added type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Feb 23, 2024
@chemicL
Copy link
Member

chemicL commented Feb 23, 2024

@qnnn glad to hear you're learning Reactor 🚀 Hope the experience is pleasant so far.

Regarding the suggestion you made:
re 1. FAIL_ZERO_SUBSCRIBER would indicate failure, while the scenario you describe in the buffering cases means the items are buffered and succeed.
re 2. the FAIL_ZERO_SUBSCRIBER in its current meaning is exactly that as far as I can tell.
re 3. the idea is not bad per se, however it's not applicable for two reasons: 1) it would be a breaking change of behaviour; 2) I don't see what value it would provide. The idea for Sinks is essentially to decouple the producer from the consumer so both can make progress. The failure events tell the producer that it needs to worry and deal with backpressure, while the warmup phase is actually a successful scenario until the buffer fills up and requires no indication that nobody is yet receiving.

If you feel this is falling short in some scenarios and there are situations that changing this behaviour would enable new flows, please open a new issue with ideas about the migration path if there's a behaviour change proposed. Let's not mix two concerns in the same issue. Thanks in advance and good luck on your learning journey!

@qnnn
Copy link

qnnn commented Feb 25, 2024

@chemicL I really appreciate your patient response, and it has been very helpful to me. I apologize for bringing it up in this issue. Best wishes!

@kimec
Copy link

kimec commented Feb 27, 2024

Nice to see this discussion finally started

@chemicL chemicL added the help wanted We need contributions on this label Apr 11, 2024
@chemicL
Copy link
Member

chemicL commented Apr 11, 2024

This issue is still a valid one and is open for contributions. The attempt in #3725 had some concerns and was closed due to inactivity so there's a chance to build upon that feedback.

@bajibalu
Copy link

bajibalu commented Apr 17, 2024

Hi @chemicL I am also learning reactor. I am interested to work on this issue. I saw your concern in the other PR. I am not sure whether it is possible/feasible. But is it a good idea to somehow freeze the queue once the sink is canceled so that it does not accept any more items?

@chemicL
Copy link
Member

chemicL commented Apr 17, 2024

Hey, @bajibalu! Happy to hear you're learning reactor and are interested in contributing 🎉

The queues we have do not have a concept of freezing/closing. However, there are other mechanisms to prevent inserting an item. On the other hand, it's also not an issue if the item is injected as long as somthing later notices that a cancellation has happened and takes care of clearing the queue.

After looking at the code one more time I think it is the case already that an item inserted after cancellation should be removed from the queue due to the drain operation that follows. Unfortunately, I couldn't get it to work because somehow the WIP (work-in-progress) marker is left in an unclean state. Debug the below code to see what's happening:

	SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
	processor.tryEmitNext(1);
	processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose();

	processor.tryEmitNext(2);

	System.out.println(processor.queue.size());

If nothing was emitted before, then no queue is created:

	SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);

	processor.asFlux().subscribe().dispose();

	processor.tryEmitNext(1);

	System.out.println(processor.queue == null);

Solving this issue would be the first step to a comprehensive solution.

One potential issue is that in case of a emit/cancel race we wouldn't communicate the cancellation to the caller if cancellation happens after checks for cancellation but before the item is inserted into the queue. But perhaps that's not a big deal as long as something discards the item from the queue (so fixing the above nuance is necessary).

What I would love to see in a PR is:

  • unit tests validating the above scenarios provided as code samples
  • a JCStress test that validates racing cancellation with tryEmitNext
  • an allowed test outcome is an OK result for emission
  • an allowed test outcome is a CANCELLED result
  • in both cases the arbiter validates the queue to check it is indeed empty

@bajibalu in case you'd like to work on this, just leave a note here so if anyone else interested in this wants to take over we can re-assign in case you don't find the time. Thanks!

@bajibalu bajibalu linked a pull request Apr 18, 2024 that will close this issue
@bajibalu
Copy link

Hi @chemicL I pushed a PR to fix this issue. Please take a look at this PR and let me know if this works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted We need contributions on this type/bug A general bug
Projects
None yet
6 participants