Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve auto context propagation in lifting and ConnectableFlux intersection #3787

Merged
merged 6 commits into from May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,13 @@
*/
public abstract class ConnectableFlux<T> extends Flux<T> {

static <T> ConnectableFlux<T> from(ConnectableFlux<T> source) {
if (ContextPropagationSupport.shouldWrapPublisher(source)) {
return new ConnectableFluxRestoringThreadLocals<>(source);
}
return source;
}

/**
* Connects this {@link ConnectableFlux} to the upstream source when the first {@link org.reactivestreams.Subscriber}
* subscribes.
Expand Down
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Consumer;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;

class ConnectableFluxRestoringThreadLocals<T> extends ConnectableFlux<T> {

private final ConnectableFlux<T> source;

public ConnectableFluxRestoringThreadLocals(ConnectableFlux<T> source) {
this.source = Objects.requireNonNull(source, "source");
}

@Override
public void connect(Consumer<? super Disposable> cancelSupport) {
source.connect(cancelSupport);
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(Operators.restoreContextOnSubscriber(source, actual));
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,8 +68,7 @@ public String stepName() {

@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));
CoreSubscriber<? super I> input = liftFunction.lifter.apply(source, actual);
chemicL marked this conversation as resolved.
Show resolved Hide resolved

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,8 +70,7 @@ public String stepName() {

@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));
CoreSubscriber<? super I> input = liftFunction.lifter.apply(source, actual);
chemicL marked this conversation as resolved.
Show resolved Hide resolved

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@

import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.context.ThreadLocalAccessor;
import reactor.core.Fuseable;
import reactor.core.observability.SignalListener;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
Expand Down Expand Up @@ -60,8 +61,10 @@ final class ContextPropagation {
}
}

static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux) {
return new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux, boolean fuseable) {
return fuseable ?
new FluxContextWriteRestoringThreadLocalsFuseable<>(flux, Function.identity())
: new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
}

static <T> Mono<T> monoRestoreThreadLocals(Mono<? extends T> mono) {
Expand Down
10 changes: 6 additions & 4 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Expand Up @@ -11132,7 +11132,8 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
if (!shouldWrap) {
return (Flux<I>) source;
}
return ContextPropagation.fluxRestoreThreadLocals((Flux<? extends I>) source);
return ContextPropagation.fluxRestoreThreadLocals(
(Flux<? extends I>) source, source instanceof Fuseable);
}

//for scalars we'll instantiate the operators directly to avoid onAssembly
Expand All @@ -11151,19 +11152,20 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
}

Flux<I> target;
boolean fuseable = source instanceof Fuseable;
if (source instanceof Mono) {
if (source instanceof Fuseable) {
if (fuseable) {
target = new FluxSourceMonoFuseable<>((Mono<I>) source);
} else {
target = new FluxSourceMono<>((Mono<I>) source);
}
} else if (source instanceof Fuseable) {
} else if (fuseable) {
target = new FluxSourceFuseable<>(source);
} else {
target = new FluxSource<>(source);
}
if (shouldWrap) {
return ContextPropagation.fluxRestoreThreadLocals(target);
return ContextPropagation.fluxRestoreThreadLocals(target, fuseable);
}
return target;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ final class FluxAutoConnect<T> extends Flux<T>
if (n <= 0) {
throw new IllegalArgumentException("n > required but it was " + n);
}
this.source = Objects.requireNonNull(source, "source");
this.source = ConnectableFlux.from(Objects.requireNonNull(source, "source"));
this.cancelSupport = Objects.requireNonNull(cancelSupport, "cancelSupport");
REMAINING.lazySet(this, n);
}
Expand All @@ -75,6 +75,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.CAPACITY) return remaining;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == InternalProducerAttr.INSTANCE) return true;

return null;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ final class FluxAutoConnectFuseable<T> extends Flux<T>
if (n <= 0) {
throw new IllegalArgumentException("n > required but it was " + n);
}
this.source = Objects.requireNonNull(source, "source");
this.source = ConnectableFlux.from(Objects.requireNonNull(source, "source"));
this.cancelSupport = Objects.requireNonNull(cancelSupport, "cancelSupport");
REMAINING.lazySet(this, n);
}
Expand All @@ -76,6 +76,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.CAPACITY) return remaining;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == InternalProducerAttr.INSTANCE) return true;

return null;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,46 +173,4 @@ public void cancel() {
}
}
}

static final class FuseableContextWriteRestoringThreadLocalsSubscriber<T>
extends ContextWriteRestoringThreadLocalsSubscriber<T>
implements Fuseable.QueueSubscription<T> {

FuseableContextWriteRestoringThreadLocalsSubscriber(
CoreSubscriber<? super T> actual, Context context) {
super(actual, context);
}

// Required for
// FuseableBestPracticesTest.coreFuseableSubscribersShouldNotExtendNonFuseableOnNext
@Override
public void onNext(T t) {
super.onNext(t);
}

@Override
public T poll() {
throw new UnsupportedOperationException("Nope");
}

@Override
public int requestFusion(int requestedMode) {
return Fuseable.NONE;
}

@Override
public int size() {
throw new UnsupportedOperationException("Nope");
}

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("Nope");
}

@Override
public void clear() {
throw new UnsupportedOperationException("Nope");
}
}
}