Skip to content

Commit

Permalink
Context propagation: use new APIs from latest snapshots (#3256)
Browse files Browse the repository at this point in the history
This commit upgrades context-propagation to post-RC1 snapshots.
This allows for several polishes and improvements:

ContextSnapshot.setAllThreadLocalsFrom can be used as a way of
restoring thread locasl while avoiding intermediary representation.

With the introduction of removal methods in ContextRegistry, it becomes
more practical to use the ContextRegistry.getInstance global instance.
Following that, two of the three intermediate classes are no longer
needed and can be replaced with lambdas: ContextCaptureFunction and
ContextRestoreHandleConsumer.

Instead of calling the ContextPropagation methods with a subscriber
(which necessitates some mocking in tests), we now take a Supplier
of Context. A CoreSubscriber::currentContext method reference can
do the trick in production code.
  • Loading branch information
simonbasle committed Nov 2, 2022
1 parent b31b331 commit a62e031
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 118 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ logback = "ch.qos.logback:logback-classic:1.2.11"
micrometer-bom = { module = "io.micrometer:micrometer-bom", version.ref = "micrometer" }
micrometer-commons = { module = "io.micrometer:micrometer-commons" }
micrometer-core = { module = "io.micrometer:micrometer-core" }
micrometer-contextPropagation = "io.micrometer:context-propagation:1.0.0-RC1"
micrometer-contextPropagation = "io.micrometer:context-propagation:1.0.0-SNAPSHOT"
micrometer-docsGenerator = { module = "io.micrometer:micrometer-docs-generator", version = "1.0.0-RC1"}
micrometer-observation-test = { module = "io.micrometer:micrometer-observation-test" }
micrometer-tracing-test = "io.micrometer:micrometer-tracing-integration-test:1.0.0-RC1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;

import reactor.core.CoreSubscriber;
import reactor.core.observability.SignalListener;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand Down Expand Up @@ -53,8 +53,9 @@ final class ContextPropagation {
Function<Context, Context> contextCaptureFunction;
boolean contextPropagation;
try {
ContextRegistry registry = ContextRegistry.getInstance();
contextCaptureFunction = new ContextCaptureFunction(PREDICATE_TRUE, registry);
ContextRegistry globalRegistry = ContextRegistry.getInstance();
contextCaptureFunction = target -> ContextSnapshot.captureAllUsing(PREDICATE_TRUE, globalRegistry)
.updateContext(target);
contextPropagation = true;
}
catch (LinkageError t) {
Expand Down Expand Up @@ -120,40 +121,34 @@ static Function<Context, Context> contextCapture(Predicate<Object> captureKeyPre
if (!isContextPropagationAvailable) {
return NO_OP;
}
return new ContextCaptureFunction(captureKeyPredicate, null);
return target -> ContextSnapshot.captureAllUsing(captureKeyPredicate, ContextRegistry.getInstance())
.updateContext(target);
}

static <T, R> BiConsumer<T, SynchronousSink<R>> contextRestoreForHandle(BiConsumer<T, SynchronousSink<R>> handler, CoreSubscriber<? super R> actual) {
if (!ContextPropagation.isContextPropagationAvailable() || actual.currentContext().isEmpty()) {
static <T, R> BiConsumer<T, SynchronousSink<R>> contextRestoreForHandle(BiConsumer<T, SynchronousSink<R>> handler, Supplier<Context> contextSupplier) {
if (!ContextPropagation.isContextPropagationAvailable()) {
return handler;
}
return new ContextRestoreHandleConsumer<>(handler, ContextRegistry.getInstance(), actual.currentContext());
}

static <T> SignalListener<T> contextRestoringSignalListener(final SignalListener<T> original,
CoreSubscriber<? super T> actual) {
if (!ContextPropagation.isContextPropagationAvailable() || actual.currentContext().isEmpty()) {
return original;
final Context ctx = contextSupplier.get();
if (ctx.isEmpty()) {
return handler;
}
return new ContextRestoreSignalListener<T>(original, actual.currentContext(), ContextRegistry.getInstance());
return (v, sink) -> {
try (ContextSnapshot.Scope ignored = ContextSnapshot.setAllThreadLocalsFrom(ctx)) {
handler.accept(v, sink);
}
};
}

//the Function indirection allows tests to directly assert code in this class rather than static methods
static final class ContextCaptureFunction implements Function<Context, Context> {

final Predicate<Object> capturePredicate;
final ContextRegistry registry;

ContextCaptureFunction(Predicate<Object> capturePredicate, @Nullable ContextRegistry registry) {
this.capturePredicate = capturePredicate;
this.registry = registry != null ? registry : ContextRegistry.getInstance();
static <T> SignalListener<T> contextRestoreForTap(final SignalListener<T> original, Supplier<Context> contextSupplier) {
if (!ContextPropagation.isContextPropagationAvailable()) {
return original;
}

@Override
public Context apply(Context target) {
return ContextSnapshot.captureAllUsing(capturePredicate, this.registry)
.updateContext(target);
final Context ctx = contextSupplier.get();
if (ctx.isEmpty()) {
return original;
}
return new ContextRestoreSignalListener<T>(original, ctx, null);
}

//the SignalListener implementation can be tested independently with a test-specific ContextRegistry
Expand All @@ -170,10 +165,7 @@ public ContextRestoreSignalListener(SignalListener<T> original, ContextView cont
}

ContextSnapshot.Scope restoreThreadLocals() {
//TODO for now ContextSnapshot static methods don't allow restoring _all_ TLs without an intermediate ContextSnapshot
return ContextSnapshot
.captureFrom(this.context, k -> true, this.registry)
.setThreadLocals();
return ContextSnapshot.setAllThreadLocalsFrom(this.context, this.registry);
}

@Override
Expand Down Expand Up @@ -288,30 +280,4 @@ public Context addToContext(Context originalContext) {
}
}
}

//the BiConsumer implementation can be tested independently with a test-specific ContextRegistry
static final class ContextRestoreHandleConsumer<T, R> implements BiConsumer<T, SynchronousSink<R>> {

private final BiConsumer<T, SynchronousSink<R>> originalHandler;
private final ContextRegistry registry;
private final ContextView reactorContext;

ContextRestoreHandleConsumer(BiConsumer<T, SynchronousSink<R>> originalHandler, ContextRegistry registry,
ContextView reactorContext) {
this.originalHandler = originalHandler;
this.registry = registry;
this.reactorContext = reactorContext;
}

@Override
public void accept(T t, SynchronousSink<R> sink) {
//TODO for now ContextSnapshot static methods don't allow restoring _all_ TLs without an intermediate ContextSnapshot
final ContextSnapshot snapshot = ContextSnapshot.captureFrom(this.reactorContext, k -> true, this.registry);
try (ContextSnapshot.Scope ignored = snapshot.setThreadLocals(k -> {
return true;
})) {
originalHandler.accept(t, sink);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.Objects;
import java.util.function.BiConsumer;

import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
Expand All @@ -45,7 +45,7 @@ final class FluxHandle<T, R> extends InternalFluxOperator<T, R> {

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
if (actual instanceof Fuseable.ConditionalSubscriber) {
@SuppressWarnings("unchecked")
Fuseable.ConditionalSubscriber<? super R> cs = (Fuseable.ConditionalSubscriber<? super R>) actual;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.Objects;
import java.util.function.BiConsumer;

import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
Expand Down Expand Up @@ -58,7 +58,7 @@ final class FluxHandleFuseable<T, R> extends InternalFluxOperator<T, R> implemen

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
if (actual instanceof ConditionalSubscriber) {
@SuppressWarnings("unchecked")
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package reactor.core.publisher;

import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -59,7 +58,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.annotation.Nullable;

/**
* A {@link reactor.core.Fuseable} generic per-Subscription side effect {@link Flux} that notifies a
Expand Down Expand Up @@ -58,7 +58,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final class MonoHandle<T, R> extends InternalMonoOperator<T, R> {

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
return new FluxHandle.HandleSubscriber<>(actual, handler2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class MonoHandleFuseable<T, R> extends InternalMonoOperator<T, R>

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual);
BiConsumer<? super T, SynchronousSink<R>> handler2 = ContextPropagation.contextRestoreForHandle(this.handler, actual::currentContext);
return new FluxHandleFuseable.HandleFuseableSubscriber<>(actual, handler2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.FluxTap.TapSubscriber;
import reactor.util.annotation.Nullable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.FluxTap.TapSubscriber;
import reactor.util.annotation.Nullable;

/**
* A generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events.
Expand Down Expand Up @@ -55,7 +55,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.annotation.Nullable;

/**
* A {@link Fuseable} generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events.
Expand Down Expand Up @@ -54,7 +54,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}
// Attempt to wrap the SignalListener with one that restores ThreadLocals from Context on each listener methods
// (only if ContextPropagation.isContextPropagationAvailable() is true)
signalListener = ContextPropagation.contextRestoringSignalListener(signalListener, actual);
signalListener = ContextPropagation.contextRestoreForTap(signalListener, actual::currentContext);

try {
signalListener.doFirst();
Expand Down

0 comments on commit a62e031

Please sign in to comment.