Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+str Add flatmapConcat with parallelism. #32024

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Jul 30, 2023

References #31958

@gael-ft Would you like to take a review, thanks?

if (isAvailable(out) && inflightSources.isEmpty) {
push(out, single.elem)
} else {
inflightSources.enqueue(new InflightSingleSource(single.elem))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some allocation here

@He-Pin
Copy link
Member Author

He-Pin commented Jul 30, 2023

before:

[info] Benchmark                                                         Mode  Cnt         Score        Error   Units
[info] FlatMapConcatBenchmark.internalSingleSource                      thrpt   10   9111519.387 锟斤拷 183996.447   ops/s
[info] FlatMapConcatBenchmark.mapBaseline                               thrpt   10  20336793.367 锟斤拷 887065.039   ops/s
[info] FlatMapConcatBenchmark.oneElementList                            thrpt   10    371981.248 锟斤拷   3765.397   ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle                           thrpt   10   4578547.437 锟斤拷 479604.201   ops/s

after:

info] Benchmark                                                         Mode  Cnt          Score            Error   Units
[info] FlatMapConcatBenchmark.internalSingleSource                      thrpt   10    9027659.060 锟斤拷     136614.084   ops/s
[info] FlatMapConcatBenchmark.mapBaseline                               thrpt   10   20947850.178 锟斤拷    1039800.021   ops/s
[info] FlatMapConcatBenchmark.oneElementList                            thrpt   10    4451591.431 锟斤拷      55815.355   ops/s
[info] FlatMapConcatBenchmark.sourceDotSingle                           thrpt   10    4547444.612 锟斤拷     136653.128   ops/s

@He-Pin He-Pin marked this pull request as ready for review July 30, 2023 14:12
@GreyPlane
Copy link
Contributor

I wonder if it is possible to write a Buffer.peek(n: Int) which peek from current to n, and by that, is it easy to impl Merge via this, just change inflightSources.peek() eq self to check through the range from 0 to breadth - 1.

@He-Pin
Copy link
Member Author

He-Pin commented Jul 31, 2023

@GreyPlane How about comment in line, thanks

Copy link

@gael-ft gael-ft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the implementation.
This will allow us to have more than 1 source pulled at a time keeping elements ordered from downstream perspective.

@He-Pin
Copy link
Member Author

He-Pin commented Jul 31, 2023

Seems should add another parameter eager too. When true will complete / fail the stream when upstream fail.

@gael-ft
Copy link

gael-ft commented Aug 1, 2023

Seems should add another parameter eager too. When true will complete / fail the stream when upstream fail.

Not obvious to me.
Goal flatMapConcat is to keep elements ordered.
If order is not a concern, you could just use merge (which has the concept of eager).

So I can't see a scenario where you would like to continue after one of the upstream sources cancelled when using this operator.

That being said, if you feel that could be useful, let's go 😉

@He-Pin
Copy link
Member Author

He-Pin commented Aug 1, 2023

@gael-ft that's true. but I saw your PR and you can return Source.failed(...) sometime. and for the eager I mean eagerError.
If when materilization, the upstream is Source.failed would I failed the stream ASAP when the parallesm > 1?

And for most case, I think we should make the eagerError = false, even there is an inflight Source.failed(...).

I was wantted to optimize for Source.future and Source.failed, but in which way the Source can be already failed. and there are Flux.concatMapDelayError/flatMapSequentialDelayError, I think delaying error should be the default behavior as the operator only parallelism materialization but not emitting.

@gael-ft
Copy link

gael-ft commented Aug 1, 2023

@He-Pin I might be a bit too much use case centric (i.e. S3 object upload where we don't want any missing part).

If when materilization, the upstream is Source.failed would I failed the stream ASAP when the parallesm > 1?

So this question can probably be a valid question in some cases.

But regarding the default value of eager I think it should be intentional to continue processing after any error occured.
As with Flux, using flatMapSequentialDelayError is intentional vs flatMapSequential.

In my understanding of flatMapConcat it guarantees that the element you process will be in order.
If for some reason you can afford skipping element N+1 and process directly N+2 that should be intentional.

Once again, just my opinion, but as long as this is parameter, users can use it as they want.

@He-Pin
Copy link
Member Author

He-Pin commented Aug 1, 2023

@gael-ft Yes, let's just keep it simple for now. And I saw the FailedSource which will failStage when preStartI().
And the FutureSource only trigger the failStage after be pulled. And in your code you are using the FailedSource which will cause the stream fails very soon, So I think I need adjust my code for that too.

push(out, sinkIn.grab())
}
}
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(self)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need take care for the upstream where fails immeditly

@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 3 times, most recently from 7d4965b to 9813a33 Compare August 10, 2023 16:27
Source(List(2, 3, 4)),
Source.future(Future.successful(5)),
Source.lazyFuture(() => Future.successful(6))))
.flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random is good for trying it out while developing but means that we'd have no idea what value it fails for if it fails. Parameterize over a few values or use a single random chosen and logged on test class instantiation instead, so failures can be repeated.

@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 4 times, most recently from bd40615 to 92172b4 Compare August 27, 2023 09:59
@He-Pin He-Pin force-pushed the flatmapConcatParallelism branch 3 times, most recently from a7bf870 to ca4a14b Compare August 27, 2023 10:48
@He-Pin
Copy link
Member Author

He-Pin commented Jan 8, 2024

@johanandren How about I spliting the optimization to a dedicated PR from this one, I think this PR is too big

@johanandren
Copy link
Member

Yes, I think that was an initial feedback from me, although I also said it might be fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants