-
Notifications
You must be signed in to change notification settings - Fork 7.6k
3.x: Add onBackpressureReduce operator #7124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Update local fork with latest
Codecov Report
@@ Coverage Diff @@
## 3.x #7124 +/- ##
============================================
- Coverage 99.57% 99.51% -0.07%
- Complexity 6671 6703 +32
============================================
Files 742 744 +2
Lines 47267 47298 +31
Branches 6374 6373 -1
============================================
+ Hits 47066 47067 +1
- Misses 89 106 +17
- Partials 112 125 +13 Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically a good PR, but I'm not yet convinced such an operation is in high enough demand to be included in RxJava.
In addition, the idea of reducing while being backpressured usually involves two types: The upstream type T
and a collection type C
that holds the aggregate and is the output type. The most common quoted use case involves T
and List<T>
; this has the drawback that individual T
s have to be always wrapped to be able to reach downstream.
The operator in this PR is limited to T
-> T
transformations of which not many standard types can do meaningful reduction. Of course, people could try with first mapping T
into an ArrayList<T>
, then the reducer just adds to this list, however, this can result in unbounded memory usage as the list grows in a backpressured situation.
Consequently, the demand and usage considerations of this operator doesn't strike me as a prime candidate for RxJava proper, but may live as a FlowableTransformer
in a 3rd party library. There is a similar operator coalesce available for example.
...main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduce.java
Outdated
Show resolved
Hide resolved
.../java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceTest.java
Outdated
Show resolved
Hide resolved
...main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduce.java
Outdated
Show resolved
Hide resolved
...main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduce.java
Outdated
Show resolved
Hide resolved
Ok, let me explain why I decided to open this PR here Real usage example in our company (and I suppose and other companies as well): typically we have many applications with grids, tables of data Why not 3rd party 3rd party is a good point, but experience shows nobody knows about them :) Coalesce is too generic and I would say it is not the similar operator, it looks more like backpressure buffer and collects items (so it has the same drawbacks as I described above) Why only T type This was done intentionally to keep single responsibility and enforce user to return the same type Update { and in reduce user can merge to lists in a single Update with rows list So that was my thoughts when I raised this PR |
I see. The no-dataloss possibility and the low complexity of the operator is a plus. @vanniktech, @davidmoten, anybody else, any opinions? |
I'm happy for this one to be included especially as the complexity is low. I wouldn't mind seeing a bit more consideration given to the typing as it is very unusual for an aggregating operator to not offer overloads that support a different downstream type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with adding this new operator.
Should not this be @experimental?
Yes, also target
Agree, reduce(BiFunction) and reduceWith(Supplier, BiFunction) tend to go together. Here, the Okay then, we accept this new operator. I'll add some comments about where to adjust things. |
Ok, so should I add If yes could you please advise on the method semantic |
The output type is |
Ok got it |
Yes I would prefer to raise separate PR, I suppose it would be easier to review smaller changeset |
Added one more backpressure strategy. It works like backpressure latest but instead of just dropping values it merge/reduce excessive items