Skip to content

Commit

Permalink
Merge #2388 into 2.0.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jul 18, 2022
2 parents 64d9058 + 7a9b894 commit 47def9a
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 88 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Expand Up @@ -84,11 +84,11 @@ ext {
}

//Metrics
micrometerVersion = '1.10.0-M3' //optional baseline
micrometerTracingVersion = '1.0.0-M6' //optional baseline
micrometerDocsVersion = '1.0.0-M5' //optional baseline
micrometerVersion = '1.10.0-SNAPSHOT' //optional baseline
micrometerTracingVersion = '1.0.0-SNAPSHOT' //optional baseline
micrometerDocsVersion = '1.0.0-SNAPSHOT' //optional baseline

contextPropagationVersion = '1.0.0-M3'
contextPropagationVersion = '1.0.0-SNAPSHOT'

jsr305Version = '3.0.2'

Expand Down
2 changes: 1 addition & 1 deletion reactor-netty5-core/build.gradle
Expand Up @@ -107,7 +107,7 @@ dependencies {
compileOnly "io.micrometer:micrometer-core:$micrometerVersion"
compileOnly "io.micrometer:micrometer-tracing:$micrometerTracingVersion"

api "io.micrometer:context-propagation:${contextPropagationVersion}"
compileOnly "io.micrometer:context-propagation:${contextPropagationVersion}"

// Logging
compileOnly "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
Expand Up @@ -16,7 +16,6 @@
package reactor.netty5.channel;

import io.micrometer.common.KeyValues;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
Expand All @@ -27,9 +26,9 @@
import io.netty5.util.concurrent.Future;
import reactor.netty5.observability.ReactorNettyHandlerContext;
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

import java.net.SocketAddress;
import java.util.function.Predicate;

import static reactor.netty5.Metrics.CONNECT_TIME;
import static reactor.netty5.Metrics.ERROR;
Expand All @@ -52,9 +51,7 @@
* @since 1.1.0
*/
public final class MicrometerChannelMetricsHandler extends AbstractChannelMetricsHandler {
static final AttributeKey<ContextSnapshot> CONTEXT_SNAPSHOT = AttributeKey.valueOf("$CONTEXT_SNAPSHOT");

static final Predicate<Object> OBSERVATION_KEY = k -> k == ObservationThreadLocalAccessor.KEY;
static final AttributeKey<ContextView> CONTEXT_VIEW = AttributeKey.valueOf("$CONTEXT_VIEW");

final MicrometerChannelMetricsRecorder recorder;

Expand Down Expand Up @@ -108,12 +105,14 @@ public Future<Void> connect(ChannelHandlerContext ctx, SocketAddress remoteAddre
//
// Move the implementation from the recorder here
this.remoteAddress = formatSocketAddress(remoteAddress);
ContextSnapshot snapshot = ctx.channel().hasAttr(CONTEXT_SNAPSHOT) ?
ctx.channel().attr(CONTEXT_SNAPSHOT).get() : ContextSnapshot.capture();
Observation observation;
try (ContextSnapshot.Scope scope = snapshot.setThreadLocalValues(OBSERVATION_KEY)) {
observation = Observation.start(recorder.name() + CONNECT_TIME, this, OBSERVATION_REGISTRY);
Observation observation = Observation.createNotStarted(recorder.name() + CONNECT_TIME, this, OBSERVATION_REGISTRY);
if (ctx.channel().hasAttr(CONTEXT_VIEW)) {
ContextView contextView = ctx.channel().attr(CONTEXT_VIEW).get();
if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
observation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
}
}
observation.start();
return ctx.connect(remoteAddress, localAddress)
.addListener(future -> {
ctx.pipeline().remove(this);
Expand Down Expand Up @@ -163,11 +162,14 @@ static final class TlsMetricsHandler extends Observation.Context implements Reac
@SuppressWarnings("try")
public void channelActive(ChannelHandlerContext ctx) {
this.remoteAddress = formatSocketAddress(ctx.channel().remoteAddress());
ContextSnapshot snapshot = ctx.channel().hasAttr(CONTEXT_SNAPSHOT) ?
ctx.channel().attr(CONTEXT_SNAPSHOT).getAndSet(null) : ContextSnapshot.capture();
try (ContextSnapshot.Scope scope = snapshot.setThreadLocalValues(OBSERVATION_KEY)) {
observation = Observation.start(recorder.name() + TLS_HANDSHAKE_TIME, this, OBSERVATION_REGISTRY);
observation = Observation.createNotStarted(recorder.name() + TLS_HANDSHAKE_TIME, this, OBSERVATION_REGISTRY);
if (ctx.channel().hasAttr(CONTEXT_VIEW)) {
ContextView contextView = ctx.channel().attr(CONTEXT_VIEW).get();
if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
observation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
}
}
observation.start();
ctx.pipeline().get(SslHandler.class)
.handshakeFuture()
.addListener(f -> {
Expand Down
Expand Up @@ -15,8 +15,6 @@
*/
package reactor.netty5.internal.util;

import static io.micrometer.core.instrument.Metrics.globalRegistry;

/**
* Utility class around instrumentation and metrics with Micrometer.
* <p><strong>Note:</strong> This utility class is for internal use only. It can be removed at any time.
Expand All @@ -26,27 +24,66 @@
*/
public class Metrics {

static final boolean isContextPropagationAvailable;
static final boolean isMicrometerAvailable;
static final boolean isTracingAvailable;

static {
boolean micrometer;
try {
globalRegistry.getRegistries();
io.micrometer.core.instrument.Metrics.globalRegistry.getRegistries();
micrometer = true;
}
catch (Throwable t) {
micrometer = false;
}
isMicrometerAvailable = micrometer;

boolean tracing;
try {
Class.forName("io.micrometer.tracing.Tracer");
tracing = true;
}
catch (Throwable t) {
tracing = false;
}
isTracingAvailable = tracing;

boolean contextPropagation;
try {
io.micrometer.context.ContextRegistry.getInstance();
contextPropagation = true;
}
catch (Throwable t) {
contextPropagation = false;
}
isContextPropagationAvailable = contextPropagation;
}

/**
* Check if the current runtime supports metrics / instrumentation, by
* verifying if Micrometer is on the classpath.
* Check if the current runtime supports context propagation, by verifying if Context Propagation is on the classpath.
*
* @return true if the Micrometer instrumentation facade is available
* @return true if the Micrometer is available
*/
public static boolean isInstrumentationAvailable() {
public static boolean isContextPropagationAvailable() {
return isContextPropagationAvailable;
}

/**
* Check if the current runtime supports metrics, by verifying if Micrometer Core is on the classpath.
*
* @return true if the Micrometer Core is available
*/
public static boolean isMicrometerAvailable() {
return isMicrometerAvailable;
}

/**
* Check if the current runtime supports tracing, by verifying if Micrometer Tracing is on the classpath.
*
* @return true if the Micrometer Tracing is available
*/
public static boolean isTracingAvailable() {
return isTracingAvailable;
}
}
Expand Up @@ -604,7 +604,7 @@ public final SPEC maxLifeTime(Duration maxLifeTime) {
*/
public final SPEC metrics(boolean metricsEnabled) {
if (metricsEnabled) {
if (!Metrics.isInstrumentationAvailable()) {
if (!Metrics.isMicrometerAvailable()) {
throw new UnsupportedOperationException(
"To enable metrics, you must add the dependency `io.micrometer:micrometer-core`" +
" to the class path first");
Expand Down
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5.resources;

import io.micrometer.context.ContextSnapshot;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.function.Function;
import java.util.function.Predicate;

final class ContextPropagationUtils {

static final Predicate<Object> OBSERVATION_KEY = k -> k == ObservationThreadLocalAccessor.KEY;

static Function<Context, Context> captureObservation(ContextView source) {
ContextSnapshot snapshot = ContextSnapshot.captureUsing(OBSERVATION_KEY, source);
return snapshot::updateContext;
}

private ContextPropagationUtils() {}
}
Expand Up @@ -24,7 +24,6 @@
import java.util.function.BiPredicate;
import java.util.function.Function;

import io.micrometer.context.ContextSnapshot;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelInitializer;
import io.netty5.channel.EventLoop;
Expand Down Expand Up @@ -80,7 +79,7 @@ protected CoreSubscriber<PooledRef<PooledConnection>> createDisposableAcquire(
long pendingAcquireTimeout,
InstrumentedPool<PooledConnection> pool,
MonoSink<Connection> sink,
ContextSnapshot snapshot) {
Function<Context, Context> snapshot) {
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
pendingAcquireTimeout, pool, sink, snapshot);
}
Expand Down Expand Up @@ -118,9 +117,9 @@ static final class DisposableAcquire
long pendingAcquireTimeout,
InstrumentedPool<PooledConnection> pool,
MonoSink<Connection> sink,
ContextSnapshot snapshot) {
Function<Context, Context> snapshot) {
this.cancellations = Disposables.composite();
this.currentContext = Context.of(snapshot.updateContext(sink.contextView()));
this.currentContext = snapshot.apply(Context.of(sink.contextView()));
this.obs = obs;
this.opsFactory = opsFactory;
this.pendingAcquireTimeout = pendingAcquireTimeout;
Expand Down Expand Up @@ -500,13 +499,12 @@ Publisher<PooledConnection> connectChannel() {
PooledConnectionInitializer initializer = new PooledConnectionInitializer(sink);
EventLoop callerEventLoop = sink.contextView().hasKey(CONTEXT_CALLER_EVENTLOOP) ?
sink.contextView().get(CONTEXT_CALLER_EVENTLOOP) : null;
ContextSnapshot snapshot = ContextSnapshot.capture(sink.contextView());
if (callerEventLoop != null) {
TransportConnector.connect(config, remoteAddress, resolver, initializer, callerEventLoop, snapshot)
TransportConnector.connect(config, remoteAddress, resolver, initializer, callerEventLoop, sink.contextView())
.subscribe(initializer);
}
else {
TransportConnector.connect(config, remoteAddress, resolver, initializer, snapshot).subscribe(initializer);
TransportConnector.connect(config, remoteAddress, resolver, initializer, sink.contextView()).subscribe(initializer);
}
});
}
Expand Down
Expand Up @@ -20,9 +20,9 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

import io.micrometer.context.ContextSnapshot;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelInitializer;
import io.netty5.channel.unix.DomainSocketAddress;
Expand All @@ -36,6 +36,7 @@
import reactor.netty5.ChannelBindException;
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.internal.util.Metrics;
import reactor.netty5.transport.AddressUtils;
import reactor.netty5.transport.TransportConfig;
import reactor.netty5.transport.TransportConnector;
Expand Down Expand Up @@ -71,8 +72,10 @@ public Mono<? extends Connection> acquire(TransportConfig config,
DisposableConnect disposableConnect = new DisposableConnect(sink, config.bindAddress());
if (remote != null && resolverGroup != null) {
ChannelInitializer<Channel> channelInitializer = config.channelInitializer(connectionObserver, remote, false);
ContextSnapshot snapshot = ContextSnapshot.capture(sink.contextView());
TransportConnector.connect(config, remote, resolverGroup, channelInitializer, snapshot)
Function<Context, Context> snapshot =
Metrics.isMicrometerAvailable() && Metrics.isContextPropagationAvailable() ?
ContextPropagationUtils.captureObservation(sink.contextView()) : Function.identity();
TransportConnector.connect(config, remote, resolverGroup, channelInitializer, snapshot.apply(Context.of(sink.contextView())))
.subscribe(disposableConnect);
}
else {
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package reactor.netty5.resources;

import io.micrometer.context.ContextSnapshot;
import io.netty5.channel.Channel;
import io.netty5.channel.EventLoop;
import io.netty5.channel.EventLoopGroup;
Expand Down Expand Up @@ -44,6 +43,7 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -141,7 +141,7 @@ public final Mono<? extends Connection> acquire(
poolFactory.registrar.get().registerMetrics(name, id, remoteAddress,
new DelegatingConnectionPoolMetrics(newPool.metrics()));
}
else if (Metrics.isInstrumentationAvailable()) {
else if (Metrics.isMicrometerAvailable()) {
// work directly with the pool otherwise a weak reference is needed to ConnectionPoolMetrics
// we don't want to keep another map with weak references
registerDefaultMetrics(id, remoteAddress, newPool.metrics());
Expand All @@ -168,8 +168,10 @@ else if (Metrics.isInstrumentationAvailable()) {
if (eventLoop != null) {
mono = mono.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop));
}
ContextSnapshot snapshot = ContextSnapshot.capture(sink.contextView());
mono.contextWrite(snapshot::updateContext)
Function<Context, Context> snapshot =
Metrics.isMicrometerAvailable() && Metrics.isContextPropagationAvailable() ?
ContextPropagationUtils.captureObservation(sink.contextView()) : Function.identity();
mono.contextWrite(snapshot)
.subscribe(createDisposableAcquire(config, connectionObserver,
poolFactory.pendingAcquireTimeout, pool, sink, snapshot));
});
Expand Down Expand Up @@ -258,7 +260,7 @@ protected abstract CoreSubscriber<PooledRef<T>> createDisposableAcquire(
long pendingAcquireTimeout,
InstrumentedPool<T> pool,
MonoSink<Connection> sink,
ContextSnapshot snapshot);
Function<Context, Context> snapshot);

protected abstract InstrumentedPool<T> createPool(
TransportConfig config,
Expand Down
Expand Up @@ -18,13 +18,15 @@
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.netty5.resolver.AddressResolver;
import io.netty5.resolver.AddressResolverGroup;
import io.netty5.util.concurrent.EventExecutor;
import io.netty5.util.concurrent.Future;
import reactor.netty5.channel.MicrometerChannelMetricsRecorder;
import reactor.netty5.internal.util.MapUtils;
import reactor.netty5.observability.ReactorNettyHandlerContext;
import reactor.util.context.ContextView;

import java.net.SocketAddress;
import java.util.List;
Expand Down Expand Up @@ -114,8 +116,11 @@ static final class MicrometerDelegatingAddressResolver<T extends SocketAddress>
this.name = recorder.name();
}

@Override
Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T>>> resolver) {
Future<List<T>> resolveAll(SocketAddress address, ContextView contextView) {
return resolveAllInternal(address, () -> resolver.resolveAll(address), contextView);
}

Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T>>> resolver, ContextView contextView) {
// Cannot invoke the recorder anymore:
// 1. The recorder is one instance only, it is invoked for all name resolutions that can happen
// 2. The recorder does not have knowledge about name resolution lifecycle
Expand All @@ -124,7 +129,11 @@ Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T
// Move the implementation from the recorder here
String remoteAddress = formatSocketAddress(address);
FutureHandlerContext handlerContext = new FutureHandlerContext((MicrometerChannelMetricsRecorder) recorder, remoteAddress);
Observation sample = Observation.start(name + ADDRESS_RESOLVER, handlerContext, OBSERVATION_REGISTRY);
Observation sample = Observation.createNotStarted(name + ADDRESS_RESOLVER, handlerContext, OBSERVATION_REGISTRY);
if (contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
sample.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
}
sample.start();
return resolver.get()
.addListener(future -> {
handlerContext.status = future.isSuccess() ? SUCCESS : ERROR;
Expand Down
Expand Up @@ -135,7 +135,7 @@ public T doOnChannelInit(ChannelPipelineConfigurer doOnChannelInit) {
*/
protected T metrics(boolean enable) {
if (enable) {
if (!Metrics.isInstrumentationAvailable()) {
if (!Metrics.isMicrometerAvailable()) {
throw new UnsupportedOperationException(
"To enable metrics, you must add the dependency `io.micrometer:micrometer-core`" +
" to the class path first");
Expand Down

0 comments on commit 47def9a

Please sign in to comment.