-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to create custom schedulers with behavior based on composing operators. #4154
Conversation
}; | ||
|
||
// enqueue the completable that process actions put in reply subject | ||
workerQueue.onNext(completable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workers can be created from multiple threads and this will likely mess up the internal state of the PublishSubject
+onBackpressureBuffer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add some synchronization.
This test changes its timing pattern when run on one of the merging-customization of this scheduler: @Test
public void fastTasks() {
final Scheduler sched = new ScheduleWhen(new Func1<Observable<Completable>, Completable>() {
@Override
public Completable call(Observable<Completable> workers) {
return Completable.merge(workers, 2);
}
}, Schedulers.computation());
Observable.range(1, 4).flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer v) {
return Observable.interval(500, TimeUnit.MILLISECONDS, sched)
.take(10).map(new Func1<Long, String>() {
@Override
public String call(Long u) {
return v + " refresh " + u;
}
});
}
}
)
.timeInterval()
.toBlocking()
.subscribe(new Action1<TimeInterval<String>>() {
@Override
public void call(TimeInterval<String> v) {
System.out.println(v);
}
});
} What I get is that for the first 5 seconds, I get
If I understand the purpose of this scheduler, especially with the For example, given a service that accepts 2 connections per client, if I have to periodically execute 3 or more tasks against this service (via the I think the solution is to use a fixed thread pool instead: it limits the number of concurrent tasks while allowing all workers to make progress. My second reservation is that since the |
This isn't intended for long running Observables. Your example is a pathological case. Rather than scheduling on the outer timer if flatMap and schedule on the inner Observables it would not block and all three would be able to run. There are many was to misuse this but if used in a controlled setting of a service library it can be use to protect service from bursts of traffic. The main point is to be able to reuse an existing scheduler with some additional behavior rather than having to create new thread pools to enforce rate limits. |
To address your concern I've changed the API from |
Current coverage is 83.12%@@ 1.x #4154 diff @@
==========================================
Files 261 262 +1
Lines 16918 16986 +68
Methods 0 0
Messages 0 0
Branches 2555 2562 +7
==========================================
+ Hits 14062 14119 +57
- Misses 1982 1988 +6
- Partials 874 879 +5
|
@akarnokd I think I've addressed all the concerns regrading this PR. What can I do help get this experiment merged? |
Oh, sorry; was busy with something else. Okay, let's continue with the experiment. 👍 |
Does 👍 mean the PR is merged now? Should we wait for a review from @zsxwing and/or @artem-zinnatullin? |
Let's wait for at least one of them. |
// room for the next worker in the workerQueue. | ||
if (unsubscribed.compareAndSet(false, true)) { | ||
actualWorker.unsubscribe(); | ||
synchronized (actionQueue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential dead lock here as it will call user's Observer with a lock. You can just use SerializedSubject to wrap actionQueue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
👍 for this experiment api. Just one comment about |
|
||
tSub.assertValueCount(0); | ||
|
||
tSched.advanceTimeBy(0, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static import for SECONDS
Very interesting API, @abersnaze! Few comments and I'm 👍 with it. |
this.workerQueue = PublishSubject.create(); | ||
// send it to a custom combinator to pick the order and rate at which | ||
// workers are processed. | ||
combine.call(workerQueue.onBackpressureBuffer()).subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If subscribed observable emits error then may throw OnErrorNotImplemedException which could confuse. Please add tests for error case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have 'Scheduler' life cycle concerns with this one? Should this Scheduler have a close method to cancel this subscription so we can always clean up references?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of this a something might be used handful of times in a VM. Should Scheduler implement Subscription?
* @param combine | ||
* @return | ||
*/ | ||
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Experimental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do'h
I don't know how I would draw the marble diagram for this one. |
…operators. • made it private and added a instance method to Scheduler. • rewrote the test to get a little more coverage. • wrapping each of the onNext/onCompleted to ensure no overlapping calls • Break up the Worker as Completable into Worker as Observable. Now the schedules actions are the indivisible elements that are subscribed to. The user has additional choice at the cost of making the API more complicated.
@abersnaze If all feedback is resolved, feel free to merge this PR and if you want, post any short description of this addition beyond the ref to this PR. |
Thanks for the feed back |
As far as I'm concerned the feedback cycle hadn't really finished on this. I'm hoping that api changes etc are not out of scope if I raise an issue. |
My comment about the use of |
@davidmoten If you have further concerns, don't hesitate to open an issue or post a PR. The file needs some javadoc tidying up anyway... |
based on #4115
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on an existing
Scheduler
. The main parameter is a function that flattens anObservable<Completable>
into just oneCompletable
. There must be a chain of operators connecting the returned value to the source Observable otherwise any work scheduled on the Scheduler will not be executed.When
scheduler.createWorker()
is invoked aCompletable
is onNext'd to the combinator to be flattened. If theCompletable
is not immediately subscribed to an calls toworker.schedule(...)
are buffered. Once theCompletable
is subscribed to actions are then scheduled on the actual Scheduler. When theWorker
is unsubscribed theCompletable
emits an onComplete and triggers any behavior in the flattening operator. TheObservable<Completable>
and allCompletable
s give to the flattening function never onError.Slowing down the rate to no more than than 1 a second.
Limit the amount concurrency two at a time without creating a new fix size thread pool: