Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

takeUntil(Predicate) tests after element has been emitted #3539

Closed
AramMessdaghi9001 opened this issue Jul 18, 2023 · 3 comments
Closed

takeUntil(Predicate) tests after element has been emitted #3539

AramMessdaghi9001 opened this issue Jul 18, 2023 · 3 comments
Labels
type/enhancement A general enhancement
Milestone

Comments

@AramMessdaghi9001
Copy link
Contributor

AramMessdaghi9001 commented Jul 18, 2023

takeUntil(Predicate) executes the predicate test after the element has been emitted.
If the element is mutable and modified afterwards, this changes the behavior in an unexpected way.

Expected Behavior

The predicate test should be executed before the element has been emitted, not after it has been emitted.
Of course it should still be emitted when the predicate test returns false, in order to match the requirement.

Actual Behavior

The predicate test is executed after the element has been consumed and possibly been modified by following publishers.

Steps to Reproduce

This is a simplified version

    @Test
    fun `takeUntil(Predicate) tests after element has been emitted`() {
        data class Test(var foo: Int)

        StepVerifier.create(Flux.just(Test(1), Test(2), Test(3))
            .takeUntil { it.foo == 0 }
            .doOnNext { it.foo = 0 }
        )
            .expectNext(Test(1))
            .expectNext(Test(2))
            .expectNext(Test(3))
            .verifyComplete()
    }

Here is a more complex scenario:

private fun AsynchronousFileChannel.readBytes(bufferSize: Int, position: Long) : Mono<ByteBuffer> =
    Mono.create { sink ->
        sink.onRequest {
            val byteBuffer = ByteBuffer.allocate(bufferSize)
            read(byteBuffer, position, null, object: CompletionHandler<Int, Nothing?>{
                override fun completed(bytesRead: Int, attachment: Nothing?) = sink.success(byteBuffer.flip())
                override fun failed(exc: Throwable, attachment: Nothing?) = sink.error(exc)
            })
        }
    }

fun Path.readFileAsFlux(bufferSize : Int = 4096) : Flux<ByteBuffer> =
    Flux.using({AsynchronousFileChannel.open(this, StandardOpenOption.READ)}, { fileChannel ->
        Flux.range(0, Int.MAX_VALUE)
            .concatMap { readIndex -> fileChannel.readBytes(bufferSize, readIndex.toLong()*bufferSize) }
            .takeUntil { it.remaining() != bufferSize }
    }, AsynchronousFileChannel::close)

Consuming the Flux will not read the file to the end, as remaining will be 0 at time of the predicate test, due to being read already by the consuming Flux

Possible Solution

Execute the predicate.test in FluxTakeUntil before actual.onNext

Your Environment

  • Reactor version(s) used: 3.4.29
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jul 18, 2023
@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jul 20, 2023

@AramMessdaghi9001 Did you try to use takeWhile? It does exactly what you need. Also, it is untipattern to rely on something mutable as well as mutating something. The overall recomendation is to make a slice of a buffer (or a copy) and work on it instead of the original one

@OlegDokuka OlegDokuka added for/stackoverflow Questions are best asked on SO or Gitter and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Jul 20, 2023
@AramMessdaghi9001
Copy link
Contributor Author

AramMessdaghi9001 commented Jul 20, 2023

@OlegDokuka takeWhile is exclusive. I want the operator to be inclusive.
The buffer is just an example, it is not always possible to change the classes you are dealing with.
In my opinion this behavior is confusing and does not what expected.
Either it should be changed and/or its behavior should be explicitly pointed out in the documentation.

By the way, this is no question for stack overflow, but a bug report due to unexpected behavior.

@OlegDokuka OlegDokuka added type/enhancement A general enhancement and removed for/stackoverflow Questions are best asked on SO or Gitter labels Jul 24, 2023
@OlegDokuka OlegDokuka added this to the 3.5.9 milestone Jul 24, 2023
@OlegDokuka
Copy link
Contributor

I guess we can enhance the implementation so it sends after predicate is executed

AramMessdaghi9001 added a commit to AramMessdaghi9001/reactor-core that referenced this issue Jul 24, 2023
AramMessdaghi9001 added a commit to AramMessdaghi9001/reactor-core that referenced this issue Jul 24, 2023
AramMessdaghi9001 added a commit to AramMessdaghi9001/reactor-core that referenced this issue Jul 24, 2023
AramMessdaghi9001 added a commit to AramMessdaghi9001/reactor-core that referenced this issue Jul 25, 2023
AramMessdaghi9001 added a commit to AramMessdaghi9001/reactor-core that referenced this issue Jul 27, 2023
@OlegDokuka OlegDokuka modified the milestones: 3.5.9, 3.6.0-M2 Aug 3, 2023
@violetagg violetagg modified the milestones: 3.6.0-M2, 3.5.9 Aug 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

4 participants