Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: add groupBy overload with evictingMapFactory #3931

Merged
merged 1 commit into from
Jun 25, 2016

Conversation

davidmoten
Copy link
Collaborator

I have a long running stream using groupBy that over time will accumulate 10s of millions of keys. If I can specify an evicting map to groupBy then I'll be able to keep it down to ~10,000 keys.

This PR supports this use case and could be used with Guava's CacheBuilder like this:

Func1<Action1<K>, Map<K, Object>> mapFactory = 
    action -> CacheBuilder.newBuilder()
              .maximumSize(1000)
              .expireAfterAccess(12, TimeUnit.HOUR)
              .removalListener(key -> action.call(key))
              .<K, Object> build().asMap();
observable
    .groupBy(keySelector, elementSelector, mapFactory)
    ...

I'll enhance this PR with more unit tests if this looks a good direction.

@davidmoten
Copy link
Collaborator Author

This PR may be useful still but I've realised I can get what I want with more control using defer, PublishSubject and takeUntil on the grouped observables.

@akarnokd
Copy link
Member

Yes, you can timeout or takeUntil on each group to get it removed from the internal map. I'm not sure about the map factory because we need something concurrently modifiable.

@davidmoten
Copy link
Collaborator Author

Yep you're right but I put that requirement in the javadoc and the example
map from cachebuilder is threadsafe.

On Fri, 13 May 2016 03:24 David Karnok notifications@github.com wrote:

Yes, you can timeout or takeUntil on each group to get it removed from
the internal map. I'm not sure about the map factory because we need
something concurrently modifiable.


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
#3931 (comment)

@davidmoten
Copy link
Collaborator Author

I'm happy using timeout with my use case now but I'll leave this PR up for a few days to see if anyone else has a use case.

@davidmoten
Copy link
Collaborator Author

davidmoten commented May 15, 2016

@akarnokd I think there's a catch with timeout. I'm using it as below but I believe I could lose an emission due to timeout cutting in on say the processing occurring in operator1:

o.groupBy(...)
  .flatMap(
    g -> 
        g.operator1()
         .timeout(10, TimeUnit.SECONDS)
         .onErrorResumeNext(completeOnTimeoutException))
)

I guess I'll have to use timeout or timer in a different way so it doesn't shortcut an in-flight emission. Have I got this right?

@akarnokd
Copy link
Member

Yes, there is chance values get dropped when an emission and group unsubscribe happens at the same time. You can't know the value made it or not. If your value has its own lifecycle, that's going to be a problem. Rx is not designed for that and it's the next challenge of the field - especially since reactive-io is full of life-cycle managed buffers.

@davidmoten
Copy link
Collaborator Author

If your value has its own lifecycle, that's going to be a problem. Rx is not designed for that and it's the next challenge of the field - especially since reactive-io is full of life-cycle managed buffers.

Your absolutely right that my use case has some special lifecycle that I'm trying to manage with groupBy. It's an event sourcing/cqrs use case where emissions upstream of groupBy are persisted (you knew io was part of the scenario you perceptive fellow) and whenever a groupBy emitted GroupedObservable is subscribed to (or resubscribed to) the stream starts with the events already recorded for that key (read from disk) before processing the incoming event from upstream.

It does seem to me now that if I don't want to lose emissions for my use case (and want to keep the groupBy keys down in numbers) then I have to record information about groups upstream of groupBy and possibly modify the upstream to include some sort of keyed sentinel to tell the GroupedObservable to complete based on last access time for a key for instance. OR I just use the modified groupBy in this PR.

I'll progress the implementation of the use case and see what else turns up.

@akarnokd akarnokd changed the title add groupBy overload with evictingMapFactory 1.x: add groupBy overload with evictingMapFactory Jun 19, 2016
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously).
* This can be used to limit the size of the map by evicting keys by maximum size or access time for
* instance. If {@code evictingMapFactory} is null then no eviction strategy will be applied (and a suitable default thread-safe
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't support null at the surface level. Those who don't wish to have a custom eviction strategy should use the other overload.

@akarnokd
Copy link
Member

If you want to pursue this further, please rebase it.

@davidmoten
Copy link
Collaborator Author

davidmoten commented Jun 19, 2016

Yep I'd like to pursue this one. This PR is not great to put in an external library because it is so coupled to the groupBy implementation and its tests. I'll fix the API and I realize I may also need to put try-catch on mapFactory.call and map.get and map.put calls with some specific error handling (catching fatal errors, calling onError etc). I'll have a look at it.

I'd also like to use the Guava CacheBuilder in unit tests. Can I add that test dependency?

@akarnokd
Copy link
Member

I'm not against adding testCompile entries. Hopefully, Guava doesn't drag in too many other libraries.

@davidmoten
Copy link
Collaborator Author

Guava only has a few optional dependencies that I wouldn't include and the current version 19.0 still supports java 6.

@akarnokd
Copy link
Member

Great!

@codecov-io
Copy link

codecov-io commented Jun 23, 2016

Current coverage is 80.32%

Merging #3931 into 1.x will increase coverage by 0.06%

@@                1.x      #3931   diff @@
==========================================
  Files           259        259          
  Lines         16821      16853    +32   
  Methods           0          0          
  Messages          0          0          
  Branches       2554       2561     +7   
==========================================
+ Hits          13500      13537    +37   
+ Misses         2408       2402     -6   
- Partials        913        914     +1   

Powered by Codecov. Last updated by afe3cb0...2e2bdb6

@davidmoten davidmoten force-pushed the groupby-eviction2 branch 2 times, most recently from 6f7c29f to e511b22 Compare June 23, 2016 04:34
@akarnokd
Copy link
Member

👍

@akarnokd
Copy link
Member

Oops, merged in a build.gradle change and broke this. Could you rebase again?

/cc @artem-zinnatullin @stevegury for review on this

//Can reach here because mapFactory.call() may throw in constructor of GroupBySubscriber
Exceptions.throwOrReport(ex, child);
Subscriber<? super T> parent2 = Subscribers.empty();
parent2.unsubscribe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add Subscribers.unsubscribed()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, separate PR I'd suggest.

@davidmoten
Copy link
Collaborator Author

davidmoten commented Jun 24, 2016

Thanks for the review @artem-zinnatullin.
I've addressed your comments and there's one little addition which is the rename of one of the test methods to use camel case and a number to differentiate from testGroupByBackpressure (testgroupByBackpressure -> testGroupByBackpressure2).

@stevegury
Copy link
Member

👍

@davidmoten
Copy link
Collaborator Author

I take it that being an API enhancement this PR still requires one more collaborator approval?

@davidmoten
Copy link
Collaborator Author

oh and thanks @stevegury for giving it a look!

@akarnokd akarnokd merged commit d855a29 into ReactiveX:1.x Jun 25, 2016
@akarnokd
Copy link
Member

Thanks @davidmoten for the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants