Skip to content

Commit

Permalink
3.x: Add concatEagerDelayError across (#6899)
Browse files Browse the repository at this point in the history
* 3.x: Add concatEagerDelayError across

* Add more since 3.0.0 tags
  • Loading branch information
akarnokd committed Jan 30, 2020
1 parent 7e4bb8b commit 81f0569
Show file tree
Hide file tree
Showing 12 changed files with 1,238 additions and 192 deletions.
258 changes: 129 additions & 129 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ public static Completable mergeDelayError(@NonNull Publisher<@NonNull ? extends
* at a time to the inner {@code CompletableSource}s
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
171 changes: 162 additions & 9 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1648,9 +1648,80 @@ public static <T> Flowable<T> concatDelayError(@NonNull Publisher<@NonNull ? ext
return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), tillTheEnd, prefetch);
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
* <p>
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.i.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and the inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEager(sources, bufferSize(), bufferSize());
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values and
* runs a limited number of inner sequences at once.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.in.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
* is interpreted as all inner {@code Publisher}s can be active at the same time
* @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.BOUNDARY));
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
* <p>
* <img width="640" height="490" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.p.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
Expand All @@ -1677,7 +1748,10 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values and
* runs a limited number of inner sequences at once.
* <p>
* <img width="640" height="421" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.pn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
Expand Down Expand Up @@ -1713,7 +1787,10 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner sequences terminate.
* <p>
* <img width="640" height="428" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.i.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
Expand All @@ -1730,18 +1807,22 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
* @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEager(sources, bufferSize(), bufferSize());
public static <T> Flowable<T> concatEagerDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEagerDelayError(sources, bufferSize(), bufferSize());
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner sequences terminate and runs a limited number
* of inner sequences at once.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.in.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
Expand All @@ -1762,18 +1843,89 @@ public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends P
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
* @since 2.0
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
public static <T> Flowable<T> concatEagerDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner and the outer sequences terminate.
* <p>
* <img width="640" height="496" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.p.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEagerDelayError(sources, bufferSize(), bufferSize());
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner and outer sequences terminate and runs a limited number of inner
* sequences at once.
* <p>
* <img width="640" height="421" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.pn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
* is interpreted as all inner {@code Publisher}s can be active at the same time
* @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Flowable<T> concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapEagerPublisher(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
}

/**
Expand Down Expand Up @@ -10134,6 +10286,7 @@ public final <R> Flowable<R> flatMap(@NonNull Function<? super T, ? extends Publ
* if {@code false}, the first one signaling an exception will terminate the whole sequence immediately
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0
*/
Expand Down

0 comments on commit 81f0569

Please sign in to comment.