Skip to content

Commit

Permalink
Make some Mono sources and aggregators lazier (#3081)
Browse files Browse the repository at this point in the history
This commit changes 4 broad categories of Mono operators to be lazier
and only start requesting from their upstream once they've been
themselves requested.

Previously, the request to upstream would happen immediately upon
receiving the Subscription (in `onSubscribe`).

The first category of Monos is the one where the upstream is a Flux
that gets aggregated into a Mono, such as `MonoCollectList`, `MonoCount`
but also `MonoAll` or `MonoElementAt`...
These Mono now all inherit a common `BaseFluxToMonoOperator` which track
two states as bits in a single volatile: request status (true/false) and
completed status (true/false). It uses a non-volatile boolean hasRequest
as a shortcut to ignore any request past the first one.

Note that for now these implementations do declare a `Fuseable` trait,
but they always negotiate fusion `NONE`. It is kept as a future possible
improvement that they would all support SYNC fusion.

The second category is Monos that combine multiple sources, like
`MonoZip` or `MonoWhen`.
A single volatile long-based state machine is used to coordinate the
multiple inners and track which inner has made the first request.

The third category is ParallelFlux-to-Mono operators, mixing the above
two aspects: aggregating values and combining multiple sources (rails).
Some of these can benefit from the `BaseFluxToMonoOperator` approach.

The last category is sources like `MonoSupplier` or `MonoCallable`.
Often the functional interface representing the logic would be run in
`onSubscribe` directly, but making it lazier implies the creation of
dedicated `Subscription` implementations now.

Fixes #2913.

Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Co-authored-by: Simon Baslé <sbasle@vmware.com>
  • Loading branch information
OlegDokuka and simonbasle committed Aug 3, 2022
1 parent 42712b1 commit 88587fc
Show file tree
Hide file tree
Showing 61 changed files with 2,741 additions and 1,452 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class MonoAllBenchmark {

@Param({"0", "10", "1000", "100000"})
int rangeSize;

public static void main(String[] args) throws Exception {
reactor.core.scrabble.ShakespearePlaysScrabbleOpt
s = new reactor.core.scrabble.ShakespearePlaysScrabbleOpt();
s.init();
System.out.println(s.measureThroughput());
}

@SuppressWarnings("unused")
@Benchmark
public void measureThroughput() {
Flux.range(0, rangeSize)
.all(i -> i < Integer.MAX_VALUE)
.block();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class MonoCallableBenchmark {

@Param({"10", "1000", "100000"})
int rangeSize;

public static void main(String[] args) throws Exception {
reactor.core.scrabble.ShakespearePlaysScrabbleOpt
s = new reactor.core.scrabble.ShakespearePlaysScrabbleOpt();
s.init();
System.out.println(s.measureThroughput());
}

@SuppressWarnings("unused")
@Benchmark
public void measureThroughput() {
Flux.range(0, rangeSize)
.all(i -> i < Integer.MAX_VALUE)
.block();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,21 +37,7 @@ final class FluxCallable<T> extends Flux<T> implements Callable<T>, Fuseable, So

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T> wrapper = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(wrapper);

try {
T v = callable.call();
if (v == null) {
wrapper.onComplete();
}
else {
wrapper.complete(v);
}
}
catch (Throwable ex) {
actual.onError(Operators.onOperatorError(ex, actual.currentContext()));
}
actual.onSubscribe(new MonoCallable.MonoCallableSubscription<>(actual, callable));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,9 @@
package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;

/**
Expand Down Expand Up @@ -50,75 +49,120 @@ public Object scanUnsafe(Attr key) {
}

static final class DefaultIfEmptySubscriber<T>
extends Operators.MonoSubscriber<T, T> {
extends Operators.BaseFluxToMonoOperator<T, T> {

Subscription s;
boolean done;

boolean hasValue;

DefaultIfEmptySubscriber(CoreSubscriber<? super T> actual, T value) {
volatile T fallbackValue;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<DefaultIfEmptySubscriber, Object> FALLBACK_VALUE =
AtomicReferenceFieldUpdater.newUpdater(DefaultIfEmptySubscriber.class, Object.class, "fallbackValue");

DefaultIfEmptySubscriber(CoreSubscriber<? super T> actual, T fallbackValue) {
super(actual);
//noinspection deprecation
this.value = value; //we write once, setValue() is NO-OP
FALLBACK_VALUE.lazySet(this, fallbackValue);
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == Attr.TERMINATED) return done;

return super.scanUnsafe(key);
}

@Override
public void request(long n) {
super.request(n);
if (!hasRequest) {
hasRequest = true;

final int state = this.state;

if (state != 1 && STATE.compareAndSet(this, state, state | 1)) {
if (state > 1) {
final T fallbackValue = this.fallbackValue;
if (fallbackValue != null && FALLBACK_VALUE.compareAndSet(this,
fallbackValue,
null)) {
// completed before request means source was empty
actual.onNext(fallbackValue);
actual.onComplete();
}
return;
}
}
}

s.request(n);
}

@Override
public void cancel() {
super.cancel();
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);
final T fallbackValue = this.fallbackValue;
if (fallbackValue != null && FALLBACK_VALUE.compareAndSet(this, fallbackValue, null)) {
Operators.onDiscard(fallbackValue, actual.currentContext());
}
}

@Override
public void onNext(T t) {
if (!hasValue) {
hasValue = true;

final T fallbackValue = this.fallbackValue;
if (fallbackValue != null && FALLBACK_VALUE.compareAndSet(this, fallbackValue, null)) {
Operators.onDiscard(fallbackValue, actual.currentContext());
}
}

actual.onNext(t);
}

@Override
public void onComplete() {
if (hasValue) {
actual.onComplete();
} else {
complete(this.value);
if (done) {
return;
}

done = true;

if (!hasValue) {
completePossiblyEmpty();

return;
}

actual.onComplete();
}

@Override
public void setValue(T value) {
// value is constant. writes from the base class are redundant, and the constant
// would always be visible in cancel(), so it will safely be discarded.
public void onError(Throwable t) {
if (done) {
return;
}

done = true;
if (!hasValue) {
final T fallbackValue = this.fallbackValue;
if (fallbackValue != null && FALLBACK_VALUE.compareAndSet(this, fallbackValue, null)) {
Operators.onDiscard(t, actual.currentContext());
}
}

actual.onError(t);
}

@Override
public int requestFusion(int requestedMode) {
return Fuseable.NONE; // prevent fusion because of the upstream
T accumulatedValue() {
final T fallbackValue = this.fallbackValue;
if (fallbackValue != null && FALLBACK_VALUE.compareAndSet(this, fallbackValue, null)) {
return fallbackValue;
}
return null;
}
}
}

0 comments on commit 88587fc

Please sign in to comment.