diff --git a/reactor-core-micrometer/build.gradle b/reactor-core-micrometer/build.gradle index a53bcca324..d4fade8344 100644 --- a/reactor-core-micrometer/build.gradle +++ b/reactor-core-micrometer/build.gradle @@ -71,6 +71,7 @@ dependencies { testRuntimeOnly libs.logback testImplementation libs.assertj testImplementation libs.mockito + testImplementation libs.awaitility } tasks.withType(Test).all { diff --git a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/Micrometer.java b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/Micrometer.java index 189c7d68bd..9c3b5856ae 100644 --- a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/Micrometer.java +++ b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/Micrometer.java @@ -16,26 +16,22 @@ package reactor.core.observability.micrometer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; - import io.micrometer.common.KeyValue; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import reactor.core.observability.SignalListener; import reactor.core.observability.SignalListenerFactory; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; public final class Micrometer { - private static final String SCHEDULERS_DECORATOR_KEY = "reactor.core.observability.micrometer.schedulerDecorator"; - /** * The default "name" to use as a prefix for meter if the instrumented sequence doesn't define a {@link reactor.core.publisher.Flux#name(String) name}. */ @@ -94,29 +90,33 @@ public final class Micrometer { return new MicrometerObservationListenerFactory<>(registry); } - //FIXME: remove these and replace with an option to decorate an arbitrary Scheduler - /** - * Set-up a decorator that will instrument any {@link ExecutorService} that backs a reactor-core {@link Scheduler} - * (or scheduler implementations which use {@link Schedulers#decorateExecutorService(Scheduler, ScheduledExecutorService)}). - *

- * The {@link MeterRegistry} to use can be configured via {@link reactor.util.Metrics.MicrometerConfiguration#useRegistry(MeterRegistry)} - * prior to using this method, the default being {@link io.micrometer.core.instrument.Metrics#globalRegistry}. + * Wrap a {@link Scheduler} in an instance that gathers various task-related metrics using + * the provided {@link MeterRegistry} and naming meters using the provided {@code metricsPrefix}. + * Note that no tags are set up for these meters. * - * @implNote Note that this is added as a decorator via Schedulers when enabling metrics for schedulers, - * which doesn't change the Factory. + * @param original the original {@link Scheduler} to decorate with metrics + * @param meterRegistry the {@link MeterRegistry} in which to register the various meters + * @param metricsPrefix the prefix to use in meter names. If needed, a dot is added at the end + * @return a {@link Scheduler} that is instrumented with dedicated metrics */ - @Deprecated - public static void enableSchedulersMetricsDecorator() { - Schedulers.addExecutorServiceDecorator(SCHEDULERS_DECORATOR_KEY, - new MicrometerSchedulerMetricsDecorator(reactor.util.Metrics.MicrometerConfiguration.getRegistry())); + public static Scheduler timedScheduler(Scheduler original, MeterRegistry meterRegistry, String metricsPrefix) { + return new TimedScheduler(original, meterRegistry, metricsPrefix, Tags.empty()); } /** - * If {@link #enableSchedulersMetricsDecorator()} has been previously called, removes the decorator. - * No-op if {@link #enableSchedulersMetricsDecorator()} hasn't been called. + * Wrap a {@link Scheduler} in an instance that gathers various task-related metrics using + * the provided {@link MeterRegistry} and naming meters using the provided {@code metricsPrefix}. + * User-provided collection of {@link Tag} (ie. {@link Tags}) can also be provided to be added to + * all the meters of that timed Scheduler. + * + * @param original the original {@link Scheduler} to decorate with metrics + * @param meterRegistry the {@link MeterRegistry} in which to register the various meters + * @param metricsPrefix the prefix to use in meter names. If needed, a dot is added at the end + * @param tags the tags to put on meters + * @return a {@link Scheduler} that is instrumented with dedicated metrics */ - public static void disableSchedulersMetricsDecorator() { - Schedulers.removeExecutorServiceDecorator(SCHEDULERS_DECORATOR_KEY); + public static Scheduler timedScheduler(Scheduler original, MeterRegistry meterRegistry, String metricsPrefix, Iterable tags) { + return new TimedScheduler(original, meterRegistry, metricsPrefix, tags); } } \ No newline at end of file diff --git a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/MicrometerSchedulerMetricsDecorator.java b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/MicrometerSchedulerMetricsDecorator.java deleted file mode 100644 index 2cad2a1317..0000000000 --- a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/MicrometerSchedulerMetricsDecorator.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.observability.micrometer; - -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; - -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; -import io.micrometer.core.instrument.search.Search; - -import reactor.core.Disposable; -import reactor.core.Scannable; -import reactor.core.scheduler.Scheduler; - -/** - * @author Simon Baslé - */ -final class MicrometerSchedulerMetricsDecorator implements BiFunction, - Disposable { - - //TODO expose keys and tags publicly? - static final String TAG_SCHEDULER_ID = "reactor.scheduler.id"; - - final WeakHashMap seenSchedulers = new WeakHashMap<>(); - final Map schedulerDifferentiator = new HashMap<>(); - final WeakHashMap executorDifferentiator = new WeakHashMap<>(); - final MeterRegistry registry; - - MicrometerSchedulerMetricsDecorator(MeterRegistry registry) { - this.registry = registry; - } - - @Override - public synchronized ScheduledExecutorService apply(Scheduler scheduler, ScheduledExecutorService service) { - //this is equivalent to `toString`, a detailed name like `parallel("foo", 3)` - String schedulerName = Scannable - .from(scheduler) - .scanOrDefault(Scannable.Attr.NAME, scheduler.getClass().getName()); - - //we hope that each NAME is unique enough, but we'll differentiate by Scheduler - String schedulerId = - seenSchedulers.computeIfAbsent(scheduler, s -> { - int schedulerDifferentiator = this.schedulerDifferentiator - .computeIfAbsent(schedulerName, k -> new AtomicInteger(0)) - .getAndIncrement(); - - return (schedulerDifferentiator == 0) ? schedulerName - : schedulerName + "#" + schedulerDifferentiator; - }); - - //we now want an executorId unique to a given scheduler - String executorId = schedulerId + "-" + - executorDifferentiator.computeIfAbsent(scheduler, key -> new AtomicInteger(0)) - .getAndIncrement(); - - Tag[] tags = new Tag[] { Tag.of(TAG_SCHEDULER_ID, schedulerId) }; - - /* - Design note: we assume that a given Scheduler won't apply the decorator twice to the - same ExecutorService. Even though, it would simply create an extraneous meter for - that ExecutorService, which we think is not that bad (compared to paying the price - upfront of also tracking executors instances to deduplicate). The main goal is to - detect Scheduler instances that have already started decorating their executors, - in order to avoid consider two calls in a row as duplicates (yet still being able - to distinguish between two instances with the same name and configuration). - */ - - - return new MetricsRemovingScheduledExecutorService(service, this.registry, executorId, tags); - } - - @Override - public void dispose() { - Search.in(registry) - .tagKeys(TAG_SCHEDULER_ID) - .meters() - .forEach(registry::remove); - - //note default isDisposed (returning false) is good enough, since the cleared - //collections can always be reused even though they probably won't - this.seenSchedulers.clear(); - this.schedulerDifferentiator.clear(); - this.executorDifferentiator.clear(); - } - - static class MetricsRemovingScheduledExecutorService implements ScheduledExecutorService { - - final ScheduledExecutorService scheduledExecutorService; - final MeterRegistry registry; - final String executorId; - - MetricsRemovingScheduledExecutorService(ScheduledExecutorService service, MeterRegistry registry, String executorId, Tag[] tags) { - this.scheduledExecutorService = ExecutorServiceMetrics.monitor(registry, service, executorId, tags); - this.registry = registry; - this.executorId = executorId; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return scheduledExecutorService.awaitTermination(timeout, unit); - } - - @Override - public void execute(Runnable command) { - scheduledExecutorService.execute(command); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return scheduledExecutorService.invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { - return scheduledExecutorService.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, - ExecutionException { - return scheduledExecutorService.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return scheduledExecutorService.invokeAny(tasks, timeout, unit); - } - - @Override - public boolean isShutdown() { - return scheduledExecutorService.isShutdown(); - } - - @Override - public boolean isTerminated() { - return scheduledExecutorService.isTerminated(); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return scheduledExecutorService.schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return scheduledExecutorService.schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - public List shutdownNow() { - removeMetrics(); - return scheduledExecutorService.shutdownNow(); - } - - @Override - public void shutdown() { - removeMetrics(); - scheduledExecutorService.shutdown(); - } - - @Override - public Future submit(Callable task) { - return scheduledExecutorService.submit(task); - } - - @Override - public Future submit(Runnable task, T result) { - return scheduledExecutorService.submit(task, result); - } - - @Override - public Future submit(Runnable task) { - return scheduledExecutorService.submit(task); - } - - void removeMetrics() { - Search.in(registry) - .tag("name", executorId) - .meters() - .forEach(registry::remove); - } - } -} diff --git a/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java new file mode 100644 index 0000000000..5cd2611e2b --- /dev/null +++ b/reactor-core-micrometer/src/main/java/reactor/core/observability/micrometer/TimedScheduler.java @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.observability.micrometer; + +import java.util.concurrent.TimeUnit; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.LongTaskTimer; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; + +import reactor.core.Disposable; +import reactor.core.scheduler.Scheduler; + +/** + * An instrumented {@link Scheduler} wrapping an original {@link Scheduler} + * and gathering metrics around submitted tasks. + * + * @author Simon Baslé + */ +final class TimedScheduler implements Scheduler { + + //FIXME document all the tags/meters/etc... somewhere public? + + /** + * {@link Timer} reflecting tasks that have finished execution. Note that this reflects all types of + * active tasks, including tasks scheduled {@link #schedule(Runnable, long, TimeUnit) with a delay} + * or {@link #schedulePeriodically(Runnable, long, long, TimeUnit) periodically} (each + * iteration being considered a separate completed task). + */ + static final String METER_TASKS_COMPLETED = "scheduler.tasks.completed"; + /** + * {@link LongTaskTimer} reflecting tasks currently running. Note that this reflects all types of + * active tasks, including tasks scheduled {@link #schedule(Runnable, long, TimeUnit) with a delay} + * or {@link #schedulePeriodically(Runnable, long, long, TimeUnit) periodically} (each + * iteration being considered an active task). + */ + static final String METER_TASKS_ACTIVE = "scheduler.tasks.active"; + /** + * {@link LongTaskTimer} reflecting tasks that were submitted for immediate execution but + * couldn't be started immediately because the scheduler is already at max capacity. + * Note that only immediate submissions via {@link Scheduler#schedule(Runnable)} and + * {@link Worker#schedule(Runnable)} are considered. + */ + static final String METER_TASKS_PENDING = "scheduler.tasks.pending"; + + /** + * The type of submission: + *

+ */ + static final String TAG_SUBMISSION = "submission.type"; + + /** + * {@link Counter} that increments by one each time a task is submitted (via any of the + * schedule methods on both {@link Scheduler} and {@link Worker}). + *

+ * Note that there are actually 4 counters, which can be differentiated by the {@link #TAG_SUBMISSION} tag. + * The sum of all these can thus be compared with the {@link #METER_TASKS_COMPLETED} counter. + */ + static final String METER_SUBMITTED = "scheduler.tasks.submitted"; + + /** + * {@link Counter} that increments by one each time a task is submitted for immediate execution + * (ie. {@link Scheduler#schedule(Runnable)} or {@link Worker#schedule(Runnable)}). + */ + static final String SUBMISSION_DIRECT = "direct"; + /** + * {@link Counter} that increments by one each time a task is submitted with a delay + * (ie. {@link Scheduler#schedule(Runnable, long, TimeUnit)} + * or {@link Worker#schedule(Runnable, long, TimeUnit)}). + */ + static final String SUBMISSION_DELAYED = "delayed"; + /** + * {@link Counter} that increments when a task is initially submitted with a period + * (ie. {@link Scheduler#schedulePeriodically(Runnable, long, long, TimeUnit)} + * or {@link Worker#schedulePeriodically(Runnable, long, long, TimeUnit)}). This isn't + * incremented on further iterations of the periodic task. + */ + static final String SUBMISSION_PERIODIC_INITIAL = "periodic_initial"; + /** + * {@link Counter} that increments by one each time a task is re-executed due to the periodic + * nature of {@link Scheduler#schedulePeriodically(Runnable, long, long, TimeUnit)} + * or {@link Worker#schedulePeriodically(Runnable, long, long, TimeUnit)} (ie. iterations + * past the initial one). + */ + static final String SUBMISSION_PERIODIC_ITERATION = "periodic_iteration"; + + final Scheduler delegate; + + final MeterRegistry registry; + + final Counter submittedDirect; + final Counter submittedDelayed; + final Counter submittedPeriodicInitial; + final Counter submittedPeriodicIteration; + final LongTaskTimer pendingTasks; + final LongTaskTimer activeTasks; + final Timer completedTasks; + + + TimedScheduler(Scheduler delegate, MeterRegistry registry, String metricPrefix, Iterable tagsList) { + this.delegate = delegate; + this.registry = registry; + if (!metricPrefix.endsWith(".")) { + metricPrefix = metricPrefix + "."; + } + Tags tags = Tags.of(tagsList); + + String submittedName = metricPrefix + METER_SUBMITTED; + this.submittedDirect = registry.counter(submittedName, tags.and(TAG_SUBMISSION, SUBMISSION_DIRECT)); + this.submittedDelayed = registry.counter(submittedName, tags.and(TAG_SUBMISSION, SUBMISSION_DELAYED)); + this.submittedPeriodicInitial = registry.counter(submittedName, tags.and(TAG_SUBMISSION, SUBMISSION_PERIODIC_INITIAL)); + this.submittedPeriodicIteration = registry.counter(submittedName, tags.and(TAG_SUBMISSION, SUBMISSION_PERIODIC_ITERATION)); + + this.pendingTasks = LongTaskTimer.builder(metricPrefix + METER_TASKS_PENDING) + .tags(tags).register(registry); + this.activeTasks = LongTaskTimer.builder(metricPrefix + METER_TASKS_ACTIVE) + .tags(tags).register(registry); + this.completedTasks = registry.timer(metricPrefix + METER_TASKS_COMPLETED, tags); + + } + + Runnable wrap(Runnable task) { + return new TimedRunnable(registry, this, task); + } + + Runnable wrapPeriodic(Runnable task) { + return new TimedRunnable(registry, this, task, true); + } + + @Override + public Disposable schedule(Runnable task) { + this.submittedDirect.increment(); + return delegate.schedule(wrap(task)); + } + + @Override + public Disposable schedule(Runnable task, long delay, TimeUnit unit) { + this.submittedDelayed.increment(); + return delegate.schedule(wrap(task), delay, unit); + } + + @Override + public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { + this.submittedPeriodicInitial.increment(); + return delegate.schedulePeriodically(wrapPeriodic(task), initialDelay, period, unit); + } + + @Override + public Worker createWorker() { + return new TimedWorker(this, delegate.createWorker()); + } + + @Override + public boolean isDisposed() { + return delegate.isDisposed(); + } + + @Override + public long now(TimeUnit unit) { + return delegate.now(unit); + } + + @Override + public void dispose() { + delegate.dispose(); + } + + @Override + public void start() { + delegate.start(); + } + + static final class TimedWorker implements Worker { + + final TimedScheduler parent; + final Worker delegate; + + TimedWorker(TimedScheduler parent, Worker delegate) { + this.parent = parent; + this.delegate = delegate; + } + + @Override + public void dispose() { + delegate.dispose(); + } + + @Override + public boolean isDisposed() { + return delegate.isDisposed(); + } + + @Override + public Disposable schedule(Runnable task) { + parent.submittedDirect.increment(); + return delegate.schedule(parent.wrap(task)); + } + + @Override + public Disposable schedule(Runnable task, long delay, TimeUnit unit) { + parent.submittedDelayed.increment(); + return delegate.schedule(parent.wrap(task), delay, unit); + } + + @Override + public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { + parent.submittedPeriodicInitial.increment(); + return delegate.schedulePeriodically(parent.wrapPeriodic(task), initialDelay, period, unit); + } + } + + static final class TimedRunnable implements Runnable { + + final MeterRegistry registry; + final TimedScheduler parent; + final Runnable task; + + final LongTaskTimer.Sample pendingSample; + + boolean isRerun; + + TimedRunnable(MeterRegistry registry, TimedScheduler parent, Runnable task) { + this(registry, parent, task, false); + } + + TimedRunnable(MeterRegistry registry, TimedScheduler parent, Runnable task, boolean periodic) { + this.registry = registry; + this.parent = parent; + this.task = task; + + if (periodic) { + this.pendingSample = null; + } + else { + this.pendingSample = parent.pendingTasks.start(); + } + this.isRerun = false; //will be ignored if not periodic + } + + @Override + public void run() { + if (this.pendingSample != null) { + //NOT periodic + this.pendingSample.stop(); + } + else { + if (!isRerun) { + this.isRerun = true; + } + else { + parent.submittedPeriodicIteration.increment(); + } + } + + Runnable completionTrackingTask = parent.completedTasks.wrap(this.task); + this.parent.activeTasks.record(completionTrackingTask); + } + } +} diff --git a/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/MicrometerTest.java b/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/MicrometerTest.java index f0099ea1f2..1be3adb5d5 100644 --- a/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/MicrometerTest.java +++ b/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/MicrometerTest.java @@ -16,10 +16,16 @@ package reactor.core.observability.micrometer; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import reactor.core.scheduler.Scheduler; import static org.assertj.core.api.Assertions.assertThat; @@ -41,4 +47,29 @@ void observationContextKeySmokeTest() { assertThat(MicrometerObservationListener.CONTEXT_KEY_OBSERVATION) .isEqualTo(ObservationThreadLocalAccessor.KEY); } + + @Test + void timedSchedulerReturnsAConfiguredTimedScheduler() { + Scheduler mockScheduler = Mockito.mock(Scheduler.class); + Scheduler.Worker mockWorker = Mockito.mock(Scheduler.Worker.class); + Mockito.when(mockScheduler.createWorker()).thenReturn(mockWorker); + + final MeterRegistry registry = new SimpleMeterRegistry(); + final Tags tags = Tags.of("1", "A", "2", "B"); + final String prefix = "testSchedulerMetrics"; + + Scheduler test = Micrometer.timedScheduler(mockScheduler, registry, prefix, tags); + + assertThat(test).isInstanceOfSatisfying(TimedScheduler.class, ts -> { + assertThat(ts.delegate).as("delegate").isSameAs(mockScheduler); + assertThat(ts.registry).as("registry").isSameAs(registry); + //we verify the tags and prefix we passed made it to at least one meter. + //this is more about the Micrometer passing down the params than it is about checking _all_ meters in the actual class. + Meter.Id id = ts.submittedDirect.getId(); + assertThat(id.getName()).as("prefix used") + .isEqualTo("testSchedulerMetrics.scheduler.tasks.submitted"); + assertThat(id.getTags()).as("tags") + .containsExactlyElementsOf(tags.and(TimedScheduler.TAG_SUBMISSION, TimedScheduler.SUBMISSION_DIRECT)); + }); + } } \ No newline at end of file diff --git a/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java b/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java new file mode 100644 index 0000000000..183637b95f --- /dev/null +++ b/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.observability.micrometer; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.micrometer.core.instrument.MockClock; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleConfig; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.tck.MeterRegistryAssert; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.Mockito; + +import reactor.core.Disposable; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.test.AutoDisposingExtension; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Simon Baslé + */ +class TimedSchedulerTest { + + @RegisterExtension + AutoDisposingExtension afterTest = new AutoDisposingExtension(); + + private SimpleMeterRegistry registry; + + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + } + + @AfterEach + void closeRegistry() { + registry.close(); + } + + @Test + void constructorAddsDotToPrefixIfNeeded() { + TimedScheduler test = new TimedScheduler(Schedulers.immediate(), registry, "noDot", Tags.empty()); + + assertThat(registry.getMeters()) + .map(m -> m.getId().getName()) + .allSatisfy(name -> assertThat(name).startsWith("noDot.")); + } + + @Test + void constructorDoesntAddTwoDots() { + TimedScheduler test = new TimedScheduler(Schedulers.immediate(), registry, "dot.", Tags.empty()); + + assertThat(registry.getMeters()) + .map(m -> m.getId().getName()) + .allSatisfy(name -> assertThat(name).doesNotContain("..")); + } + + @Test + void constructorRegistersSevenMetersWithFourSimilarCountersWithSubmissionTypeTag() { + MeterRegistryAssert.assertThat(registry).as("before constructor").hasNoMetrics(); + + new TimedScheduler(Schedulers.immediate(), registry, "test", Tags.empty()); + + assertThat(registry.getMeters()) + .map(m -> { + String name = m.getId().getName(); + String type = m.getId().getTag("submission.type"); + return name + (type == null ? "" : " submission.type=" + type); + }) + .containsExactlyInAnyOrder( + "test.scheduler.tasks.active", + "test.scheduler.tasks.completed", + "test.scheduler.tasks.pending", + //technically 4 different submitted counters + "test.scheduler.tasks.submitted submission.type=direct", + "test.scheduler.tasks.submitted submission.type=delayed", + "test.scheduler.tasks.submitted submission.type=periodic_initial", + "test.scheduler.tasks.submitted submission.type=periodic_iteration" + ); + } + + @Test + void timingOfActiveAndPendingTasks() throws InterruptedException { + MockClock virtualClock = new MockClock(); + SimpleMeterRegistry registryWithVirtualClock = new SimpleMeterRegistry(SimpleConfig.DEFAULT, virtualClock); + afterTest.autoDispose(registryWithVirtualClock::close); + TimedScheduler test = new TimedScheduler(Schedulers.single(), registryWithVirtualClock, "test", Tags.empty()); + + /* + This test schedules two tasks in a Schedulers.single(), using latches to "pause" and "resume" the + tasks at points where we can make predictable assertions. As a result, task2 will be pending until + task1 is un-paused. + + LongTaskTimer only report timings and counts of Runnable that are being executed. Once the Runnable + finishes, LTT won't report any activity. + + Timer on the other hand will report cumulative times and counts AFTER the Runnable has finished. + This is used for the completedTasks metric, which is asserted at the end. + */ + + final CountDownLatch firstTaskPause = new CountDownLatch(1); + final CountDownLatch secondTaskDone = new CountDownLatch(1); + test.schedule(() -> { + try { + firstTaskPause.await(1, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + test.schedule(() -> { + try { + virtualClock.addSeconds(1); + } + finally { + secondTaskDone.countDown(); + } + }); + + //there might be a slight hiccup when the registry doesn't see task1 as active + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted( + () -> assertThat(test.activeTasks.activeTasks()).as("one active").isOne()); + assertThat(test.pendingTasks.activeTasks()).as("one idle").isOne(); + + //we advance time by 2s, expecting that pendingTasks and activeTasks both reflect these 2 seconds (for task2 and task1 respectively) + virtualClock.addSeconds(2); + + assertThat(test.pendingTasks.duration(TimeUnit.SECONDS)) + .as("after 1st idle totalTime SECONDS") + .isEqualTo(2); + assertThat(test.activeTasks.duration(TimeUnit.SECONDS)) + .as("after 1st active totalTime SECONDS") + .isEqualTo(2); + + // we "resume" both tasks and let them finish, at which point the LongTaskTimers will stop recording + firstTaskPause.countDown(); + secondTaskDone.await(1, TimeUnit.SECONDS); + + //again, there might be a slight hiccup before registry sees 2nd task as done + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted( + () -> assertThat(test.activeTasks.duration(TimeUnit.SECONDS)) + .as("once 2nd done, no active timing") + .isEqualTo(0)); + assertThat(test.pendingTasks.duration(TimeUnit.SECONDS)) + .as("once 2nd done, no pending timing") + .isEqualTo(0); + assertThat(test.pendingTasks.activeTasks()).as("at end pendingTasks").isZero(); + assertThat(test.activeTasks.activeTasks()).as("at end activeTasks").isZero(); + + //now we assert that the completedTasks timer reflects a history of all Runnable#run + assertThat(test.completedTasks.count()) + .as("#completed") + .isEqualTo(2L); + assertThat(test.completedTasks.totalTime(TimeUnit.MILLISECONDS)) + .as("total duration of tasks") + .isEqualTo(3000); + } + + @Test + void schedulePeriodicallyTimesOneRunInActiveAndAllRunsInCompleted() throws InterruptedException { + MockClock virtualClock = new MockClock(); + SimpleMeterRegistry registryWithVirtualClock = new SimpleMeterRegistry(SimpleConfig.DEFAULT, virtualClock); + TimedScheduler test = new TimedScheduler(Schedulers.single(), registryWithVirtualClock, "test", Tags.empty()); + + //schedule a periodic task for which one run takes 500ms. we cancel after 3 runs + CountDownLatch latch = new CountDownLatch(3); + Disposable d = test.schedulePeriodically( + () -> { + try { + virtualClock.add(Duration.ofMillis(500)); + } + finally { + latch.countDown(); + } + }, + 100, 100, TimeUnit.MILLISECONDS); + latch.await(1, TimeUnit.SECONDS); + d.dispose(); + + //now we assert that the completedTasks timer reflects a history of all Runnable#run + assertThat(test.submittedDirect.count()).as("#submittedDirect").isZero(); + assertThat(test.submittedPeriodicInitial.count()).as("#submittedPeriodicInitial").isOne(); + assertThat(test.submittedPeriodicIteration.count()).as("#submittedPeriodicIteration").isEqualTo(2); + assertThat(test.completedTasks.count()) + .as("#completed") + .isEqualTo(3L); + assertThat(test.completedTasks.totalTime(TimeUnit.MILLISECONDS)) + .as("total duration of tasks") + .isEqualTo(1500); + } + + @Test + void scheduleIncrementDirectCounterOnly() { + TimedScheduler test = new TimedScheduler(Schedulers.immediate(), registry, "test", Tags.empty()); + + test.schedule(() -> {}); + + assertThat(test.submittedDirect.count()).as("submittedDirect.count").isOne(); + assertThat(test.submittedDelayed.count()).as("submittedDelayed.count").isZero(); + assertThat(test.submittedPeriodicInitial.count()).as("submittedPeriodicInitial.count").isZero(); + assertThat(test.submittedPeriodicIteration.count()).as("submittedPeriodicIteration.count").isZero(); + } + + @Test + void scheduleDelayIncrementsDelayedCounter() throws InterruptedException { + TimedScheduler test = new TimedScheduler(Schedulers.single(), registry, "test", Tags.empty()); + + test.schedule(() -> {}, 100, TimeUnit.MILLISECONDS); + + assertThat(test.submittedDirect.count()).as("submittedDirect.count").isZero(); + assertThat(test.submittedDelayed.count()).as("submittedDelayed.count").isOne(); + assertThat(test.submittedPeriodicInitial.count()).as("submittedPeriodicInitial.count").isZero(); + assertThat(test.submittedPeriodicIteration.count()).as("submittedPeriodicIteration.count").isZero(); + } + + @Test + void schedulePeriodicallyIsCorrectlyMetered() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(5); + TimedScheduler test = new TimedScheduler(Schedulers.single(), registry, "test", Tags.empty()); + + Disposable d = test.schedulePeriodically(latch::countDown, 100, 100, TimeUnit.MILLISECONDS); + + latch.await(10, TimeUnit.SECONDS); + d.dispose(); + + assertThat(test.submittedDirect.count()).as("submittedDirect.count").isZero(); + assertThat(test.submittedDelayed.count()).as("submittedDelayed.count").isZero(); + assertThat(test.submittedPeriodicInitial.count()).as("submittedPeriodicInitial.count").isOne(); + assertThat(test.submittedPeriodicIteration.count()).as("submittedPeriodicIteration.count").isEqualTo(4); + assertThat(test.completedTasks.count()) + .as("completed counter tracks all iterations") + .isEqualTo(5) + .matches(l -> l == test.submittedDirect.count() + test.submittedDelayed.count() + test.submittedPeriodicInitial.count() + + test.submittedPeriodicIteration.count(), "completed tasks == sum of all timer counts"); + } + + @Test + void createWorkerDelegatesToAnOriginalWorker() { + Scheduler mockScheduler = Mockito.mock(Scheduler.class); + Scheduler.Worker mockWorker = Mockito.mock(Scheduler.Worker.class); + Mockito.when(mockScheduler.createWorker()).thenReturn(mockWorker); + + TimedScheduler test = new TimedScheduler(mockScheduler, registry, "test", Tags.empty()); + + TimedScheduler.TimedWorker worker = (TimedScheduler.TimedWorker) test.createWorker(); + + assertThat(worker.delegate).as("worker delegate").isSameAs(mockWorker); + } + + @Test + void workerScheduleIncrementsDirectCounterOnly() { + TimedScheduler testScheduler = new TimedScheduler(Schedulers.immediate(), registry, "test", Tags.empty()); + Scheduler.Worker test = testScheduler.createWorker(); + + test.schedule(() -> {}); + + assertThat(testScheduler.submittedDirect.count()).as("submittedDirect.count").isOne(); + assertThat(testScheduler.submittedDelayed.count()).as("submittedDelayed.count").isZero(); + assertThat(testScheduler.submittedPeriodicInitial.count()).as("submittedPeriodicInitial.count").isZero(); + assertThat(testScheduler.submittedPeriodicIteration.count()).as("submittedPeriodicIteration.count").isZero(); + } + + @Test + void workerScheduleDelayIncrementsDelayedCounter() throws InterruptedException { + TimedScheduler testScheduler = new TimedScheduler(Schedulers.single(), registry, "test", Tags.empty()); + Scheduler.Worker test = testScheduler.createWorker(); + + test.schedule(() -> {}, 100, TimeUnit.MILLISECONDS); + + assertThat(testScheduler.submittedDirect.count()).as("submittedDirect.count").isZero(); + assertThat(testScheduler.submittedDelayed.count()).as("submittedDelayed.count").isOne(); + assertThat(testScheduler.submittedPeriodicInitial.count()).as("submittedPeriodicInitial.count").isZero(); + assertThat(testScheduler.submittedPeriodicIteration.count()).as("submittedPeriodicIteration.count").isZero(); + } + + @Test + void workerSchedulePeriodicallyIsCorrectlyMetered() throws InterruptedException { + Scheduler original = Schedulers.single(); + CountDownLatch latch = new CountDownLatch(5); + TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty()); + Scheduler.Worker test = testScheduler.createWorker(); + + Disposable d = test.schedulePeriodically(latch::countDown, 100, 100, TimeUnit.MILLISECONDS); + + latch.await(10, TimeUnit.SECONDS); + d.dispose(); + + assertThat(testScheduler.submittedDirect.count()).as("submittedDirect.count").isZero(); + assertThat(testScheduler.submittedDelayed.count()).as("submittedDelayed.count").isZero(); + assertThat(testScheduler.submittedPeriodicInitial.count()).as("submittedPeriodicInitial.count").isOne(); + assertThat(testScheduler.submittedPeriodicIteration.count()).as("submittedPeriodicIteration.count").isEqualTo(4); + assertThat(testScheduler.completedTasks.count()) + .as("completed counter tracks all iterations") + .isEqualTo(5) + .matches(l -> l == testScheduler.submittedDirect.count() + + testScheduler.submittedDelayed.count() + + testScheduler.submittedPeriodicInitial.count() + + testScheduler.submittedPeriodicIteration.count(), "completed tasks == sum of all timer counts"); + } +} \ No newline at end of file diff --git a/reactor-core/src/main/java/reactor/core/scheduler/SchedulerMetricDecorator.java b/reactor-core/src/main/java/reactor/core/scheduler/SchedulerMetricDecorator.java index adbdd1fac1..b9d32465a9 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/SchedulerMetricDecorator.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/SchedulerMetricDecorator.java @@ -34,7 +34,7 @@ import reactor.core.Scannable.Attr; import reactor.util.Metrics; -@Deprecated //this class is duplicated in reactor-core-micrometer +@Deprecated final class SchedulerMetricDecorator implements BiFunction, Disposable { diff --git a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java index 5ba30fdd3c..2b962c1b11 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java @@ -606,7 +606,7 @@ public static boolean isNonBlockingThread(Thread t) { *

* * @implNote Note that this is added as a decorator via Schedulers when enabling metrics for schedulers, which doesn't change the Factory. - * @deprecated prefer using the equivalent method in reactor-core-micrometer module. To be removed at the earliest in 3.6.0. + * @deprecated prefer using Micrometer#timedScheduler from the reactor-core-micrometer module. To be removed at the earliest in 3.6.0. */ @Deprecated public static void enableMetrics() { @@ -619,7 +619,7 @@ public static void enableMetrics() { * If {@link #enableMetrics()} has been previously called, removes the decorator. * No-op if {@link #enableMetrics()} hasn't been called. * - * @deprecated prefer using the equivalent method in reactor-core-micrometer module. To be removed at the earliest in 3.6.0. + * @deprecated prefer using Micrometer#timedScheduler from the reactor-core-micrometer module. To be removed at the earliest in 3.6.0. */ @Deprecated public static void disableMetrics() { diff --git a/reactor-core/src/test/java/reactor/core/scheduler/SingleSchedulerTest.java b/reactor-core/src/test/java/reactor/core/scheduler/SingleSchedulerTest.java index 56d10a149f..4a17fb50c6 100644 --- a/reactor-core/src/test/java/reactor/core/scheduler/SingleSchedulerTest.java +++ b/reactor-core/src/test/java/reactor/core/scheduler/SingleSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,14 +17,17 @@ package reactor.core.scheduler; import static java.lang.String.format; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; import java.lang.management.ManagementFactory; import java.time.Duration; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.Test; + +import reactor.core.Disposable; import reactor.core.Scannable; import reactor.core.publisher.*; import reactor.core.scheduler.Scheduler.Worker; @@ -130,6 +133,57 @@ public void lotsOfTasks() throws Exception { } + @Test + void independentWorkers() throws InterruptedException { + Scheduler timer = afterTest.autoDispose(Schedulers.newSingle("test-timer")); + + Worker w1 = timer.createWorker(); + + Worker w2 = timer.createWorker(); + + CountDownLatch cdl = new CountDownLatch(1); + + w1.dispose(); + + assertThatExceptionOfType(Throwable.class).isThrownBy(() -> { + w1.schedule(() -> { }); + }); + + w2.schedule(cdl::countDown); + + if (!cdl.await(1, TimeUnit.SECONDS)) { + fail("Worker 2 didn't execute in time"); + } + w2.dispose(); + } + + @Test + void massCancel() { + Scheduler timer = afterTest.autoDispose(Schedulers.newSingle("test-timer")); + Worker w1 = timer.createWorker(); + + AtomicInteger counter = new AtomicInteger(); + + Runnable task = counter::getAndIncrement; + + int tasks = 10; + + Disposable[] c = new Disposable[tasks]; + + for (int i = 0; i < tasks; i++) { + c[i] = w1.schedulePeriodically(task, 500, 500, TimeUnit.MILLISECONDS); + } + + w1.dispose(); + + for (int i = 0; i < tasks; i++) { + assertThat(c[i].isDisposed()).isTrue(); + } + + assertThat(counter).hasValue(0); + } + + @Test public void scanName() { Scheduler withNamedFactory = Schedulers.newSingle("scanName"); diff --git a/reactor-core/src/test/java/reactor/core/scheduler/TimedSchedulerTest.java b/reactor-core/src/test/java/reactor/core/scheduler/TimedSchedulerTest.java deleted file mode 100644 index 5f7dd1d3a8..0000000000 --- a/reactor-core/src/test/java/reactor/core/scheduler/TimedSchedulerTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package reactor.core.scheduler; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.jupiter.api.Test; -import reactor.core.Disposable; -import reactor.core.scheduler.Scheduler.Worker; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.fail; - -public class TimedSchedulerTest extends AbstractSchedulerTest { - - @Override - protected Scheduler scheduler() { - return Schedulers.newSingle("TimedSchedulerTest"); - } - - @Test - public void independentWorkers() throws InterruptedException { - Scheduler timer = Schedulers.newSingle("test-timer"); - - try { - Worker w1 = timer.createWorker(); - - Worker w2 = timer.createWorker(); - - CountDownLatch cdl = new CountDownLatch(1); - - w1.dispose(); - - assertThatExceptionOfType(Throwable.class).isThrownBy(() -> { - w1.schedule(() -> { }); - }); - - w2.schedule(cdl::countDown); - - if (!cdl.await(1, TimeUnit.SECONDS)) { - fail("Worker 2 didn't execute in time"); - } - w2.dispose(); - } finally { - timer.dispose(); - } - } - - @Test - public void massCancel() throws InterruptedException { - Scheduler timer = Schedulers.newSingle("test-timer"); - - try { - Worker w1 = timer.createWorker(); - - AtomicInteger counter = new AtomicInteger(); - - Runnable task = counter::getAndIncrement; - - int tasks = 10; - - Disposable[] c = new Disposable[tasks]; - - for (int i = 0; i < tasks; i++) { - c[i] = w1.schedulePeriodically(task, 500, 500, TimeUnit.MILLISECONDS); - } - - w1.dispose(); - - for (int i = 0; i < tasks; i++) { - assertThat(c[i].isDisposed()).isTrue(); - } - - assertThat(counter).hasValue(0); - } - finally { - timer.dispose(); - } - } - -}