Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DoOnEach ASYNC fusion triggering onNext signal twice #3045

Merged
merged 1 commit into from May 23, 2022

Conversation

simonbasle
Copy link
Member

This commit fixes the DoOnEachFuseableSubscriber to interpret any
upstream onNext as a fusion trigger in ASYNC mode (onNext(null)).

Furthermore, this commit fixes the Operators.MonoSubscriber handling of
FUSED case. Since that abstract operator only supports ASYNC fusion,
the FUSED_* states are renamed FUSED_ASYNC_*. In the drain loop, even in
FUSED case the operator emits t downstream where it should emit null
technically (again, because FUSED == ASYNC). This error is also fixed.

Fixes #3044.

This commit fixes the DoOnEachFuseableSubscriber to interpret any
upstream onNext as a fusion trigger in ASYNC mode (onNext(null)).

Furthermore, this commit fixes the Operators.MonoSubscriber handling of
FUSED case. Since that abstract operator only supports ASYNC fusion,
the FUSED_* states are renamed FUSED_ASYNC_*. In the drain loop, even in
FUSED case the operator emits t downstream where it should emit null
technically (again, because FUSED == ASYNC). This error is also fixed.

Fixes #3044.
@simonbasle simonbasle requested a review from a team as a code owner May 19, 2022 14:52
@simonbasle simonbasle added this to the 3.4.19 milestone May 19, 2022
@simonbasle simonbasle added the type/bug A general bug label May 19, 2022
@simonbasle simonbasle self-assigned this May 19, 2022
@simonbasle simonbasle merged commit bee3d07 into 3.4.x May 23, 2022
@simonbasle simonbasle deleted the 3044-doOnEachAsyncFusionBug branch May 23, 2022 09:06
simonbasle added a commit that referenced this pull request May 23, 2022
simonbasle added a commit that referenced this pull request Jun 7, 2022
As a follow-up to #3045, this commit introduces an ArchUnit rule to
surface other cases where a Subscriber is a QueueSubscription but
doesn't implement `onNext` (and thus can break ASYNC fusion).

Three additional case have thus been surfaced, two of which cannot be
fixed entirely.

`doFinally`: in the `poll()` path there was no catching of `qs.poll()`
exceptions BUT unfortunately there is no clean way to implement fused
doFinally with thrown exceptions. For instance, assuming the following
is fused:
```
 .doFinally(sig -> signals.add("finally"))
 .doOnError(e -> signals.add("onError"))
```
then the `doFinally` handler is invoked BEFORE the doOnError (which is
contrary to the documented order and the unfused order). This commit
thus removes `Fuseable` trait from doFinally operators.

`doAfterTerminate`: this is similar to the above situation, but fusion
support needs not be entirely removed from `MonoPeekTerminal` if no
"afterXxx" handler is defined. This commit thus forces fusion to be
negotiated to `NONE` in case a `MonoPeekTerminal` has an
`afterTerminateHandler`, and only in that case.

`doOnError`: didn't catch polling exceptions either, so the `doOnError`
handler from the above snippet was not even invoked. This commit fixes
that case.

Lastly, this commit fixes an out-of-order issue in `StepVerifier`'s
handling of fusion: StepVerifierBuilder must cancel the upstream AFTER
having propagated onError in ASYNC mode, otherwise it may hang.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants