Skip to content

Commit

Permalink
Improve auto context propagation in lifting and ConnectableFlux inter…
Browse files Browse the repository at this point in the history
…section (#3787)

Combining ConnectableFlux or Fuseable operators and wrapping via Hooks could lead to ClassCastException or lack of ThreadLocal restoration. ConnectableFlux and Fuseable handling has been improved. Also, lifting now avoids unnecessary multiple-wrapping.

Fixes #3762
  • Loading branch information
chemicL committed May 6, 2024
1 parent 055c0e8 commit c760a0a
Show file tree
Hide file tree
Showing 26 changed files with 519 additions and 112 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,13 @@
*/
public abstract class ConnectableFlux<T> extends Flux<T> {

static <T> ConnectableFlux<T> from(ConnectableFlux<T> source) {
if (ContextPropagationSupport.shouldWrapPublisher(source)) {
return new ConnectableFluxRestoringThreadLocals<>(source);
}
return source;
}

/**
* Connects this {@link ConnectableFlux} to the upstream source when the first {@link org.reactivestreams.Subscriber}
* subscribes.
Expand Down
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Consumer;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;

class ConnectableFluxRestoringThreadLocals<T> extends ConnectableFlux<T> {

private final ConnectableFlux<T> source;

public ConnectableFluxRestoringThreadLocals(ConnectableFlux<T> source) {
this.source = Objects.requireNonNull(source, "source");
}

@Override
public void connect(Consumer<? super Disposable> cancelSupport) {
source.connect(cancelSupport);
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(Operators.restoreContextOnSubscriber(source, actual));
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,8 +68,8 @@ public String stepName() {

@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));
// No need to wrap actual for CP, the Operators$LiftFunction handles it.
CoreSubscriber<? super I> input = liftFunction.lifter.apply(source, actual);

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,8 +70,8 @@ public String stepName() {

@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));
// No need to wrap actual for CP, the Operators$LiftFunction handles it.
CoreSubscriber<? super I> input = liftFunction.lifter.apply(source, actual);

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@

import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.context.ThreadLocalAccessor;
import reactor.core.Fuseable;
import reactor.core.observability.SignalListener;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
Expand Down Expand Up @@ -60,8 +61,10 @@ final class ContextPropagation {
}
}

static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux) {
return new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux, boolean fuseable) {
return fuseable ?
new FluxContextWriteRestoringThreadLocalsFuseable<>(flux, Function.identity())
: new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
}

static <T> Mono<T> monoRestoreThreadLocals(Mono<? extends T> mono) {
Expand Down
10 changes: 6 additions & 4 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Expand Up @@ -11132,7 +11132,8 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
if (!shouldWrap) {
return (Flux<I>) source;
}
return ContextPropagation.fluxRestoreThreadLocals((Flux<? extends I>) source);
return ContextPropagation.fluxRestoreThreadLocals(
(Flux<? extends I>) source, source instanceof Fuseable);
}

//for scalars we'll instantiate the operators directly to avoid onAssembly
Expand All @@ -11151,19 +11152,20 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
}

Flux<I> target;
boolean fuseable = source instanceof Fuseable;
if (source instanceof Mono) {
if (source instanceof Fuseable) {
if (fuseable) {
target = new FluxSourceMonoFuseable<>((Mono<I>) source);
} else {
target = new FluxSourceMono<>((Mono<I>) source);
}
} else if (source instanceof Fuseable) {
} else if (fuseable) {
target = new FluxSourceFuseable<>(source);
} else {
target = new FluxSource<>(source);
}
if (shouldWrap) {
return ContextPropagation.fluxRestoreThreadLocals(target);
return ContextPropagation.fluxRestoreThreadLocals(target, fuseable);
}
return target;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ final class FluxAutoConnect<T> extends Flux<T>
if (n <= 0) {
throw new IllegalArgumentException("n > required but it was " + n);
}
this.source = Objects.requireNonNull(source, "source");
this.source = ConnectableFlux.from(Objects.requireNonNull(source, "source"));
this.cancelSupport = Objects.requireNonNull(cancelSupport, "cancelSupport");
REMAINING.lazySet(this, n);
}
Expand All @@ -75,6 +75,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.CAPACITY) return remaining;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == InternalProducerAttr.INSTANCE) return true;

return null;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ final class FluxAutoConnectFuseable<T> extends Flux<T>
if (n <= 0) {
throw new IllegalArgumentException("n > required but it was " + n);
}
this.source = Objects.requireNonNull(source, "source");
this.source = ConnectableFlux.from(Objects.requireNonNull(source, "source"));
this.cancelSupport = Objects.requireNonNull(cancelSupport, "cancelSupport");
REMAINING.lazySet(this, n);
}
Expand All @@ -76,6 +76,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.CAPACITY) return remaining;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == InternalProducerAttr.INSTANCE) return true;

return null;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,46 +173,4 @@ public void cancel() {
}
}
}

static final class FuseableContextWriteRestoringThreadLocalsSubscriber<T>
extends ContextWriteRestoringThreadLocalsSubscriber<T>
implements Fuseable.QueueSubscription<T> {

FuseableContextWriteRestoringThreadLocalsSubscriber(
CoreSubscriber<? super T> actual, Context context) {
super(actual, context);
}

// Required for
// FuseableBestPracticesTest.coreFuseableSubscribersShouldNotExtendNonFuseableOnNext
@Override
public void onNext(T t) {
super.onNext(t);
}

@Override
public T poll() {
throw new UnsupportedOperationException("Nope");
}

@Override
public int requestFusion(int requestedMode) {
return Fuseable.NONE;
}

@Override
public int size() {
throw new UnsupportedOperationException("Nope");
}

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("Nope");
}

@Override
public void clear() {
throw new UnsupportedOperationException("Nope");
}
}
}

0 comments on commit c760a0a

Please sign in to comment.