-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: add groupBy overload with evictingMapFactory #3931
Conversation
This PR may be useful still but I've realised I can get what I want with more control using |
Yes, you can |
Yep you're right but I put that requirement in the javadoc and the example On Fri, 13 May 2016 03:24 David Karnok notifications@github.com wrote:
|
I'm happy using |
@akarnokd I think there's a catch with o.groupBy(...)
.flatMap(
g ->
g.operator1()
.timeout(10, TimeUnit.SECONDS)
.onErrorResumeNext(completeOnTimeoutException))
) I guess I'll have to use |
Yes, there is chance values get dropped when an emission and group unsubscribe happens at the same time. You can't know the value made it or not. If your value has its own lifecycle, that's going to be a problem. Rx is not designed for that and it's the next challenge of the field - especially since reactive-io is full of life-cycle managed buffers. |
Your absolutely right that my use case has some special lifecycle that I'm trying to manage with It does seem to me now that if I don't want to lose emissions for my use case (and want to keep the I'll progress the implementation of the use case and see what else turns up. |
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe | ||
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously). | ||
* This can be used to limit the size of the map by evicting keys by maximum size or access time for | ||
* instance. If {@code evictingMapFactory} is null then no eviction strategy will be applied (and a suitable default thread-safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't support null
at the surface level. Those who don't wish to have a custom eviction strategy should use the other overload.
If you want to pursue this further, please rebase it. |
Yep I'd like to pursue this one. This PR is not great to put in an external library because it is so coupled to the groupBy implementation and its tests. I'll fix the API and I realize I may also need to put try-catch on I'd also like to use the Guava |
I'm not against adding |
Guava only has a few optional dependencies that I wouldn't include and the current version 19.0 still supports java 6. |
Great! |
0099a1a
to
b163ebd
Compare
Current coverage is 80.32%@@ 1.x #3931 diff @@
==========================================
Files 259 259
Lines 16821 16853 +32
Methods 0 0
Messages 0 0
Branches 2554 2561 +7
==========================================
+ Hits 13500 13537 +37
+ Misses 2408 2402 -6
- Partials 913 914 +1
|
6f7c29f
to
e511b22
Compare
👍 |
Oops, merged in a /cc @artem-zinnatullin @stevegury for review on this |
e511b22
to
085ec5d
Compare
//Can reach here because mapFactory.call() may throw in constructor of GroupBySubscriber | ||
Exceptions.throwOrReport(ex, child); | ||
Subscriber<? super T> parent2 = Subscribers.empty(); | ||
parent2.unsubscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add Subscribers.unsubscribed()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries, separate PR I'd suggest.
085ec5d
to
2e2bdb6
Compare
Thanks for the review @artem-zinnatullin. |
👍 |
I take it that being an API enhancement this PR still requires one more collaborator approval? |
oh and thanks @stevegury for giving it a look! |
Thanks @davidmoten for the contribution! |
I have a long running stream using
groupBy
that over time will accumulate 10s of millions of keys. If I can specify an evicting map togroupBy
then I'll be able to keep it down to ~10,000 keys.This PR supports this use case and could be used with Guava's
CacheBuilder
like this:I'll enhance this PR with more unit tests if this looks a good direction.