Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for a cache operator supporting asynchronous refresh #3573

Open
keddie opened this issue Sep 11, 2023 · 6 comments
Open

Support for a cache operator supporting asynchronous refresh #3573

keddie opened this issue Sep 11, 2023 · 6 comments
Labels
type/enhancement A general enhancement

Comments

@keddie
Copy link

keddie commented Sep 11, 2023

Caffeine supports a concept called "refresh" where after a value is written into the cache, it will be refreshed if it's referenced after a certain duration. While the refresh is in progress, the original value is returned. This allows low-latency, while still getting a relatively up-to-date value. I propose adding an operator Mono.refreshCache(Duration refreshAfterWrite, Duration expireAfterWrite) with this functionality.

Motivation

For example suppose you have a method which returns after 60 minutes takes60Minutes() ... callers to

Mono.fromSupplier(() -> takes60Minutes()).cache(Duration.ofHours(2))

will wait for an hour every 2 hours when the value expires. On the other hand an operator supporting refresh

Mono.fromSupplier(() -> takes60Minutes()).refreshCache(refresh: Duration.ofMinutes(30), expire: Duration.ofHours(2))

Will always immediately have a value as long as there is at least one read per half hour.

Desired solution

Add an implementation of the operator above.

Considered alternatives

One approach people might take is to just grab the latest value from a Flux periodically polling the slow emitter. One key advantage of the refreshCache() structure is that the call is only made when/if there are further subscriptions, so expensive operations are not re-run excessively. Note that the refresh does rate limit the upstream.

I feel like there ought to be some combination of other operators that could build up this behavior (i.e. two stacked cache() calls... but I don't see it.

Another option would be to just provide said operator as a transformation used like:
sourceMono.transform(Extensions.refreshCache(ofHours(1), ofHours(3)))
In some suitable extensions library.

A clear downside of adding any new operator is that the proliferation of methods on Mono is already fairly baffling, and comes with a support cost :) This would argue for just providing a transformation.

Additional context

I made an standalone implementation Mono.refreshCache(). I'm happy to contribute it. But since I made it with a few things specific to a project for my current employer(who are supportive)... before I go to the trouble of making a PR. I thought I'd check on your interest in such an operator. I did not yet implement a corresponding operator for Flux, but it'd be straightforward to do, and I think I should do that just for consistency.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Sep 11, 2023
@OlegDokuka OlegDokuka added type/enhancement A general enhancement and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Dec 14, 2023
@MikkelHJuul
Copy link

Wouldn't it be a lot easier for you to have a single Sinks.many().replay().latest() and return that as a Mono.from(sink.asFlux()) and spawn a background process to update that

Or Flux.interval(howOftenIWantToFetch).map(costlyFetch).cache()

A Mono that change is no Mono

@Akshay-Sundarraj
Copy link

@MikkelHJuul Can you please provide more info on how to use Sinks.many? or better way to handle following situation?
I'm facing the similar issue mentioned by op. In my case I need to get bearer token from server on application startup and refresh it at half life to reduce latency. During refresh, application should continue to use the cached token and once refresh completes new token should be used and old token to be discarded. The refresh should happen in the background and should not affect main path.
I'm currently using Mono.cache(ttl) to cache the token for specified duration and when caching expires it is triggering the call to server which is increasing latency for some of the requests. TIA.

@MikkelHJuul
Copy link

@MikkelHJuul Can you please provide more info on how to use Sinks.many? or better way to handle following situation?

I'm facing the similar issue mentioned by op. In my case I need to get bearer token from server on application startup and refresh it at half life to reduce latency. During refresh, application should continue to use the cached token and once refresh completes new token should be used and old token to be discarded. The refresh should happen in the background and should not affect main path.

I'm currently using Mono.cache(ttl) to cache the token for specified duration and when caching expires it is triggering the call to server which is increasing latency for some of the requests. TIA.

So, if I had an app designed around reactor anyway I would use a Flux directly, as the value may change over time. If you have other flux-like input data you can choose either of sources as primary, and use #zip or #combineLatest to combine the fluxes' content.

But depending on your use it may be nice to have the token requested on subscribe since errors during its use could warrant refreshing the token. But if you can promise a time based token that is sure to not become rejected I would use the Flux.interval(..).cache(1) which would make it a hot-source that, if resubscribed, will re-emit the most recent token (new pipelines are trying to make request should still use the most recent token.

The cache(1) operator is essentially the same operator as the Sinks.many.replay.latest btw. The sinks thing just gives you the inverted control of being able to "send" messages on demand

@keddie
Copy link
Author

keddie commented May 11, 2024

@MikkelHJuul - the semantics are different.

The problem is that background processes are expensive, as is storing a value which isn't actually being accessed. If you make more calls to the backing service to get the data than the number of times the data is accessed, that's a bad thing. If you do either Sinks.many()... or Flux.interval()... then there can be more calls to the source of the data than access attempts. A refresh cache guarantees that the number of upstream fetches is less than or equal to the number of accesses.

As a thought experiment, It might help to imagine creating 10000 of these, and the signals being 100 MBs of data extracted and transformed from different databases.

obv. Take a look at caffeine to get a fuller picture. If anyone seems interested I could just open a PR with the implementation I made.

@ben-manes
Copy link

@keddie is correct. A common misunderstanding with Caffeine's refresh is when users think of it as a periodic reload of the content at the fixed delay. That's a (unbounded) periodic replica of the source data, whereas a cache is more often a bounded subset of that data. In those cases, users can simply use a ScheduledExecutorService with a periodic task that reloads the data, e.g. building an immutable snapshot and setting a volatile field to make it visible for readers. Your case, @MikkelHJuul, sounds similar and I believe interval(duration) provides that functionality.

@MikkelHJuul
Copy link

@keddie / @ben-manes - now I'm not sure if you agree or disagree. It sounds like yes except for the fact that you want the source to be cancelled if no one listens, so as to not cache data when it is unneeded. #refCount spring to mind.

We actually do that as well where all subscribers use the same cached data (Flux#cache) and the cached flux is cancelled with Flux#refCount.

Now it may be nicer to do something more specialized, sure. I think my initial thought however is that a data source that changes like this is logically, not, a Mono, but a Flux. So it should not be covered by the Mono api but the Flux api, if any.

But I guess that is only slightly related to whether to add such a construct or not :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

6 participants