diff --git a/reactor-netty5-core/build.gradle b/reactor-netty5-core/build.gradle index b94c3ca0ab..8122ffdace 100644 --- a/reactor-netty5-core/build.gradle +++ b/reactor-netty5-core/build.gradle @@ -41,10 +41,16 @@ ext { sourceSets { jarFileTest + contextPropagationTest { + compileClasspath += sourceSets.main.output + runtimeClasspath += sourceSets.main.output + } } configurations { shaded + contextPropagationTestImplementation.extendsFrom(implementation) + contextPropagationTestRuntimeOnly.extendsFrom(runtimeOnly) } dependencies { @@ -107,8 +113,6 @@ dependencies { compileOnly "io.micrometer:micrometer-core:$micrometerVersion" compileOnly "io.micrometer:micrometer-tracing:$micrometerTracingVersion" - compileOnly "io.micrometer:context-propagation:${contextPropagationVersion}" - // Logging compileOnly "org.slf4j:slf4j-api:$slf4jVersion" @@ -153,6 +157,12 @@ dependencies { jarFileTestImplementation "org.assertj:assertj-core:$assertJVersion" jarFileTestImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" jarFileTestRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + + contextPropagationTestImplementation "org.assertj:assertj-core:$assertJVersion" + contextPropagationTestImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" + contextPropagationTestImplementation "io.micrometer:micrometer-core:$micrometerVersion" + contextPropagationTestImplementation "io.micrometer:context-propagation:$contextPropagationVersion" + contextPropagationTestRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" } jar { @@ -431,6 +441,13 @@ task jarFileTest(type: Test) { tasks.check.dependsOn(jarFileTest) +task contextPropagationTest(type: Test) { + testClassesDirs = sourceSets.contextPropagationTest.output.classesDirs + classpath = sourceSets.contextPropagationTest.runtimeClasspath +} + +tasks.check.dependsOn(contextPropagationTest) + task shadedJarTest(type: Test) { testClassesDirs = sourceSets.test.output.classesDirs diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/ContextPropagationUtils.java b/reactor-netty5-core/src/contextPropagationTest/java/reactor/netty5/ContextPropagationTest.java similarity index 54% rename from reactor-netty5-core/src/main/java/reactor/netty5/resources/ContextPropagationUtils.java rename to reactor-netty5-core/src/contextPropagationTest/java/reactor/netty5/ContextPropagationTest.java index e7d3ac4ae3..035d055fd1 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/ContextPropagationUtils.java +++ b/reactor-netty5-core/src/contextPropagationTest/java/reactor/netty5/ContextPropagationTest.java @@ -13,24 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package reactor.netty5.resources; +package reactor.netty5; -import io.micrometer.context.ContextSnapshot; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; -import reactor.util.context.Context; -import reactor.util.context.ContextView; +import org.junit.jupiter.api.Test; -import java.util.function.Function; -import java.util.function.Predicate; +import static org.assertj.core.api.Assertions.assertThat; -final class ContextPropagationUtils { +class ContextPropagationTest { - static final Predicate OBSERVATION_KEY = k -> k == ObservationThreadLocalAccessor.KEY; - - static Function captureObservation(ContextView source) { - ContextSnapshot snapshot = ContextSnapshot.captureUsing(OBSERVATION_KEY, source); - return snapshot::updateContext; + @Test + void testObservationKey() { + assertThat(Metrics.OBSERVATION_KEY).isEqualTo(ObservationThreadLocalAccessor.KEY); } - - private ContextPropagationUtils() {} } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/Metrics.java b/reactor-netty5-core/src/main/java/reactor/netty5/Metrics.java index 787a0c3afe..2c78b7de0e 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/Metrics.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/Metrics.java @@ -17,10 +17,13 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationHandler; import io.micrometer.observation.ObservationRegistry; import reactor.netty5.observability.ReactorNettyTimerObservationHandler; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -33,6 +36,7 @@ */ public class Metrics { public static final MeterRegistry REGISTRY = io.micrometer.core.instrument.Metrics.globalRegistry; + public static final String OBSERVATION_KEY = "micrometer.observation"; public static final ObservationRegistry OBSERVATION_REGISTRY = ObservationRegistry.create(); static { OBSERVATION_REGISTRY.observationConfig().observationHandler( @@ -268,6 +272,13 @@ public class Metrics { public static final String ERROR = "ERROR"; + @Nullable + public static Observation currentObservation(ContextView contextView) { + if (contextView.hasKey(OBSERVATION_KEY)) { + return contextView.get(OBSERVATION_KEY); + } + return OBSERVATION_REGISTRY.getCurrentObservation(); + } @Nullable public static String formatSocketAddress(@Nullable SocketAddress socketAddress) { @@ -281,4 +292,8 @@ public static String formatSocketAddress(@Nullable SocketAddress socketAddress) } return null; } + + public static Context updateContext(Context context, Object observation) { + return context.hasKey(OBSERVATION_KEY) ? context : context.put(OBSERVATION_KEY, observation); + } } diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/channel/MicrometerChannelMetricsHandler.java b/reactor-netty5-core/src/main/java/reactor/netty5/channel/MicrometerChannelMetricsHandler.java index 9ebc8c6978..b566cebcca 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/channel/MicrometerChannelMetricsHandler.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/channel/MicrometerChannelMetricsHandler.java @@ -18,7 +18,6 @@ import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.Timer; import io.micrometer.observation.Observation; -import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.netty5.channel.ChannelHandler; import io.netty5.channel.ChannelHandlerContext; import io.netty5.handler.ssl.SslHandler; @@ -32,6 +31,7 @@ import static reactor.netty5.Metrics.CONNECT_TIME; import static reactor.netty5.Metrics.ERROR; +import static reactor.netty5.Metrics.OBSERVATION_KEY; import static reactor.netty5.Metrics.OBSERVATION_REGISTRY; import static reactor.netty5.Metrics.SUCCESS; import static reactor.netty5.Metrics.TLS_HANDSHAKE_TIME; @@ -108,8 +108,8 @@ public Future connect(ChannelHandlerContext ctx, SocketAddress remoteAddre Observation observation = Observation.createNotStarted(recorder.name() + CONNECT_TIME, this, OBSERVATION_REGISTRY); if (ctx.channel().hasAttr(CONTEXT_VIEW)) { ContextView contextView = ctx.channel().attr(CONTEXT_VIEW).get(); - if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) { - observation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY)); + if (contextView.hasKey(OBSERVATION_KEY)) { + observation.parentObservation(contextView.get(OBSERVATION_KEY)); } } observation.start(); @@ -165,8 +165,8 @@ public void channelActive(ChannelHandlerContext ctx) { observation = Observation.createNotStarted(recorder.name() + TLS_HANDSHAKE_TIME, this, OBSERVATION_REGISTRY); if (ctx.channel().hasAttr(CONTEXT_VIEW)) { ContextView contextView = ctx.channel().attr(CONTEXT_VIEW).get(); - if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) { - observation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY)); + if (contextView.hasKey(OBSERVATION_KEY)) { + observation.parentObservation(contextView.get(OBSERVATION_KEY)); } } observation.start(); diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/internal/util/Metrics.java b/reactor-netty5-core/src/main/java/reactor/netty5/internal/util/Metrics.java index ba1ee3a687..993ed5fa4d 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/internal/util/Metrics.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/internal/util/Metrics.java @@ -24,7 +24,6 @@ */ public class Metrics { - static final boolean isContextPropagationAvailable; static final boolean isMicrometerAvailable; static final boolean isTracingAvailable; @@ -48,25 +47,6 @@ public class Metrics { tracing = false; } isTracingAvailable = tracing; - - boolean contextPropagation; - try { - io.micrometer.context.ContextRegistry.getInstance(); - contextPropagation = true; - } - catch (Throwable t) { - contextPropagation = false; - } - isContextPropagationAvailable = contextPropagation; - } - - /** - * Check if the current runtime supports context propagation, by verifying if Context Propagation is on the classpath. - * - * @return true if the Micrometer is available - */ - public static boolean isContextPropagationAvailable() { - return isContextPropagationAvailable; } /** diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java index be6e4c2e35..9f15d31a5f 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/resources/DefaultPooledConnectionProvider.java @@ -79,9 +79,9 @@ protected CoreSubscriber> createDisposableAcquire( long pendingAcquireTimeout, InstrumentedPool pool, MonoSink sink, - Function snapshot) { + Context currentContext) { return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), - pendingAcquireTimeout, pool, sink, snapshot); + pendingAcquireTimeout, pool, sink, currentContext); } @Override @@ -117,9 +117,9 @@ static final class DisposableAcquire long pendingAcquireTimeout, InstrumentedPool pool, MonoSink sink, - Function snapshot) { + Context currentContext) { this.cancellations = Disposables.composite(); - this.currentContext = snapshot.apply(Context.of(sink.contextView())); + this.currentContext = currentContext; this.obs = obs; this.opsFactory = opsFactory; this.pendingAcquireTimeout = pendingAcquireTimeout; diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/NewConnectionProvider.java b/reactor-netty5-core/src/main/java/reactor/netty5/resources/NewConnectionProvider.java index 6635563ed4..99c5a657ea 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/NewConnectionProvider.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/resources/NewConnectionProvider.java @@ -20,7 +20,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Objects; -import java.util.function.Function; import java.util.function.Supplier; import io.netty5.channel.Channel; @@ -72,10 +71,14 @@ public Mono acquire(TransportConfig config, DisposableConnect disposableConnect = new DisposableConnect(sink, config.bindAddress()); if (remote != null && resolverGroup != null) { ChannelInitializer channelInitializer = config.channelInitializer(connectionObserver, remote, false); - Function snapshot = - Metrics.isMicrometerAvailable() && Metrics.isContextPropagationAvailable() ? - ContextPropagationUtils.captureObservation(sink.contextView()) : Function.identity(); - TransportConnector.connect(config, remote, resolverGroup, channelInitializer, snapshot.apply(Context.of(sink.contextView()))) + Context currentContext = Context.of(sink.contextView()); + if (Metrics.isMicrometerAvailable()) { + Object currentObservation = reactor.netty5.Metrics.currentObservation(currentContext); + if (currentObservation != null) { + currentContext = reactor.netty5.Metrics.updateContext(currentContext, currentObservation); + } + } + TransportConnector.connect(config, remote, resolverGroup, channelInitializer, currentContext) .subscribe(disposableConnect); } else { diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/resources/PooledConnectionProvider.java b/reactor-netty5-core/src/main/java/reactor/netty5/resources/PooledConnectionProvider.java index a6c7178aa1..c6dff2ac44 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/resources/PooledConnectionProvider.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/resources/PooledConnectionProvider.java @@ -168,12 +168,16 @@ else if (Metrics.isMicrometerAvailable()) { if (eventLoop != null) { mono = mono.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop)); } - Function snapshot = - Metrics.isMicrometerAvailable() && Metrics.isContextPropagationAvailable() ? - ContextPropagationUtils.captureObservation(sink.contextView()) : Function.identity(); - mono.contextWrite(snapshot) - .subscribe(createDisposableAcquire(config, connectionObserver, - poolFactory.pendingAcquireTimeout, pool, sink, snapshot)); + Context currentContext = Context.of(sink.contextView()); + if (Metrics.isMicrometerAvailable()) { + Object currentObservation = reactor.netty5.Metrics.currentObservation(currentContext); + if (currentObservation != null) { + currentContext = reactor.netty5.Metrics.updateContext(currentContext, currentObservation); + mono = mono.contextWrite(ctx -> reactor.netty5.Metrics.updateContext(ctx, currentObservation)); + } + } + mono.subscribe(createDisposableAcquire(config, connectionObserver, + poolFactory.pendingAcquireTimeout, pool, sink, currentContext)); }); } @@ -260,7 +264,7 @@ protected abstract CoreSubscriber> createDisposableAcquire( long pendingAcquireTimeout, InstrumentedPool pool, MonoSink sink, - Function snapshot); + Context currentContext); protected abstract InstrumentedPool createPool( TransportConfig config, diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/transport/MicrometerAddressResolverGroupMetrics.java b/reactor-netty5-core/src/main/java/reactor/netty5/transport/MicrometerAddressResolverGroupMetrics.java index 5d43904b2d..069173b62a 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/transport/MicrometerAddressResolverGroupMetrics.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/transport/MicrometerAddressResolverGroupMetrics.java @@ -18,7 +18,6 @@ import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.Timer; import io.micrometer.observation.Observation; -import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.netty5.resolver.AddressResolver; import io.netty5.resolver.AddressResolverGroup; import io.netty5.util.concurrent.EventExecutor; @@ -37,6 +36,7 @@ import static reactor.netty5.Metrics.ADDRESS_RESOLVER; import static reactor.netty5.Metrics.ERROR; +import static reactor.netty5.Metrics.OBSERVATION_KEY; import static reactor.netty5.Metrics.OBSERVATION_REGISTRY; import static reactor.netty5.Metrics.SUCCESS; import static reactor.netty5.Metrics.formatSocketAddress; @@ -130,8 +130,8 @@ Future> resolveAllInternal(SocketAddress address, Supplier> createDisposableAcquire( long pendingAcquireTimeout, InstrumentedPool pool, MonoSink sink, - Function snapshot) { + Context currentContext) { boolean acceptGzip = false; ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null; Function uriTagValue = null; @@ -111,7 +111,7 @@ protected CoreSubscriber> createDisposableAcquire( uriTagValue = httpClientConfig.uriTagValue; } return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), - acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, snapshot, uriTagValue); + acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, currentContext, uriTagValue); } @Override @@ -223,10 +223,10 @@ static final class DisposableAcquire long pendingAcquireTimeout, InstrumentedPool pool, MonoSink sink, - Function snapshot, + Context currentContext, @Nullable Function uriTagValue) { this.cancellations = Disposables.composite(); - this.currentContext = snapshot.apply(Context.of(sink.contextView())); + this.currentContext = currentContext; this.obs = obs; this.opsFactory = opsFactory; this.acceptGzip = acceptGzip; diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/MicrometerHttpClientMetricsHandler.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/MicrometerHttpClientMetricsHandler.java index db7ec53d0b..4d9e4e0ae9 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/client/MicrometerHttpClientMetricsHandler.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/client/MicrometerHttpClientMetricsHandler.java @@ -18,7 +18,6 @@ import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.Timer; import io.micrometer.observation.Observation; -import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import io.micrometer.observation.transport.RequestReplySenderContext; import io.netty5.channel.Channel; import io.netty5.handler.codec.http.HttpRequest; @@ -32,6 +31,7 @@ import java.util.Objects; import java.util.function.Function; +import static reactor.netty5.Metrics.OBSERVATION_KEY; import static reactor.netty5.Metrics.OBSERVATION_REGISTRY; import static reactor.netty5.Metrics.RESPONSE_TIME; import static reactor.netty5.Metrics.formatSocketAddress; @@ -113,8 +113,8 @@ protected void startWrite(HttpRequest msg, Channel channel, @Nullable ContextVie responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, msg, path, channel.remoteAddress()); responseTimeObservation = Observation.createNotStarted(recorder.name() + RESPONSE_TIME, responseTimeHandlerContext, OBSERVATION_REGISTRY); - if (contextView != null && contextView.hasKey(ObservationThreadLocalAccessor.KEY)) { - responseTimeObservation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY)); + if (contextView != null && contextView.hasKey(OBSERVATION_KEY)) { + responseTimeObservation.parentObservation(contextView.get(OBSERVATION_KEY)); } responseTimeObservation.start(); }