Skip to content

Commit

Permalink
Fix DoOnEach ASYNC fusion triggering onNext signal twice (#3045)
Browse files Browse the repository at this point in the history
This commit fixes the DoOnEachFuseableSubscriber to interpret any
upstream onNext as a fusion trigger in ASYNC mode (onNext(null)).

Furthermore, this commit fixes the Operators.MonoSubscriber handling of
FUSED case. Since that abstract operator only supports ASYNC fusion,
the FUSED_* states are renamed FUSED_ASYNC_*. In the drain loop, even in
FUSED case the operator emits t downstream where it should emit null
technically (again, because FUSED == ASYNC). This error is also fixed.

Fixes #3044.
  • Loading branch information
simonbasle committed May 23, 2022
1 parent 01a55c9 commit bee3d07
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -265,23 +265,33 @@ public String toString() {
static class DoOnEachFuseableSubscriber<T> extends DoOnEachSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {

boolean syncFused;
int fusionMode;

DoOnEachFuseableSubscriber(CoreSubscriber<? super T> actual,
Consumer<? super Signal<T>> onSignal, boolean isMono) {
super(actual, onSignal, isMono);
}

@Override
public void onNext(T t) {
if (this.fusionMode == Fuseable.ASYNC) {
actual.onNext(null);
return;
}
super.onNext(t);
}

@Override
public int requestFusion(int mode) {
QueueSubscription<T> qs = this.qs;
if (qs != null && (mode & Fuseable.THREAD_BARRIER) == 0) {
int m = qs.requestFusion(mode);
if (m != Fuseable.NONE) {
syncFused = m == Fuseable.SYNC;
if (m == Fuseable.SYNC || m == Fuseable.ASYNC) {
this.fusionMode = m;
return m;
}
return m;
}
this.fusionMode = NONE;
return Fuseable.NONE;
}

Expand All @@ -302,7 +312,7 @@ public T poll() {
return null;
}
T v = qs.poll();
if (v == null && syncFused) {
if (v == null && this.fusionMode == SYNC) {
state = STATE_DONE;
try {
onSignal.accept(Signal.complete(cachedContext));
Expand Down
32 changes: 16 additions & 16 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1776,7 +1776,7 @@ public Object scanUnsafe(Attr key) {

@Override
public final void clear() {
STATE.lazySet(this, FUSED_CONSUMED);
STATE.lazySet(this, FUSED_ASYNC_CONSUMED);
this.value = null;
}

Expand All @@ -1790,12 +1790,12 @@ public final void clear() {
public final void complete(@Nullable O v) {
for (; ; ) {
int state = this.state;
if (state == FUSED_EMPTY) {
if (state == FUSED_ASYNC_EMPTY) {
setValue(v);
//sync memory since setValue is non volatile
if (STATE.compareAndSet(this, FUSED_EMPTY, FUSED_READY)) {
if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) {
Subscriber<? super O> a = actual;
a.onNext(v);
a.onNext(null);
a.onComplete();
return;
}
Expand Down Expand Up @@ -1850,7 +1850,7 @@ public final boolean isCancelled() {

@Override
public final boolean isEmpty() {
return this.state != FUSED_READY;
return this.state != FUSED_ASYNC_READY;
}

@Override
Expand All @@ -1877,7 +1877,7 @@ public void onSubscribe(Subscription s) {
@Override
@Nullable
public final O poll() {
if (STATE.compareAndSet(this, FUSED_READY, FUSED_CONSUMED)) {
if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) {
O v = value;
value = null;
return v;
Expand Down Expand Up @@ -1918,7 +1918,7 @@ public void request(long n) {
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
STATE.lazySet(this, FUSED_EMPTY);
STATE.lazySet(this, FUSED_ASYNC_EMPTY);
return ASYNC;
}
return NONE;
Expand Down Expand Up @@ -1963,21 +1963,21 @@ public int size() {
/**
* Indicates the Subscription has been cancelled.
*/
static final int CANCELLED = 4;
static final int CANCELLED = 4;
/**
* Indicates this Subscription is in fusion mode and is currently empty.
* Indicates this Subscription is in ASYNC fusion mode and is currently empty.
*/
static final int FUSED_EMPTY = 8;
static final int FUSED_ASYNC_EMPTY = 8;
/**
* Indicates this Subscription is in fusion mode and has a value.
* Indicates this Subscription is in ASYNC fusion mode and has a value.
*/
static final int FUSED_READY = 16;
static final int FUSED_ASYNC_READY = 16;
/**
* Indicates this Subscription is in fusion mode and its value has been consumed.
* Indicates this Subscription is in ASYNC fusion mode and its value has been consumed.
*/
static final int FUSED_CONSUMED = 32;
static final int FUSED_ASYNC_CONSUMED = 32;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,25 @@

public class FluxDoOnEachTest {


// see https://github.com/reactor/reactor-core/issues/3044
@Test
void doOnEachAsyncFusionDoesntTriggerOnNextTwice() {
List<String> signals = new ArrayList<>();
StepVerifier.create(Flux.just("a", "b", "c")
.collectList()
.doOnEach(sig -> signals.add(sig.toString()))
)
.expectFusion(Fuseable.ASYNC)
.expectNext(Arrays.asList("a", "b", "c"))
.verifyComplete();

assertThat(signals).containsExactly(
"doOnEach_onNext([a, b, c])",
"onComplete()"
);
}

@Test
public void nullSource() {
assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ public void clear() {

ds.clear();

assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_CONSUMED);
assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED);
assertThat(ds.value).isNull();
}

Expand Down

0 comments on commit bee3d07

Please sign in to comment.