Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Micrometer#useRegistry, remove notion of Clock #3128

Merged
merged 1 commit into from Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,63 +35,16 @@
public final class Micrometer {

private static final String SCHEDULERS_DECORATOR_KEY = "reactor.core.observability.micrometer.schedulerDecorator";
private static MeterRegistry registry = Metrics.globalRegistry;

/**
* The default "name" to use as a prefix for meter if the instrumented sequence doesn't define a {@link reactor.core.publisher.Flux#name(String) name}.
*/
public static final String DEFAULT_METER_PREFIX = "reactor";

/**
* Set the registry to use in reactor-core-micrometer for metrics related purposes.
* @return the previously configured registry.
* @deprecated in M4, will be removed in M5 / RC1. prefer your own singleton and explicitly
* passing the registry to {@link #metrics(MeterRegistry, Clock)}
*/
@Deprecated
public static MeterRegistry useRegistry(MeterRegistry newRegistry) {
MeterRegistry previous = registry;
registry = newRegistry;
return previous;
}

/**
* Get the registry used in reactor-core-micrometer for metrics related purposes.
*
* @deprecated in M4, will be removed in M5 / RC1. prefer your own singleton and explicitly
* passing the registry to {@link #metrics(MeterRegistry, Clock)}
*/
@Deprecated
public static MeterRegistry getRegistry() {
return registry;
}

/**
* A {@link SignalListener} factory that will ultimately produce Micrometer metrics
* to the configured default {@link #getRegistry() registry}.
* To be used with either the {@link reactor.core.publisher.Flux#tap(SignalListenerFactory)} or
* {@link reactor.core.publisher.Mono#tap(SignalListenerFactory)} operator.
* <p>
* When used in a {@link reactor.core.publisher.Flux#tap(SignalListenerFactory)} operator, meter names use
* the {@link reactor.core.publisher.Flux#name(String)} set upstream of the tap as id prefix if applicable
* or default to {@link #DEFAULT_METER_PREFIX}. Similarly, upstream tags are gathered and added
* to the default set of tags for meters.
* <p>
* Note that some monitoring systems like Prometheus require to have the exact same set of
* tags for each meter bearing the same name.
*
* @param <T> the type of onNext in the target publisher
* @return a {@link SignalListenerFactory} to record metrics
* @deprecated in M4, will be removed in M5 / RC1. prefer explicitly passing a registry via {@link #metrics(MeterRegistry, Clock)}
*/
@Deprecated
public static <T> SignalListenerFactory<T, ?> metrics() {
return new MicrometerMeterListenerFactory<>();
}

/**
* A {@link SignalListener} factory that will ultimately produce Micrometer metrics
* to the provided {@link MeterRegistry} using the provided {@link Clock} for timings.
* to the provided {@link MeterRegistry} (and using the registry's {@link MeterRegistry.Config#clock() configured}
* {@link Clock} in case additional timings are needed).
* To be used with either the {@link reactor.core.publisher.Flux#tap(SignalListenerFactory)} or
* {@link reactor.core.publisher.Mono#tap(SignalListenerFactory)} operator.
* <p>
Expand All @@ -104,20 +57,11 @@ public static MeterRegistry getRegistry() {
* tags for each meter bearing the same name.
*
* @param <T> the type of onNext in the target publisher
* @param meterRegistry the {@link MeterRegistry} in which to register and publish metrics
* @return a {@link SignalListenerFactory} to record metrics
*/
public static <T> SignalListenerFactory<T, ?> metrics(MeterRegistry registry, Clock clock) {
return new MicrometerMeterListenerFactory<T>() {
@Override
protected Clock useClock() {
return clock;
}

@Override
protected MeterRegistry useRegistry() {
return registry;
}
};
public static <T> SignalListenerFactory<T, ?> metrics(MeterRegistry meterRegistry) {
return new MicrometerMeterListenerFactory<T>(meterRegistry);
}

/**
Expand Down
Expand Up @@ -126,7 +126,7 @@ public void doOnNext(T t) {
}
//record the delay since previous onNext/onSubscribe. This also records the count.
long last = this.lastNextEventNanos;
this.lastNextEventNanos = configuration.clock.monotonicTime();
this.lastNextEventNanos = configuration.registry.config().clock().monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);
}

Expand All @@ -138,8 +138,8 @@ public void doOnMalformedOnNext(T value) {
@Override
public void doOnSubscription() {
recordOnSubscribe(configuration.sequenceName, configuration.commonTags, configuration.registry);
this.subscribeToTerminateSample = Timer.start(configuration.clock);
this.lastNextEventNanos = configuration.clock.monotonicTime();
this.subscribeToTerminateSample = Timer.start(configuration.registry);
this.lastNextEventNanos = configuration.registry.config().clock().monotonicTime();
}

@Override
Expand Down
Expand Up @@ -41,20 +41,20 @@ final class MicrometerMeterListenerConfiguration {

private static final Logger LOGGER = Loggers.getLogger(MicrometerMeterListenerConfiguration.class);

static MicrometerMeterListenerConfiguration fromFlux(Flux<?> source, MeterRegistry meterRegistry, Clock clock) {
static MicrometerMeterListenerConfiguration fromFlux(Flux<?> source, MeterRegistry meterRegistry) {
Tags defaultTags = MicrometerMeterListener.DEFAULT_TAGS_FLUX;
final String name = resolveName(source, LOGGER, Micrometer.DEFAULT_METER_PREFIX);
final Tags tags = resolveTags(source, defaultTags);

return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, clock, false);
return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, false);
}

static MicrometerMeterListenerConfiguration fromMono(Mono<?> source, MeterRegistry meterRegistry, Clock clock) {
static MicrometerMeterListenerConfiguration fromMono(Mono<?> source, MeterRegistry meterRegistry) {
Tags defaultTags = MicrometerMeterListener.DEFAULT_TAGS_MONO;
final String name = resolveName(source, LOGGER, Micrometer.DEFAULT_METER_PREFIX);
final Tags tags = resolveTags(source, defaultTags);

return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, clock, true);
return new MicrometerMeterListenerConfiguration(name, tags, meterRegistry, true);
}

/**
Expand Down Expand Up @@ -103,7 +103,6 @@ static Tags resolveTags(Publisher<?> source, Tags tags) {
return tags;
}

final Clock clock;
final Tags commonTags;
final boolean isMono;
final String sequenceName;
Expand All @@ -112,9 +111,7 @@ static Tags resolveTags(Publisher<?> source, Tags tags) {
// separator is the dot, not camelCase...
final MeterRegistry registry;

MicrometerMeterListenerConfiguration(String sequenceName, Tags tags, MeterRegistry registryCandidate, Clock clock,
boolean isMono) {
this.clock = clock;
MicrometerMeterListenerConfiguration(String sequenceName, Tags tags, MeterRegistry registryCandidate, boolean isMono) {
this.commonTags = tags;
this.isMono = isMono;
this.sequenceName = sequenceName;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package reactor.core.observability.micrometer;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import org.reactivestreams.Publisher;

Expand All @@ -33,22 +34,19 @@
*/
class MicrometerMeterListenerFactory<T> implements SignalListenerFactory<T, MicrometerMeterListenerConfiguration> {

protected Clock useClock() {
return Clock.SYSTEM;
}
final MeterRegistry registry;

@SuppressWarnings("deprecation")
protected MeterRegistry useRegistry() {
return Micrometer.getRegistry();
MicrometerMeterListenerFactory(MeterRegistry registry) {
this.registry = registry;
}

@Override
public MicrometerMeterListenerConfiguration initializePublisherState(Publisher<? extends T> source) {
if (source instanceof Mono) {
return MicrometerMeterListenerConfiguration.fromMono((Mono<?>) source, useRegistry(), useClock());
return MicrometerMeterListenerConfiguration.fromMono((Mono<?>) source, this.registry);
}
else if (source instanceof Flux) {
return MicrometerMeterListenerConfiguration.fromFlux((Flux<?>) source, useRegistry(), useClock());
return MicrometerMeterListenerConfiguration.fromFlux((Flux<?>) source, this.registry);
}
else {
throw new IllegalArgumentException("MicrometerMeterListenerFactory must only be used via the tap operator / with a Flux or Mono");
Expand Down
Expand Up @@ -58,9 +58,9 @@ void fromFlux(@Nullable String name, @Nullable String tag) {
flux = flux.tag("tag", tag);
}

MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromFlux(flux, expectedRegistry, expectedClock);
MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromFlux(flux, expectedRegistry);

assertThat(configuration.clock).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry.config().clock()).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry).as("registry").isSameAs(expectedRegistry);
assertThat(configuration.isMono).as("isMono").isFalse();

Expand Down Expand Up @@ -100,9 +100,9 @@ void fromMono(@Nullable String name, @Nullable String tag) {
mono = mono.tag("tag", tag);
}

MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromMono(mono, expectedRegistry, expectedClock);
MicrometerMeterListenerConfiguration configuration = MicrometerMeterListenerConfiguration.fromMono(mono, expectedRegistry);

assertThat(configuration.clock).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry.config().clock()).as("clock").isSameAs(expectedClock);
assertThat(configuration.registry).as("registry").isSameAs(expectedRegistry);
assertThat(configuration.isMono).as("isMono").isTrue();

Expand Down
Expand Up @@ -17,7 +17,7 @@
package reactor.core.observability.micrometer;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
Expand All @@ -36,34 +36,11 @@
*/
class MicrometerMeterListenerFactoryTest {

@Test
void useClockDefaultsToSystemClock() {
MicrometerMeterListenerFactory<?> factory = new MicrometerMeterListenerFactory<>();

assertThat(factory.useClock()).isSameAs(Clock.SYSTEM);
}

@Test
void useRegistryDefaultsToCommonRegistry() {
SimpleMeterRegistry commonRegistry = new SimpleMeterRegistry();
MeterRegistry defaultCommon = Micrometer.useRegistry(commonRegistry);
try {
MicrometerMeterListenerFactory<?> factory = new MicrometerMeterListenerFactory<>();

assertThat(factory.useRegistry()).isSameAs(Micrometer.getRegistry())
.isSameAs(commonRegistry);
}
finally {
Micrometer.useRegistry(defaultCommon);
}
}

@Test
void configurationFromMono() {
MicrometerMeterListenerConfiguration configuration = CUSTOM_FACTORY.initializePublisherState(Mono.just(1));

assertThat(configuration.registry).as("registry").isSameAs(CUSTOM_REGISTRY);
assertThat(configuration.clock).as("clock").isSameAs(CUSTOM_CLOCK);
assertThat(configuration.isMono).as("isMono").isTrue();
assertThat(configuration.commonTags).map(Object::toString).containsExactly("tag(type=Mono)");
}
Expand All @@ -73,7 +50,6 @@ void configurationFromFlux() {
MicrometerMeterListenerConfiguration configuration = CUSTOM_FACTORY.initializePublisherState(Flux.just(1, 2));

assertThat(configuration.registry).as("registry").isSameAs(CUSTOM_REGISTRY);
assertThat(configuration.clock).as("clock").isSameAs(CUSTOM_CLOCK);
assertThat(configuration.isMono).as("isMono").isFalse();
assertThat(configuration.commonTags).map(Object::toString).containsExactly("tag(type=Flux)");
}
Expand Down Expand Up @@ -106,17 +82,7 @@ public long monotonicTime() {
return 0;
}
};
protected static final SimpleMeterRegistry CUSTOM_REGISTRY = new SimpleMeterRegistry();
protected static final SimpleMeterRegistry CUSTOM_REGISTRY = new SimpleMeterRegistry(SimpleConfig.DEFAULT, CUSTOM_CLOCK);
protected static final MicrometerMeterListenerFactory<Object>
CUSTOM_FACTORY = new MicrometerMeterListenerFactory<Object>() {
@Override
protected Clock useClock() {
return CUSTOM_CLOCK;
}

@Override
protected MeterRegistry useRegistry() {
return CUSTOM_REGISTRY;
}
};
CUSTOM_FACTORY = new MicrometerMeterListenerFactory<Object>(CUSTOM_REGISTRY);
}
Expand Up @@ -23,6 +23,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -42,7 +43,6 @@ class MicrometerMeterListenerTest {

@BeforeEach
void initRegistry() {
registry = new SimpleMeterRegistry();
virtualClockTime = new AtomicLong();
virtualClock = new Clock() {
@Override
Expand All @@ -55,11 +55,11 @@ public long monotonicTime() {
return virtualClockTime.get();
}
};
registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, virtualClock);
configuration = new MicrometerMeterListenerConfiguration(
"testName",
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
false);
}

Expand All @@ -69,7 +69,6 @@ void initialStateFluxWithDefaultName() {
Micrometer.DEFAULT_METER_PREFIX,
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
false);

MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
Expand All @@ -95,7 +94,6 @@ void initialStateFluxWithCustomName() {
"testName",
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
false);

MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
Expand Down Expand Up @@ -126,7 +124,6 @@ void initialStateMono() {
Micrometer.DEFAULT_METER_PREFIX,
Tags.of("testTag1", "testTagValue1","testTag2", "testTagValue2"),
registry,
virtualClock,
true);

MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
Expand Down Expand Up @@ -284,7 +281,7 @@ void doOnNextRecordsInterval() {
@Test
void doOnNextRecordsInterval_defaultName() {
configuration = new MicrometerMeterListenerConfiguration(Micrometer.DEFAULT_METER_PREFIX, Tags.empty(),
registry, virtualClock, false);
registry, false);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
listener.doOnSubscription();

Expand All @@ -309,7 +306,7 @@ void doOnNextRecordsInterval_defaultName() {
@Test
void doOnNext_monoRecordsCompletionOnly() {
configuration = new MicrometerMeterListenerConfiguration("testName", Tags.empty(),
registry, virtualClock, true);
registry, true);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);

listener.doOnSubscription();
Expand Down Expand Up @@ -369,15 +366,15 @@ void doOnRequestRecordsTotalDemand() {

@Test
void doOnRequestMonoIgnoresRequest() {
configuration = new MicrometerMeterListenerConfiguration("testName", Tags.empty(), registry, virtualClock, true);
configuration = new MicrometerMeterListenerConfiguration("testName", Tags.empty(), registry, true);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
assertThatCode(() -> listener.doOnRequest(100L)).doesNotThrowAnyException();
assertThat(listener.requestedCounter).isNull();
}

@Test
void doOnRequestDefaultNameIgnoresRequest() {
configuration = new MicrometerMeterListenerConfiguration(Micrometer.DEFAULT_METER_PREFIX, Tags.empty(), registry, virtualClock, false);
configuration = new MicrometerMeterListenerConfiguration(Micrometer.DEFAULT_METER_PREFIX, Tags.empty(), registry, false);
MicrometerMeterListener<Integer> listener = new MicrometerMeterListener<>(configuration);
assertThatCode(() -> listener.doOnRequest(100L)).doesNotThrowAnyException();
assertThat(listener.requestedCounter).isNull();
Expand Down