Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/3340 Add best-effort replay sink #3798

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,68 @@
package reactor.core.publisher;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.I_Result;


import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE_INTERESTING;

public class SinksManyReplayLatestStressTest {

@JCStressTest
@Outcome(id = {"6"}, expect = ACCEPTABLE, desc = "all signals go through")
@Outcome( expect = ACCEPTABLE_INTERESTING, desc = "Signals are lost before replay")
@State
public static class FluxReplaySizeBoundWriteStressTest {
final StressSubscriber<String> target = new StressSubscriber<>();

final SinkManyReplayProcessor<String> sink = new SinkManyReplayProcessor<>(
new FluxReplay.ArraySizeBoundReplayBuffer<>(1)
);

public FluxReplaySizeBoundWriteStressTest() {
// subscribe before start
sink.subscribe(target);
}


@Actor
public void one() {
sink.tryEmitNext("Hello");
}

@Actor
public void two() {
sink.tryEmitNext("Hello");
}

@Actor
public void three() {
sink.tryEmitNext("Hello");
}

@Actor
public void four() {
sink.tryEmitNext("Hello");
}

@Actor
public void five() {
sink.tryEmitNext("Hello");
}

@Actor
public void six() {
sink.tryEmitNext("Hello");
}

@Arbiter
public void arbiter(I_Result r) {
r.r1 = target.onNextCalls.get();
}
}
}
252 changes: 251 additions & 1 deletion reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Expand Up @@ -1068,6 +1068,256 @@ public int size() {
}
}

static final class ArraySizeBoundReplayBuffer<T> implements ReplayBuffer<T> {

private static final class StampedBuffer {
final int index;
final Object[] arr;

private StampedBuffer(int index, Object[] arr) {
this.index = index;
this.arr = arr;
}
}

volatile StampedBuffer buffer;

@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<ArraySizeBoundReplayBuffer, StampedBuffer> BUFF =
AtomicReferenceFieldUpdater.newUpdater(ArraySizeBoundReplayBuffer.class, StampedBuffer.class, "buffer");

final int indexUpdateLimit;
final int cap;

volatile boolean done;
Throwable error;

ArraySizeBoundReplayBuffer(int size) {
if (size < 0) {
throw new IllegalArgumentException("Limit cannot be negative");
}
this.buffer = new StampedBuffer(0, new Object[size]);
this.indexUpdateLimit = Operators.unboundedOrLimit(size);
this.cap = size;
}


@Override
public void add(T value) {
StampedBuffer buff, next;
do {
buff = BUFF.get(this);
next = new StampedBuffer(buff.index + 1, new Object[cap]);
System.arraycopy(buff.arr, 1, next.arr, 0, cap-1);
next.arr[cap-1] = value;
} while (!BUFF.compareAndSet(this, buff, next));
}

@Override
public void onError(Throwable ex) {
error = ex;
done = true;
}

@Override
public Throwable getError() {
return error;
}

@Override
public void onComplete() {
done = true;
}

void replayNormal(ReplaySubscription<T> rs) {
final Subscriber<? super T> a = rs.actual();

int missed = 1;
for (; ; ) {

long r = rs.requested();
long e = 0L;

StampedBuffer curr = BUFF.get(this);
int nodeIdx = rs.index();
for (int i = minIndex(nodeIdx, curr); i < cap; i++) {

if (rs.isCancelled()) {
clear(rs);
return;
}

if (e == r) {
break; // lack of request
}

@SuppressWarnings("unchecked") T next = (T) curr.arr[i];
a.onNext(next);

e++;
int logicalNextIdx = curr.index - (cap - i) + 1;
rs.index(logicalNextIdx);

if ((logicalNextIdx) % indexUpdateLimit == 0) {
rs.requestMore(logicalNextIdx);
}
}

if (done && isEmpty(rs)) {
clear(rs);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
return;
}

if (e == r) {
if (rs.isCancelled()) {
rs.index(0);
return;
}

if (done && isEmpty(rs)) {
clear(rs);
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
return;
}
}

if (e != 0L) {
if (r != Long.MAX_VALUE) {
rs.produced(e);
}
}

missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}

int minIndex(int rsNodeIdx, StampedBuffer buf) {
if (rsNodeIdx > buf.index) {
return cap;
}
return Math.max(cap-(buf.index - rsNodeIdx), 0);
}

void replayFused(ReplaySubscription<T> rs) {
int missed = 1;

final Subscriber<? super T> a = rs.actual();

for (; ; ) {

if (rs.isCancelled()) {
clear(rs);
return;
}

boolean d = done;

a.onNext(null);

if (d) {
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
return;
}

missed = rs.leave(missed);
if (missed == 0) {
break;
}
}
}

@Override
public void replay(ReplaySubscription<T> rs) {
if (!rs.enter()) {
return;
}

if (rs.fusionMode() == NONE) {
replayNormal(rs);
} else {
replayFused(rs);
}
}

@Override
public boolean isDone() {
return done;
}

@Override
public T poll(ReplaySubscription<T> rs) {
StampedBuffer curr = BUFF.get(this);
int nodeIdx = rs.index();

int idx = minIndex(nodeIdx, curr);
if (idx == cap) {
return null;
}
@SuppressWarnings("unchecked") T next = (T) curr.arr[idx];

int logicalNextIdx = curr.index - (cap - idx) + 1;
rs.index(logicalNextIdx);

if ((logicalNextIdx) % indexUpdateLimit == 0) {
rs.requestMore(logicalNextIdx);
}

return next;
}

@Override
public void clear(ReplaySubscription<T> rs) {
rs.index(0);
}

@Override
public boolean isEmpty(ReplaySubscription<T> rs) {
int nodeIdx = rs.index();
StampedBuffer curr = BUFF.get(this);
return nodeIdx == curr.index;
}

@Override
public int size(ReplaySubscription<T> rs) {
return Math.min(capacity(), BUFF.get(this).index - rs.index());
}

@Override
public int size() {
return Math.min(capacity(), BUFF.get(this).index);
}

@Override
public int capacity() {
return cap;
}

@Override
public boolean isExpired() {
return false;
}
}


FluxReplay(CorePublisher<T> source,
int history,
long ttl,
Expand Down Expand Up @@ -1209,7 +1459,7 @@ public final CorePublisher<? extends T> source() {

@Override
@Nullable
public Object scanUnsafe(Scannable.Attr key) {
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return getPrefetch();
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_ON) return scheduler;
Expand Down
Expand Up @@ -142,6 +142,30 @@ static <E> SinkManyReplayProcessor<E> create(int historySize, boolean unbounded)
return new SinkManyReplayProcessor<>(buffer);
}


/**
* Creates a {@link SinkManyReplayProcessor} with a buffer based on the {@link FluxReplay.ArraySizeBoundReplayBuffer}
*
* This buffer is similar to the {@link FluxReplay.SizeBoundReplayBuffer}. But is implemented as an atomically replaced array,
* instead of an atomic linked list. The atomic linked list will always be able to replay all items
* no matter the limit, to a subscriber that lack request, and as such lack of request may lead to out of memory issues.
* The {@link FluxReplay.ArraySizeBoundReplayBuffer} is deterministically sized and will only ever be able to
* replay {@param historySize} items; subscribers that lack request will therefore miss any
* items that escaped the cache in the time that they lacked request.
* <br>
* <strong>A note on concurrency:</strong> the buffer is safe for concurrent use, but a small buffer (like the {@link Sinks.MulticastReplayBestEffortSpec#latest() Sinks.unsafe().many().replay().bestEffort().latest()})
* can miss elements as the items can roll over before being replayed. The Atomically <i>latest</i> item added this way will always be replayed.
*
*
* @param historySize the number of items to keep in the buffer
* @return {@link SinkManyReplayProcessor}
* @param <E> the type of pushed elements
*/
static <E> SinkManyReplayProcessor<E> createArrayBounded(int historySize) {
FluxReplay.ReplayBuffer<E> buffer = new FluxReplay.ArraySizeBoundReplayBuffer<>(historySize);
return new SinkManyReplayProcessor<>(buffer);
}

/**
* Creates a time-bounded replay processor.
* <p>
Expand Down