Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes incorrect value/error set during tryEmitXXX call #3260

Merged
merged 1 commit into from Nov 3, 2022

Conversation

OlegDokuka
Copy link
Contributor

closes #3257

Signed-off-by: Oleh Dokuka odokuka@vmware.com
Signed-off-by: OlegDokuka odokuka@vmware.com

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
@OlegDokuka OlegDokuka requested a review from a team as a code owner October 30, 2022 11:55
@OlegDokuka OlegDokuka added the type/bug A general bug label Oct 30, 2022
}
return EmitResult.OK;
}
this.value = value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a question for my own understanding: Does the stashing of this value for future subscribers rely on the fact that the 'safe' entry point to this sink will be wrapped in a serialized sink? In other words, is it OK for the below test to fail?

@RepeatedTest(5)
void test() throws InterruptedException {
  final SinkOneMulticast < String > sink = new SinkOneMulticast < > ();
  final AtomicReference < String > successfullyEmittedElement = new AtomicReference < > ();
  final AtomicReference < String > firstSubscriptionOnNext = new AtomicReference < > ();
  final CountDownLatch firstSubscriptionLatch = new CountDownLatch(1);

  // first subscription before element is emitted through sink
  sink.asMono()
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(a -> {
      firstSubscriptionOnNext.compareAndSet(null, a);
      firstSubscriptionLatch.countDown();
    });

  // two competing tryEmitValue calls -- I assume this is where I break the spec?
  RaceTestUtils.race(
    () -> {
      if (sink.tryEmitValue("foo").isSuccess()) {
        if (!successfullyEmittedElement.compareAndSet(null, "foo")) {
          fail("Expected only a single successful emit");
        }
      }
    },
    () -> {
      if (sink.tryEmitValue("bar").isSuccess()) {
        if (!successfullyEmittedElement.compareAndSet(null, "bar")) {
          fail("Expected only a single successful emit");
        }
      }
    }
  );

  firstSubscriptionLatch.await();
  assertThat(firstSubscriptionOnNext.get()).isEqualTo(successfullyEmittedElement.get());

  // second subscription *after* sink is 'terminated'
  final String secondSubscriptionOnNext = sink.asMono().block();
  assertThat(secondSubscriptionOnNext).isEqualTo(successfullyEmittedElement.get());

}

I assume it is my responsibility in this case to not break the spec by racing two tryEmitValue calls, but just want to double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely correct. It is only possible to get there if you use Sinks.unsafe() spec which implies more advanced capabilities but unsafe

@OlegDokuka OlegDokuka changed the title fixes incorrect values/error set fixes incorrect value/error set during tryEmitXXX call Nov 3, 2022
@OlegDokuka OlegDokuka merged commit 849be9a into 3.4.x Nov 3, 2022
@OlegDokuka OlegDokuka deleted the bugfix/3257-sink branch November 3, 2022 09:14
@reactorbot
Copy link

@OlegDokuka this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to main 🙇

OlegDokuka added a commit that referenced this pull request Nov 3, 2022
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
if (prevSubscribers == TERMINATED) {
Inner<T>[] prevSubscribers = this.subscribers;

if (isTerminated(prevSubscribers)) {
return EmitResult.FAIL_TERMINATED;
}

error = cause;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a race now by which two threads write the value in the following order T1: v1, T2: v2; then T1 manages to win the CAS below, while value is set to v2? The previous behaviour prevented such an interleaving as the CAS happened first, although the publication was a problem, but now I'm uncertain about consistency, can you explain whether it's impossible?

Copy link
Contributor Author

@OlegDokuka OlegDokuka Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the goal of this implementation to cover such cases. Usafe Sink implementation is not covering concurrent invocation of the tryEmit methods

@simonbasle simonbasle added this to the 3.4.25 milestone Nov 3, 2022
@simonbasle simonbasle linked an issue Nov 3, 2022 that may be closed by this pull request
@simonbasle simonbasle linked an issue Nov 17, 2022 that may be closed by this pull request
chemicL pushed a commit that referenced this pull request Mar 7, 2023
This commit ensures non-volatile write is available via the happens-before rule. 
The fix assumes serial invocation of all the `tryEmitXXX` methods and does not guarantee correct behavior if this requirement is violated (the safe impl is wrapped by SerialSink and the real impl is available only via unsafe API)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A Mono created from a Sinks.one() sporadically turns empty
5 participants