Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to create custom schedulers with behavior based on composing operators. #4154

Merged
merged 1 commit into from
Jul 9, 2016

Conversation

abersnaze
Copy link
Contributor

based on #4115

Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on an existing Scheduler. The main parameter is a function that flattens an Observable<Completable> into just one Completable. There must be a chain of operators connecting the returned value to the source Observable otherwise any work scheduled on the Scheduler will not be executed.

When scheduler.createWorker() is invoked a Completable is onNext'd to the combinator to be flattened. If the Completable is not immediately subscribed to an calls to worker.schedule(...) are buffered. Once the Completable is subscribed to actions are then scheduled on the actual Scheduler. When the Worker is unsubscribed the Completable emits an onComplete and triggers any behavior in the flattening operator. The Observable<Completable> and all Completables give to the flattening function never onError.

Slowing down the rate to no more than than 1 a second.

 Scheduler slowSched = new ScheduleWhen(workers -> {
    // use concatenate to make each worker happen one at a time.
    return Completable.concat(workers.map(worker -> {
        // delay the starting of the next worker by 1 second.
        return worker.delay(1, TimeUnit.SECONDS);
    }));
 }, Schedulers.computation());

Limit the amount concurrency two at a time without creating a new fix size thread pool:

 Scheduler limitSched = new ScheduleWhen(workers -> {
    // use merge max concurrent to two at a time
    return Completable.merge(workers, 2);
 }, Schedulers.computation());

};

// enqueue the completable that process actions put in reply subject
workerQueue.onNext(completable);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers can be created from multiple threads and this will likely mess up the internal state of the PublishSubject+onBackpressureBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some synchronization.

@akarnokd
Copy link
Member

akarnokd commented Jul 1, 2016

This test changes its timing pattern when run on one of the merging-customization of this scheduler:

@Test
public void fastTasks() {
    final Scheduler sched = new ScheduleWhen(new Func1<Observable<Completable>, Completable>() {
        @Override
        public Completable call(Observable<Completable> workers) {
            return Completable.merge(workers, 2);
        }
    }, Schedulers.computation());


    Observable.range(1, 4).flatMap(new Func1<Integer, Observable<String>>() {
        @Override
        public Observable<String> call(final Integer v) {
            return Observable.interval(500, TimeUnit.MILLISECONDS, sched)
            .take(10).map(new Func1<Long, String>() {
                @Override
                public String call(Long u) {
                    return v + " refresh " + u;
                }
            });
        }
    }
    )
    .timeInterval()
    .toBlocking()
    .subscribe(new Action1<TimeInterval<String>>() {
        @Override
        public void call(TimeInterval<String> v) {
            System.out.println(v);
        }
    });
}

What I get is that for the first 5 seconds, I get 1 refresh x and 2 refresh x in 500 ms intervals, but after thet, all the 3 refresh x and 4 refresh x gets printed instantly.

TimeInterval [intervalInMilliseconds=515, value=2 refresh 0]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 0]
TimeInterval [intervalInMilliseconds=500, value=1 refresh 1]
TimeInterval [intervalInMilliseconds=0, value=2 refresh 1]
TimeInterval [intervalInMilliseconds=500, value=2 refresh 2]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 2]
TimeInterval [intervalInMilliseconds=500, value=2 refresh 3]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 3]
TimeInterval [intervalInMilliseconds=485, value=2 refresh 4]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 4]
TimeInterval [intervalInMilliseconds=516, value=2 refresh 5]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 5]
TimeInterval [intervalInMilliseconds=486, value=1 refresh 6]
TimeInterval [intervalInMilliseconds=0, value=2 refresh 6]
TimeInterval [intervalInMilliseconds=500, value=1 refresh 7]
TimeInterval [intervalInMilliseconds=0, value=2 refresh 7]
TimeInterval [intervalInMilliseconds=499, value=2 refresh 8]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 8]
TimeInterval [intervalInMilliseconds=500, value=2 refresh 9]
TimeInterval [intervalInMilliseconds=0, value=1 refresh 9]
TimeInterval [intervalInMilliseconds=516, value=3 refresh 0]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 0]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 1]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 1]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 2]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 3]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 4]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 5]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 6]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 7]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 8]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 2]
TimeInterval [intervalInMilliseconds=0, value=4 refresh 9]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 3]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 4]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 5]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 6]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 7]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 8]
TimeInterval [intervalInMilliseconds=0, value=3 refresh 9]

If I understand the purpose of this scheduler, especially with the merge(workers, 2) case, you want to limit the number of concurrently running tasks. However, the implementation blocks out further workers from executing anything until a previous worker is released.

For example, given a service that accepts 2 connections per client, if I have to periodically execute 3 or more tasks against this service (via the interval() above), only the first two would execute periodically. In addition, the other 2 would slowly pile up tasks in the internal queue and if the associated sequence uses timeout, this will make it timeout.

I think the solution is to use a fixed thread pool instead: it limits the number of concurrent tasks while allowing all workers to make progress.

My second reservation is that since the ScheduleWhen is a self-contained class (unlike operators that need a method in Observable) this can live in any external library (just like AndroidSchedulers).

@abersnaze
Copy link
Contributor Author

abersnaze commented Jul 1, 2016

This isn't intended for long running Observables. Your example is a pathological case. Rather than scheduling on the outer timer if flatMap and schedule on the inner Observables it would not block and all three would be able to run.

There are many was to misuse this but if used in a controlled setting of a service library it can be use to protect service from bursts of traffic.

The main point is to be able to reuse an existing scheduler with some additional behavior rather than having to create new thread pools to enforce rate limits.

@abersnaze
Copy link
Contributor Author

To address your concern I've changed the API from Ob<Comp> -> Comp to Ob<Ob<Comp>> -> Comp. This is a more direct mapping of Scheduler of Workers of Actions that give the user complete control how and when the actions are done. In the example above the actions from all workers can be merged into one stream of actions and then the merge max (N) concurrent could be applied to the actions to better simulate running on a N threads.

@codecov-io
Copy link

codecov-io commented Jul 1, 2016

Current coverage is 83.12%

Merging #4154 into 1.x will increase coverage by <.01%

@@                1.x      #4154   diff @@
==========================================
  Files           261        262     +1   
  Lines         16918      16986    +68   
  Methods           0          0          
  Messages          0          0          
  Branches       2555       2562     +7   
==========================================
+ Hits          14062      14119    +57   
- Misses         1982       1988     +6   
- Partials        874        879     +5   

Powered by Codecov. Last updated by 97c4e53...f3777df

@abersnaze
Copy link
Contributor Author

@akarnokd I think I've addressed all the concerns regrading this PR. What can I do help get this experiment merged?

@akarnokd
Copy link
Member

akarnokd commented Jul 5, 2016

Oh, sorry; was busy with something else.

Okay, let's continue with the experiment. 👍

/cc @zsxwing @artem-zinnatullin

@abersnaze
Copy link
Contributor Author

Does 👍 mean the PR is merged now? Should we wait for a review from @zsxwing and/or @artem-zinnatullin?

@akarnokd
Copy link
Member

akarnokd commented Jul 5, 2016

Let's wait for at least one of them.

// room for the next worker in the workerQueue.
if (unsubscribed.compareAndSet(false, true)) {
actualWorker.unsubscribe();
synchronized (actionQueue) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential dead lock here as it will call user's Observer with a lock. You can just use SerializedSubject to wrap actionQueue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

@zsxwing
Copy link
Member

zsxwing commented Jul 5, 2016

👍 for this experiment api. Just one comment about synchronized (actionQueue).


tSub.assertValueCount(0);

tSched.advanceTimeBy(0, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static import for SECONDS

@artem-zinnatullin
Copy link
Contributor

Very interesting API, @abersnaze! Few comments and I'm 👍 with it.

this.workerQueue = PublishSubject.create();
// send it to a custom combinator to pick the order and rate at which
// workers are processed.
combine.call(workerQueue.onBackpressureBuffer()).subscribe();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If subscribed observable emits error then may throw OnErrorNotImplemedException which could confuse. Please add tests for error case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have 'Scheduler' life cycle concerns with this one? Should this Scheduler have a close method to cancel this subscription so we can always clean up references?

Copy link
Contributor Author

@abersnaze abersnaze Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of this a something might be used handful of times in a VM. Should Scheduler implement Subscription?

* @param combine
* @return
*/
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do'h

@abersnaze
Copy link
Contributor Author

I don't know how I would draw the marble diagram for this one.

…operators.

• made it private and added a instance method to Scheduler.
• rewrote the test to get a little more coverage.
• wrapping each of the onNext/onCompleted to ensure no overlapping calls
• Break up the Worker as Completable into Worker as Observable. Now the schedules actions are the indivisible elements that are subscribed to. The user has additional choice at the cost of making the API more complicated.
@akarnokd
Copy link
Member

akarnokd commented Jul 9, 2016

@abersnaze If all feedback is resolved, feel free to merge this PR and if you want, post any short description of this addition beyond the ref to this PR.

@abersnaze
Copy link
Contributor Author

Thanks for the feed back

@abersnaze abersnaze merged commit e267e02 into ReactiveX:1.x Jul 9, 2016
@davidmoten
Copy link
Collaborator

As far as I'm concerned the feedback cycle hadn't really finished on this. I'm hoping that api changes etc are not out of scope if I raise an issue.

@davidmoten
Copy link
Collaborator

My comment about the use of subscribe() in the constructor was not addressed. Should be unsafeSubscribe() I think.

@akarnokd
Copy link
Member

@davidmoten If you have further concerns, don't hesitate to open an issue or post a PR. The file needs some javadoc tidying up anyway...

@akarnokd akarnokd mentioned this pull request Jul 12, 2016
@abersnaze abersnaze deleted the sched branch September 20, 2016 21:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants