-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optmize interop.flow.StreamSubscriber.onNext #3387
base: main
Are you sure you want to change the base?
Conversation
72b0be9
to
8234158
Compare
@@ -109,6 +109,7 @@ private[flow] final class StreamSubscription[F[_], A] private ( | |||
// if we were externally canceled, this is handled below | |||
F.unit | |||
} | |||
.mask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #3384
@Param(Array("1024", "5120", "10240")) | ||
var totalElements: Int = _ | ||
|
||
@Param(Array("1000")) | ||
var iterations: Int = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these parameters okay?
override final def request(n: Long): Unit = { | ||
Future { | ||
var j = 0 | ||
while (j < n && i < totalElements && !canceled) { | ||
subscriber.onNext(i) | ||
i += 1 | ||
j += 1 | ||
} | ||
|
||
if (i == totalElements || canceled) { | ||
subscriber.onComplete() | ||
} | ||
}(global.compute) | ||
|
||
// Discarding the Future so it runs in the background. | ||
() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armanbilge is this close to the idea that you had in mind originally?
val program = | ||
stream.compile.toVector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not sure what was the best way to collect the results.
My other two options were doing something like compile.fold(0)(_ + _)
or a .foreach(_ => IO.unit).compile.drain
Since the
reactive-streams
spec mentions that all calls must be done "serially" these changes optimize the tight loop of multiple consequentialonNext
calls.