Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context propagation: use new APIs from latest snapshots #3256

Merged
merged 3 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ logback = "ch.qos.logback:logback-classic:1.2.11"
micrometer-bom = { module = "io.micrometer:micrometer-bom", version.ref = "micrometer" }
micrometer-commons = { module = "io.micrometer:micrometer-commons" }
micrometer-core = { module = "io.micrometer:micrometer-core" }
micrometer-contextPropagation = "io.micrometer:context-propagation:1.0.0-RC1"
micrometer-contextPropagation = "io.micrometer:context-propagation:1.0.0-SNAPSHOT"
micrometer-docsGenerator = { module = "io.micrometer:micrometer-docs-generator", version = "1.0.0-RC1"}
micrometer-observation-test = { module = "io.micrometer:micrometer-observation-test" }
micrometer-tracing-test = "io.micrometer:micrometer-tracing-integration-test:1.0.0-RC1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;

import reactor.core.CoreSubscriber;
import reactor.core.observability.SignalListener;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -53,8 +53,9 @@ final class ContextPropagation {
Function<Context, Context> contextCaptureFunction;
boolean contextPropagation;
try {
ContextRegistry registry = ContextRegistry.getInstance();
contextCaptureFunction = new ContextCaptureFunction(PREDICATE_TRUE, registry);
ContextRegistry globalRegistry = ContextRegistry.getInstance();
contextCaptureFunction = target -> ContextSnapshot.captureAllUsing(PREDICATE_TRUE, globalRegistry)
.updateContext(target);
contextPropagation = true;
}
catch (LinkageError t) {
Expand Down Expand Up @@ -120,40 +121,34 @@ static Function<Context, Context> contextCapture(Predicate<Object> captureKeyPre
if (!isContextPropagationAvailable) {
return NO_OP;
}
return new ContextCaptureFunction(captureKeyPredicate, null);
return target -> ContextSnapshot.captureAllUsing(captureKeyPredicate, ContextRegistry.getInstance())
.updateContext(target);
}

static <T, R> BiConsumer<T, SynchronousSink<R>> contextRestoreForHandle(BiConsumer<T, SynchronousSink<R>> handler, CoreSubscriber<? super R> actual) {
if (!ContextPropagation.isContextPropagationAvailable() || actual.currentContext().isEmpty()) {
static <T, R> BiConsumer<T, SynchronousSink<R>> contextRestoreForHandle(BiConsumer<T, SynchronousSink<R>> handler, Supplier<Context> contextSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a small comment that the supplier can be overhead for the mono case since one extra object is allocated per subscriber * per operator. I guess that the goal was to avoid context retrieval if there is no need, but not sure what brings more impact.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine since the Supplier in the hot path is CoreSubscriber::currentContext (ie. a method reference).

if (!ContextPropagation.isContextPropagationAvailable()) {
return handler;
}
return new ContextRestoreHandleConsumer<>(handler, ContextRegistry.getInstance(), actual.currentContext());
}

static <T> SignalListener<T> contextRestoringSignalListener(final SignalListener<T> original,
CoreSubscriber<? super T> actual) {
if (!ContextPropagation.isContextPropagationAvailable() || actual.currentContext().isEmpty()) {
return original;
final Context ctx = contextSupplier.get();
if (ctx.isEmpty()) {
return handler;
}
return new ContextRestoreSignalListener<T>(original, actual.currentContext(), ContextRegistry.getInstance());
return (v, sink) -> {
try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(ctx)) {
handler.accept(v, sink);
}
};
}

//the Function indirection allows tests to directly assert code in this class rather than static methods
static final class ContextCaptureFunction implements Function<Context, Context> {

final Predicate<Object> capturePredicate;
final ContextRegistry registry;

ContextCaptureFunction(Predicate<Object> capturePredicate, @Nullable ContextRegistry registry) {
this.capturePredicate = capturePredicate;
this.registry = registry != null ? registry : ContextRegistry.getInstance();
static <T> SignalListener<T> contextRestoreForTap(final SignalListener<T> original, Supplier<Context> contextSupplier) {
if (!ContextPropagation.isContextPropagationAvailable()) {
return original;
}

@Override
public Context apply(Context target) {
return ContextSnapshot.captureAllUsing(capturePredicate, this.registry)
.updateContext(target);
final Context ctx = contextSupplier.get();
if (ctx.isEmpty()) {
return original;
}
return new ContextRestoreSignalListener<T>(original, ctx, null);
}

//the SignalListener implementation can be tested independently with a test-specific ContextRegistry
Expand All @@ -170,10 +165,7 @@ public ContextRestoreSignalListener(SignalListener<T> original, ContextView cont
}

ContextSnapshot.Scope restoreThreadLocals() {
//TODO for now ContextSnapshot static methods don't allow restoring _all_ TLs without an intermediate ContextSnapshot
return ContextSnapshot
.captureFrom(this.context, k -> true, this.registry)
.setThreadLocals();
return ContextSnapshot.setAllThreadLocalsFrom(this.context, this.registry);
}

@Override
Expand Down Expand Up @@ -288,30 +280,4 @@ public Context addToContext(Context originalContext) {
}
}
}

//the BiConsumer implementation can be tested independently with a test-specific ContextRegistry
static final class ContextRestoreHandleConsumer<T, R> implements BiConsumer<T, SynchronousSink<R>> {

private final BiConsumer<T, SynchronousSink<R>> originalHandler;
private final ContextRegistry registry;
private final ContextView reactorContext;

ContextRestoreHandleConsumer(BiConsumer<T, SynchronousSink<R>> originalHandler, ContextRegistry registry,
ContextView reactorContext) {
this.originalHandler = originalHandler;
this.registry = registry;
this.reactorContext = reactorContext;
}

@Override
public void accept(T t, SynchronousSink<R> sink) {
//TODO for now ContextSnapshot static methods don't allow restoring _all_ TLs without an intermediate ContextSnapshot
final ContextSnapshot snapshot = ContextSnapshot.captureFrom(this.reactorContext, k -> true, this.registry);
try (ContextSnapshot.Scope ignored = snapshot.setThreadLocals(k -> {
return true;
})) {
originalHandler.accept(t, sink);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.Objects;
import java.util.function.BiConsumer;

import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
Expand All @@ -45,7 +45,7 @@ final class FluxHandle<T, R> extends InternalFluxOperator<T, R> {

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
if (actual instanceof Fuseable.ConditionalSubscriber) {
@SuppressWarnings("unchecked")
Fuseable.ConditionalSubscriber<? super R> cs = (Fuseable.ConditionalSubscriber<? super R>) actual;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.Objects;
import java.util.function.BiConsumer;

import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
Expand Down Expand Up @@ -58,7 +58,7 @@ final class FluxHandleFuseable<T, R> extends InternalFluxOperator<T, R> implemen

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
if (actual instanceof ConditionalSubscriber) {
@SuppressWarnings("unchecked")
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package reactor.core.publisher;

import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -59,7 +58,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.annotation.Nullable;

/**
* A {@link reactor.core.Fuseable} generic per-Subscription side effect {@link Flux} that notifies a
Expand Down Expand Up @@ -58,7 +58,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final class MonoHandle<T, R> extends InternalMonoOperator<T, R> {

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
return new FluxHandle.HandleSubscriber<>(actual, handler2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class MonoHandleFuseable<T, R> extends InternalMonoOperator<T, R>

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
return new FluxHandleFuseable.HandleFuseableSubscriber<>(actual, handler2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.FluxTap.TapSubscriber;
import reactor.util.annotation.Nullable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.FluxTap.TapSubscriber;
import reactor.util.annotation.Nullable;

/**
* A generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events.
Expand Down Expand Up @@ -55,7 +55,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.annotation.Nullable;

/**
* A {@link Fuseable} generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events.
Expand Down Expand Up @@ -54,7 +54,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down