Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RangePublisher - Possible example improvement? #551

Open
cmhteixeira opened this issue Aug 20, 2023 · 5 comments
Open

RangePublisher - Possible example improvement? #551

cmhteixeira opened this issue Aug 20, 2023 · 5 comments

Comments

@cmhteixeira
Copy link

cmhteixeira commented Aug 20, 2023

On the examples, could RangePublisher rely on volatile variable rather than on AtomicLong ?

From the examples, the RangePublisher's Subscription extends an AtomicLong. It does so because its methods can be run from different threads. The following comment can be read:

We are using this AtomicLong to make sure that this Subscription doesn't run concurrently with itself, which would violate rule 1.3 among others (no concurrent notifications). The atomic transition from 0L to N > 0L will ensure this.

However, from rule 2.7

A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially.

Would this then not mean that, regardless of how the subscriber behaves, the subscription only needs to be concerned about publishing the index of that subscriber? In which case a volatile variable would suffice?

Provided I am right, using a less powerful form of synchronization is preferable by the principle of least power. More importantly, for people learning via these examples, usage of volatile would emphasize that the Subscriber must comply with rule 2.7.

What do you think?

@akarnokd
Copy link
Contributor

It uses that logic because request may happen from a different thread than the emission of items are currently happening. That requires atomics or thread confinement. RangePublisher uses the former.

I have an example FlowRange which uses the latter, more like both techniques. The thing is, unless the Executor is guaranteed single threaded, one has to guard against reentrance with atomics anyway.

@cmhteixeira
Copy link
Author

cmhteixeira commented Aug 21, 2023

Thank you for the prompt reply.

It uses that logic because request may happen from a different thread than the emission of items are currently happening. That requires atomics or thread confinement. RangePublisher uses the former.

But the Publisher is synchronous, it will emit on the same thread as the request calls. Correct? And since all request
calls are serialized by rule 2.7, wouldn't that mean we can do it without atomics (but still require volatile for
visibility)?

In my mind, the Susbcription's calls must follow the pattern:

Thread-1 -----| request() |----------------------------------------------------------
Thread-2 ---------------------| request() |------------------------------------------
Thread-3 -------------------------------------| request() |--------------------------

Within a particular call to request, the subscription calls onNext, which in turn might call request again.
If it does so on the same thread, then we are fine. If it calls it asynchronously, then rule 2.7 must be uphold.
Is my understanding mistaken?

On the current RangePublisher we see :

@Override
public void request(long n) {
  // trimmed
  // Downstream requests are cumulative and may come from any thread
  for (; ; ) {
    long requested = get();
    long update = requested + n;
    // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
    // we treat the signalled demand as "effectively unbounded"
    if (update < 0L) {
      update = Long.MAX_VALUE;
    }
    // atomically update the current requested amount
    if (compareAndSet(requested, update)) {
      // if there was no prior request amount, we start the emission loop
      if (requested == 0L) {
        emit(update);
      }
      break;
    }
  }
}

For example, between the long requested = get() and the compareAndSet(requested, update) we can be 100% sure that no other threads could have ever run concurrently. No?

To illustrate my case further, I have set up this other implementation as a gist:

  • No TCK tests failing (27 passed, 11 ignored).
  • It's not "water tight", but should be sufficient to illustrate my point.

It would be interesting to hear you insights.

@akarnokd
Copy link
Contributor

Requests can be serialized but nothing says they have to be serialized with onNexts. The RangePublisher has no trouble with it, or violations of 2.7 because it just explicitly serializes requests on its own anyway.

If you can ensure thread confinement, i.e., by rolling your own framework around RS, your implementation might just work (until it stack overflows due to the recursion).

@cmhteixeira
Copy link
Author

cmhteixeira commented Aug 21, 2023

Requests can be serialized [...]

Requests MUST be serialized. Right? Or else the subscriber (any subscriber) would violate 2.7.

Requests can be serialized but nothing says they have to be serialized with onNexts

But because this is a synchronous publisher, one implies the other.

The RangePublisher has no trouble with it, or violations of 2.7 because it just explicitly serializes requests on its own anyway.

I am not saying that. I am saying the opposite: Because the RangePublisher can expect the subscriber to respect 2.7, it can relax its assumptions and doesn't need to serialize on its own. As he knows for sure no concurrent request will occur.

[...] your implementation might just work (until it stack overflows due to the recursion).

As my example implementation (linked above) is passing the required_spec303_mustNotAllowUnboundedRecursion TCK test, I don't think that is a problem.

I might off course be making a glaringly wrong assumption... but can't see it. Would me opening a MR for analysis by others be reasonable?

@akarnokd
Copy link
Contributor

I don't know how to explain it better. Try thinking about non-trivial but conforming Subscribers that involve some form asynchrony and consider how the request-onNext dance would play out and what serialization mechanisms could ensure that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants