Skip to content

Commit

Permalink
Merge #2394 into 2.0.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jul 21, 2022
2 parents 975f94d + 6bf35e7 commit be0147a
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 70 deletions.
21 changes: 19 additions & 2 deletions reactor-netty5-core/build.gradle
Expand Up @@ -41,10 +41,16 @@ ext {

sourceSets {
jarFileTest
contextPropagationTest {
compileClasspath += sourceSets.main.output
runtimeClasspath += sourceSets.main.output
}
}

configurations {
shaded
contextPropagationTestImplementation.extendsFrom(implementation)
contextPropagationTestRuntimeOnly.extendsFrom(runtimeOnly)
}

dependencies {
Expand Down Expand Up @@ -107,8 +113,6 @@ dependencies {
compileOnly "io.micrometer:micrometer-core:$micrometerVersion"
compileOnly "io.micrometer:micrometer-tracing:$micrometerTracingVersion"

compileOnly "io.micrometer:context-propagation:${contextPropagationVersion}"

// Logging
compileOnly "org.slf4j:slf4j-api:$slf4jVersion"

Expand Down Expand Up @@ -153,6 +157,12 @@ dependencies {
jarFileTestImplementation "org.assertj:assertj-core:$assertJVersion"
jarFileTestImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
jarFileTestRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"

contextPropagationTestImplementation "org.assertj:assertj-core:$assertJVersion"
contextPropagationTestImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
contextPropagationTestImplementation "io.micrometer:micrometer-core:$micrometerVersion"
contextPropagationTestImplementation "io.micrometer:context-propagation:$contextPropagationVersion"
contextPropagationTestRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
}

jar {
Expand Down Expand Up @@ -431,6 +441,13 @@ task jarFileTest(type: Test) {

tasks.check.dependsOn(jarFileTest)

task contextPropagationTest(type: Test) {
testClassesDirs = sourceSets.contextPropagationTest.output.classesDirs
classpath = sourceSets.contextPropagationTest.runtimeClasspath
}

tasks.check.dependsOn(contextPropagationTest)

task shadedJarTest(type: Test) {
testClassesDirs = sourceSets.test.output.classesDirs

Expand Down
Expand Up @@ -13,24 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5.resources;
package reactor.netty5;

import io.micrometer.context.ContextSnapshot;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import org.junit.jupiter.api.Test;

import java.util.function.Function;
import java.util.function.Predicate;
import static org.assertj.core.api.Assertions.assertThat;

final class ContextPropagationUtils {
class ContextPropagationTest {

static final Predicate<Object> OBSERVATION_KEY = k -> k == ObservationThreadLocalAccessor.KEY;

static Function<Context, Context> captureObservation(ContextView source) {
ContextSnapshot snapshot = ContextSnapshot.captureUsing(OBSERVATION_KEY, source);
return snapshot::updateContext;
@Test
void testObservationKey() {
assertThat(Metrics.OBSERVATION_KEY).isEqualTo(ObservationThreadLocalAccessor.KEY);
}

private ContextPropagationUtils() {}
}
15 changes: 15 additions & 0 deletions reactor-netty5-core/src/main/java/reactor/netty5/Metrics.java
Expand Up @@ -17,10 +17,13 @@

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import reactor.netty5.observability.ReactorNettyTimerObservationHandler;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -33,6 +36,7 @@
*/
public class Metrics {
public static final MeterRegistry REGISTRY = io.micrometer.core.instrument.Metrics.globalRegistry;
public static final String OBSERVATION_KEY = "micrometer.observation";
public static final ObservationRegistry OBSERVATION_REGISTRY = ObservationRegistry.create();
static {
OBSERVATION_REGISTRY.observationConfig().observationHandler(
Expand Down Expand Up @@ -268,6 +272,13 @@ public class Metrics {

public static final String ERROR = "ERROR";

@Nullable
public static Observation currentObservation(ContextView contextView) {
if (contextView.hasKey(OBSERVATION_KEY)) {
return contextView.get(OBSERVATION_KEY);
}
return OBSERVATION_REGISTRY.getCurrentObservation();
}

@Nullable
public static String formatSocketAddress(@Nullable SocketAddress socketAddress) {
Expand All @@ -281,4 +292,8 @@ public static String formatSocketAddress(@Nullable SocketAddress socketAddress)
}
return null;
}

public static Context updateContext(Context context, Object observation) {
return context.hasKey(OBSERVATION_KEY) ? context : context.put(OBSERVATION_KEY, observation);
}
}
Expand Up @@ -18,7 +18,6 @@
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.ssl.SslHandler;
Expand All @@ -32,6 +31,7 @@

import static reactor.netty5.Metrics.CONNECT_TIME;
import static reactor.netty5.Metrics.ERROR;
import static reactor.netty5.Metrics.OBSERVATION_KEY;
import static reactor.netty5.Metrics.OBSERVATION_REGISTRY;
import static reactor.netty5.Metrics.SUCCESS;
import static reactor.netty5.Metrics.TLS_HANDSHAKE_TIME;
Expand Down Expand Up @@ -108,8 +108,8 @@ public Future<Void> connect(ChannelHandlerContext ctx, SocketAddress remoteAddre
Observation observation = Observation.createNotStarted(recorder.name() + CONNECT_TIME, this, OBSERVATION_REGISTRY);
if (ctx.channel().hasAttr(CONTEXT_VIEW)) {
ContextView contextView = ctx.channel().attr(CONTEXT_VIEW).get();
if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
observation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
if (contextView.hasKey(OBSERVATION_KEY)) {
observation.parentObservation(contextView.get(OBSERVATION_KEY));
}
}
observation.start();
Expand Down Expand Up @@ -165,8 +165,8 @@ public void channelActive(ChannelHandlerContext ctx) {
observation = Observation.createNotStarted(recorder.name() + TLS_HANDSHAKE_TIME, this, OBSERVATION_REGISTRY);
if (ctx.channel().hasAttr(CONTEXT_VIEW)) {
ContextView contextView = ctx.channel().attr(CONTEXT_VIEW).get();
if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
observation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
if (contextView.hasKey(OBSERVATION_KEY)) {
observation.parentObservation(contextView.get(OBSERVATION_KEY));
}
}
observation.start();
Expand Down
Expand Up @@ -24,7 +24,6 @@
*/
public class Metrics {

static final boolean isContextPropagationAvailable;
static final boolean isMicrometerAvailable;
static final boolean isTracingAvailable;

Expand All @@ -48,25 +47,6 @@ public class Metrics {
tracing = false;
}
isTracingAvailable = tracing;

boolean contextPropagation;
try {
io.micrometer.context.ContextRegistry.getInstance();
contextPropagation = true;
}
catch (Throwable t) {
contextPropagation = false;
}
isContextPropagationAvailable = contextPropagation;
}

/**
* Check if the current runtime supports context propagation, by verifying if Context Propagation is on the classpath.
*
* @return true if the Micrometer is available
*/
public static boolean isContextPropagationAvailable() {
return isContextPropagationAvailable;
}

/**
Expand Down
Expand Up @@ -79,9 +79,9 @@ protected CoreSubscriber<PooledRef<PooledConnection>> createDisposableAcquire(
long pendingAcquireTimeout,
InstrumentedPool<PooledConnection> pool,
MonoSink<Connection> sink,
Function<Context, Context> snapshot) {
Context currentContext) {
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
pendingAcquireTimeout, pool, sink, snapshot);
pendingAcquireTimeout, pool, sink, currentContext);
}

@Override
Expand Down Expand Up @@ -117,9 +117,9 @@ static final class DisposableAcquire
long pendingAcquireTimeout,
InstrumentedPool<PooledConnection> pool,
MonoSink<Connection> sink,
Function<Context, Context> snapshot) {
Context currentContext) {
this.cancellations = Disposables.composite();
this.currentContext = snapshot.apply(Context.of(sink.contextView()));
this.currentContext = currentContext;
this.obs = obs;
this.opsFactory = opsFactory;
this.pendingAcquireTimeout = pendingAcquireTimeout;
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

import io.netty5.channel.Channel;
Expand Down Expand Up @@ -72,10 +71,14 @@ public Mono<? extends Connection> acquire(TransportConfig config,
DisposableConnect disposableConnect = new DisposableConnect(sink, config.bindAddress());
if (remote != null && resolverGroup != null) {
ChannelInitializer<Channel> channelInitializer = config.channelInitializer(connectionObserver, remote, false);
Function<Context, Context> snapshot =
Metrics.isMicrometerAvailable() && Metrics.isContextPropagationAvailable() ?
ContextPropagationUtils.captureObservation(sink.contextView()) : Function.identity();
TransportConnector.connect(config, remote, resolverGroup, channelInitializer, snapshot.apply(Context.of(sink.contextView())))
Context currentContext = Context.of(sink.contextView());
if (Metrics.isMicrometerAvailable()) {
Object currentObservation = reactor.netty5.Metrics.currentObservation(currentContext);
if (currentObservation != null) {
currentContext = reactor.netty5.Metrics.updateContext(currentContext, currentObservation);
}
}
TransportConnector.connect(config, remote, resolverGroup, channelInitializer, currentContext)
.subscribe(disposableConnect);
}
else {
Expand Down
Expand Up @@ -168,12 +168,16 @@ else if (Metrics.isMicrometerAvailable()) {
if (eventLoop != null) {
mono = mono.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop));
}
Function<Context, Context> snapshot =
Metrics.isMicrometerAvailable() && Metrics.isContextPropagationAvailable() ?
ContextPropagationUtils.captureObservation(sink.contextView()) : Function.identity();
mono.contextWrite(snapshot)
.subscribe(createDisposableAcquire(config, connectionObserver,
poolFactory.pendingAcquireTimeout, pool, sink, snapshot));
Context currentContext = Context.of(sink.contextView());
if (Metrics.isMicrometerAvailable()) {
Object currentObservation = reactor.netty5.Metrics.currentObservation(currentContext);
if (currentObservation != null) {
currentContext = reactor.netty5.Metrics.updateContext(currentContext, currentObservation);
mono = mono.contextWrite(ctx -> reactor.netty5.Metrics.updateContext(ctx, currentObservation));
}
}
mono.subscribe(createDisposableAcquire(config, connectionObserver,
poolFactory.pendingAcquireTimeout, pool, sink, currentContext));
});
}

Expand Down Expand Up @@ -260,7 +264,7 @@ protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(
long pendingAcquireTimeout,
InstrumentedPool<T> pool,
MonoSink<Connection> sink,
Function<Context, Context> snapshot);
Context currentContext);

protected abstract InstrumentedPool<T> createPool(
TransportConfig config,
Expand Down
Expand Up @@ -18,7 +18,6 @@
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.netty5.resolver.AddressResolver;
import io.netty5.resolver.AddressResolverGroup;
import io.netty5.util.concurrent.EventExecutor;
Expand All @@ -37,6 +36,7 @@

import static reactor.netty5.Metrics.ADDRESS_RESOLVER;
import static reactor.netty5.Metrics.ERROR;
import static reactor.netty5.Metrics.OBSERVATION_KEY;
import static reactor.netty5.Metrics.OBSERVATION_REGISTRY;
import static reactor.netty5.Metrics.SUCCESS;
import static reactor.netty5.Metrics.formatSocketAddress;
Expand Down Expand Up @@ -130,8 +130,8 @@ Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T
String remoteAddress = formatSocketAddress(address);
FutureHandlerContext handlerContext = new FutureHandlerContext((MicrometerChannelMetricsRecorder) recorder, remoteAddress);
Observation sample = Observation.createNotStarted(name + ADDRESS_RESOLVER, handlerContext, OBSERVATION_REGISTRY);
if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
sample.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
if (contextView.hasKey(OBSERVATION_KEY)) {
sample.parentObservation(contextView.get(OBSERVATION_KEY));
}
sample.start();
return resolver.get()
Expand Down
3 changes: 0 additions & 3 deletions reactor-netty5-http/build.gradle
Expand Up @@ -111,8 +111,6 @@ dependencies {
compileOnly "io.micrometer:micrometer-core:$micrometerVersion"
compileOnly "io.micrometer:micrometer-tracing:$micrometerTracingVersion"

compileOnly "io.micrometer:context-propagation:${contextPropagationVersion}"

// Logging
compileOnly "org.slf4j:slf4j-api:$slf4jVersion"

Expand Down Expand Up @@ -144,7 +142,6 @@ dependencies {
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testRuntimeOnly "org.slf4j:jcl-over-slf4j:$slf4jVersion"
testRuntimeOnly "ch.qos.logback:logback-classic:$logbackVersion"
testRuntimeOnly "io.micrometer:context-propagation:${contextPropagationVersion}"

// Needed for proxy testing
testRuntimeOnly "io.netty.contrib:netty-handler-proxy:$nettyContribVersion"
Expand Down
Expand Up @@ -102,7 +102,7 @@ protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
MonoSink<Connection> sink,
Function<Context, Context> snapshot) {
Context currentContext) {
boolean acceptGzip = false;
ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null;
Function<String, String> uriTagValue = null;
Expand All @@ -111,7 +111,7 @@ protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(
uriTagValue = httpClientConfig.uriTagValue;
}
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, snapshot, uriTagValue);
acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, currentContext, uriTagValue);
}

@Override
Expand Down Expand Up @@ -223,10 +223,10 @@ static final class DisposableAcquire
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
MonoSink<Connection> sink,
Function<Context, Context> snapshot,
Context currentContext,
@Nullable Function<String, String> uriTagValue) {
this.cancellations = Disposables.composite();
this.currentContext = snapshot.apply(Context.of(sink.contextView()));
this.currentContext = currentContext;
this.obs = obs;
this.opsFactory = opsFactory;
this.acceptGzip = acceptGzip;
Expand Down
Expand Up @@ -18,7 +18,6 @@
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.transport.RequestReplySenderContext;
import io.netty5.channel.Channel;
import io.netty5.handler.codec.http.HttpRequest;
Expand All @@ -32,6 +31,7 @@
import java.util.Objects;
import java.util.function.Function;

import static reactor.netty5.Metrics.OBSERVATION_KEY;
import static reactor.netty5.Metrics.OBSERVATION_REGISTRY;
import static reactor.netty5.Metrics.RESPONSE_TIME;
import static reactor.netty5.Metrics.formatSocketAddress;
Expand Down Expand Up @@ -113,8 +113,8 @@ protected void startWrite(HttpRequest msg, Channel channel, @Nullable ContextVie

responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, msg, path, channel.remoteAddress());
responseTimeObservation = Observation.createNotStarted(recorder.name() + RESPONSE_TIME, responseTimeHandlerContext, OBSERVATION_REGISTRY);
if (contextView != null && contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
responseTimeObservation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
if (contextView != null && contextView.hasKey(OBSERVATION_KEY)) {
responseTimeObservation.parentObservation(contextView.get(OBSERVATION_KEY));
}
responseTimeObservation.start();
}
Expand Down

0 comments on commit be0147a

Please sign in to comment.