Skip to content

Migration Guides

Dariusz Jędrzejczyk edited this page Mar 15, 2024 · 1 revision

Migrating from 3.4.x to 3.5.0

Previously deprecated APIs that have been removed

As with every release cycle, APIs that have been marked as @Deprecated in the past have been evaluated for removal according to our deprecation policy.

Users should already have taken these deprecations into consideration, but they are listed here again just in case:

  • Kotlin extensions: these now live in the reactor-kotlin-extensions library

  • Mono SuccessOrError side effects

    • The Mono.doOnSuccessOrError recommended alternatives include doOnNext(Consumer), doOnError(Consumer), doOnTerminate(Runnable) and doOnSuccess(Consumer).

    • The Mono.doAfterSuccessOrError recommended alternatives are doAfterTerminate(Runnable) and doFinally(Consumer).

  • Some context-related operators

    • deferWithContext(Function) → use deferContextual(Function) instead.

    • subscriberContext(Context) and subscriberContext(Function) → use contextWrite(ContextView) and contextWrite(Function) instead.

    • Signal::getContext() → use Signal::getContextView() instead.

    • Context::putAll(Context) → use Context::putAll(ContextView) instead.

  • Elastic Scheduler

    • Use BoundedElasticScheduler (Schedulers.boundedElastic()) instead.

Newly introduced deprecations

These will be removed in 3.6.0 at the earliest:

  • FluxSink, MonoSink, SynchronousSink #currentContext(): use #contextView() instead.

  • Scheduler#start(): #init() method should be used instead. Restart capability is discouraged and the new method is allowed to throw in case of initializing a disposed instance. Please create a new Scheduler instance when necessary.

  • Flux#metrics(), Mono#metrics(), Schedulers#enableMetrics(), Metrics: Metrics-related operators have been deprecated in reactor-core in favor of newly introduced reactor-core-micrometer:1.0.0 module (see Notable New Features below)

Other Considerations and Behavior Changes

  • Most Mono source and aggregating operators are now lazier: they defer requesting from the upstream until they have themselves received a request(long).

  • Operators that had an alternative behavior option introduced in the previous release train now use this behavior as the default:

    • switchOnNext,switchMap, concatMap now default to 0-prefetch unless a prefetch amount is explicitly provided.

    • take(n) now only request(n) from the upstream. Use take(n,false) to get the old behavior of requesting an unbounded amount.

  • Windowing operators that receive an error from the upstream main sequence notify their open windows with a SourceException.

    • This exception wraps the original exception as the #getCause().

    • It allows to distinguish between an upstream error and and error in the window consumption logic.

  • Future Cancellation: Converting a CompletionStage that is also a Future to a Mono now ensures that cancelling the Mono also cancels the Future.

    • For CompletableFuture one can opt-out of this by using the Mono.fromFuture(CompletableFuture, boolean) variant.

  • Fusion Negotiation in some Mono operators:

    • A number of Mono operators derived from the Operators.MonoSubscriber have stopped negotiating fusion (even though their Publisher still implements Fuseable).

    • When downstream tries to negotiate fusion by calling requestFusion(int) with any mode, these operators always reply NONE.

    • The highest impact is likely to be on tests that validate the fusion negotiation (eg. StepVerifier’s `expectFusion(…​)).

Notable New Features

New tap Operator and its companion SignalListener

The tap operator is a side-effect operator that deals with all signals / side-effects in a single class, SignalListener. It enables the creation of one SignalListener instance per subscription by providing a factory, and that instance will be reused throughout the lifecycle of the subscription.

SignalListener instances can thus store state and react to new signals differently depending on said state. This is a generalisation of the implementation of the now deprecated Flux#metrics() operator (see below).

Publisher Metrics

The Flux#metrics() and Mono#metrics() operators are now deprecated in favor of using the new reactor-core-micrometer module.

Their replacement builds upon the tap operator introduced above, combined with an off-the-shelf SignalListener factory provided in the module: Micrometer.metrics(MeterRegistry, String). This approach stops relying on an intrinsic dependency on the MeterRegistry#globalRegistry() without any way of using a different registry. Instead, it requires that a MeterRegistry be explicitly provided.

Metrics names and tag names have been reworked to follow conventions and recommendations from the Micrometer team. The name and tag operators are still detected upstream for meter name prefixing and additonal tagging.

See the Exposing Reactor metrics section for more details on how to use this, and Micrometer.metrics() section for a list of meter and tag names.

Schedulers Metrics

The Schedulers.enableMetrics() method is now deprecated in favor of using the new reactor-core-micrometer module (see the Exposing Reactor metrics section). The new paradigm consist in instrumenting Scheduler instance on a case by case basis, using Micrometer.timedScheduler method.

The previous approach was flawed in several ways:

  • intrinsic dependency on the MeterRegistry#globalRegistry() without any way of using a different registry.

  • wrong level of abstraction and limited instrumentation:

    • it was not the schedulers themselves that were instrumented, but individual ExecutorService instances assumed to back the schedulers.

    • schedulers NOT backed by any ExecutorService couldn’t be instrumented.

    • schedulers backed by MULTIPLE ExecutorService (eg. a pool of workers) would produce multiple levels of metrics difficult to aggregate.

    • instrumentation was all-or-nothing, potentially polluting metrics backend with metrics from global or irrelevant schedulers.

A deliberate constraint of the new approach is that each Scheduler must be explicitly wrapped, which ensures that the correct MeterRegistry is used and that metrics are recognizable and aggregated for that particular Scheduler (thanks to the mandatory metricsPrefix).

See Micrometer.timedScheduler() for a list of meter and tag names.

Support for io.micrometer:context-propagation, dealing with Context vs ThreadLocal

The context-propagation library is a new and important part of the Reactor and Micrometer backbone to allow bridging between reactive and ThreadLocal contextual metadata.

Reactor supports this bridging as soon as the library is present at runtime.

Rather than storing and restoring ThreadLocal values in the Reactor Context manually, users are encouraged to rely on context-propagation and the supporting operators in Reactor:

  • register a ThreadLocalAccessor for your relevant custom thread locals

  • capture all registered thread locals via Flux|Mono#contextCapture()

  • upstream in the chain, find captured threadlocal values in the ContextView with the usual context-reading operators

  • alternatively, any handle or tap operator upstream in the chain will restore relevant ThreadLocal around their handling function or SignalListener methods

Read more in the Context-Propagation Support section.

Support for Micrometer’s new Observation feature

This feature allows frameworks and libraries to define a single call site as a point of observation, and Micrometer decouples that observation call site from the actual steps to perform: lightweight metrics, tracing, logging…​

See Micrometer’s documentation and read more in the Observation section. See also Micrometer.observation() for a list of observation meter and tag names.

Notes for implementors of Reactor Scheduler

These notes are relevant if you implement your own custom Scheduler.

Implement void init()

start() is deprecated and Schedulers are encouraged to phase out support for restarting a disposed Scheduler.

The simplest (and default) implementation for the init() method is likely to be the same as start(), but implementors are encouraged to have it detect that the scheduler has been disposed (via dispose or disposeGracefully) and throw an IllegalStateException.

Implement Mono<Void> disposeGracefully()

This new method defaults to just calling Mono.fromRunnable(this::dispose), but implementors are encouraged to add support for true graceful shutdown (eg. calling ExecutorService#shutdown() rather than shutdownNow() on underlying `ExecutorService`s).

Once initiated, the disposal cannot be cancelled and the Scheduler should be considered unusable and MUST reject new tasks.

The returned Mono<Void> SHOULD be the same in case of multiple concurrent calls (ie. all callers get the notification that graceful disposal has succeeded, at the same time).

It MUST support cancellation via timeout(Duration) as well as retry(…​) attempts, but these only relate to the question "has the Scheduler finished shutting down gracefully?" (give the scheduler x seconds to shutdown gracefully, retry to give it a bit more time, fallback to hard shutdown, etc…​).

Dealing with concurrent calls to dipose/disposeGracefully can be tricky, especially given the above constraints. Internally, vanilla Reactor schedulers use a state object SchedulerState and DisposeAwaiter to support this. These classes are private, but you can have a look at the source code for inspiration.