Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New operator - delayBetween #3582

Open
yos1p opened this issue Sep 21, 2023 · 9 comments
Open

New operator - delayBetween #3582

yos1p opened this issue Sep 21, 2023 · 9 comments
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/has-workaround This has a known workaround described

Comments

@yos1p
Copy link

yos1p commented Sep 21, 2023

Summary:

Current Flux.delayElements will delay the elements since the start of the first elements. It would be nice to have another operator that will give delay between elements.

Example:

Flux.delayElements(Duration.ofHour(1))
5 Elements arrived --> delay 1 hour --> element 1 processed --> delay 1 hour --> element 2 processed ...

Flux.delayBetween(Duration.ofHour(1))
5 elements arrived --> element 1 processed --> delay 1 hour --> element 2 processed ...

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Sep 21, 2023
@chemicL
Copy link
Member

chemicL commented Sep 21, 2023

Just to be clear on the behaviour, please try this:

Flux.range(0, 10)
	.doFirst(() -> System.out.println(Instant.now() + ": START"))
	.switchOnFirst((i, f) -> {
		if (i.hasValue()) {
			return Flux.concat(
				Flux.just(i.get()),
				f.skip(1).delayElements(Duration.ofSeconds(1)));
		}
		return f;
	})
	.doOnNext(i -> System.out.println(Instant.now() + ": " + i))
	.blockLast();

Output:

2023-09-21T06:18:12.238Z: START
2023-09-21T06:18:12.295Z: 0
2023-09-21T06:18:13.312Z: 1
2023-09-21T06:18:14.317Z: 2
2023-09-21T06:18:15.323Z: 3
2023-09-21T06:18:16.329Z: 4
2023-09-21T06:18:17.335Z: 5
2023-09-21T06:18:18.342Z: 6
2023-09-21T06:18:19.345Z: 7
2023-09-21T06:18:20.351Z: 8
2023-09-21T06:18:21.359Z: 9

Is this behaviour what you are looking for?

@chemicL chemicL added status/need-user-input This needs user input to proceed and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Sep 21, 2023
@yos1p
Copy link
Author

yos1p commented Sep 21, 2023

@chemicL
In the test, with .blockLast() it works. But when I tried, using flatMap() and subscribe(), seems the delay always starts from the first element.

Take a look at the following sample code:

Flux myFlux = ...
myFlux.retry()
       .flatMap(this::getDetails) // Method returns Flux<String>
       .bufferTimeout(throttlePerHourCap, Duration.ofSeconds(10))
       .switchOnFirst((signal, flux) -> {
           if (signal.hasValue()) {
               return Flux.concat(
                     Flux.just(Objects.requireNonNull(signal.get(), "First element invalid...")),
                     flux.skip(1).delayElements(Duration.ofHour(1))
               );
           }
           return flux;
       })
       .flatMap(this::throttlePerHour) // Method returns Mono<Void>
       .subscribe();

@chemicL
Copy link
Member

chemicL commented Sep 21, 2023

Please provide something I can copy and paste, without any internal details of your project.

@yos1p
Copy link
Author

yos1p commented Sep 21, 2023

Hmm, seems to be working after second try.

Was hoping if we can have an built-in operator though. Is it part of the roadmap?

@chemicL chemicL added status/has-workaround This has a known workaround described and removed status/need-user-input This needs user input to proceed labels Sep 25, 2023
@chemicL chemicL changed the title New operator for delay elements after first. New operator - delayBetween Sep 27, 2023
@chemicL
Copy link
Member

chemicL commented Sep 27, 2023

Let's leave the issue open for some time. If there is interest in a new operator and this issue gets some traction, we can consider adding it. However, it seems this is behaviour can be easily achieved with existing operators. Another alternative would probably be

Flux.concat(source.take(1), source.skip(1).delayElements())

@chemicL chemicL added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Sep 27, 2023
@alixroyere
Copy link

alixroyere commented Oct 26, 2023

It would be handy, even if doable in an other way. Pipelines would be easier to read too.
I have a usecase where I need to space elements because next action can only be done 1 time maximum per X seconds. In this case, there is no point to wait for the first element

@alixroyere
Copy link

alixroyere commented Oct 26, 2023

Well thinking more about this, a real delayBetween operator would not only be skipping the first element waiting. That is more a special case.
In fact, comparing the elapsed time since the last emission and the requested delay, it could directly emit when there is no need to delay.

@Test
void delayElement() {
    StepVerifier.withVirtualTime(() -> {
                    var data = merge(
                        Mono.just(0).delayElement(Duration.ofSeconds(0)),
                        Mono.just(40).delayElement(Duration.ofSeconds(40)),
                        Mono.just(50).delayElement(Duration.ofSeconds(50))
                    );
                    return data.delayElements(Duration.ofSeconds(10));
                })// received at 10 - 50 - 60 when it could be 0 - 40 - 50 with delayBetween
                .thenAwait(Duration.ofSeconds(10))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(50 - 10))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(60 - 50))
                .expectNextCount(1)
                .expectComplete()
                .verify(Duration.ofMillis(100));
}

@Test
void delayElementExceptFirst() {
    StepVerifier.withVirtualTime(() -> {
                    var data = merge(
                        Mono.just(0).delayElement(Duration.ofSeconds(0)),
                        Mono.just(40).delayElement(Duration.ofSeconds(40)),
                        Mono.just(50).delayElement(Duration.ofSeconds(50))
                    );

                    return data.switchOnFirst(
                        (signal, flux) -> Flux.concat(Flux.just(flux.take(1)),
                                                      flux.skip(1).delayElements(Duration.ofSeconds(10)))
                    );
                })// received at 0 - 50 - 60 when it could be 0 - 40 - 50 with delayBetween
                .thenAwait(Duration.ofSeconds(0))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(50))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(60 - 50))
                .expectNextCount(1)
                .expectComplete()
                .verify(Duration.ofMillis(100));
}

@chemicL
Copy link
Member

chemicL commented Oct 31, 2023

@alixroyere in this case it's probably a good use for a zip operation with interval. In my eyes actually this perspective doesn't solidify the need for the delayBetween operator but rather shows it's not obvious what the expectation is and specific requirements for a particular use case can be expressed with existing operators.

Consider this:

Flux.zip(
	Flux.range(0, 10)
	       .doFirst(() -> System.out.println(Instant.now() + ": START")),
	Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
)
    .map(Tuple2::getT1)
    .doOnNext(i -> System.out.println(Instant.now() + ": " + i))
    .blockLast();

With the output:

2023-10-31T12:49:45.960570Z: START
2023-10-31T12:49:45.969872Z: 0
2023-10-31T12:49:46.971268Z: 1
2023-10-31T12:49:47.974015Z: 2
2023-10-31T12:49:48.972947Z: 3
2023-10-31T12:49:49.974434Z: 4
2023-10-31T12:49:50.974538Z: 5
2023-10-31T12:49:51.973992Z: 6
2023-10-31T12:49:52.974480Z: 7
2023-10-31T12:49:53.972106Z: 8
2023-10-31T12:49:54.974418Z: 9

I believe this is even more appropriate for the original case, where it seems the events should happen every hour, regardless how long each individual task takes.

@knuclechan
Copy link

It is useful. I am looking for this too. And this is the usual use case. Delay all (including the first element) is rarely needed.
You can find a lot of question in stackoverflow about how to do this delayBetween operation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/has-workaround This has a known workaround described
Projects
None yet
Development

No branches or pull requests

5 participants