Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: replay().refCount() avoid leaking items between connections #5181

Merged
merged 2 commits into from
Mar 15, 2017

Conversation

akarnokd
Copy link
Member

This PR updates replay() to not leak items between reconnections when run with .refCount(). The replay() operator is designed to hold onto the buffer even after its completion so late subscribers can still receive the cached data. Only a new connect() clears this data which may or may not happen. Since refCount ensures that there won't be any latecommers to an already completed connection (because it also drops its current subscribers on termination or when reaching zero), the replay() can be reset to an empty state.

Reported in #5172

@codecov
Copy link

codecov bot commented Mar 13, 2017

Codecov Report

Merging #5181 into 1.x will increase coverage by 0.02%.
The diff coverage is 100%.

@@             Coverage Diff              @@
##                1.x    #5181      +/-   ##
============================================
+ Coverage     84.33%   84.36%   +0.02%     
- Complexity     2879     2882       +3     
============================================
  Files           290      290              
  Lines         18107    18115       +8     
  Branches       2475     2478       +3     
============================================
+ Hits          15270    15282      +12     
+ Misses         1971     1962       -9     
- Partials        866      871       +5
Impacted Files Coverage Δ Complexity Δ
...ava/rx/internal/operators/OnSubscribeRefCount.java 96.42% <100%> (+0.27%) 6 <0> (ø)
...ain/java/rx/internal/operators/OperatorReplay.java 82.77% <100%> (+0.13%) 15 <4> (+4)
...ain/java/rx/internal/schedulers/SchedulerWhen.java 83.78% <0%> (-4.06%) 4% <0%> (ø)
...n/java/rx/subscriptions/CompositeSubscription.java 74.02% <0%> (-3.9%) 23% <0%> (-2%)
...n/java/rx/internal/operators/CachedObservable.java 83.69% <0%> (-1.64%) 6% <0%> (ø)
.../rx/internal/schedulers/CachedThreadScheduler.java 87.37% <0%> (-0.98%) 6% <0%> (ø)
...main/java/rx/internal/operators/OperatorMerge.java 86.6% <0%> (-0.47%) 7% <0%> (ø)
...ternal/operators/OperatorOnBackpressureLatest.java 82.85% <0%> (+1.9%) 3% <0%> (ø)
...java/rx/internal/schedulers/ExecutorScheduler.java 81.69% <0%> (+4.22%) 2% <0%> (ø)
...ain/java/rx/internal/operators/OnSubscribeAmb.java 84.17% <0%> (+5.03%) 13% <0%> (ø)
... and 1 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8811831...c4221a6. Read the comment docs.

Copy link
Contributor

@artem-zinnatullin artem-zinnatullin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

source = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't WeakReference be a more determined way to detect memory leak?

AtomicReference<WeakReference<Object>> ref = new AtomicReference<>();

Observable
  .fromCallable {
    Object o = new Object();
    ref.set(new WeakReference(o));
    return o;
  }
  .replay(1)
  .refCount();

…

for (i = 0; i < 10; i++) { System.gc(); Thread.sleep(1); }

assertThat(ref.get().get()).isNull()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ~100M is almost as apparent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes and no. I wouldn't trust heap of a process in which I do not control Threads and allocations (both IDE and Gradle start separate JVM with own wrappers around your tests to execute them).

Also, I'm afraid that allocation of such a relatively huge piece of memory as an array may result in OOM on constrained environment like Travis CI.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why only 100M, the OOMKiller jumps into action around 800 MB. Plus, the scheduler tests already use such large amount of memory and no kills for those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then!

@akarnokd akarnokd merged commit 0f1542d into ReactiveX:1.x Mar 15, 2017
@akarnokd akarnokd deleted the ReplayRefCountNoLeak branch March 24, 2017 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants