Skip to content

Commit

Permalink
Merge #2389 into 2.0.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Jul 20, 2022
2 parents 47def9a + e947e39 commit 22d798d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 199 deletions.
4 changes: 2 additions & 2 deletions reactor-netty5-core/src/main/java/reactor/netty5/Metrics.java
Expand Up @@ -16,7 +16,7 @@
package reactor.netty5;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.observation.TimerObservationHandler;
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import reactor.netty5.observability.ReactorNettyTimerObservationHandler;
Expand All @@ -38,7 +38,7 @@ public class Metrics {
OBSERVATION_REGISTRY.observationConfig().observationHandler(
new ObservationHandler.FirstMatchingCompositeObservationHandler(
new ReactorNettyTimerObservationHandler(REGISTRY),
new TimerObservationHandler(REGISTRY)));
new DefaultMeterObservationHandler(REGISTRY)));
}


Expand Down
Expand Up @@ -19,9 +19,7 @@
import io.micrometer.core.instrument.Timer;
import io.micrometer.observation.Observation;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.transport.http.HttpClientRequest;
import io.micrometer.observation.transport.http.HttpClientResponse;
import io.micrometer.observation.transport.http.context.HttpClientContext;
import io.micrometer.observation.transport.RequestReplySenderContext;
import io.netty5.channel.Channel;
import io.netty5.handler.codec.http.HttpRequest;
import io.netty5.handler.codec.http.HttpResponse;
Expand All @@ -31,7 +29,7 @@

import java.net.SocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Function;

import static reactor.netty5.Metrics.OBSERVATION_REGISTRY;
Expand Down Expand Up @@ -104,7 +102,7 @@ protected void reset() {
protected void startRead(HttpResponse msg) {
super.startRead(msg);

responseTimeHandlerContext.setResponse(new ObservationHttpClientResponse(msg));
responseTimeHandlerContext.setResponse(msg);
}

// writing the request
Expand All @@ -113,88 +111,16 @@ protected void startRead(HttpResponse msg) {
protected void startWrite(HttpRequest msg, Channel channel, @Nullable ContextView contextView) {
super.startWrite(msg, channel, contextView);

HttpClientRequest httpClientRequest = new ObservationHttpClientRequest(msg, method, path);
responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, httpClientRequest, channel.remoteAddress());
responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, msg, path, channel.remoteAddress());
responseTimeObservation = Observation.createNotStarted(recorder.name() + RESPONSE_TIME, responseTimeHandlerContext, OBSERVATION_REGISTRY);
if (contextView != null && contextView.hasKey(ObservationThreadLocalAccessor.KEY)) {
responseTimeObservation.parentObservation(contextView.get(ObservationThreadLocalAccessor.KEY));
}
responseTimeObservation.start();
}

static final class ObservationHttpClientRequest implements HttpClientRequest {

final String method;
final HttpRequest nettyRequest;
final String path;

ObservationHttpClientRequest(HttpRequest nettyRequest, String method, String path) {
this.method = method;
this.nettyRequest = nettyRequest;
this.path = path;
}

@Override
public String header(String name) {
return nettyRequest.headers().get(name);
}

@Override
public void header(String name, String value) {
nettyRequest.headers().set(name, value);
}

@Override
public Collection<String> headerNames() {
return nettyRequest.headers().names();
}

@Override
public String method() {
return method;
}

@Override
public String path() {
return path;
}

@Override
public Object unwrap() {
return nettyRequest;
}

@Override
public String url() {
return nettyRequest.uri();
}
}

static final class ObservationHttpClientResponse implements HttpClientResponse {

final HttpResponse nettyResponse;

ObservationHttpClientResponse(HttpResponse nettyResponse) {
this.nettyResponse = nettyResponse;
}

@Override
public Collection<String> headerNames() {
return nettyResponse.headers().names();
}

@Override
public int statusCode() {
return nettyResponse.status().code();
}

@Override
public Object unwrap() {
return nettyResponse;
}
}

static final class ResponseTimeHandlerContext extends HttpClientContext implements ReactorNettyHandlerContext {
static final class ResponseTimeHandlerContext extends RequestReplySenderContext<HttpRequest, HttpResponse>
implements ReactorNettyHandlerContext {
static final String TYPE = "client";

final String method;
Expand All @@ -205,13 +131,15 @@ static final class ResponseTimeHandlerContext extends HttpClientContext implemen
// status might not be known beforehand
String status;

ResponseTimeHandlerContext(MicrometerHttpClientMetricsRecorder recorder, HttpClientRequest request, SocketAddress remoteAddress) {
super(request);
ResponseTimeHandlerContext(MicrometerHttpClientMetricsRecorder recorder, HttpRequest request, String path, SocketAddress remoteAddress) {
super((carrier, key, value) -> Objects.requireNonNull(carrier).headers().set(key, value));
this.recorder = recorder;
this.method = request.method();
this.path = request.path();
this.method = request.method().name();
this.path = path;
this.remoteAddress = formatSocketAddress(remoteAddress);
put(HttpClientRequest.class, request);
setCarrier(request);
setContextualName(this.method);
}

@Override
Expand All @@ -232,9 +160,9 @@ public KeyValues getLowCardinalityKeyValues() {
}

@Override
public HttpClientContext setResponse(HttpClientResponse response) {
public void setResponse(HttpResponse response) {
put(HttpClientResponse.class, response);
return super.setResponse(response);
super.setResponse(response);
}
}
}
Expand Up @@ -17,37 +17,39 @@

import io.micrometer.common.KeyValue;
import io.micrometer.observation.Observation;
import io.micrometer.observation.transport.http.HttpClientRequest;
import io.micrometer.observation.transport.http.context.HttpClientContext;
import io.micrometer.observation.transport.ReceiverContext;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.handler.HttpClientTracingObservationHandler;
import io.micrometer.tracing.http.HttpClientHandler;
import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler;
import io.micrometer.tracing.propagation.Propagator;
import io.netty5.handler.codec.http.HttpRequest;
import reactor.netty5.http.client.HttpClientRequest;
import reactor.netty5.observability.ReactorNettyHandlerContext;

import static reactor.netty5.Metrics.REMOTE_ADDRESS;

/**
* Reactor Netty specific {@link HttpClientTracingObservationHandler}
* Reactor Netty specific {@link PropagatingReceiverTracingObservationHandler}.
*
* @author Marcin Grzejszczak
* @author Violeta Georgieva
* @since 1.1.0
*/
public final class ReactorNettyHttpClientTracingObservationHandler extends HttpClientTracingObservationHandler {
public final class ReactorNettyPropagatingReceiverTracingObservationHandler
extends PropagatingReceiverTracingObservationHandler<ReceiverContext<HttpRequest>> {

/**
* Creates a new instance of {@link HttpClientTracingObservationHandler}.
* Creates a new instance of {@link ReactorNettyPropagatingReceiverTracingObservationHandler}.
*
* @param tracer tracer
* @param handler http client handler
* @param tracer tracer
* @param propagator tracing propagator
*/
public ReactorNettyHttpClientTracingObservationHandler(Tracer tracer, HttpClientHandler handler) {
super(tracer, handler);
public ReactorNettyPropagatingReceiverTracingObservationHandler(Tracer tracer, Propagator propagator) {
super(tracer, propagator);
}

@Override
public void tagSpan(HttpClientContext context, Span span) {
public void tagSpan(ReceiverContext<HttpRequest> context, Span span) {
for (KeyValue tag : context.getHighCardinalityKeyValues()) {
span.tag(tag.getKey(), tag.getValue());
}
Expand Down
Expand Up @@ -17,35 +17,37 @@

import io.micrometer.common.KeyValue;
import io.micrometer.observation.Observation;
import io.micrometer.observation.transport.http.HttpServerRequest;
import io.micrometer.observation.transport.http.context.HttpServerContext;
import io.micrometer.observation.transport.SenderContext;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.handler.HttpServerTracingObservationHandler;
import io.micrometer.tracing.http.HttpServerHandler;
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
import io.micrometer.tracing.propagation.Propagator;
import io.netty5.handler.codec.http.HttpRequest;
import reactor.netty5.http.server.HttpServerRequest;
import reactor.netty5.observability.ReactorNettyHandlerContext;

/**
* Reactor Netty specific {@link HttpServerTracingObservationHandler}
* Reactor Netty specific {@link PropagatingSenderTracingObservationHandler}
*
* @author Marcin Grzejszczak
* @author Violeta Georgieva
* @since 1.1.0
*/
public final class ReactorNettyHttpServerTracingObservationHandler extends HttpServerTracingObservationHandler {
public final class ReactorNettyPropagatingSenderTracingObservationHandler
extends PropagatingSenderTracingObservationHandler<SenderContext<HttpRequest>> {

/**
* Creates a new instance of {@link HttpServerTracingObservationHandler}.
* Creates a new instance of {@link ReactorNettyPropagatingSenderTracingObservationHandler}.
*
* @param tracer tracer
* @param handler http server handler
* @param tracer tracer
* @param propagator tracing propagator
*/
public ReactorNettyHttpServerTracingObservationHandler(Tracer tracer, HttpServerHandler handler) {
super(tracer, handler);
public ReactorNettyPropagatingSenderTracingObservationHandler(Tracer tracer, Propagator propagator) {
super(tracer, propagator);
}

@Override
public void tagSpan(HttpServerContext context, Span span) {
public void tagSpan(SenderContext<HttpRequest> context, Span span) {
for (KeyValue tag : context.getHighCardinalityKeyValues()) {
span.tag(tag.getKey(), tag.getValue());
}
Expand Down

0 comments on commit 22d798d

Please sign in to comment.