Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Mono created from a Sinks.one() sporadically turns empty #3257

Closed
anenviousguest opened this issue Oct 27, 2022 · 3 comments · Fixed by #3260
Closed

A Mono created from a Sinks.one() sporadically turns empty #3257

anenviousguest opened this issue Oct 27, 2022 · 3 comments · Fixed by #3260
Labels
type/bug A general bug
Milestone

Comments

@anenviousguest
Copy link

anenviousguest commented Oct 27, 2022

In our code, we create a Sink.one() whose value is emitted from onSuccess/onError handlers of some other Mono. A value being emitted is never null.

Expected Behavior

Expectation is that a mono view of that sink will never be an empty mono, due to value emitted by a sink being non-null.

Actual Behavior

Contary to our expectation, a Mono view of that sink sporadically turns empty. It happens both on Windows 10 as well as on Ubuntu 20 machines, so it is likely not something platform-specific.

Steps to Reproduce

Below is the shortest unit test I could come up with which reflects what we're doing in our production code, and when this test is run from IDE with "repeat until failure" setting, it fails the assertion about non-null Mono result quite consistently within 10-30 seconds after start.

I also added some diagnostic logging around tryEmitValue / tryEmitError, and looks like it's never an error, and never a null value being emitted, neither it's an emission failure.

import java.util.concurrent.*;
import java.util.function.Consumer;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import reactor.util.Loggers;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

@Slf4j
class MonoPlaygroundTest {

    private static ExecutorService executor;

    static {
        executor = Executors.newCachedThreadPool();
        Loggers.useSl4jLoggers();
    }

    @Test
    void shouldConstructNonEmptyMono() {
        var sink = Sinks.<String>one();
        var scheduler = Schedulers.boundedElastic();

        Mono
            .just("value")
            .log(Loggers.getLogger("sink-setter"))
            .subscribeOn(scheduler)
            .doOnSuccess(tryEmitValueWithDebugLogging(sink))
            .doOnError(tryEmitErrorWithDebugLogging(sink))
            .subscribe();

        var result = sink
            .asMono()
            .log(Loggers.getLogger("sink-result-receiver"))
            // more map / flatMap here...
            .subscribeOn(scheduler)
            .block();

        assertThat(result).isNotNull();
    }

    private static Consumer<Throwable> tryEmitErrorWithDebugLogging(Sinks.One<String> sink) {
        return t -> {
            sink.tryEmitError(t);
            log.error("Was emitting an error: {}", t);
        };
    }

    private static Consumer<String> tryEmitValueWithDebugLogging(Sinks.One<String> sink) {
        return value -> {
            var result = sink.tryEmitValue(value);
            if (value == null) {
                log.error("Tried to emit null value");
            }
            if (result.isFailure()) {
                log.error("Failed to emit because of {}", result);
            }
        };
    }
}

The output from a failed run is like this:

23:48:12.635 [pool-1-thread-1] INFO  sink-setter - | onSubscribe([Fuseable] Operators.MonoSubscriber)
23:48:12.635 [pool-1-thread-1] INFO  sink-setter - | request(unbounded)
23:48:12.635 [boundedElastic-1] INFO  sink-result-receiver - onSubscribe(SinkOneMulticast.NextInner)
23:48:12.635 [pool-1-thread-1] INFO  sink-setter - | onNext(value)
23:48:12.635 [boundedElastic-1] INFO  sink-result-receiver - request(unbounded)
23:48:12.635 [pool-1-thread-1] INFO  sink-setter - | onComplete()
23:48:12.635 [boundedElastic-1] INFO  sink-result-receiver - onComplete()

java.lang.AssertionError: 
Expecting actual not to be null

Possible Solution

Your Environment

  • Reactor version(s) used: 3.4.22
  • JVM version (java -version): openjdk version "11.0.16.1" 2022-07-19 LTS
  • OS and version (eg uname -a): Windows 10 / 21H2 build 19044.2130, also Linux 5.4.0-96-generic SMP x86_64 GNU/Linux
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 27, 2022
@anenviousguest anenviousguest changed the title Mono created from a Sink.one sporadically turns empty A Mono created from a Sinks.one() sporadically turns empty Oct 28, 2022
@anenviousguest
Copy link
Author

With a debugger, I finally nailed the root cause.

Why

By the time when this line inside tryEmitValue of a sink is about to be run, the sink becomes "terminated" as far as add implementation is concerned. However, at this point in time its value still remains null because this.value = value; line hasn't been executed yet.

Now, if someone subscribes to this sink's Mono view, there are chances that this if block inside subscribe method is bypassed because the termination has already been performed, as described above.

So the execution path then follows an else branch and here still the null value is seen - hence the mono view will be completed as an empty one.

This in principle explains the behavior observed:

  • sporadic, relatively seldom failures.
  • no failures with single-threaded execution - that is, if .subscribeOn calls in the test case from the original description are removed

Conclusions / Workarounds

To me this behavior directly contradicts the description given in Sinks documentation - unless this is only applicable to tryEmit... calls, which is also not mentioned directly:

Unless constructed through the unsafe() spec, these sinks are thread safe in the sense that they will detect concurrent access and fail fast on one of the attempts.

So, in the case above there were no failure reported but rather an empty mono.

As an immediate work-around, one could of course call .repeatWhenEmpty(...) on the mono view of a sink, but this is only limitedly helpful, because it requires an explicit knowledge whether the values emitted from the sink could/should be empty or not.

I'm not sure what the "proper" fix for this might be. I can think of failing the subscribe call in such scenarios with some very specific error, so that in client code it's at least possible to do a dedicated retry.

Or perhaps make subscribe call use the same lock as tryAcquire from SinkOneSerialized, which is the actual implementation returned by Sinks.one().

@UgiR
Copy link
Contributor

UgiR commented Oct 28, 2022

I was able to reproduce and was about to post something similar!

Essentially, looks like a race between:

@SuppressWarnings("unchecked") Inner<O>[] array = SUBSCRIBERS.getAndSet(this, TERMINATED);
if (array == TERMINATED) {
return EmitResult.FAIL_TERMINATED;
}
this.value = value;

and:

O v = value;
if (v != null) {
as.complete(v);
}
else {
as.complete();
}

Even thought the state is TERMINATED, there is nothing guaranteeing this.value is written before it is read in the second snippet.

Here is a snippet to make it easier to reproduce:

@Test
void reproduce3257() {
  var sink = Sinks.<String>one();
  RaceTestUtils.race(
    () -> sink.tryEmitValue("value"),
    () -> assertThat(sink.asMono.block()).isNotNull()
  );
}

@OlegDokuka OlegDokuka added type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Oct 30, 2022
@OlegDokuka OlegDokuka added this to the 3.4.25 milestone Oct 30, 2022
@simonbasle simonbasle linked a pull request Nov 3, 2022 that will close this issue
@simonbasle
Copy link
Member

Fixed by #3260 / commit 849be9a

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants