Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optmize interop.flow.StreamSubscriber.onNext #3387

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

BalmungSan
Copy link
Contributor

Since the reactive-streams spec mentions that all calls must be done "serially" these changes optimize the tight loop of multiple consequential onNext calls.

@@ -109,6 +109,7 @@ private[flow] final class StreamSubscription[F[_], A] private (
// if we were externally canceled, this is handled below
F.unit
}
.mask
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #3384

Comment on lines +47 to +51
@Param(Array("1024", "5120", "10240"))
var totalElements: Int = _

@Param(Array("1000"))
var iterations: Int = _
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these parameters okay?

Comment on lines +63 to +79
override final def request(n: Long): Unit = {
Future {
var j = 0
while (j < n && i < totalElements && !canceled) {
subscriber.onNext(i)
i += 1
j += 1
}

if (i == totalElements || canceled) {
subscriber.onComplete()
}
}(global.compute)

// Discarding the Future so it runs in the background.
()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armanbilge is this close to the idea that you had in mind originally?

Comment on lines +90 to +91
val program =
stream.compile.toVector
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure what was the best way to collect the results.
My other two options were doing something like compile.fold(0)(_ + _) or a .foreach(_ => IO.unit).compile.drain

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant