Skip to content

Commit

Permalink
Micrometer#observation(): avoids scopes, set parentObservation (#3119)
Browse files Browse the repository at this point in the history
This commit removes context-propagation dependency entirely from
the reactor-core-micrometer module.

It also changes the MicrometerObservationListener so that it doesn't
use any Scope at all.

To deal with hierarchical Observations, we instead do the following:
 - read Observation from Context
 - don't open a scope, but rather start new Observation setting old one
  as parent explicitly
 - store the new Observation in the ContextView
  • Loading branch information
simonbasle committed Jul 22, 2022
1 parent 5e9b084 commit f47fc56
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 45 deletions.
10 changes: 7 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ bytebuddy = "1.12.12"
jmh = "1.35"
junit = "5.8.2"
#note that context-propagation has a different version directly set in libraries
micrometer = "1.10.0-M3"
micrometer = "1.10.0-SNAPSHOT" # was -M3
reactiveStreams = "1.0.4"

[libraries]
Expand All @@ -26,14 +26,18 @@ jmh-annotations-processor = { module = "org.openjdk.jmh:jmh-generator-annprocess
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
jsr166backport = "io.projectreactor:jsr166:1.0.0.RELEASE"
jsr305 = "com.google.code.findbugs:jsr305:3.0.1"
junit-api = { module = "org.junit.jupiter:junit-jupiter-api" }
junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
junit-engine = { module = "org.junit.jupiter:junit-jupiter-engine" }
junit-params = { module = "org.junit.jupiter:junit-jupiter-params" }
junit-platform = { module = "org.junit.platform:junit-platform-launcher" }
logback = "ch.qos.logback:logback-classic:1.2.11"
micrometer-bom = { module = "io.micrometer:micrometer-bom", version.ref = "micrometer" }
micrometer-commons = { module = "io.micrometer:micrometer-commons" }
micrometer-core = { module = "io.micrometer:micrometer-core" }
micrometer-contextPropagation = "io.micrometer:context-propagation:1.0.0-M3"
micrometer-contextPropagation = "io.micrometer:context-propagation:1.0.0-SNAPSHOT" # was -M3
micrometer-observation-test = { module = "io.micrometer:micrometer-observation-test" }
micrometer-tracing-test = "io.micrometer:micrometer-tracing-integration-test:1.0.0-M6"
micrometer-tracing-test = "io.micrometer:micrometer-tracing-integration-test:1.0.0-SNAPSHOT" # was -M6
micrometer-test = { module = "io.micrometer:micrometer-test" }
mockito = "org.mockito:mockito-core:4.6.1"
reactiveStreams = { module = "org.reactivestreams:reactive-streams", version.ref = "reactiveStreams" }
Expand Down
6 changes: 4 additions & 2 deletions reactor-core-micrometer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ dependencies {

implementation platform(libs.micrometer.bom)
api libs.micrometer.core
implementation libs.micrometer.contextPropagation

testImplementation platform(libs.junit.bom)
testImplementation "org.junit.jupiter:junit-jupiter-api"
Expand All @@ -58,7 +57,10 @@ dependencies {
testImplementation libs.micrometer.core
testImplementation libs.micrometer.test
testImplementation libs.micrometer.observation.test
testImplementation libs.micrometer.tracing.test
testImplementation libs.micrometer.contextPropagation
testImplementation(libs.micrometer.tracing.test) { //brings in context-propagation
exclude group: "io.micrometer", module: "context-propagation"
}

testImplementation(project(":reactor-test")) {
exclude module: 'reactor-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package reactor.core.observability.micrometer;

import io.micrometer.context.ContextSnapshot;
import io.micrometer.observation.Observation;

import reactor.core.observability.SignalListener;
Expand Down Expand Up @@ -47,20 +46,27 @@ final class MicrometerObservationListener<T> implements SignalListener<T> {
static final String STATUS_COMPLETED_EMPTY = MicrometerMeterListener.TAG_STATUS_COMPLETED_EMPTY;
static final String STATUS_ERROR = MicrometerMeterListener.TAG_STATUS_ERROR;

/**
* The key to use to store {@link Observation} in context (same as the one from {@code ObservationThreadLocalAccessor}).
*
* @implNote this might be redundant, but we got {@code com.sun.tools.javac.code.Symbol$CompletionFailure: class file for io.micrometer.context.ThreadLocalAccessor not found}
* in reactor-netty while compiling a similar arrangement. A unit test in MicrometerTest acts as a smoke test in case
* micrometer-observation's {@code ObservationThreadLocalAccessor.KEY} changes to something else.
*/
static final String CONTEXT_KEY_OBSERVATION = "micrometer.observation";

/**
* A value for the status tag, to be used when a Mono completes from onNext.
* In production, this is set to {@link #STATUS_COMPLETED}.
* In some tests, this can be overridden as a way to assert {@link #doOnComplete()} is no-op.
*/
final String completedOnNextStatus;
final MicrometerObservationListenerConfiguration configuration;
final ContextView originalContext;
final Observation subscribeToTerminalObservation;
final ContextView originalContext;
final Observation tapObservation;

@Nullable
Context contextWithScope;
@Nullable
Observation.Scope scope = null;
Context contextWithObservation;

boolean valued;

Expand All @@ -78,7 +84,7 @@ final class MicrometerObservationListener<T> implements SignalListener<T> {

//creation of the listener matches subscription (Publisher.subscribe(Subscriber) / doFirst)
//while doOnSubscription matches the moment where the Publisher acknowledges said subscription
subscribeToTerminalObservation = Observation.createNotStarted(
tapObservation = Observation.createNotStarted(
configuration.sequenceName,
configuration.registry
)
Expand All @@ -88,47 +94,68 @@ final class MicrometerObservationListener<T> implements SignalListener<T> {

@Override
public void doFirst() {
ContextSnapshot contextSnapshot = ContextSnapshot.capture(this.originalContext);

try (ContextSnapshot.Scope ignored = contextSnapshot.setThreadLocalValues()) {
this.scope = this.subscribeToTerminalObservation
.start()
.openScope();
//reacquire the scope from ThreadLocal
//tap context hasn't been initialized yet, so addToContext can now use the Scope
ContextSnapshot contextSnapshot2 = ContextSnapshot.capture(this.originalContext);
this.contextWithScope = contextSnapshot2.updateContext(Context.of(this.originalContext));
/* Implementation note on using parentObservation vs openScope:
Opening a Scope is never necessary in this tap listener, because the Observation we create is stored in
the Context the tap operator will expose to upstream, rather than via ThreadLocal population.
We also make a best-effort attempt to discover such an Observation in the context here in doFirst, so that this
can explicitly be used as the parentObservation. At this point, if none is found we take also the opportunity
of checking if the registry has a currentObservation.
As a consequence, fanout (eg. with a `flatMap`) upstream of the tap should be able to see the current Observation
in the context and the inner publishers should inherit it as their parent observation if they also use `tap(Micrometer.observation())`.
Note that Reactor's threading model doesn't generally guarantee that doFirst and doOnNext/doOnComplete/doOnError run
in the same thread, and that's the main reason why Scopes are avoided here (as their sole purpose is to set up
Thread Local variables).
*/

Observation o;
Observation p;
if (this.originalContext.hasKey(CONTEXT_KEY_OBSERVATION)) {
p = this.originalContext.get(CONTEXT_KEY_OBSERVATION);
}
else {
p = this.configuration.registry.getCurrentObservation();
}

if (p != null) {
o = this.tapObservation
.parentObservation(p)
.start();
}
else {
o = this.tapObservation.start();
}
this.contextWithObservation = Context.of(this.originalContext)
.put(CONTEXT_KEY_OBSERVATION, o);
}

@Override
public Context addToContext(Context originalContext) {
if (this.originalContext != originalContext) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("addToContext call on Observation {} with unexpected originalContext {}",
this.subscribeToTerminalObservation, originalContext);
this.tapObservation, originalContext);
}
return originalContext;
}
if (this.contextWithScope == null) {
if (this.contextWithObservation == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("addToContext call on Observation {} before contextWithScope is set",
this.subscribeToTerminalObservation);
this.tapObservation);
}
return originalContext;
}
return contextWithScope;
return contextWithObservation;
}

@Override
public void doOnCancel() {
Observation observation = subscribeToTerminalObservation
Observation observation = tapObservation
.lowCardinalityKeyValue(KEY_STATUS, STATUS_CANCELLED);

observation.stop();
if (scope != null) {
scope.close();
}
}

@Override
Expand All @@ -144,40 +171,31 @@ else if (!configuration.isMono) {

// if status == null, recording with OnComplete tag is done directly in onNext for the Mono(valued) case
if (status != null) {
Observation completeObservation = subscribeToTerminalObservation
Observation completeObservation = tapObservation
.lowCardinalityKeyValue(KEY_STATUS, status);

completeObservation.stop();
if (scope != null) {
scope.close();
}
}
}

@Override
public void doOnError(Throwable e) {
Observation errorObservation = subscribeToTerminalObservation
Observation errorObservation = tapObservation
.lowCardinalityKeyValue(KEY_STATUS, STATUS_ERROR)
.error(e);

errorObservation.stop();
if (scope != null) {
scope.close();
}
}

@Override
public void doOnNext(T t) {
valued = true;
if (configuration.isMono) {
//record valued completion directly
Observation completeObservation = subscribeToTerminalObservation
Observation completeObservation = tapObservation
.lowCardinalityKeyValue(KEY_STATUS, completedOnNextStatus);

completeObservation.stop();
if (scope != null) {
scope.close();
}
}
}

Expand Down

0 comments on commit f47fc56

Please sign in to comment.