Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FluxSampleTimeout Javadoc concerns #3689

Open
MikkelHJuul opened this issue Jan 9, 2024 · 3 comments
Open

FluxSampleTimeout Javadoc concerns #3689

MikkelHJuul opened this issue Jan 9, 2024 · 3 comments
Labels
status/need-user-input This needs user input to proceed

Comments

@MikkelHJuul
Copy link

First of all, the javadoc has a FIXME. This ultimately helps to confuse the reader.

I will clear out the question in the FIXME though; maxConcurrency is not a terrible name for the variable. Emissions from the source is added onto a queue, and read from the queue rapidly hereafter it seems. Therefore the queue/buffer protect for concurrency mitigating data loss. (It's not clear to me why this has not been passed along on the stack in stead - but all of this is a bigger discussion - the same goes for the ReplayBuffer interfaces between add and replay). I think the default buffer size may even be a bit beefy (who has 32cores to overwhelm the buffer?) in any case the buffer size here really do regard the concurrency, and nothing else really.

Now onto what is actually important for me to have cleared; discard support.

The operator discard elements in the queue when downstream cancels, or when upstream errors. Concurrent overwhelming of the buffer will result in the items simply losing reference; which could result in untimely triggering. These items are not discarded nor dropped, and their companion Publisher are not cancelled.

But the misleading wording tells the reader that items not part of the sampling are discarded; they are not. The trigger that a client needs to implement themselves to handle "discard" is the companion Publisher#cancel. The marble diagram hints at this, but I think the docs could be more clear here.

I can also report of a possible bug: it seems to me that the Publisher companion of an element that is successfully emitted to a downstream that cancelOnNext will itself be cancelled, causing the normal/successful element to become "discarded". I will investigate/recreate later.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jan 9, 2024
@MikkelHJuul
Copy link
Author

Okay, I have now done some ground work on the Operator, but I have reached a standstill.

my work is present in the branch https://github.com/MikkelHJuul/reactor-core/tree/issues/3689

The implementation plainly ditch using a queue.

(which seems to be OK since the queue was used in an unsafe way, and would only really result in caching more Publishers than needed (you only need 1, and can only propagate values from that 1)) (and overflowing would drop elements silently)

I'm very aware that some corner-cases are not met in the implementation I link. My issue now, is that I disagree with some of the tests, and how the operator works/should work.

This could obviously be a thing that I will simply have to be completely backwards compatible (so we cannot change this now), but here are the cases I disagree with:

  • When the source completes, the most recent value is played immediately.
    I think this behavior breaks with the contract of "sampling". IMO the element and then complete should be propagated after the last other publisher emits. i.e. the flux here should never emit the source value:
      Flux.just(5).sampleTimeout(i -> Mono.never())
    and
      Flux.just(5).sampleTimeout(i -> Mono.delay(Duration.ofDays(1))
    should send 5 after 1 day.
  • A Subscriber that request's late, from a sampled flux should have that element emitted, even if the source already completed.
    Right now source-completion will only play out if the subscriber requests in #onSubscribe. I believe the turn of events should play out in order, on request as well. (this could be a bit hard to implement as is; since we essentially drop the Publisher once we have subscribed, and as such we do not know if the item should play (the publisher emitted) after request is called, I would have to reintroduce something, like an index (updated on Other#next), for this test).
    On a related note, I believe the promise of the last element always being emitted is broken in this situation, as the last element was not played.

test:

@Test
void lateSubscriber() {
   Flux<Integer> input = Flux.just(4);
   int[] discards = {-1};
   Flux<Integer> sampled = input.sampleTimeout(i -> Mono.empty()) // instant playback
                   .doOnDiscard(i -> discards[0] = i);
  
   ZeroRequestOnSubscribeSubscriber testSub = new ZeroRequestOnSubscribeSubscriber();
   sampled.subscribe(testSub);

   assert testSub.next == 0;
   testSub.subs.request(1);
   assert testSub.next == 0; // value still not played
   Integer val = sampled.blockFirst();
   assert testSub.next == 0; // no value appeared here
   assert val == 4; // 4 is available if you requested eagerly
   assert discards[0] == -1;
} 

static class ZeroRequestOnSubscribeSubscriber extends BaseSubscriber<Integer> {
      Subscriber subs;
      int next;
     
      @Override
      protected void hookOnSubscribe(Subscription s) {
             subs = s;
      }

      @Override
      protected void hookOnNext(Integer i) {
             next = i;
     }
}

I disagree with:

    @Test
    public void scanOther() {
        ...
        assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(main.other);
        ...
    }

this test tests that they are both null, they should be verified separately (i break this, since main.other is set at startup, but I also think that it's incorrect to test that two nulls are the same).

A possible way around doing too much here is to simply document this behavior in the Javadoc.

  • I think, not propagating at all, on a late request is fine, but should be documented (especially given the javadoc statement telling otherwise: last element always sent)
  • I think propagation after upstream complete should be reworked to honor throttling, but it should be documented at least.

@MikkelHJuul MikkelHJuul changed the title FluxTimeout Javadoc concerns FluxSampleTimeout Javadoc concerns Jan 13, 2024
@chemicL
Copy link
Member

chemicL commented Jan 25, 2024

Hi, thanks for the report and the work you did on your branch. I spent some time looking into the existing implementation and tried to frame your concerns around what I learned.
Before considering any possible alteration of the existing behaviour, let's first clarify the expectations and concerns. From what I understood, the complaints you bring are the following:

  1. The Javadoc has a "FIXME" note which is confusing.
  2. Discard support and unclear wording.
  3. Handling of cancellation of a companion - possible bug.
  4. The last value should be delivered at the end of the timeout.

Before addressing these concerns, I believe there is some misunderstanding with regards to what ends up in the queue.

Whenever a new item is emitted from the source, the implementation handling timeout companions cancels the current timeout companion and loses its reference. There is always at most one reference to a companion.

The queue is used by the companion wrapper class (SampleTimeoutOther). Upon creation, the wrapper gets a reference to the SampleTimeoutMain (the upstream subscriber) and when the cancellation races with the timeout signal, only then the queue is used. Therefore, the maxConcurrency is used to limit the amount of simultaneous timeouts of already canceled companions and the actual current valid one. Picking this value as too low might mean a valid sample is missed on occasions when cancellations are not triggering fast enough and at the same time multiple stale items time out together with a valid most recent one. There is a check if the wrapper in the queue has the index of the actual last provided item from the source.

Now to the actual raised points:

  1. The FIXME is meant for the contributors, not the users. It is a comment that is not visible when viewing the documentation for this operator. It should not confuse the users unless they dive into reading the code. We can certainly improve this situation, let's focus on that with further clarifications what is expected.
  2. I don't think we have tests for this case, but I think you are correct, the discard support for the items for which companions are being canceled do not seem to implement any logic for discarding the actual item. Surprisingly, the racing stale items implementation actually uses the discard hook for the actual value type. Perhaps the implementation is missing that instead of relying on the cancel hook to implement discarding, especially combined with the below point and the queue handling.
  3. I can confirm this is actually happening:
	@Test
	void deliverAndCancel() {
		Sinks.Many<Integer> source = Sinks.unsafe().many().multicast().directBestEffort();
		Sinks.Many<Integer> companion = Sinks.unsafe().many().multicast().directBestEffort();

		AssertSubscriber<Integer> ts = AssertSubscriber.create();

		AtomicBoolean canceled = new AtomicBoolean(false);

		source.asFlux()
		   .sampleTimeout(v -> companion.asFlux().doOnCancel(() -> canceled.set(true)))
		      .subscribe(ts);

		source.emitNext(1, FAIL_FAST);
		companion.emitNext(1, FAIL_FAST);
		source.emitComplete(FAIL_FAST);

		ts.assertValues(1).assertNoError().assertComplete();
		assertThat(canceled.get()).isFalse();
	}

However, it doesn't look incorrect per se, it does make sense to cancel the timeout Publisher. The assumption that upon cancellation the item is discarded might not hold true though.

  1. I disagree here. The requirement that the value is delivered unless any other item appears within the timeout makes the timing aspect unnecessary upon source completion - we are certain no value will be delivered from the source during the timeout so the requirement is satisfied for the delivery.
  2. Upon subscription, the source gets an unlimited demand, while the downstream might have zero demand. The upstream demand should be initiated only when downstream is able to consume anything. It also looks as if there is no backpressure support in this operator, do I get this right?

Please let me know if I got this right and we can decide what to do about these.

@chemicL chemicL added status/need-user-input This needs user input to proceed and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Jan 25, 2024
@MikkelHJuul
Copy link
Author

hi @chemicL thanks for your help

I'm not entirely sure what you are asking me to provide here on all points. But I will try

  1. you are right, I just read from the sources, sorry, I sometimes come out too negative
  2. I did test that if you had too many incoming items the queue just silently drop them
  3. I don't know what to reply here, I guess as you note either case is fine, if it is cancelled or not when the source completes
  4. I'm not that concerned about that specific situation anyway, I can see how completion would instantly emit is fine.
  5. yes, there is no backpressure, but there is a sort of caching

I just noted another thing, in the test above, I didn't explicitly state that the subscriber was not completed, and I just checked it; it is not. So if you subscribe to a "completed" FluxSampleTimeout and do not request in the onSubscribe, the subscription will hang infinitely, since no signals can ever end/complete the subscription; why it should be either emit+complete on late requests, or complete instantly without emitting.

I'm still not entirely sure the queue is needed, and I don't really understand the race you are describing (but it is several weeks since I looked at the code)

please let me know if there is anything else you need me to answer, or discuss

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-user-input This needs user input to proceed
Projects
None yet
Development

No branches or pull requests

3 participants