Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BufferTimeout with fair backpressure rework #3634

Open
wants to merge 12 commits into
base: 3.5.x
Choose a base branch
from
Open
Expand Up @@ -18,10 +18,12 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
Expand All @@ -30,23 +32,28 @@
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LLL_Result;
import org.openjdk.jcstress.infra.results.LL_Result;
import reactor.core.util.FastLogger;
import reactor.test.scheduler.VirtualTimeScheduler;

import static java.util.Collections.emptyList;

public class FluxBufferTimeoutStressTest {

@JCStressTest
@Outcome(id = "1, 1, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "2, 1, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "1, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "2, 1", expect = Expect.ACCEPTABLE, desc = "")
@State
public static class FluxBufferTimeoutStressTestRaceDeliveryAndTimeout {

final FastLogger fastLogger = new FastLogger(getClass().getName());

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();

final StressSubscriber<List<Long>> subscriber = new StressSubscriber<>();

final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Long, List<Long>> bufferTimeoutSubscriber =
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null);
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger);

final StressSubscription<Long> subscription = new StressSubscription<>(bufferTimeoutSubscriber);

Expand All @@ -67,35 +74,39 @@ public void timeout() {
}

@Arbiter
public void arbiter(LLL_Result result) {
public void arbiter(LL_Result result) {
result.r1 = subscriber.onNextCalls.get();
result.r2 = subscriber.onCompleteCalls.get();
result.r3 = subscription.requestsCount.get();
result.r2 = subscription.requestsCount.get();

if (subscriber.onCompleteCalls.get() > 1) {
throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get());
if (subscriber.onCompleteCalls.get() != 1) {
fail(fastLogger,
"unexpected completion count " + subscriber.onCompleteCalls.get());
}
if (subscriber.concurrentOnComplete.get()) {
throw new IllegalStateException("subscriber concurrent onComplete");
fail(fastLogger, "subscriber concurrent onComplete");
}
if (subscriber.concurrentOnNext.get()) {
throw new IllegalStateException("subscriber concurrent onNext");
fail(fastLogger, "subscriber concurrent onNext");
}
if (!subscriber.discardedValues.isEmpty()) {
fail(fastLogger, "Unexpected discarded values " + subscriber.discardedValues);
}
}
}

@JCStressTest
@Outcome(id = "3, 1, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "4, 1, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "5, 1, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "3, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "4, 1", expect = Expect.ACCEPTABLE, desc = "")
@Outcome(id = "5, 1", expect = Expect.ACCEPTABLE, desc = "")
@State
public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeouts {

final FastLogger fastLogger = new FastLogger(getClass().getName());

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();

final StressSubscriber<List<Long>> subscriber = new StressSubscriber<>();

final FastLogger fastLogger = new FastLogger(getClass().getName());
final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Long, List<Long>> bufferTimeoutSubscriber =
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger);

Expand Down Expand Up @@ -126,22 +137,25 @@ public void timeout() {
}

@Arbiter
public void arbiter(LLL_Result result) {
public void arbiter(LL_Result result) {
result.r1 = subscriber.onNextCalls.get();
result.r2 = subscriber.onCompleteCalls.get();
result.r3 = subscription.requestsCount.get();
result.r2 = subscription.requestsCount.get();

if (subscriber.onCompleteCalls.get() != 1) {
throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get());
fail(fastLogger, "unexpected completion: " + subscriber.onCompleteCalls.get());
}
if (subscriber.concurrentOnComplete.get()) {
throw new IllegalStateException("subscriber concurrent onComplete");
fail(fastLogger, "subscriber concurrent onComplete");
}
if (subscriber.concurrentOnNext.get()) {
throw new IllegalStateException("subscriber concurrent onNext");
fail(fastLogger, "subscriber concurrent onNext");
}
if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) {
throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger);
if (!subscriber.discardedValues.isEmpty()) {
fail(fastLogger, "Unexpected discarded values " + subscriber.discardedValues);
}
if (!allValuesHandled(fastLogger, 5, emptyList(),
subscriber.receivedValues)) {
fail(fastLogger, "not all values delivered; result=" + result);
}
}
}
Expand All @@ -158,16 +172,19 @@ public void arbiter(LLL_Result result) {
@State
public static class FluxBufferTimeoutStressTestRaceDeliveryAndMoreTimeoutsPossiblyIncomplete {

final FastLogger fastLogger = new FastLogger(getClass().getName());

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();

final StressSubscriber<List<Long>> subscriber = new StressSubscriber<>(1);

final FastLogger fastLogger = new FastLogger(getClass().getName());
final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Long, List<Long>> bufferTimeoutSubscriber =
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger);

Sinks.Many<Long> proxy = Sinks.unsafe().many().unicast().onBackpressureBuffer();

final AtomicLong requested = new AtomicLong();

{
proxy.asFlux()
.doOnRequest(r -> requested.incrementAndGet())
Expand Down Expand Up @@ -213,29 +230,34 @@ public void arbiter(LLL_Result result) {
result.r2 = subscriber.onCompleteCalls.get();
result.r3 = requested.get();

if (!allValuesHandled(fastLogger, 4, emptyList(), subscriber.receivedValues)) {
fail(fastLogger, "minimum set of values not delivered");
}

if (subscriber.onCompleteCalls.get() == 0) {
if (subscriber.receivedValues.stream()
.noneMatch(buf -> buf.size() == 1)) {
throw new IllegalStateException("incomplete but received all two " +
"element buffers. received: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger);
if (subscriber.receivedValues.size() == 5 &&
subscriber.receivedValues.stream().noneMatch(buf -> buf.size() == 1)) {
fail(fastLogger, "incomplete but delivered all two " +
"element buffers. received: " + subscriber.receivedValues + "; result=" + result);
}
}

// TODO #onNext < 5 and incomplete => why fail?
if (subscriber.onNextCalls.get() < 5 && subscriber.onCompleteCalls.get() == 0) {
throw new IllegalStateException("incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result + "\n" + fastLogger);
fail(fastLogger, "incomplete. received: " + subscriber.receivedValues + "; requested=" + requested.get() + "; result=" + result);
}

if (subscriber.onCompleteCalls.get() > 1) {
throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get());
fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get());
}
if (subscriber.concurrentOnComplete.get()) {
throw new IllegalStateException("subscriber concurrent onComplete");
fail(fastLogger, "subscriber concurrent onComplete");
}
if (subscriber.concurrentOnNext.get()) {
throw new IllegalStateException("subscriber concurrent onNext");
fail(fastLogger, "subscriber concurrent onNext");
}
if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) {
throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger);
fail(fastLogger, "received an empty buffer: " + subscriber.receivedValues + "; result=" + result);
}
}
}
Expand All @@ -247,12 +269,14 @@ public void arbiter(LLL_Result result) {
@State
public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancel {

final FastLogger fastLogger = new FastLogger(getClass().getName());

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();

final StressSubscriber<List<Long>> subscriber = new StressSubscriber<>();

final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Long, List<Long>> bufferTimeoutSubscriber =
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null);
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger);

final StressSubscription<Long> subscription = new StressSubscription<>(bufferTimeoutSubscriber);

Expand All @@ -279,13 +303,16 @@ public void arbiter(LLL_Result result) {
result.r3 = subscription.requestsCount.get();

if (subscriber.onCompleteCalls.get() > 1) {
throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get());
fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get());
}
if (subscriber.concurrentOnComplete.get()) {
throw new IllegalStateException("subscriber concurrent onComplete");
fail(fastLogger, "subscriber concurrent onComplete");
}
if (subscriber.concurrentOnNext.get()) {
throw new IllegalStateException("subscriber concurrent onNext");
fail(fastLogger, "subscriber concurrent onNext");
}
if (!allValuesHandled(fastLogger, 2, subscriber.discardedValues, subscriber.receivedValues)) {
fail(fastLogger, "Not all handled!" + "; result=" + result);
}
}
}
Expand All @@ -310,6 +337,7 @@ public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelWithBackpres
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger);

Sinks.Many<Long> proxy = Sinks.unsafe().many().unicast().onBackpressureBuffer();
AtomicLong emits = new AtomicLong();
final AtomicLong requested = new AtomicLong();
{
proxy.asFlux()
Expand All @@ -319,11 +347,12 @@ public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelWithBackpres

@Actor
public void next() {
proxy.tryEmitNext(0L);
proxy.tryEmitNext(1L);

proxy.tryEmitNext(2L);
proxy.tryEmitNext(3L);
for (long i = 0; i < 4; i++) {
if (proxy.tryEmitNext(i) != Sinks.EmitResult.OK) {
return;
}
emits.set(i + 1);
}
proxy.tryEmitComplete();
}

Expand All @@ -344,16 +373,26 @@ public void arbiter(LLL_Result result) {
result.r3 = requested.get();

if (subscriber.onCompleteCalls.get() > 1) {
throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get());
fail(fastLogger, "unexpected completion " + subscriber.onCompleteCalls.get());
}
if (subscriber.concurrentOnComplete.get()) {
throw new IllegalStateException("subscriber concurrent onComplete");
fail(fastLogger, "subscriber concurrent onComplete");
}
if (subscriber.concurrentOnNext.get()) {
throw new IllegalStateException("subscriber concurrent onNext");
fail(fastLogger, "subscriber concurrent onNext");
}
if (subscriber.receivedValues.stream().anyMatch(List::isEmpty)) {
throw new IllegalStateException("received an empty buffer: " + subscriber.receivedValues + "; result=" + result + "\n" + fastLogger);
int emits = (int) this.emits.get();
if (subscriber.onCompleteCalls.get() == 1 && !allValuesHandled(fastLogger, 4,
emptyList(),
subscriber.receivedValues)) {
fail(fastLogger,
"Completed but not all values handled!" + "; result=" + result);
}

if (subscriber.onNextCalls.get() > 0 && !allValuesHandled(fastLogger, emits,
subscriber.discardedValues,
subscriber.receivedValues)) {
fail(fastLogger, "Not all " + emits + " emits handled!" + "; result=" + result);
}
}
}
Expand All @@ -367,12 +406,14 @@ public void arbiter(LLL_Result result) {
@State
public static class FluxBufferTimeoutStressTestRaceDeliveryAndCancelAndTimeout {

final FastLogger fastLogger = new FastLogger(getClass().getName());

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();

final StressSubscriber<List<Long>> subscriber = new StressSubscriber<>();

final FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<Long, List<Long>> bufferTimeoutSubscriber =
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), null);
new FluxBufferTimeout.BufferTimeoutWithBackpressureSubscriber<>(subscriber, 2, 1, TimeUnit.SECONDS, virtualTimeScheduler.createWorker(), bufferSupplier(), fastLogger);

final StressSubscription<Long> subscription = new StressSubscription<>(bufferTimeoutSubscriber);

Expand Down Expand Up @@ -404,17 +445,71 @@ public void arbiter(LLL_Result result) {
result.r3 = subscription.requestsCount.get();

if (subscriber.onCompleteCalls.get() > 1) {
throw new IllegalStateException("unexpected completion " + subscriber.onCompleteCalls.get());
fail(fastLogger,
"unexpected completion " + subscriber.onCompleteCalls.get());
}
if (subscriber.concurrentOnComplete.get()) {
throw new IllegalStateException("subscriber concurrent onComplete");
fail(fastLogger, "subscriber concurrent onComplete");
}
if (subscriber.concurrentOnNext.get()) {
throw new IllegalStateException("subscriber concurrent onNext");
fail(fastLogger, "subscriber concurrent onNext");
}

if (!allValuesHandled(fastLogger, 2, subscriber.discardedValues, subscriber.receivedValues)) {
fail(fastLogger, "Not all handled!" + "; result=" + result);
}
}
}

private static void fail(FastLogger fastLogger, String msg) {
throw new IllegalStateException(msg + "\n" + fastLogger);
}

private static boolean allValuesHandled(FastLogger logger, int range, List<Object> discarded, List<List<Long>>... delivered) {
if (delivered.length == 0) {
return false;
}

List<Long> discardedValues = discarded.stream()
.map(o -> (Long) o)
.collect(Collectors.toList());

logger.trace("discarded: " + discardedValues);
logger.trace("delivered: " + Arrays.toString(delivered));

boolean[] search = new boolean[range];
for (long l : discardedValues) {
search[(int) l] = true;
}

List<List<Long>> all =
Arrays.stream(delivered)
.flatMap(lists -> lists.stream())
.collect(Collectors.toList());

for (List<Long> buf : all) {
if (buf.isEmpty()) {
fail(logger, "Received empty buffer!");
}
for (long l : buf) {
if (l >= range) {
// just check within the range
continue;
}
if (search[(int) l]) {
fail(logger, "Duplicate value (both discarded " +
"and delivered, or duplicated in multiple buffers)");
}
search[(int) l] = true;
}
}
for (boolean b : search) {
if (!b) {
return false;
}
}
return true;
}
private static Supplier<List<Long>> bufferSupplier() {
return ArrayList::new;
}
Expand Down