Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test - FluxBlackboxProcessorVerification #3742

Open
chemicL opened this issue Mar 7, 2024 · 6 comments
Open

Flaky test - FluxBlackboxProcessorVerification #3742

chemicL opened this issue Mar 7, 2024 · 6 comments
Assignees
Milestone

Comments

@chemicL
Copy link
Member

chemicL commented Mar 7, 2024

The following test failed during CI in 3.4.x

Gradle suite > Gradle test > reactor.core.publisher.tck.FluxBlackboxProcessorVerification > required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling FAILED
    java.lang.AssertionError: Publisher signalled [16] elements, which is more than the signalled demand: 15 expected [true] but found [false]
        at org.testng.Assert.fail(Assert.java:110)
        at org.testng.Assert.failNotEquals(Assert.java:1413)
        at org.testng.Assert.assertTrue(Assert.java:56)
        at org.reactivestreams.tck.PublisherVerification$25.run(PublisherVerification.java:978)
        at org.reactivestreams.tck.PublisherVerification.activePublisherTest(PublisherVerification.java:1135)
        at org.reactivestreams.tck.PublisherVerification.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling(PublisherVerification.java:936)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:136)
        at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:658)
        at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:219)
        at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50)
        at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:923)
        at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:192)
        at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
        at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at org.testng.TestRunner.privateRun(TestRunner.java:808)
        at org.testng.TestRunner.run(TestRunner.java:603)
        at org.testng.SuiteRunner.runTest(SuiteRunner.java:429)
        at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:423)
        at org.testng.SuiteRunner.privateRun(SuiteRunner.java:383)
        at org.testng.SuiteRunner.run(SuiteRunner.java:326)
        at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
        at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95)
        at org.testng.TestNG.runSuitesSequentially(TestNG.java:1249)
        at org.testng.TestNG.runSuitesLocally(TestNG.java:1169)
        at org.testng.TestNG.runSuites(TestNG.java:1092)
        at org.testng.TestNG.run(TestNG.java:[1060](https://github.com/reactor/reactor-core/actions/runs/8186085531/job/22383745779#step:5:1062))
        at org.gradle.api.internal.tasks.testing.testng.TestNGTestClassProcessor.runTests(TestNGTestClassProcessor.java:146)
        at org.gradle.api.internal.tasks.testing.testng.TestNGTestClassProcessor.stop(TestNGTestClassProcessor.java:91)
        at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
        at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
        at com.sun.proxy.$Proxy2.stop(Unknown Source)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
        at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
        at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
        at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
        at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
        at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Test class reactor.core.publisher.tck.FluxGenerateVerification
Test class reactor.core.publisher.tck.FluxSwitchOnFirstVerification

Your Environment

  • Reactor version(s) used: 3.4.36-SNAPSHOT
  • JVM version (java -version): Java_Temurin-Hotspot_jdk/8.0.402-6/x64
  • OS and version (eg uname -a): Ubuntu 20.04.6
@chemicL chemicL added this to the 3.4.x Backlog milestone Mar 7, 2024
@injae-kim
Copy link
Contributor

May I handle this issue?

@chemicL
Copy link
Member Author

chemicL commented Mar 11, 2024

Be my guest @injae-kim :)

@injae-kim
Copy link
Contributor

Thank you! I'll investigate the cause of failed test and create fix PR soon :)

@injae-kim
Copy link
Contributor

injae-kim commented Mar 18, 2024

  @Override @Test
  public void required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() throws Throwable {
    // the publisher is able to signal more elements than the subscriber will be requesting in total
    final int publisherElements = 20;

    final int demand1 = 10;
    final int demand2 = 5;
    final int totalDemand = demand1 + demand2;

    activePublisherTest(publisherElements, false, new PublisherTestRun<T>() {
      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
      public void run(Publisher<T> pub) throws Throwable {
        final ManualSubscriber<T> sub = env.newManualSubscriber(pub);

        sub.request(demand1);
        sub.request(demand2);
        sub.nextElement();
        sub.cancel();
...
          // if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
          assertTrue(onNextsSignalled <= totalDemand,
                     String.format("Publisher signalled [%d] elements, which is more than the signalled demand: %d",
                                   onNextsSignalled, totalDemand));

I checked that required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling TCK test requests 15 elements to upstream and then cancel it, finally assertThat(onNextsSignalled <= 15)

image

return f.publishOn(sharedGroup)
.parallel(2)
.groups()
.flatMap(stream -> stream.publishOn(asyncGroup)

(I'm not sure) but on our FluxBlackboxProcessorVerification with parallel, I think publisher can produce element that larger than 15 as you can see on above screen shot.

So I think publisher can produce total 16 items (e.g. 1 -> 3 -> 6 -> .. -> 20 (15th) -> Complete(16th)) and TCK test can be failed!

So I suggest to fix our FluxBlackboxProcessorVerification#ftransformFlux to always produce 15 items properly. WDYT?

@chemicL
Copy link
Member Author

chemicL commented Mar 25, 2024

It looks like it's not the test configuration that is broken but some operator along doesn't honour the demand. If the TCK's Subscriber demands 15 items, any configuration that we come up with should not deliver more items. So it seems that at least one operator along the way is broken with that regard. Instead of changing the pipeline to avoid this problematic outcome, perhaps it's best identifying which operator delivers more than was demanded. Some race occurs which doesn't properly synchronise delivery with demand.

@injae-kim
Copy link
Contributor

injae-kim commented Mar 25, 2024

It's best identifying which operator delivers more than was demanded. Some race occurs which doesn't properly synchronise delivery with demand.

I agree! good point. I think I found some clue on above investigation, so I'll check it more and share the root cause to you soon.

Thank you for your guide!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants