Skip to content

Commit

Permalink
3.x: [Java 8] Add flattenStreamAsX to Maybe/Single (#6805)
Browse files Browse the repository at this point in the history
* 3.x: [Java 8] Add flattenStreamAsX to Maybe/Single

* Add RS TCK tests for flattenStreamAsFlowable
  • Loading branch information
akarnokd committed Dec 28, 2019
1 parent 79f8e6d commit 34f381c
Show file tree
Hide file tree
Showing 15 changed files with 2,647 additions and 7 deletions.
85 changes: 85 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

import org.reactivestreams.*;

Expand Down Expand Up @@ -3152,6 +3153,7 @@ public final <U, R> Maybe<R> flatMap(@NonNull Function<? super T, ? extends Mayb
* source Maybe
* @return the new Flowable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #flattenStreamAsFlowable(Function)
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
Expand Down Expand Up @@ -5004,4 +5006,87 @@ public final CompletionStage<T> toCompletionStage() {
public final CompletionStage<T> toCompletionStage(@Nullable T defaultItem) {
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as a {@link Flowable}.
* <p>
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsFlowable.m.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}:
* <pre><code>
* source.flattenAsFlowable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsFlowable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and iterates the given {@code Stream}
* on demand (i.e., when requested).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Flowable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Flowable instance
* @since 3.0.0
* @see #flattenAsFlowable(Function)
* @see #flattenStreamAsObservable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
public final <R> Flowable<R> flattenStreamAsFlowable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlattenStreamAsFlowable<>(this, mapper));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as an {@link Observable}.
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsObservable.m.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsObservable(Function)}:
* <pre><code>
* source.flattenAsObservable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsObservable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Observable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Observable instance
* @since 3.0.0
* @see #flattenAsObservable(Function)
* @see #flattenStreamAsFlowable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> flattenStreamAsObservable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlattenStreamAsObservable<>(this, mapper));
}
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6517,6 +6517,7 @@ public final <R> Observable<R> concatMap(@NonNull Function<? super T, ? extends
* the scheduler where the {@code mapper} function will be executed
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
* @since 3.0.0
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
Expand Down Expand Up @@ -2799,6 +2801,7 @@ public final <R> Flowable<R> flatMapPublisher(@NonNull Function<? super T, ? ext
* source Single
* @return the new Flowable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #flattenStreamAsFlowable(Function)
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
Expand Down Expand Up @@ -2826,6 +2829,7 @@ public final <U> Flowable<U> flattenAsFlowable(@NonNull Function<? super T, ? ex
* source Single
* @return the new Observable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #flattenStreamAsObservable(Function)
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -4308,4 +4312,87 @@ private static <T> Single<T> toSingle(@NonNull Flowable<T> source) {
public final CompletionStage<T> toCompletionStage() {
return subscribeWith(new CompletionStageConsumer<>(false, null));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as a {@link Flowable}.
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsFlowable.s.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}:
* <pre><code>
* source.flattenAsFlowable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsFlowable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and iterates the given {@code Stream}
* on demand (i.e., when requested).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Flowable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Flowable instance
* @since 3.0.0
* @see #flattenAsFlowable(Function)
* @see #flattenStreamAsObservable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
public final <R> Flowable<R> flattenStreamAsFlowable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlattenStreamAsFlowable<>(this, mapper));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as an {@link Observable}.
* <p>
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsObservable.s.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}:
* <pre><code>
* source.flattenAsObservable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsObservable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Observable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Observable instance
* @since 3.0.0
* @see #flattenAsObservable(Function)
* @see #flattenStreamAsFlowable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> flattenStreamAsObservable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlattenStreamAsObservable<>(this, mapper));
}
}

0 comments on commit 34f381c

Please sign in to comment.