Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

investigate flaky FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout raceTest #3639

Open
OlegDokuka opened this issue Nov 13, 2023 · 5 comments · May be fixed by #3707
Open

investigate flaky FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout raceTest #3639

OlegDokuka opened this issue Nov 13, 2023 · 5 comments · May be fixed by #3707
Labels
status/need-investigation This needs more in-depth investigation type/bug A general bug type/test/flaky

Comments

@OlegDokuka
Copy link
Contributor

FluxRefCountGraceTest > raceSubscribeAndCancelNoTimeout() FAILED
    java.lang.AssertionError: [signalCount2] 
    Expecting AtomicInteger(99999) to have value:
      100000
    but did not.
        at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:299)
@chemicL
Copy link
Member

chemicL commented Jan 11, 2024

Interestingly, the combination of replay with refCount might be problematic as pointed out in one summary by Simon. This reflection came a few years after the test was introduced to address #1260

For reference, the output when the test fails:

14:14:32.349 [boundedElastic-1039] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1390)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1395)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1141)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:100)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4318)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:168)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
14:14:32.349 [boundedElastic-1040] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1390)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1395)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:133)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:314)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:255)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:877)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1344)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1141)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:100)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4318)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:168)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

java.lang.AssertionError: [signalCount2] 
Expecting AtomicInteger(99998) to have value:
  100000
but did not.

	at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:299)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

It feels like at some racy case both Subscribers of FluxRefCountGrace get the same error drop caused by cancellation. What is unexpected though is that both trigger FluxReplay.connect, while the assumption is that only one subscriber makes that call.
Perhaps the race is that while the first subscriber is already canceling and the FluxReplay subscription is being disposed, the next subscriber comes, notices that the connection reports it's disposed and initiates a new connection and that is causing issues.

@chemicL
Copy link
Member

chemicL commented Jan 12, 2024

For debugging locally, I added

new RuntimeException("onError called!").printStackTrace();

to FluxRefCountGrace$RefCountInner.onError and got this during a failing run:

java.lang.RuntimeException: onError called!
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.RuntimeException: onError called!
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplayInner.request(FluxReplay.java:1721)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.request(FluxRefCountGrace.java:311)
	at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
	at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)
	at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.setRefConnection(FluxRefCountGrace.java:236)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:98)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$27(FluxRefCountGraceTest.java:287)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
09:12:32.364 [boundedElastic-71] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.RuntimeException: onError called!
09:12:32.364 [boundedElastic-72] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onError(FluxRefCountGrace.java:270)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:869)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
09:12:32.366 [boundedElastic-72] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1399)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.dispose(FluxReplay.java:1405)
	at reactor.core.publisher.FluxRefCountGrace.cancel(FluxRefCountGrace.java:143)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.cancel(FluxRefCountGrace.java:325)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
	at reactor.core.publisher.FluxRefCountGrace$RefCountInner.onNext(FluxRefCountGrace.java:265)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:881)
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:970)
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onNext(FluxReplay.java:1353)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$25(FluxRefCountGraceTest.java:277)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.FluxReplay.connect(FluxReplay.java:1149)
	at reactor.core.publisher.FluxRefCountGrace.subscribe(FluxRefCountGrace.java:102)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4339)
	at reactor.core.publisher.FluxRefCountGraceTest.lambda$raceSubscribeAndCancelNoTimeout$29(FluxRefCountGraceTest.java:293)
	at reactor.test.util.RaceTestUtils.lambda$race$2(RaceTestUtils.java:180)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

java.lang.AssertionError: [signalCount1] 
Expecting AtomicInteger(99997) to have value:
  100000
but did not.

	at reactor.core.publisher.FluxRefCountGraceTest.raceSubscribeAndCancelNoTimeout(FluxRefCountGraceTest.java:298)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)


@MikkelHJuul
Copy link

I just wanted to chip in.

When I was doing my changes to the replay-buffer I found that there was a small detail in there that, when not implemented properly in the ReplayBuffer, these tests (this and similar ones across the project) would fail.

See L950:
https://github.com/MikkelHJuul/reactor-core/blob/issues/3340-object-array/reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java

@chemicL
Copy link
Member

chemicL commented Feb 19, 2024

Thanks @MikkelHJuul – please have a look at the linked PR (#3707), I suppose it incorporates the same cancellation validation, although using Flux#next() which implicitly cancels the source.

@MikkelHJuul
Copy link

@chemicL - I'm not that deeply invested in to this part of these interactions but it looks and sounds reasonable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation type/bug A general bug type/test/flaky
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants