Skip to content

Commit

Permalink
Establishing approach - #3593
Browse files Browse the repository at this point in the history
  • Loading branch information
Desislav-Petrov committed Oct 1, 2023
1 parent 323a1c7 commit ab0d08f
Show file tree
Hide file tree
Showing 10 changed files with 242 additions and 74 deletions.
3 changes: 2 additions & 1 deletion reactor-core/src/main/java/reactor/core/publisher/Flux.java
Expand Up @@ -81,6 +81,7 @@
import reactor.util.function.Tuples;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.repeat.Repeat;
import reactor.util.retry.Retry;

/**
Expand Down Expand Up @@ -7789,7 +7790,7 @@ public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate) {
* @return a {@link Flux} that repeats on onComplete when the companion {@link Publisher} produces an
* onNext signal
*/
public final Flux<T> repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) {
public final Flux<T> repeatWhen(Repeat repeatFactory) {
return onAssembly(new FluxRepeatWhen<>(this, repeatFactory));
}

Expand Down
Expand Up @@ -18,7 +18,6 @@

import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
Expand All @@ -30,6 +29,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.repeat.Repeat;

/**
* Repeats a source when a companion sequence signals an item in response to the main's
Expand All @@ -45,62 +45,68 @@
*/
final class FluxRepeatWhen<T> extends InternalFluxOperator<T, T> {

final Function<? super Flux<Long>, ? extends Publisher<?>> whenSourceFactory;
final Repeat whenSourceFactory;

FluxRepeatWhen(Flux<? extends T> source,
Function<? super Flux<Long>, ? extends Publisher<?>> whenSourceFactory) {
Repeat whenSourceFactory) {
super(source);
this.whenSourceFactory =
Objects.requireNonNull(whenSourceFactory, "whenSourceFactory");
}

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
static <T> void subscribe(CoreSubscriber<? super T> actual,
Repeat whenSourceFactory,
CorePublisher<? extends T> source) {
RepeatWhenOtherSubscriber other = new RepeatWhenOtherSubscriber();
CoreSubscriber<T> serial = Operators.serialize(actual);

RepeatWhenMainSubscriber<T> main =
new RepeatWhenMainSubscriber<>(serial, other.completionSignal, source);
RepeatWhenMainSubscriber<T> main = new RepeatWhenMainSubscriber<>(
serial, other.completionSignal, source, whenSourceFactory.getRepeatContext());
other.main = main;

serial.onSubscribe(main);

Publisher<?> p;

try {
p = Objects.requireNonNull(whenSourceFactory.apply(other),
p = Objects.requireNonNull(whenSourceFactory.generateCompanion(other),
"The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
return null;
return;
}

p.subscribe(other);

if (!main.cancelled) {
return main;
}
else {
return null;
source.subscribe(main);
}
}

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
subscribe(actual, whenSourceFactory, source);
return null;
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}

static final class RepeatWhenMainSubscriber<T>
extends Operators.MultiSubscriptionSubscriber<T, T> {
extends Operators.MultiSubscriptionSubscriber<T, T>
implements Repeat.RepeatSignal {

final Operators.DeferredSubscription otherArbiter;

final Sinks.Many<Long> signaller;
final Sinks.Many<Repeat.RepeatSignal> signaller;

final CorePublisher<? extends T> source;

long totalRetriesSoFar = 0L;
final ContextView repeatContext;
volatile int wip;
static final AtomicIntegerFieldUpdater<RepeatWhenMainSubscriber> WIP =
AtomicIntegerFieldUpdater.newUpdater(RepeatWhenMainSubscriber.class,
Expand All @@ -110,11 +116,12 @@ static final class RepeatWhenMainSubscriber<T>
long produced;

RepeatWhenMainSubscriber(CoreSubscriber<? super T> actual,
Sinks.Many<Long> signaller,
CorePublisher<? extends T> source) {
Sinks.Many<Repeat.RepeatSignal> signaller,
CorePublisher<? extends T> source, ContextView repeatContext) {
super(actual);
this.signaller = signaller;
this.source = source;
this.repeatContext = repeatContext;
this.otherArbiter = new Operators.DeferredSubscription();
this.context = actual.currentContext();
}
Expand All @@ -139,6 +146,7 @@ public void cancel() {

@Override
public void onNext(T t) {
totalRetriesSoFar++;
actual.onNext(t);

produced++;
Expand All @@ -159,7 +167,7 @@ public void onComplete() {
produced(p);
}

signaller.emitNext(p, Sinks.EmitFailureHandler.FAIL_FAST);
signaller.emitNext(this, Sinks.EmitFailureHandler.FAIL_FAST);
// request after signalling, otherwise it may race
otherArbiter.request(1);
}
Expand All @@ -182,7 +190,6 @@ void resubscribe(Object trigger) {
}

source.subscribe(this);

}
while (WIP.decrementAndGet(this) != 0);
}
Expand All @@ -205,14 +212,30 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}

@Override
public Throwable failure() {
return null;
}

@Override
public ContextView retryContextView() {
return repeatContext;
}

@Override
public long getRepeatsSoFar() {
return totalRetriesSoFar;
}
}

static final class RepeatWhenOtherSubscriber extends Flux<Long>
implements InnerConsumer<Object>, OptimizableOperator<Long, Long> {
static final class RepeatWhenOtherSubscriber extends Flux<Repeat.RepeatSignal>
implements InnerConsumer<Object>, OptimizableOperator<Repeat.RepeatSignal, Repeat.RepeatSignal>
{

RepeatWhenMainSubscriber<?> main;

final Sinks.Many<Long> completionSignal = Sinks.many().multicast().onBackpressureBuffer();
final Sinks.Many<Repeat.RepeatSignal> completionSignal = Sinks.many().multicast().onBackpressureBuffer();

@Override
public Context currentContext() {
Expand All @@ -236,7 +259,11 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(Object t) {
main.resubscribe(t);
if (t.equals(-1L)) {
onComplete();
} else {
main.resubscribe(t);
}
}

@Override
Expand All @@ -250,22 +277,22 @@ public void onComplete() {
}

@Override
public void subscribe(CoreSubscriber<? super Long> actual) {
public void subscribe(CoreSubscriber<? super Repeat.RepeatSignal> actual) {
completionSignal.asFlux().subscribe(actual);
}

@Override
public CoreSubscriber<? super Long> subscribeOrReturn(CoreSubscriber<? super Long> actual) {
public CoreSubscriber<? super Repeat.RepeatSignal> subscribeOrReturn(CoreSubscriber<? super Repeat.RepeatSignal> actual) {
return actual;
}

@Override
public Flux<Long> source() {
public Flux<Repeat.RepeatSignal> source() {
return completionSignal.asFlux();
}

@Override
public OptimizableOperator<?, ? extends Long> nextOptimizableSource() {
public OptimizableOperator<?, ? extends Repeat.RepeatSignal> nextOptimizableSource() {
return null;
}
}
Expand Down
10 changes: 8 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Expand Up @@ -75,6 +75,7 @@
import reactor.util.function.Tuples;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.repeat.Repeat;
import reactor.util.retry.Retry;

/**
Expand Down Expand Up @@ -4066,8 +4067,9 @@ public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate) {
* @return a {@link Flux} that repeats on onComplete when the companion {@link Publisher} produces an
* onNext signal
*/
public final Flux<T> repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) {
return Flux.onAssembly(new MonoRepeatWhen<>(this, repeatFactory));
public final Flux<T> repeatWhen(Repeat repeatFactory) {
//return Flux.onAssembly(new MonoRepeatWhen<>(this, repeatFactory));
return null;
}

/**
Expand Down Expand Up @@ -4110,6 +4112,7 @@ public final Mono<T> repeatWhenEmpty(Function<Flux<Long>, ? extends Publisher<?>
* as long as the companion {@link Publisher} produces an onNext signal and the maximum number of repeats isn't exceeded.
*/
public final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>, ? extends Publisher<?>> repeatFactory) {
/*
return Mono.defer(() -> this.repeatWhen(o -> {
if (maxRepeat == Integer.MAX_VALUE) {
return repeatFactory.apply(o.index().map(Tuple2::getT1));
Expand All @@ -4122,6 +4125,9 @@ public final Mono<T> repeatWhenEmpty(int maxRepeat, Function<Flux<Long>, ? exten
.concatWith(Flux.error(() -> new IllegalStateException("Exceeded maximum number of repeats"))));
}
}).next());
*/
return null;
}


Expand Down
Expand Up @@ -17,11 +17,10 @@
package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.util.repeat.Repeat;

/**
* Repeats a source when a companion sequence signals an item in response to the main's
Expand All @@ -37,10 +36,9 @@
*/
final class MonoRepeatWhen<T> extends FluxFromMonoOperator<T, T> {

final Function<? super Flux<Long>, ? extends Publisher<?>> whenSourceFactory;
final Repeat whenSourceFactory;

MonoRepeatWhen(Mono<? extends T> source,
Function<? super Flux<Long>, ? extends Publisher<?>> whenSourceFactory) {
MonoRepeatWhen(Mono<? extends T> source, Repeat whenSourceFactory) {
super(source);
this.whenSourceFactory =
Objects.requireNonNull(whenSourceFactory, "whenSourceFactory");
Expand All @@ -54,16 +52,16 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
CoreSubscriber<T> serial = Operators.serialize(actual);

FluxRepeatWhen.RepeatWhenMainSubscriber<T> main =
new FluxRepeatWhen.RepeatWhenMainSubscriber<>(serial, other.completionSignal, source);
new FluxRepeatWhen.RepeatWhenMainSubscriber<>(serial, other.completionSignal, source, whenSourceFactory.getRepeatContext());
other.main = main;

serial.onSubscribe(main);

Publisher<?> p;
Publisher<?> p = null;

try {
p = Objects.requireNonNull(whenSourceFactory.apply(other),
"The whenSourceFactory returned a null Publisher");
//p = Objects.requireNonNull(whenSourceFactory.apply(other),
// "The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
Expand Down
@@ -0,0 +1,25 @@
package reactor.util.repeat;

import reactor.util.context.ContextView;

public class ImmutableRepeatSignal implements Repeat.RepeatSignal {
final Throwable failure;
final ContextView retryContext;
final long repeatsSoFar;

public ImmutableRepeatSignal(Throwable failure, ContextView retryContext, long repeatsSoFar) {
this.failure = failure;
this.retryContext = retryContext;
this.repeatsSoFar = repeatsSoFar;
}

@Override
public Throwable failure() {
return failure;
}

@Override
public long getRepeatsSoFar() {
return repeatsSoFar;
}
}

0 comments on commit ab0d08f

Please sign in to comment.