Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issues/3340] bugfix concurrency issues FluxReplay.Size/SizeAndTime #3714

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,239 @@
package reactor.core.publisher;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.II_Result;
import org.openjdk.jcstress.infra.results.I_Result;

import java.util.concurrent.atomic.AtomicInteger;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE_INTERESTING;
import static org.openjdk.jcstress.annotations.Expect.FORBIDDEN;

public class SinksManyReplayLatestStressTest {
final StressSubscriber<String> target = new StressSubscriber<>();

final SinkManyReplayProcessor<String> sink = new SinkManyReplayProcessor<>(
new FluxReplay.SizeBoundReplayBuffer<>(1)
);

@JCStressTest
@Outcome(id = {"1"}, expect = ACCEPTABLE, desc = "single emission")
@Outcome(id = {"2"}, expect = ACCEPTABLE, desc = "two emissions")
@State
public static class TryEmitNextStressTest extends SinksManyReplayLatestStressTest {

@Actor
public void first() {
sink.tryEmitNext("Hello");
}

@Actor
public void second() {
sink.tryEmitNext("Hello");
}

@Actor
public void subscriber() {
sink.subscribe(target);
}

@Arbiter
public void arbiter(I_Result r) {
r.r1 = target.onNextCalls.get();
}
}

@JCStressTest
@Outcome(id = {"6, 63"}, expect = ACCEPTABLE, desc = "six subscribers, 64-1 result")
@State
public static class SubscriberCountStressTest extends SinksManyReplayLatestStressTest {

AtomicInteger count = new AtomicInteger();

@Actor
public void first() {
sink.tryEmitNext("Hello");
}

@Actor
public void one() {
sink.subscribe(s -> count.addAndGet(1));
}

@Actor
public void two() {
sink.subscribe(s -> count.addAndGet(2));
}

@Actor
public void three() {
sink.subscribe(s -> count.addAndGet(4));
}

@Actor
public void four() {
sink.subscribe(s -> count.addAndGet(8));
}

@Actor
public void five() {
sink.subscribe(s -> count.addAndGet(16));
}

@Actor
public void six() {
sink.subscribe(s -> count.addAndGet(32));
}

@Arbiter
public void arbiter(II_Result r) {
r.r1 = sink.currentSubscriberCount();
r.r2 = count.get();
}
}


@JCStressTest
@Outcome(id = {"0, 0"}, expect = ACCEPTABLE, desc = "complete first")
@Outcome(id = {"0, 1"}, expect = ACCEPTABLE, desc = "subscriber 1 before complete")
@Outcome(id = {"0, 2"}, expect = ACCEPTABLE, desc = "subscriber 2 before complete")
@Outcome(id = {"0, 3"}, expect = ACCEPTABLE, desc = "both subscribe before complete")
@State
public static class SubscriberCountCompleteStressTest extends SinksManyReplayLatestStressTest {

AtomicInteger count = new AtomicInteger();

@Actor
public void first() {
sink.tryEmitNext("Hello");
}

@Actor
public void completer() {
sink.tryEmitComplete();
}


@Actor
public void one() {
sink.subscribe(s -> count.addAndGet(1));
}

@Actor
public void two() {
sink.subscribe(s -> count.addAndGet(2));
}

@Arbiter
public void arbiter(II_Result r) {
r.r1 = sink.currentSubscriberCount();
r.r2 = count.get();
}
}

@JCStressTest
@Outcome(id = {"7"}, expect = ACCEPTABLE, desc = "all signals go through")
@State
public static class FluxReplayLatestWriteStressTest extends SinksManyReplayLatestStressTest {
public FluxReplayLatestWriteStressTest() {
// subscribe before start
sink.subscribe(target);
sink.tryEmitNext("initial");
}


@Actor
public void one() {
sink.tryEmitNext("Hello");
}

@Actor
public void two() {
sink.tryEmitNext("Hello");
}

@Actor
public void three() {
sink.tryEmitNext("Hello");
}

@Actor
public void four() {
sink.tryEmitNext("Hello");
}

@Actor
public void five() {
sink.tryEmitNext("Hello");
}

@Actor
public void six() {
sink.tryEmitNext("Hello");
}

@Arbiter
public void arbiter(I_Result r) {
r.r1 = target.onNextCalls.get();
}
}

@JCStressTest
@Outcome(id = {"6"}, expect = ACCEPTABLE, desc = "all signals go through")
@Outcome(id = {"0"}, expect = FORBIDDEN, desc = "at least one signal must go through")
@Outcome( expect = ACCEPTABLE_INTERESTING, desc = "signal lost between add and replay")
@State
public static class FluxReplaySizeBoundWriteStressTest {
final StressSubscriber<String> target = new StressSubscriber<>();

final SinkManyReplayProcessor<String> sink = new SinkManyReplayProcessor<>(
new FluxReplay.SizeBoundReplayBuffer<>(1)
);

public FluxReplaySizeBoundWriteStressTest() {
// subscribe before start
sink.subscribe(target);
}


@Actor
public void one() {
sink.tryEmitNext("Hello");
}

@Actor
public void two() {
sink.tryEmitNext("Hello");
}

@Actor
public void three() {
sink.tryEmitNext("Hello");
}

@Actor
public void four() {
sink.tryEmitNext("Hello");
}

@Actor
public void five() {
sink.tryEmitNext("Hello");
}

@Actor
public void six() {
sink.tryEmitNext("Hello");
}

@Arbiter
public void arbiter(I_Result r) {
r.r1 = target.onNextCalls.get();
}
}
}
71 changes: 51 additions & 20 deletions reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Expand Up @@ -17,6 +17,7 @@
package reactor.core.publisher;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -155,9 +156,13 @@ public String toString() {
final Scheduler scheduler;
int size;

volatile TimedNode<T> head;
final TimedNode<T> head;

TimedNode<T> tail;
volatile TimedNode<T> tail;

@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<SizeAndTimeBoundReplayBuffer, TimedNode>
TAIL = AtomicReferenceFieldUpdater.newUpdater(SizeAndTimeBoundReplayBuffer.class, TimedNode.class, "tail");

Throwable error;
static final long NOT_DONE = Long.MIN_VALUE;
Expand Down Expand Up @@ -390,7 +395,6 @@ public void clear(ReplaySubscription<T> rs) {
}

@Override
@SuppressWarnings("unchecked")
public boolean isEmpty(ReplaySubscription<T> rs) {
TimedNode<T> node = latestHead(rs);
return node.get() == null;
Expand Down Expand Up @@ -431,15 +435,26 @@ public int capacity() {

@Override
public void add(T value) {
final TimedNode<T> tail = this.tail;
final TimedNode<T> valueNode = new TimedNode<>(tail.index + 1,
value,
scheduler.now(TimeUnit.NANOSECONDS));
TimedNode<T> tail;
TimedNode<T> valueNode;
do {
tail = this.tail;
valueNode = new TimedNode<>(tail.index + 1,
value,
scheduler.now(TimeUnit.NANOSECONDS));
} while (
!TAIL.compareAndSet(this, tail, valueNode)
);
tail.set(valueNode);
this.tail = valueNode;
int s = size;
if (s == limit) {
head = head.get();
TimedNode<T> cur, next;
do {
cur = head.get();
if (cur == null)
next = null;
else next = cur.get();
} while (next != null && !head.compareAndSet(cur, next));
}
else {
size = s + 1;
Expand All @@ -450,7 +465,7 @@ public void add(T value) {
//we still want to keep the newly added value in order for the immediately following replay
//to propagate it to currently registered subscribers.
if (maxAge == 0) {
head = valueNode;
head.set(valueNode);
return;
}

Expand All @@ -470,7 +485,7 @@ public void add(T value) {
//otherwise we'd skip removal and re-walk the whole linked list on next add, retaining outdated values for nothing.
if (removed != 0) {
size = size - removed;
head = h;
head.set(h.get());
}
break;
}
Expand All @@ -481,7 +496,6 @@ public void add(T value) {
}

@Override
@SuppressWarnings("unchecked")
public void replay(ReplaySubscription<T> rs) {
if (!rs.enter()) {
return;
Expand Down Expand Up @@ -776,9 +790,12 @@ static final class SizeBoundReplayBuffer<T> implements ReplayBuffer<T> {
final int limit;
final int indexUpdateLimit;

volatile Node<T> head;
final Node<T> head;

Node<T> tail;
volatile Node<T> tail;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<SizeBoundReplayBuffer, Node>
TAIL = AtomicReferenceFieldUpdater.newUpdater(SizeBoundReplayBuffer.class, Node.class, "tail");

int size;

Expand Down Expand Up @@ -809,15 +826,29 @@ public int capacity() {

@Override
public void add(T value) {
final Node<T> tail = this.tail;
final Node<T> n = new Node<>(tail.index + 1, value);
Node<T> tail;
Node<T> n;
do {
tail = this.tail;
n = new Node<>(tail.index + 1, value);
} while (
!TAIL.compareAndSet(this, tail, n)
);
// when tail was successfully updated to 'n'
// tail.set(n) will not lose integrity.
// replaying between these lines' execution is fine
// n will replay on next call to replay, done after this method
tail.set(n);
this.tail = n;
int s = size;
if (s == limit) {
head = head.get();
}
else {
Node<T> cur, next;
do {
cur = head.get();
if (cur == null)
next = null;
else next = cur.get();
} while (next != null && !head.compareAndSet(cur, next));
} else {
size = s + 1;
}
}
Expand Down