Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

various minor cleanups #3234

Merged
merged 3 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,6 @@ https://github.com/reactor/head-first-reactive-with-spring-and-reactor/
-------------------------------------
_Powered by [Reactive Streams Commons](https://github.com/reactor/reactive-streams-commons)_

_Licensed under [Apache Software License 2.0](www.apache.org/licenses/LICENSE-2.0)_
_Licensed under [Apache Software License 2.0](https://www.apache.org/licenses/LICENSE-2.0)_

_Sponsored by [VMware](https://tanzu.vmware.com/)_
24 changes: 9 additions & 15 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public static <T, V> Flux<V> combineLatest(Iterable<? extends Publisher<? extend
int prefetch,
Function<Object[], V> combinator) {

return onAssembly(new FluxCombineLatest<T, V>(sources,
return onAssembly(new FluxCombineLatest<>(sources,
combinator,
Queues.get(prefetch), prefetch));
}
Expand Down Expand Up @@ -2397,10 +2397,10 @@ public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources,
int prefetch,
final Function<? super Object[], ? extends O> combinator) {

return onAssembly(new FluxZip<Object, O>(sources,
combinator,
Queues.get(prefetch),
prefetch));
return onAssembly(new FluxZip<>(sources,
combinator,
Queues.get(prefetch),
prefetch));
}

/**
Expand Down Expand Up @@ -3050,7 +3050,7 @@ public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean c
*
* @return a microbatched {@link Flux} of {@link List}
*/
public final <V> Flux<List<T>> bufferUntilChanged() {
public final Flux<List<T>> bufferUntilChanged() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even though I was assuming japicmp would complain if this was an issue, I double checked with the JLS that this is indeed fine. for my own reference: https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html

13.4.13. Method and Constructor Type Parameters
Adding or removing a type parameter of a method or constructor does not, in itself, have any implications for binary compatibility.
If such a type parameter is used in the type of the method or constructor, that may have the normal implications of changing the aforementioned type.

👍

return bufferUntilChanged(identityFunction());
}

Expand Down Expand Up @@ -3718,7 +3718,6 @@ public final Mono<List<T>> collectSortedList() {
*
* @return a {@link Mono} of a sorted {@link List} of all values from this {@link Flux}
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator) {
return collectList().doOnNext(list -> {
// Note: this assumes the list emitted by buffer() is mutable
Expand Down Expand Up @@ -4021,7 +4020,6 @@ public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterabl
*/
public final Flux<T> concatWith(Publisher<? extends T> other) {
if (this instanceof FluxConcatArray) {
@SuppressWarnings({ "unchecked" })
FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this;

return fluxConcatArray.concatAdditionalSourceLast(other);
Expand Down Expand Up @@ -5990,7 +5988,6 @@ public final Mono<T> last(T defaultValue) {
@SuppressWarnings("unchecked")
Callable<T> thiz = (Callable<T>)this;
if(thiz instanceof Fuseable.ScalarCallable){
@SuppressWarnings("unchecked")
Fuseable.ScalarCallable<T> c = (Fuseable.ScalarCallable<T>)thiz;
T v;
try {
Expand Down Expand Up @@ -9182,7 +9179,6 @@ public final Mono<Void> thenEmpty(Publisher<Void> other) {
*/
public final <V> Flux<V> thenMany(Publisher<V> other) {
if (this instanceof FluxConcatArray) {
@SuppressWarnings({ "unchecked" })
FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this;
return fluxConcatArray.concatAdditionalIgnoredLast(other);
}
Expand Down Expand Up @@ -9372,7 +9368,7 @@ public final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
return timeout(firstTimeout, nextTimeoutFactory, "first signal from a Publisher");
}

private final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
private <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
Function<? super T, ? extends Publisher<V>> nextTimeoutFactory,
String timeoutDescription) {
return onAssembly(new FluxTimeout<>(this, firstTimeout, nextTimeoutFactory, timeoutDescription));
Expand Down Expand Up @@ -10205,7 +10201,7 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cut
*
* @return a microbatched {@link Flux} of {@link Flux} windows.
*/
public final <V> Flux<Flux<T>> windowUntilChanged() {
public final Flux<Flux<T>> windowUntilChanged() {
return windowUntilChanged(identityFunction());
}

Expand Down Expand Up @@ -10470,7 +10466,6 @@ public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2) {
public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2,
final BiFunction<? super T, ? super T2, ? extends V> combinator) {
if (this instanceof FluxZip) {
@SuppressWarnings("unchecked")
FluxZip<T, V> o = (FluxZip<T, V>) this;
Flux<V> result = o.zipAdditionalSource(source2, combinator);
if (result != null) {
Expand Down Expand Up @@ -10542,7 +10537,6 @@ public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2, i
* @return a zipped {@link Flux}
*
*/
@SuppressWarnings("unchecked")
public final <T2> Flux<Tuple2<T, T2>> zipWithIterable(Iterable<? extends T2> iterable) {
return zipWithIterable(iterable, tuple2Function());
}
Expand Down Expand Up @@ -10644,7 +10638,6 @@ final <R> Flux<R> flatMapSequential(Function<? super T, ? extends
FluxConcatMap.ErrorMode.IMMEDIATE));
}

@SuppressWarnings("unchecked")
static <T> Flux<T> doOnSignal(Flux<T> source,
@Nullable Consumer<? super Subscription> onSubscribe,
@Nullable Consumer<? super T> onNext,
Expand Down Expand Up @@ -10847,6 +10840,7 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
@SuppressWarnings("rawtypes")
static final Supplier SET_SUPPLIER = HashSet::new;
static final BooleanSupplier ALWAYS_BOOLEAN_SUPPLIER = () -> true;
@SuppressWarnings("rawtypes")
static final BiPredicate OBJECT_EQUAL = Object::equals;
@SuppressWarnings("rawtypes")
static final Function IDENTITY_FUNCTION = Function.identity();
Expand Down