Skip to content

Commit

Permalink
fixes incorrect value/error set during tryEmitXXX call(#3260)
Browse files Browse the repository at this point in the history
This commit ensures non-volatile write is available via the happens-before rule. 
The fix assumes serial invocation of all the `tryEmitXXX` methods and does not guarantee correct behavior if this requirement is violated (the safe impl is wrapped by SerialSink and the real impl is available only via unsafe API)
  • Loading branch information
OlegDokuka committed Nov 3, 2022
1 parent b0cc7ca commit 849be9a
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LLL_Result;
import org.openjdk.jcstress.infra.results.LL_Result;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

public class SinkOneStressTest {

@JCStressTest
@Outcome(id = {"foo, 1"}, expect = ACCEPTABLE, desc = "value delivered")
@State
public static class EmitValueSubscribeStressTest extends SinkOneStressTest {

final SinkOneMulticast<String> sink = new SinkOneMulticast<>();
final StressSubscriber<String> subscriber = new StressSubscriber<>();

@Actor
public void value() {
sink.tryEmitValue("foo");
sink.tryEmitEmpty();
sink.tryEmitError(new RuntimeException());
}

@Actor
public void subscriber() {
sink.asMono().subscribe(subscriber);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = subscriber.onNextCalls.get() == 1 ? subscriber.receivedValues.get(0) : null;
r.r2 = subscriber.onCompleteCalls.get();
}
}

@JCStressTest
@Outcome(id = {"null, 1"}, expect = ACCEPTABLE, desc = "value delivered")
@State
public static class EmitEmptySubscribeStressTest extends SinkOneStressTest {

final SinkOneMulticast<String> sink = new SinkOneMulticast<>();
final StressSubscriber<String> subscriber = new StressSubscriber<>();

@Actor
public void value() {
sink.tryEmitEmpty();
sink.tryEmitValue("foo");
sink.tryEmitError(new RuntimeException());
}

@Actor
public void subscriber() {
sink.asMono().subscribe(subscriber);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = subscriber.onNextCalls.get() == 1 ? subscriber.receivedValues.get(0) : null;
r.r2 = subscriber.onCompleteCalls.get();
}
}

@JCStressTest
@Outcome(id = {"null, 0, boo"}, expect = ACCEPTABLE, desc = "value delivered")
@State
public static class EmitErrorSubscribeStressTest extends SinkOneStressTest {

final SinkOneMulticast<String> sink = new SinkOneMulticast<>();
final StressSubscriber<String> subscriber = new StressSubscriber<>();

@Actor
public void value() {
sink.tryEmitError(new RuntimeException("boo"));
sink.tryEmitEmpty();
sink.tryEmitValue("foo");
}

@Actor
public void subscriber() {
sink.asMono().subscribe(subscriber);
}

@Arbiter
public void arbiter(LLL_Result r) {
r.r1 = subscriber.onNextCalls.get() == 1 ? subscriber.receivedValues.get(0) : null;
r.r2 = subscriber.onCompleteCalls.get();
r.r3 = subscriber.onErrorCalls.get() == 1 ? subscriber.error.getMessage() : null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,9 +37,14 @@ class SinkEmptyMulticast<T> extends Mono<T> implements InternalEmptySink<T> {

@SuppressWarnings("rawtypes")
static final Inner[] EMPTY = new Inner[0];

@SuppressWarnings("rawtypes")
static final Inner[] TERMINATED = new Inner[0];
static final Inner[] TERMINATED_EMPTY = new Inner[0];
@SuppressWarnings("rawtypes")
static final Inner[] TERMINATED_ERROR = new Inner[0];

static final int STATE_ADDED = 0;
static final int STATE_ERROR = -1;
static final int STATE_EMPTY = -2;

@Nullable
Throwable error;
Expand All @@ -58,12 +63,23 @@ public Mono<T> asMono() {
return this;
}

boolean isTerminated(Inner<?>[] array) {
return array == TERMINATED_EMPTY || array == TERMINATED_ERROR;
}

@Override
public EmitResult tryEmitEmpty() {
Inner<?>[] array = SUBSCRIBERS.getAndSet(this, TERMINATED);
Inner<T>[] array;
for (;;) {
array = this.subscribers;

if (isTerminated(array)) {
return EmitResult.FAIL_TERMINATED;
}

if (array == TERMINATED) {
return Sinks.EmitResult.FAIL_TERMINATED;
if (SUBSCRIBERS.compareAndSet(this, array, TERMINATED_EMPTY)) {
break;
}
}

for (Inner<?> as : array) {
Expand All @@ -77,23 +93,36 @@ public EmitResult tryEmitEmpty() {
public EmitResult tryEmitError(Throwable cause) {
Objects.requireNonNull(cause, "onError cannot be null");

Inner<T>[] prevSubscribers = SUBSCRIBERS.getAndSet(this, TERMINATED);
if (prevSubscribers == TERMINATED) {
Inner<T>[] prevSubscribers = this.subscribers;

if (isTerminated(prevSubscribers)) {
return EmitResult.FAIL_TERMINATED;
}

error = cause;

for (;;) {
if (SUBSCRIBERS.compareAndSet(this, prevSubscribers, TERMINATED_ERROR)) {
break;
}

prevSubscribers = this.subscribers;
if (isTerminated(prevSubscribers)) {
return EmitResult.FAIL_TERMINATED;
}
}

for (Inner<T> as : prevSubscribers) {
as.error(cause);
}

return EmitResult.OK;
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return subscribers == TERMINATED;
if (key == Attr.ERROR) return error;
if (key == Attr.TERMINATED) return isTerminated(subscribers);
if (key == Attr.ERROR) return subscribers == TERMINATED_ERROR ? error : null;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
Expand All @@ -104,12 +133,16 @@ public Context currentContext() {
return Operators.multiSubscribersContext(subscribers);
}

boolean add(Inner<T> ps) {
int add(Inner<T> ps) {
for (; ; ) {
Inner<T>[] a = subscribers;

if (a == TERMINATED) {
return false;
if (a == TERMINATED_EMPTY) {
return STATE_EMPTY;
}

if (a == TERMINATED_ERROR) {
return STATE_ERROR;
}

int n = a.length;
Expand All @@ -118,7 +151,7 @@ boolean add(Inner<T> ps) {
b[n] = ps;

if (SUBSCRIBERS.compareAndSet(this, a, b)) {
return true;
return STATE_ADDED;
}
}
}
Expand Down Expand Up @@ -165,20 +198,21 @@ void remove(Inner<T> ps) {
public void subscribe(final CoreSubscriber<? super T> actual) {
Inner<T> as = new VoidInner<>(actual, this);
actual.onSubscribe(as);
if (add(as)) {
final int addedState = add(as);
if (addedState == STATE_ADDED) {
if (as.isCancelled()) {
remove(as);
}
}
else {
else if (addedState == STATE_ERROR) {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
}
else {
as.complete();
}

actual.onError(ex);
}
else {
as.complete();
}

}

@Override
Expand Down

0 comments on commit 849be9a

Please sign in to comment.