Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes incorrect value/error set during tryEmitXXX call #3260

Merged
merged 1 commit into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LLL_Result;
import org.openjdk.jcstress.infra.results.LL_Result;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

public class SinkOneStressTest {

@JCStressTest
@Outcome(id = {"foo, 1"}, expect = ACCEPTABLE, desc = "value delivered")
@State
public static class EmitValueSubscribeStressTest extends SinkOneStressTest {

final SinkOneMulticast<String> sink = new SinkOneMulticast<>();
final StressSubscriber<String> subscriber = new StressSubscriber<>();

@Actor
public void value() {
sink.tryEmitValue("foo");
sink.tryEmitEmpty();
sink.tryEmitError(new RuntimeException());
}

@Actor
public void subscriber() {
sink.asMono().subscribe(subscriber);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = subscriber.onNextCalls.get() == 1 ? subscriber.receivedValues.get(0) : null;
r.r2 = subscriber.onCompleteCalls.get();
}
}

@JCStressTest
@Outcome(id = {"null, 1"}, expect = ACCEPTABLE, desc = "value delivered")
@State
public static class EmitEmptySubscribeStressTest extends SinkOneStressTest {

final SinkOneMulticast<String> sink = new SinkOneMulticast<>();
final StressSubscriber<String> subscriber = new StressSubscriber<>();

@Actor
public void value() {
sink.tryEmitEmpty();
sink.tryEmitValue("foo");
sink.tryEmitError(new RuntimeException());
}

@Actor
public void subscriber() {
sink.asMono().subscribe(subscriber);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = subscriber.onNextCalls.get() == 1 ? subscriber.receivedValues.get(0) : null;
r.r2 = subscriber.onCompleteCalls.get();
}
}

@JCStressTest
@Outcome(id = {"null, 0, boo"}, expect = ACCEPTABLE, desc = "value delivered")
@State
public static class EmitErrorSubscribeStressTest extends SinkOneStressTest {

final SinkOneMulticast<String> sink = new SinkOneMulticast<>();
final StressSubscriber<String> subscriber = new StressSubscriber<>();

@Actor
public void value() {
sink.tryEmitError(new RuntimeException("boo"));
sink.tryEmitEmpty();
sink.tryEmitValue("foo");
}

@Actor
public void subscriber() {
sink.asMono().subscribe(subscriber);
}

@Arbiter
public void arbiter(LLL_Result r) {
r.r1 = subscriber.onNextCalls.get() == 1 ? subscriber.receivedValues.get(0) : null;
r.r2 = subscriber.onCompleteCalls.get();
r.r3 = subscriber.onErrorCalls.get() == 1 ? subscriber.error.getMessage() : null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,9 +37,14 @@ class SinkEmptyMulticast<T> extends Mono<T> implements InternalEmptySink<T> {

@SuppressWarnings("rawtypes")
static final Inner[] EMPTY = new Inner[0];

@SuppressWarnings("rawtypes")
static final Inner[] TERMINATED = new Inner[0];
static final Inner[] TERMINATED_EMPTY = new Inner[0];
@SuppressWarnings("rawtypes")
static final Inner[] TERMINATED_ERROR = new Inner[0];

static final int STATE_ADDED = 0;
static final int STATE_ERROR = -1;
static final int STATE_EMPTY = -2;

@Nullable
Throwable error;
Expand All @@ -58,12 +63,23 @@ public Mono<T> asMono() {
return this;
}

boolean isTerminated(Inner<?>[] array) {
return array == TERMINATED_EMPTY || array == TERMINATED_ERROR;
}

@Override
public EmitResult tryEmitEmpty() {
Inner<?>[] array = SUBSCRIBERS.getAndSet(this, TERMINATED);
Inner<T>[] array;
for (;;) {
array = this.subscribers;

if (isTerminated(array)) {
return EmitResult.FAIL_TERMINATED;
}

if (array == TERMINATED) {
return Sinks.EmitResult.FAIL_TERMINATED;
if (SUBSCRIBERS.compareAndSet(this, array, TERMINATED_EMPTY)) {
break;
}
}

for (Inner<?> as : array) {
Expand All @@ -77,23 +93,36 @@ public EmitResult tryEmitEmpty() {
public EmitResult tryEmitError(Throwable cause) {
Objects.requireNonNull(cause, "onError cannot be null");

Inner<T>[] prevSubscribers = SUBSCRIBERS.getAndSet(this, TERMINATED);
if (prevSubscribers == TERMINATED) {
Inner<T>[] prevSubscribers = this.subscribers;

if (isTerminated(prevSubscribers)) {
return EmitResult.FAIL_TERMINATED;
}

error = cause;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a race now by which two threads write the value in the following order T1: v1, T2: v2; then T1 manages to win the CAS below, while value is set to v2? The previous behaviour prevented such an interleaving as the CAS happened first, although the publication was a problem, but now I'm uncertain about consistency, can you explain whether it's impossible?

Copy link
Contributor Author

@OlegDokuka OlegDokuka Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the goal of this implementation to cover such cases. Usafe Sink implementation is not covering concurrent invocation of the tryEmit methods


for (;;) {
if (SUBSCRIBERS.compareAndSet(this, prevSubscribers, TERMINATED_ERROR)) {
break;
}

prevSubscribers = this.subscribers;
if (isTerminated(prevSubscribers)) {
return EmitResult.FAIL_TERMINATED;
}
}

for (Inner<T> as : prevSubscribers) {
as.error(cause);
}

return EmitResult.OK;
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return subscribers == TERMINATED;
if (key == Attr.ERROR) return error;
if (key == Attr.TERMINATED) return isTerminated(subscribers);
if (key == Attr.ERROR) return subscribers == TERMINATED_ERROR ? error : null;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
Expand All @@ -104,12 +133,16 @@ public Context currentContext() {
return Operators.multiSubscribersContext(subscribers);
}

boolean add(Inner<T> ps) {
int add(Inner<T> ps) {
for (; ; ) {
Inner<T>[] a = subscribers;

if (a == TERMINATED) {
return false;
if (a == TERMINATED_EMPTY) {
return STATE_EMPTY;
}

if (a == TERMINATED_ERROR) {
return STATE_ERROR;
}

int n = a.length;
Expand All @@ -118,7 +151,7 @@ boolean add(Inner<T> ps) {
b[n] = ps;

if (SUBSCRIBERS.compareAndSet(this, a, b)) {
return true;
return STATE_ADDED;
}
}
}
Expand Down Expand Up @@ -165,20 +198,21 @@ void remove(Inner<T> ps) {
public void subscribe(final CoreSubscriber<? super T> actual) {
Inner<T> as = new VoidInner<>(actual, this);
actual.onSubscribe(as);
if (add(as)) {
final int addedState = add(as);
if (addedState == STATE_ADDED) {
if (as.isCancelled()) {
remove(as);
}
}
else {
else if (addedState == STATE_ERROR) {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
}
else {
as.complete();
}

actual.onError(ex);
}
else {
as.complete();
}

}

@Override
Expand Down