Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provides clarification on the 1.9 rule #494

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

OlegDokuka
Copy link
Member

@OlegDokuka OlegDokuka commented Jul 24, 2020

This PR changes:

  1. Spec wording to clearly state that onSubscribe must be called synchronously within the subscribe method
  2. Fixes TCK tests to ensure subscription is delivered present right after the subscribe method call
  3. Fixes AsyncIterablePublisher to cal onSubscribe immediately with no offloading to the scheduler

closes #486

@OlegDokuka OlegDokuka force-pushed the master branch 2 times, most recently from 43eb7f9 to 0e3ca22 Compare July 24, 2020 09:40
@OlegDokuka OlegDokuka force-pushed the master branch 2 times, most recently from 9e727f9 to 352b397 Compare July 24, 2020 13:11
@OlegDokuka
Copy link
Member Author

It seems that SubmissionPublisher violates rule 1.9 :D

@DougLea
Copy link
Contributor

DougLea commented Jul 24, 2020

@OlegDokuka Can you describe the failed test?

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Jul 24, 2020

The idea is to mandate calling onSubscribe within the subscribe method synchronously (on the caller thread).

Due to my findings, SubmissionPublisher always offload that call to the executor, which makes that process asynchronous

final void onSubscribe() {
   startOnSignal(RUN | ACTIVE);
}

final void startOnSignal(int bits) {
   if ((ctl & bits) != bits &&
          (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
       tryStart();
}

/**
 * Tries to start consumer task. Sets error state on failure.
 */
final void tryStart() {
   try {
      Executor e;
      ConsumerTask<T> task = new ConsumerTask<T>(this);
      if ((e = executor) != null)   // skip if disabled on error
         e.execute(task);
   } catch (RuntimeException | Error ex) {
      getAndBitwiseOrCtl(ERROR | CLOSED);
      throw ex;
   }
}

@akarnokd
Copy link
Contributor

This is a drastic change and I don't understand why you'd want to add this limitation.

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Jul 24, 2020

@akarnokd I guess that was a drastic misunderstanding of how it should work.

Imagine that .subscribe(subscriber) now offloads onSubscribe as a task to a scheduler which never executes this task because it hanged for any reason. How do you cancel that one?

The correct answer - nohow. Because we neither know which worker responsible for the execution, nor allowed to terminate the whole associated Pool

Along, Subscriber will never be called with onSubscribe even though the subscribe method was called so we have the assumption that Publisher is subscribed now. That would lead to the fact that we expect that subscription happened but now there is no ability to cancel execution (e.g. .timeout logic) and terminate associated resources (right we do not have Subscription, because it was offloaded as task which is hanged). Also, detecting such bugs - is a headache.

From my standpoint that is a clear violation of the spec and reactive-streams should not work that way.

This PR just clarifies the 1.9 rule and fixes such misbehavior

@akarnokd
Copy link
Contributor

If this is a failure mode for your use case, don't use an operator that schedules onSubscribe. Also thete is a limit on how much infrastructure failures we can work aroud, i.e. can't do much when trying to schedule with a shut down executor for example.

I don't support this change to the spec/tck.

Instead let's look at the original issue and have a workaround/solution there.

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Jul 24, 2020

Well. Sounds like solving symptoms rather than solving the root cause.
We have already fixed that. My intent is to get rid of such a mistake in the future.

And yeah - the workaround is simple - do not offload anything related to the subscription phase at all. Call onSubscribe synchronously - that is it

@akarnokd
Copy link
Contributor

Your solution breaks at least one established implementation that passed before. I'd also think this could break Akka Stream as they tend to schedule all sorts of signals.

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Jul 24, 2020

I'm not pushing to merge it right now. That discussion started at #486 and ended saying that we do not have to offload the onSubscribe call.

If we know that there is an extensive number of libraries using that - let's discuss how to bring that breaking clarification up, so all the vendors will have time to fix this issue.

@DougLea
Copy link
Contributor

DougLea commented Jul 24, 2020

@OlegDokuka SubmissionPublisher calls Subscriber.onSubscribe synchronously. It does not trap exceptions from the call, so they will be rethrown. I suppose this could be changed, but whatever is done would be outside of spec because onSubscribe should not throw. See code at http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java?view=log

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Jul 24, 2020

@DougLea

SubmissionPublisher calls Subscriber.onSubscribe synchronously.

That is not true. Subscriber#onSubscribe is scheduled on the executor service. So the call is offloaded and asynchronous. Please find the meaning of the word synchronously here -> https://github.com/reactive-streams/reactive-streams-jvm#term_sync

The execution stack is the following

  1. Sync_ Thread 1: [ 313]SubmissionPublisher#subscribe(subscriber)
  2. Sync_ Thread 1: [ 331]BufferedSubscription#onSubscribe()
  3. Sync_ Thread 1: [1217]BufferedSubscription#startOnSignal(RUN \| ACTIVE)
  4. Sync_ Thread 1: [1213]BufferedSubscription#tryStart()
  5. Sync_ Thread 1: [1197]Executor#execute.
  6. Async Thread 2: [1263]BufferedSubscription#subscribeOnOpen
  7. Async Thread 2: [1349]BufferedSubscription#consumeSubscribe
  8. Async Thread 2: [1357]Subscriber#onSubscribe(Subscription)

Please, notice, at point 5, the tasks is offloaded to the Executor so the execution is not synchronous anymore.

It does not trap exceptions from the call, so they will be rethrown. I suppose this could be changed, but whatever is done would be outside of spec because onSubscribe should not throw

Nobody is talking about exceptions in onSubscribe method.

The PR here makes sure every Publisher implementation calls onSubscribe immediately (a.k.a synchronously) (a.k.a on the caller Thread within the same stack) within the Publisher#subscriber method.

It means that the following is ✅

class CorrectPublisher<T> implements Publisher<T> {

 public void subscribe(Subscriber<T> s) {
    s.onSubscribe(new Subscription()); ✅

    ...
 }

}

whiles the next is ❌

class IncorrectPublisher<T> implements Publisher<T> {

 public void subscribe(Subscriber<T> s) {
    ForkJoinPool.execute(() -> s.onSubscribe(new Subscription())); ❌❌❌❌❌❌❌

    ...
 }

}

@DougLea
Copy link
Contributor

DougLea commented Jul 24, 2020

@OlegDokuka Here is the code. It looks synchro9nous to me:

    public void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
        ReentrantLock lock = this.lock;
        int max = maxBufferCapacity; // allocate initial array
        Object[] array = new Object[max < INITIAL_CAPACITY ?
                                    max : INITIAL_CAPACITY];
        BufferedSubscription<T> subscription =
            new BufferedSubscription<T>(subscriber, executor, onNextHandler,
                                        array, max);
        lock.lock();
        try {
            if (!subscribed) {
                subscribed = true;
                owner = Thread.currentThread();
            }
            for (BufferedSubscription<T> b = clients, pred = null;;) {
                if (b == null) {
                    Throwable ex;
                    **subscription.onSubscribe();**
                    if ((ex = closedException) != null)
                        subscription.onError(ex);
                    else if (closed)
                        subscription.onComplete();
                    else if (pred == null)
                        clients = subscription;
                    else
                        pred.next = subscription;
                    break;
                }
                BufferedSubscription<T> next = b.next;
                if (b.isClosed()) {   // remove
                    b.next = null;    // detach
                    if (pred == null)
                        clients = next;
                    else
                        pred.next = next;
                }
                else if (subscriber.equals(b.subscriber)) {
                    b.onError(new IllegalStateException("Duplicate subscribe"));
                    break;
                }
                else
                    pred = b;
                b = next;
            }
        } finally {
            lock.unlock();
        }
    }

@OlegDokuka
Copy link
Member Author

OlegDokuka commented Jul 24, 2020

@DougLea You have all the references to the execution stack, we can debate forever, but it is better to check it first before.

Also the main difference between

subscription.onSubscribe();

is that it is not equal to

subscriber.onSubscribe(subscription).

subscription.onSubscribe() is a method of the BufferedSubscription class. It is a delegate which schedules real Subscriber.onSubscribe(Subscription) method execution on a different thread

@DougLea
Copy link
Contributor

DougLea commented Jul 25, 2020

@OlegDokuka Sorry for the confusion. You are right that user.onSubscribe method can be performed async.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Happens-before relationship between Publisher#subscribe() and Subscriber.onSubscribe()?
3 participants