Skip to content

Commit

Permalink
Remove Micrometer#useRegistry, remove notion of Clock (#3128)
Browse files Browse the repository at this point in the history
This commit reworks the `reactor-core-micrometer` `Micrometer` API to
simplify things a bit and make the use of MeterRegistry more explicit:
 - remove `useRegistry` and the notion of a default global registry
 - remove `metrics()` and `metrics(MeterRegistry, Clock)`
 - replace the above by `metrics(MeterRegistry)`

The `Clock` parameter was a bit redundant, as it can always be provided
through the `MeterRegistry#config()`. For example:

```
new SimpleMeterRegistry(SimpleConfig.DEFAULT, myCustomClock);
```

This is a breaking change compared to previous milestones only.
  • Loading branch information
simonbasle committed Jul 27, 2022
1 parent d4bb1f8 commit 687fdf8
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 190 deletions.
Expand Up @@ -35,63 +35,16 @@
public final class Micrometer {

private static final String SCHEDULERS_DECORATOR_KEY = "reactor.core.observability.micrometer.schedulerDecorator";
private static MeterRegistry registry = Metrics.globalRegistry;

/**
* The default "name" to use as a prefix for meter if the instrumented sequence doesn't define a {@link reactor.core.publisher.Flux#name(String) name}.
*/
public static final String DEFAULT_METER_PREFIX = "reactor";

/**
* Set the registry to use in reactor-core-micrometer for metrics related purposes.
* @return the previously configured registry.
* @deprecated in M4, will be removed in M5 / RC1. prefer your own singleton and explicitly
* passing the registry to {@link #metrics(MeterRegistry, Clock)}
*/
@Deprecated
public static MeterRegistry useRegistry(MeterRegistry newRegistry) {
MeterRegistry previous = registry;
registry = newRegistry;
return previous;
}

/**
* Get the registry used in reactor-core-micrometer for metrics related purposes.
*
* @deprecated in M4, will be removed in M5 / RC1. prefer your own singleton and explicitly
* passing the registry to {@link #metrics(MeterRegistry, Clock)}
*/
@Deprecated
public static MeterRegistry getRegistry() {
return registry;
}

/**
* A {@link SignalListener} factory that will ultimately produce Micrometer metrics
* to the configured default {@link #getRegistry() registry}.
* To be used with either the {@link reactor.core.publisher.Flux#tap(SignalListenerFactory)} or
* {@link reactor.core.publisher.Mono#tap(SignalListenerFactory)} operator.
* <p>
* When used in a {@link reactor.core.publisher.Flux#tap(SignalListenerFactory)} operator, meter names use
* the {@link reactor.core.publisher.Flux#name(String)} set upstream of the tap as id prefix if applicable
* or default to {@link #DEFAULT_METER_PREFIX}. Similarly, upstream tags are gathered and added
* to the default set of tags for meters.
* <p>
* Note that some monitoring systems like Prometheus require to have the exact same set of
* tags for each meter bearing the same name.
*
* @param <T> the type of onNext in the target publisher
* @return a {@link SignalListenerFactory} to record metrics
* @deprecated in M4, will be removed in M5 / RC1. prefer explicitly passing a registry via {@link #metrics(MeterRegistry, Clock)}
*/
@Deprecated
public static <T> SignalListenerFactory<T, ?> metrics() {
return new MicrometerMeterListenerFactory<>();
}

/**
* A {@link SignalListener} factory that will ultimately produce Micrometer metrics
* to the provided {@link MeterRegistry} using the provided {@link Clock} for timings.
* to the provided {@link MeterRegistry} (and using the registry's {@link MeterRegistry.Config#clock() configured}
* {@link Clock} in case additional timings are needed).
* To be used with either the {@link reactor.core.publisher.Flux#tap(SignalListenerFactory)} or
* {@link reactor.core.publisher.Mono#tap(SignalListenerFactory)} operator.
* <p>
Expand All @@ -104,20 +57,11 @@ public static MeterRegistry getRegistry() {
* tags for each meter bearing the same name.
*
* @param <T> the type of onNext in the target publisher
* @param meterRegistry the {@link MeterRegistry} in which to register and publish metrics
* @return a {@link SignalListenerFactory} to record metrics
*/
public static <T> SignalListenerFactory<T, ?> metrics(MeterRegistry registry, Clock clock) {
return new MicrometerMeterListenerFactory<T>() {
@Override
protected Clock useClock() {
return clock;
}

@Override
protected MeterRegistry useRegistry() {
return registry;
}
};
public static <T> SignalListenerFactory<T, ?> metrics(MeterRegistry meterRegistry) {
return new MicrometerMeterListenerFactory<T>(meterRegistry);
}

/**
Expand Down
Expand Up @@ -126,7 +126,7 @@ public void doOnNext(T t) {
}
//record the delay since previous onNext/onSubscribe. This also records the count.
long last = this.lastNextEventNanos;
this.lastNextEventNanos = configuration.clock.monotonicTime();
this.lastNextEventNanos = configuration.registry.config().clock().monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);
}

Expand All @@ -138,8 +138,8 @@ public void doOnMalformedOnNext(T value) {
@Override
public void doOnSubscription() {
recordOnSubscribe(configuration.sequenceName, configuration.commonTags, configuration.registry);
this.subscribeToTerminateSample = Timer.start(configuration.clock);
this.lastNextEventNanos = configuration.clock.monotonicTime();
this.subscribeToTerminateSample = Timer.start(configuration.registry);
this.lastNextEventNanos = configuration.registry.config().clock().monotonicTime();
}

@Override
Expand Down
Expand Up @@ -41,20 +41,20 @@ final class MicrometerMeterListenerConfiguration {

private static final Logger LOGGER = Loggers.getLogger(MicrometerMeterListenerConfiguration.class);

static MicrometerMeterListenerConfiguration fromFlux(Flux<?> source, MeterRegistry meterRegistry, Clock clock) {
static MicrometerMeterListenerConfiguration fromFlux(Flux<?> source, MeterRegistry meterRegistry) {
Tags defaultTags = MicrometerMeterListener.DEFAULT_TAGS_FLUX;
final String name = resolveName(source, LOGGER, Micrometer.DEFAULT_METER_PREFIX);
final Tags tags = resolveTags(source, defaultTags);

return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, clock, false);
return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, false);
}

static MicrometerMeterListenerConfiguration fromMono(Mono<?> source, MeterRegistry meterRegistry, Clock clock) {
static MicrometerMeterListenerConfiguration fromMono(Mono<?> source, MeterRegistry meterRegistry) {
Tags defaultTags = MicrometerMeterListener.DEFAULT_TAGS_MONO;
final String name = resolveName(source, LOGGER, Micrometer.DEFAULT_METER_PREFIX);
final Tags tags = resolveTags(source, defaultTags);

return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, clock, true);
return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, true);
}

/**
Expand Down Expand Up @@ -103,7 +103,6 @@ static Tags resolveTags(Publisher<?> source, Tags tags) {
return tags;
}

final Clock clock;
final Tags commonTags;
final boolean isMono;
final String sequenceName;
Expand All @@ -112,9 +111,7 @@ static Tags resolveTags(Publisher<?> source, Tags tags) {
// separator is the dot, not camelCase...
final MeterRegistry registry;

MicrometerMeterListenerConfiguration(String sequenceName, Tags tags, MeterRegistry registryCandidate, Clock clock,
boolean isMono) {
this.clock = clock;
MicrometerMeterListenerConfiguration(String sequenceName, Tags tags, MeterRegistry registryCandidate, boolean isMono) {
this.commonTags = tags;
this.isMono = isMono;
this.sequenceName = sequenceName;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package reactor.core.observability.micrometer;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import org.reactivestreams.Publisher;

Expand All @@ -33,22 +34,19 @@
*/
class MicrometerMeterListenerFactory<T> implements SignalListenerFactory<T, MicrometerMeterListenerConfiguration> {

protected Clock useClock() {
return Clock.SYSTEM;
}
final MeterRegistry registry;

@SuppressWarnings("deprecation")
protected MeterRegistry useRegistry() {
return Micrometer.getRegistry();
MicrometerMeterListenerFactory(MeterRegistry registry) {
this.registry = registry;
}

@Override
public MicrometerMeterListenerConfiguration initializePublisherState(Publisher<? extends T> source) {
if (source instanceof Mono) {
return MicrometerMeterListenerConfiguration.fromMono((Mono<?>) source, useRegistry(), useClock());
return MicrometerMeterListenerConfiguration.fromMono((Mono<?>) source, this.registry);
}
else if (source instanceof Flux) {
return MicrometerMeterListenerConfiguration.fromFlux((Flux<?>) source, useRegistry(), useClock());
return MicrometerMeterListenerConfiguration.fromFlux((Flux<?>) source, this.registry);
}
else {
throw new IllegalArgumentException("MicrometerMeterListenerFactory must only be used via the tap operator / with a Flux or Mono");
Expand Down
Expand Up @@ -58,9 +58,9 @@ void fromFlux(@Nullable String name, @Nullable String tag) {
flux = flux.tag("tag", tag);
}

MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromFlux(flux, expectedRegistry, expectedClock);
MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromFlux(flux, expectedRegistry);

assertThat(configuration.clock).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry.config().clock()).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry).as("registry").isSameAs(expectedRegistry);
assertThat(configuration.isMono).as("isMono").isFalse();

Expand Down Expand Up @@ -100,9 +100,9 @@ void fromMono(@Nullable String name, @Nullable String tag) {
mono = mono.tag("tag", tag);
}

MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromMono(mono, expectedRegistry, expectedClock);
MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromMono(mono, expectedRegistry);

assertThat(configuration.clock).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry.config().clock()).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry).as("registry").isSameAs(expectedRegistry);
assertThat(configuration.isMono).as("isMono").isTrue();

Expand Down
Expand Up @@ -17,7 +17,7 @@
package reactor.core.observability.micrometer;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
Expand All @@ -36,34 +36,11 @@
*/
class MicrometerMeterListenerFactoryTest {

@Test
void useClockDefaultsToSystemClock() {
MicrometerMeterListenerFactory<?> factory = new MicrometerMeterListenerFactory<>();

assertThat(factory.useClock()).isSameAs(Clock.SYSTEM);
}

@Test
void useRegistryDefaultsToCommonRegistry() {
SimpleMeterRegistry commonRegistry = new SimpleMeterRegistry();
MeterRegistry defaultCommon = Micrometer.useRegistry(commonRegistry);
try {
MicrometerMeterListenerFactory<?> factory = new MicrometerMeterListenerFactory<>();

assertThat(factory.useRegistry()).isSameAs(Micrometer.getRegistry())
.isSameAs(commonRegistry);
}
finally {
Micrometer.useRegistry(defaultCommon);
}
}

@Test
void configurationFromMono() {
MicrometerMeterListenerConfiguration configuration = CUSTOM_FACTORY.initializePublisherState(Mono.just(1));

assertThat(configuration.registry).as("registry").isSameAs(CUSTOM_REGISTRY);
assertThat(configuration.clock).as("clock").isSameAs(CUSTOM_CLOCK);
assertThat(configuration.isMono).as("isMono").isTrue();
assertThat(configuration.commonTags).map(Object::toString).containsExactly("tag(type=Mono)");
}
Expand All @@ -73,7 +50,6 @@ void configurationFromFlux() {
MicrometerMeterListenerConfiguration configuration = CUSTOM_FACTORY.initializePublisherState(Flux.just(1, 2));

assertThat(configuration.registry).as("registry").isSameAs(CUSTOM_REGISTRY);
assertThat(configuration.clock).as("clock").isSameAs(CUSTOM_CLOCK);
assertThat(configuration.isMono).as("isMono").isFalse();
assertThat(configuration.commonTags).map(Object::toString).containsExactly("tag(type=Flux)");
}
Expand Down Expand Up @@ -106,17 +82,7 @@ public long monotonicTime() {
return 0;
}
};
protected static final SimpleMeterRegistry CUSTOM_REGISTRY = new SimpleMeterRegistry();
protected static final SimpleMeterRegistry CUSTOM_REGISTRY = new SimpleMeterRegistry(SimpleConfig.DEFAULT, CUSTOM_CLOCK);
protected static final MicrometerMeterListenerFactory<Object>
CUSTOM_FACTORY = new MicrometerMeterListenerFactory<Object>() {
@Override
protected Clock useClock() {
return CUSTOM_CLOCK;
}

@Override
protected MeterRegistry useRegistry() {
return CUSTOM_REGISTRY;
}
};
CUSTOM_FACTORY = new MicrometerMeterListenerFactory<Object>(CUSTOM_REGISTRY);
}
Expand Up @@ -23,6 +23,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -42,7 +43,6 @@ class MicrometerMeterListenerTest {

@BeforeEach
void initRegistry() {
registry = new SimpleMeterRegistry();
virtualClockTime = new AtomicLong();
virtualClock = new Clock() {
@Override
Expand All @@ -55,11 +55,11 @@ public long monotonicTime() {
return virtualClockTime.get();
}
};
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, virtualClock);
configuration = new MicrometerMeterListenerConfiguration(
"testName",
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
false);
}

Expand All @@ -69,7 +69,6 @@ void initialStateFluxWithDefaultName() {
Micrometer.DEFAULT_METER_PREFIX,
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
false);

MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
Expand All @@ -95,7 +94,6 @@ void initialStateFluxWithCustomName() {
"testName",
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
false);

MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
Expand Down Expand Up @@ -126,7 +124,6 @@ void initialStateMono() {
Micrometer.DEFAULT_METER_PREFIX,
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
true);

MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
Expand Down Expand Up @@ -284,7 +281,7 @@ void doOnNextRecordsInterval() {
@Test
void doOnNextRecordsInterval_defaultName() {
configuration = new MicrometerMeterListenerConfiguration(Micrometer.DEFAULT_METER_PREFIX, Tags.empty(),
registry, virtualClock, false);
registry, false);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
listener.doOnSubscription();

Expand All @@ -309,7 +306,7 @@ void doOnNextRecordsInterval_defaultName() {
@Test
void doOnNext_monoRecordsCompletionOnly() {
configuration = new MicrometerMeterListenerConfiguration("testName", Tags.empty(),
registry, virtualClock, true);
registry, true);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);

listener.doOnSubscription();
Expand Down Expand Up @@ -369,15 +366,15 @@ void doOnRequestRecordsTotalDemand() {

@Test
void doOnRequestMonoIgnoresRequest() {
configuration = new MicrometerMeterListenerConfiguration("testName", Tags.empty(), registry, virtualClock, true);
configuration = new MicrometerMeterListenerConfiguration("testName", Tags.empty(), registry, true);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
assertThatCode(() -> listener.doOnRequest(100L)).doesNotThrowAnyException();
assertThat(listener.requestedCounter).isNull();
}

@Test
void doOnRequestDefaultNameIgnoresRequest() {
configuration = new MicrometerMeterListenerConfiguration(Micrometer.DEFAULT_METER_PREFIX, Tags.empty(), registry, virtualClock, false);
configuration = new MicrometerMeterListenerConfiguration(Micrometer.DEFAULT_METER_PREFIX, Tags.empty(), registry, false);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
assertThatCode(() -> listener.doOnRequest(100L)).doesNotThrowAnyException();
assertThat(listener.requestedCounter).isNull();
Expand Down

0 comments on commit 687fdf8

Please sign in to comment.