-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixes incorrect value/error set during tryEmitXXX
call
#3260
Conversation
Signed-off-by: Oleh Dokuka <odokuka@vmware.com> Signed-off-by: OlegDokuka <odokuka@vmware.com>
} | ||
return EmitResult.OK; | ||
} | ||
this.value = value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a question for my own understanding: Does the stashing of this value for future subscribers rely on the fact that the 'safe' entry point to this sink will be wrapped in a serialized sink? In other words, is it OK for the below test to fail?
@RepeatedTest(5)
void test() throws InterruptedException {
final SinkOneMulticast < String > sink = new SinkOneMulticast < > ();
final AtomicReference < String > successfullyEmittedElement = new AtomicReference < > ();
final AtomicReference < String > firstSubscriptionOnNext = new AtomicReference < > ();
final CountDownLatch firstSubscriptionLatch = new CountDownLatch(1);
// first subscription before element is emitted through sink
sink.asMono()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(a -> {
firstSubscriptionOnNext.compareAndSet(null, a);
firstSubscriptionLatch.countDown();
});
// two competing tryEmitValue calls -- I assume this is where I break the spec?
RaceTestUtils.race(
() -> {
if (sink.tryEmitValue("foo").isSuccess()) {
if (!successfullyEmittedElement.compareAndSet(null, "foo")) {
fail("Expected only a single successful emit");
}
}
},
() -> {
if (sink.tryEmitValue("bar").isSuccess()) {
if (!successfullyEmittedElement.compareAndSet(null, "bar")) {
fail("Expected only a single successful emit");
}
}
}
);
firstSubscriptionLatch.await();
assertThat(firstSubscriptionOnNext.get()).isEqualTo(successfullyEmittedElement.get());
// second subscription *after* sink is 'terminated'
final String secondSubscriptionOnNext = sink.asMono().block();
assertThat(secondSubscriptionOnNext).isEqualTo(successfullyEmittedElement.get());
}
I assume it is my responsibility in this case to not break the spec by racing two tryEmitValue
calls, but just want to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely correct. It is only possible to get there if you use Sinks.unsafe() spec which implies more advanced capabilities but unsafe
tryEmitXXX
call
@OlegDokuka this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
Signed-off-by: Oleh Dokuka <odokuka@vmware.com> Signed-off-by: OlegDokuka <odokuka@vmware.com>
if (prevSubscribers == TERMINATED) { | ||
Inner<T>[] prevSubscribers = this.subscribers; | ||
|
||
if (isTerminated(prevSubscribers)) { | ||
return EmitResult.FAIL_TERMINATED; | ||
} | ||
|
||
error = cause; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there be a race now by which two threads write the value in the following order T1: v1, T2: v2; then T1 manages to win the CAS below, while value is set to v2? The previous behaviour prevented such an interleaving as the CAS happened first, although the publication was a problem, but now I'm uncertain about consistency, can you explain whether it's impossible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the goal of this implementation to cover such cases. Usafe Sink implementation is not covering concurrent invocation of the tryEmit methods
This commit ensures non-volatile write is available via the happens-before rule. The fix assumes serial invocation of all the `tryEmitXXX` methods and does not guarantee correct behavior if this requirement is violated (the safe impl is wrapped by SerialSink and the real impl is available only via unsafe API)
closes #3257
Signed-off-by: Oleh Dokuka odokuka@vmware.com
Signed-off-by: OlegDokuka odokuka@vmware.com