Skip to content

3.x: Add onBackpressureReduce operator #7129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 9, 2020
Merged

3.x: Add onBackpressureReduce operator #7129

merged 18 commits into from
Dec 9, 2020

Conversation

ansip
Copy link
Contributor

@ansip ansip commented Dec 9, 2020

added one more overloaded version of onBackpressureReduce operator

@Experimental
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
    @SchedulerSupport(SchedulerSupport.NONE)
    @NonNull
    public final <R> Flowable<R> onBackpressureReduce(@NonNull Supplier<R> supplier, @NonNull BiFunction<R, ? super T, R> reducer) {
        return RxJavaPlugins.onAssembly(new FlowableOnBackpressureReduceWith<>(this, supplier, reducer));
    }

Changes:

  • Added FlowableOnBackpressureReduceWith
  • AbstractBackpressureThrottlingSubscriber has two types now T - upstream, R- downstream and inherited by 3 classes: BackpressureLatestSubscriber, BackpressureReduceSubscriber and BackpressureReduceWithSubscriber

suggest to review and merge after this PR merged

@akarnokd
Copy link
Member

akarnokd commented Dec 9, 2020

Could you please rebase this onto a fresh 3.x?

@akarnokd akarnokd added this to the 3.1 milestone Dec 9, 2020
@codecov
Copy link

codecov bot commented Dec 9, 2020

Codecov Report

Merging #7129 (b94f4cd) into 3.x (7741c59) will decrease coverage by 0.02%.
The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##                3.x    #7129      +/-   ##
============================================
- Coverage     99.54%   99.52%   -0.03%     
- Complexity     6703     6708       +5     
============================================
  Files           744      745       +1     
  Lines         47298    47327      +29     
  Branches       6373     6375       +2     
============================================
+ Hits          47083    47100      +17     
- Misses           96      101       +5     
- Partials        119      126       +7     
Impacted Files Coverage Δ Complexity Δ
...erators/flowable/FlowableOnBackpressureLatest.java 100.00% <ø> (ø) 2.00 <0.00> (ø)
.../main/java/io/reactivex/rxjava3/core/Flowable.java 100.00% <100.00%> (ø) 597.00 <2.00> (+1.00)
...able/AbstractBackpressureThrottlingSubscriber.java 100.00% <100.00%> (ø) 26.00 <0.00> (ø)
...erators/flowable/FlowableOnBackpressureReduce.java 100.00% <100.00%> (ø) 2.00 <0.00> (ø)
...ors/flowable/FlowableOnBackpressureReduceWith.java 100.00% <100.00%> (ø) 2.00 <2.00> (?)
...erators/completable/CompletableConcatIterable.java 95.91% <0.00%> (-4.09%) 2.00% <0.00%> (ø%)
...nternal/operators/parallel/ParallelReduceFull.java 91.08% <0.00%> (-3.97%) 2.00% <0.00%> (ø%)
.../operators/observable/ObservableFlatMapSingle.java 93.65% <0.00%> (-3.18%) 2.00% <0.00%> (ø%)
...vable/ObservableFlatMapCompletableCompletable.java 98.48% <0.00%> (-1.52%) 3.00% <0.00%> (ø%)
...ators/observable/ObservableFlatMapCompletable.java 98.52% <0.00%> (-1.48%) 2.00% <0.00%> (ø%)
... and 17 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7741c59...b94f4cd. Read the comment docs.

# Conflicts:
#	src/main/java/io/reactivex/rxjava3/core/Flowable.java
#	src/main/java/io/reactivex/rxjava3/internal/operators/flowable/AbstractBackpressureThrottlingSubscriber.java
#	src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java
#	src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduce.java
#	src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceTest.java
@ansip
Copy link
Contributor Author

ansip commented Dec 9, 2020

Could you please rebase this onto a fresh 3.x?

done

"The reducer returned a null value"
));
}
drain();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drain() should be outside the try-catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved outside

@akarnokd akarnokd merged commit e0122a4 into ReactiveX:3.x Dec 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants