Skip to content

3.x: Add onBackpressureReduce operator #7124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Dec 9, 2020
Merged

3.x: Add onBackpressureReduce operator #7124

merged 14 commits into from
Dec 9, 2020

Conversation

ansip
Copy link
Contributor

@ansip ansip commented Dec 5, 2020

Added one more backpressure strategy. It works like backpressure latest but instead of just dropping values it merge/reduce excessive items

@codecov
Copy link

codecov bot commented Dec 5, 2020

Codecov Report

Merging #7124 (5fdbf1a) into 3.x (67c1a36) will decrease coverage by 0.06%.
The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##                3.x    #7124      +/-   ##
============================================
- Coverage     99.57%   99.51%   -0.07%     
- Complexity     6671     6703      +32     
============================================
  Files           742      744       +2     
  Lines         47267    47298      +31     
  Branches       6374     6373       -1     
============================================
+ Hits          47066    47067       +1     
- Misses           89      106      +17     
- Partials        112      125      +13     
Impacted Files Coverage Δ Complexity Δ
.../main/java/io/reactivex/rxjava3/core/Flowable.java 100.00% <100.00%> (ø) 596.00 <1.00> (+1.00)
...able/AbstractBackpressureThrottlingSubscriber.java 100.00% <100.00%> (ø) 26.00 <26.00> (?)
...erators/flowable/FlowableOnBackpressureLatest.java 100.00% <100.00%> (ø) 2.00 <0.00> (ø)
...erators/flowable/FlowableOnBackpressureReduce.java 100.00% <100.00%> (ø) 2.00 <2.00> (?)
...l/operators/observable/ObservableFlatMapMaybe.java 89.43% <0.00%> (-7.75%) 2.00% <0.00%> (ø%)
...xjava3/internal/observers/FutureMultiObserver.java 96.61% <0.00%> (-3.39%) 27.00% <0.00%> (-1.00%)
...eactivex/rxjava3/processors/BehaviorProcessor.java 97.48% <0.00%> (-2.52%) 50.00% <0.00%> (ø%)
...va/io/reactivex/rxjava3/subjects/AsyncSubject.java 97.87% <0.00%> (-2.13%) 44.00% <0.00%> (-1.00%)
...ernal/operators/flowable/FlowableFlatMapMaybe.java 95.85% <0.00%> (-2.08%) 2.00% <0.00%> (ø%)
...nternal/operators/parallel/ParallelReduceFull.java 91.08% <0.00%> (-1.99%) 2.00% <0.00%> (ø%)
... and 13 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 67c1a36...5fdbf1a. Read the comment docs.

Copy link
Member

@akarnokd akarnokd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically a good PR, but I'm not yet convinced such an operation is in high enough demand to be included in RxJava.

In addition, the idea of reducing while being backpressured usually involves two types: The upstream type T and a collection type C that holds the aggregate and is the output type. The most common quoted use case involves T and List<T>; this has the drawback that individual Ts have to be always wrapped to be able to reach downstream.

The operator in this PR is limited to T -> T transformations of which not many standard types can do meaningful reduction. Of course, people could try with first mapping T into an ArrayList<T>, then the reducer just adds to this list, however, this can result in unbounded memory usage as the list grows in a backpressured situation.

Consequently, the demand and usage considerations of this operator doesn't strike me as a prime candidate for RxJava proper, but may live as a FlowableTransformer in a 3rd party library. There is a similar operator coalesce available for example.

@ansip
Copy link
Contributor Author

ansip commented Dec 5, 2020

Technically a good PR, but I'm not yet convinced such an operation is in high enough demand to be included in RxJava.

In addition, the idea of reducing while being backpressured usually involves two types: The upstream type T and a collection type C that holds the aggregate and is the output type. The most common quoted use case involves T and List<T>; this has the drawback that individual Ts have to be always wrapped to be able to reach downstream.

The operator in this PR is limited to T -> T transformations of which not many standard types can do meaningful reduction. Of course, people could try with first mapping T into an ArrayList<T>, then the reducer just adds to this list, however, this can result in unbounded memory usage as the list grows in a backpressured situation.

Consequently, the demand and usage considerations of this operator doesn't strike me as a prime candidate for RxJava proper, but may live as a FlowableTransformer in a 3rd party library. There is a similar operator coalesce available for example.

Ok, let me explain why I decided to open this PR here

Real usage example in our company (and I suppose and other companies as well): typically we have many applications with grids, tables of data
The tables have live updates coming after snapshot received on the client side
So the first update is the snapshot and after it server sends only deltas (row updates)
That means that here we are not ready to loose any data and operators like latest and drop are useless
The only operator from standard library which we can use in this case is backpressure buffer
But the problem with this operator is that if we have slow consumer client/UI or we have slow network etc.
We can just simple receive MissingBackpressureException (in best case) or OOM in worst case
Because this operator collects data - and that is we don't want to do

Why not 3rd party

3rd party is a good point, but experience shows nobody knows about them :)
As the result I saw many times how developers reinvent wheel trying to solve this problem

Coalesce is too generic and I would say it is not the similar operator, it looks more like backpressure buffer and collects items (so it has the same drawbacks as I described above)
Yes it can solve our problem but as you mentioned there are additional allocations for list and increased gc pressure so it does not look like a good choice
And if we think in this way coalesce can replace latest and drop as well... You can just keep single first element in list and this would be drop implementation
or you can replace first element in list and this would be latest operator implementation
Does it mean that latest and drop should be moved from RxJava to extentions as well?

Why only T type

This was done intentionally to keep single responsibility and enforce user to return the same type
This operator is not a mapper it just allows user to choose what to do with updates, keep first or second or calculate new items
If user need to collect updates nobody restrcits to make T as a container itself

Update {
List rows = ....
}

and in reduce user can merge to lists in a single Update with rows list
If user needs mapping he can do this right after this operator
.onBackpressureReduce()
.map()

So that was my thoughts when I raised this PR
Of course I think you have more experince in RxJava than me and more clear view on how it should develop
I don't mind to move this to 3rd party lib, extensions or decline.
We already have implementation inside our company so this problem solved for us
This PR was just an attempt to share my experience

@akarnokd
Copy link
Member

akarnokd commented Dec 5, 2020

I see. The no-dataloss possibility and the low complexity of the operator is a plus. @vanniktech, @davidmoten, anybody else, any opinions?

@davidmoten
Copy link
Collaborator

I see. The no-dataloss possibility and the low complexity of the operator is a plus. @vanniktech, @davidmoten, anybody else, any opinions?

I'm happy for this one to be included especially as the complexity is low. I wouldn't mind seeing a bit more consideration given to the typing as it is very unusual for an aggregating operator to not offer overloads that support a different downstream type.

Copy link
Collaborator

@vanniktech vanniktech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with adding this new operator.

Should not this be @experimental?

@akarnokd
Copy link
Member

akarnokd commented Dec 8, 2020

Should not this be @experimental?

Yes, also target 3.0.9.

I wouldn't mind seeing a bit more consideration given to the typing as it is very unusual for an aggregating operator to not offer overloads that support a different downstream type.

Agree, reduce(BiFunction) and reduceWith(Supplier, BiFunction) tend to go together. Here, the seed variant wouldn't make much sense so both can be called onBackpressureReduce.

Okay then, we accept this new operator. I'll add some comments about where to adjust things.

@ansip
Copy link
Contributor Author

ansip commented Dec 8, 2020

Should not this be @experimental?

Yes, also target 3.0.9.

I wouldn't mind seeing a bit more consideration given to the typing as it is very unusual for an aggregating operator to not offer overloads that support a different downstream type.

Agree, reduce(BiFunction) and reduceWith(Supplier, BiFunction) tend to go together. Here, the seed variant wouldn't make much sense so both can be called onBackpressureReduce.

Okay then, we accept this new operator. I'll add some comments about where to adjust things.

Ok, so should I add onBackpressureReduce(Supplier<R> seedSupplier, BiFunction<R, ? super T, R> reducer)?

If yes could you please advise on the method semantic
Should seedSupplier be called for each new coming T from upstream in case if there is no backpressure (downstream requested MAX_VALUE)? Or it should be called once and we should store this value inside this operator and pass it to reducer each time?

@akarnokd
Copy link
Member

akarnokd commented Dec 8, 2020

The output type is R thus you can't relay Ts anymore. Consequently, even if the downstream is ready, you have to call the supplier and the reducer.

@akarnokd
Copy link
Member

akarnokd commented Dec 8, 2020

Ok, so should I add onBackpressureReduce(Supplier seedSupplier, BiFunction<R, ? super T, R> reducer)?

Yes, you can do it in this PR or in a fresh PR after this gets merged.

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.onBackpressureReduce.ff.png 640 x 315

image

@ansip
Copy link
Contributor Author

ansip commented Dec 8, 2020

he reducer.

Ok got it

@ansip
Copy link
Contributor Author

ansip commented Dec 8, 2020

Ok, so should I add onBackpressureReduce(Supplier seedSupplier, BiFunction<R, ? super T, R> reducer)?

Yes, you can do it in this PR or in a fresh PR after this gets merged.

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.onBackpressureReduce.ff.png 640 x 315

image

Yes I would prefer to raise separate PR, I suppose it would be easier to review smaller changeset
Will do this today or tomorrow

@akarnokd akarnokd changed the title onBackpressureReduce operator 3.x: Add onBackpressureReduce operator Dec 8, 2020
@akarnokd akarnokd merged commit 7741c59 into ReactiveX:3.x Dec 9, 2020
@akarnokd akarnokd added this to the 3.1 milestone Dec 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants