Skip to content

Commit

Permalink
Merge #3787 into 3.7.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed May 6, 2024
2 parents c2425ed + c760a0a commit 1b24e45
Show file tree
Hide file tree
Showing 26 changed files with 519 additions and 112 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,13 @@
*/
public abstract class ConnectableFlux<T> extends Flux<T> {

static <T> ConnectableFlux<T> from(ConnectableFlux<T> source) {
if (ContextPropagationSupport.shouldWrapPublisher(source)) {
return new ConnectableFluxRestoringThreadLocals<>(source);
}
return source;
}

/**
* Connects this {@link ConnectableFlux} to the upstream source when the first {@link org.reactivestreams.Subscriber}
* subscribes.
Expand Down
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Objects;
import java.util.function.Consumer;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;

class ConnectableFluxRestoringThreadLocals<T> extends ConnectableFlux<T> {

private final ConnectableFlux<T> source;

public ConnectableFluxRestoringThreadLocals(ConnectableFlux<T> source) {
this.source = Objects.requireNonNull(source, "source");
}

@Override
public void connect(Consumer<? super Disposable> cancelSupport) {
source.connect(cancelSupport);
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(Operators.restoreContextOnSubscriber(source, actual));
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,8 +68,8 @@ public String stepName() {

@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));
// No need to wrap actual for CP, the Operators$LiftFunction handles it.
CoreSubscriber<? super I> input = liftFunction.lifter.apply(source, actual);

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,8 +70,8 @@ public String stepName() {

@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));
// No need to wrap actual for CP, the Operators$LiftFunction handles it.
CoreSubscriber<? super I> input = liftFunction.lifter.apply(source, actual);

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@

import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.context.ThreadLocalAccessor;
import reactor.core.Fuseable;
import reactor.core.observability.SignalListener;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
Expand Down Expand Up @@ -60,8 +61,10 @@ final class ContextPropagation {
}
}

static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux) {
return new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux, boolean fuseable) {
return fuseable ?
new FluxContextWriteRestoringThreadLocalsFuseable<>(flux, Function.identity())
: new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
}

static <T> Mono<T> monoRestoreThreadLocals(Mono<? extends T> mono) {
Expand Down
10 changes: 6 additions & 4 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Expand Up @@ -11132,7 +11132,8 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
if (!shouldWrap) {
return (Flux<I>) source;
}
return ContextPropagation.fluxRestoreThreadLocals((Flux<? extends I>) source);
return ContextPropagation.fluxRestoreThreadLocals(
(Flux<? extends I>) source, source instanceof Fuseable);
}

//for scalars we'll instantiate the operators directly to avoid onAssembly
Expand All @@ -11151,19 +11152,20 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
}

Flux<I> target;
boolean fuseable = source instanceof Fuseable;
if (source instanceof Mono) {
if (source instanceof Fuseable) {
if (fuseable) {
target = new FluxSourceMonoFuseable<>((Mono<I>) source);
} else {
target = new FluxSourceMono<>((Mono<I>) source);
}
} else if (source instanceof Fuseable) {
} else if (fuseable) {
target = new FluxSourceFuseable<>(source);
} else {
target = new FluxSource<>(source);
}
if (shouldWrap) {
return ContextPropagation.fluxRestoreThreadLocals(target);
return ContextPropagation.fluxRestoreThreadLocals(target, fuseable);
}
return target;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ final class FluxAutoConnect<T> extends Flux<T>
if (n <= 0) {
throw new IllegalArgumentException("n > required but it was " + n);
}
this.source = Objects.requireNonNull(source, "source");
this.source = ConnectableFlux.from(Objects.requireNonNull(source, "source"));
this.cancelSupport = Objects.requireNonNull(cancelSupport, "cancelSupport");
REMAINING.lazySet(this, n);
}
Expand All @@ -75,6 +75,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.CAPACITY) return remaining;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == InternalProducerAttr.INSTANCE) return true;

return null;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ final class FluxAutoConnectFuseable<T> extends Flux<T>
if (n <= 0) {
throw new IllegalArgumentException("n > required but it was " + n);
}
this.source = Objects.requireNonNull(source, "source");
this.source = ConnectableFlux.from(Objects.requireNonNull(source, "source"));
this.cancelSupport = Objects.requireNonNull(cancelSupport, "cancelSupport");
REMAINING.lazySet(this, n);
}
Expand All @@ -76,6 +76,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.CAPACITY) return remaining;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
if (key == InternalProducerAttr.INSTANCE) return true;

return null;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,46 +173,4 @@ public void cancel() {
}
}
}

static final class FuseableContextWriteRestoringThreadLocalsSubscriber<T>
extends ContextWriteRestoringThreadLocalsSubscriber<T>
implements Fuseable.QueueSubscription<T> {

FuseableContextWriteRestoringThreadLocalsSubscriber(
CoreSubscriber<? super T> actual, Context context) {
super(actual, context);
}

// Required for
// FuseableBestPracticesTest.coreFuseableSubscribersShouldNotExtendNonFuseableOnNext
@Override
public void onNext(T t) {
super.onNext(t);
}

@Override
public T poll() {
throw new UnsupportedOperationException("Nope");
}

@Override
public int requestFusion(int requestedMode) {
return Fuseable.NONE;
}

@Override
public int size() {
throw new UnsupportedOperationException("Nope");
}

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("Nope");
}

@Override
public void clear() {
throw new UnsupportedOperationException("Nope");
}
}
}

0 comments on commit 1b24e45

Please sign in to comment.