Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DoOnEach ASYNC fusion triggering onNext signal twice #3045

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -265,23 +265,33 @@ public String toString() {
static class DoOnEachFuseableSubscriber<T> extends DoOnEachSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {

boolean syncFused;
int fusionMode;

DoOnEachFuseableSubscriber(CoreSubscriber<? super T> actual,
Consumer<? super Signal<T>> onSignal, boolean isMono) {
super(actual, onSignal, isMono);
}

@Override
public void onNext(T t) {
if (this.fusionMode == Fuseable.ASYNC) {
actual.onNext(null);
return;
}
super.onNext(t);
}

@Override
public int requestFusion(int mode) {
QueueSubscription<T> qs = this.qs;
if (qs != null && (mode & Fuseable.THREAD_BARRIER) == 0) {
int m = qs.requestFusion(mode);
if (m != Fuseable.NONE) {
syncFused = m == Fuseable.SYNC;
if (m == Fuseable.SYNC || m == Fuseable.ASYNC) {
this.fusionMode = m;
return m;
}
return m;
}
this.fusionMode = NONE;
return Fuseable.NONE;
}

Expand All @@ -302,7 +312,7 @@ public T poll() {
return null;
}
T v = qs.poll();
if (v == null && syncFused) {
if (v == null && this.fusionMode == SYNC) {
state = STATE_DONE;
try {
onSignal.accept(Signal.complete(cachedContext));
Expand Down
32 changes: 16 additions & 16 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1776,7 +1776,7 @@ public Object scanUnsafe(Attr key) {

@Override
public final void clear() {
STATE.lazySet(this, FUSED_CONSUMED);
STATE.lazySet(this, FUSED_ASYNC_CONSUMED);
this.value = null;
}

Expand All @@ -1790,12 +1790,12 @@ public final void clear() {
public final void complete(@Nullable O v) {
for (; ; ) {
int state = this.state;
if (state == FUSED_EMPTY) {
if (state == FUSED_ASYNC_EMPTY) {
setValue(v);
//sync memory since setValue is non volatile
if (STATE.compareAndSet(this, FUSED_EMPTY, FUSED_READY)) {
if (STATE.compareAndSet(this, FUSED_ASYNC_EMPTY, FUSED_ASYNC_READY)) {
Subscriber<? super O> a = actual;
a.onNext(v);
a.onNext(null);
a.onComplete();
return;
}
Expand Down Expand Up @@ -1850,7 +1850,7 @@ public final boolean isCancelled() {

@Override
public final boolean isEmpty() {
return this.state != FUSED_READY;
return this.state != FUSED_ASYNC_READY;
}

@Override
Expand All @@ -1877,7 +1877,7 @@ public void onSubscribe(Subscription s) {
@Override
@Nullable
public final O poll() {
if (STATE.compareAndSet(this, FUSED_READY, FUSED_CONSUMED)) {
if (STATE.compareAndSet(this, FUSED_ASYNC_READY, FUSED_ASYNC_CONSUMED)) {
O v = value;
value = null;
return v;
Expand Down Expand Up @@ -1918,7 +1918,7 @@ public void request(long n) {
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
STATE.lazySet(this, FUSED_EMPTY);
STATE.lazySet(this, FUSED_ASYNC_EMPTY);
return ASYNC;
}
return NONE;
Expand Down Expand Up @@ -1963,21 +1963,21 @@ public int size() {
/**
* Indicates the Subscription has been cancelled.
*/
static final int CANCELLED = 4;
static final int CANCELLED = 4;
/**
* Indicates this Subscription is in fusion mode and is currently empty.
* Indicates this Subscription is in ASYNC fusion mode and is currently empty.
*/
static final int FUSED_EMPTY = 8;
static final int FUSED_ASYNC_EMPTY = 8;
/**
* Indicates this Subscription is in fusion mode and has a value.
* Indicates this Subscription is in ASYNC fusion mode and has a value.
*/
static final int FUSED_READY = 16;
static final int FUSED_ASYNC_READY = 16;
/**
* Indicates this Subscription is in fusion mode and its value has been consumed.
* Indicates this Subscription is in ASYNC fusion mode and its value has been consumed.
*/
static final int FUSED_CONSUMED = 32;
static final int FUSED_ASYNC_CONSUMED = 32;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
static final AtomicIntegerFieldUpdater<MonoSubscriber> STATE =
AtomicIntegerFieldUpdater.newUpdater(MonoSubscriber.class, "state");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,25 @@

public class FluxDoOnEachTest {


// see https://github.com/reactor/reactor-core/issues/3044
@Test
void doOnEachAsyncFusionDoesntTriggerOnNextTwice() {
List<String> signals = new ArrayList<>();
StepVerifier.create(Flux.just("a", "b", "c")
.collectList()
.doOnEach(sig -> signals.add(sig.toString()))
)
.expectFusion(Fuseable.ASYNC)
.expectNext(Arrays.asList("a", "b", "c"))
.verifyComplete();

assertThat(signals).containsExactly(
"doOnEach_onNext([a, b, c])",
"onComplete()"
);
}

@Test
public void nullSource() {
assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ public void clear() {

ds.clear();

assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_CONSUMED);
assertThat(ds.state).isEqualTo(MonoSubscriber.FUSED_ASYNC_CONSUMED);
assertThat(ds.value).isNull();
}

Expand Down