Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify intent of rule 1.9 #488

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

NiteshKant
Copy link

Rule 1.9 defines that onSubscribe() on a Subscriber should be called before calling any other method on that Subscriber. However, this does not clarify that if onSubscribe() is signalled asynchronously then there should be a happens-before relationship between subscribe() and signalling of onSubscribe().
In absence of such a guarantee, non-final fields initialized in a constructor have no guarantee to be visible inside onSubscribe().

Considering a simple example:

public class UnsafeSubscriber implements Subscriber {
private boolean duplicateOnSubscribe = false;

@Override
public void onSubscribe(final Subscription s) {
    if (duplicateOnSubscribe) {
        throw new IllegalStateException("Duplicate onSubscribe() calls.");
    }
    duplicateOnSubscribe = true;
}

@Override
public void onNext(final String s) {

}

@Override
public void onError(final Throwable t) {

}

@Override
public void onComplete() {

}

}
If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe():

if (duplicateOnSubscribe) {

is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:

private boolean duplicateOnSubscribe = false;

can be interleaved with

duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later.

Has this been considered before or am I missing something?

Fixes #486

Rule 1.9 defines that `onSubscribe()` on a `Subscriber` should be called before calling any other method on that `Subscriber`. However, this does not clarify that if `onSubscribe()` is signalled asynchronously then there should be a happens-before relationship between `subscribe()` and signalling of `onSubscribe()`.
In absence of such a guarantee, non-final fields initialized in a constructor have no guarantee to be visible inside `onSubscribe()`.

Considering a simple example:

public class UnsafeSubscriber implements Subscriber<String> {
    private boolean duplicateOnSubscribe = false;

    @OverRide
    public void onSubscribe(final Subscription s) {
        if (duplicateOnSubscribe) {
            throw new IllegalStateException("Duplicate onSubscribe() calls.");
        }
        duplicateOnSubscribe = true;
    }

    @OverRide
    public void onNext(final String s) {

    }

    @OverRide
    public void onError(final Throwable t) {

    }

    @OverRide
    public void onComplete() {

    }
}
If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe():

if (duplicateOnSubscribe) {

is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:

private boolean duplicateOnSubscribe = false;

can be interleaved with

duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later.

Has this been considered before or am I missing something?

Fixes reactive-streams#486
@viktorklang
Copy link
Contributor

@NiteshKant Your Subscriber is not a valid subscriber since it violates 2:13: https://github.com/reactive-streams/reactive-streams-jvm#2.13

@NiteshKant
Copy link
Author

@viktorklang yikes, yes. Throwing IllegalArgumentException isn't the crux of the problem though. Once we get to an agreement on #486 about whether this PR make sense, I wil fix the commit message as:

if (duplicateOnSubscribe) {
        System.out.println("Duplicate onSubscribe() calls.");
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Happens-before relationship between Publisher#subscribe() and Subscriber.onSubscribe()?
2 participants