-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: replay().refCount() avoid leaking items between connections #5181
Conversation
Codecov Report
@@ Coverage Diff @@
## 1.x #5181 +/- ##
============================================
+ Coverage 84.33% 84.36% +0.02%
- Complexity 2879 2882 +3
============================================
Files 290 290
Lines 18107 18115 +8
Branches 2475 2478 +3
============================================
+ Hits 15270 15282 +12
+ Misses 1971 1962 -9
- Partials 866 871 +5
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
source = Observable.fromCallable(new Callable<Object>() { | ||
@Override | ||
public Object call() throws Exception { | ||
return new byte[100 * 1000 * 1000]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't WeakReference
be a more determined way to detect memory leak?
AtomicReference<WeakReference<Object>> ref = new AtomicReference<>();
Observable
.fromCallable {
Object o = new Object();
ref.set(new WeakReference(o));
return o;
}
.replay(1)
.refCount();
…
for (i = 0; i < 10; i++) { System.gc(); Thread.sleep(1); }
assertThat(ref.get().get()).isNull()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the ~100M is almost as apparent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, yes and no. I wouldn't trust heap of a process in which I do not control Threads and allocations (both IDE and Gradle start separate JVM with own wrappers around your tests to execute them).
Also, I'm afraid that allocation of such a relatively huge piece of memory as an array may result in OOM on constrained environment like Travis CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why only 100M, the OOMKiller jumps into action around 800 MB. Plus, the scheduler tests already use such large amount of memory and no kills for those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok then!
This PR updates
replay()
to not leak items between reconnections when run with.refCount()
. Thereplay()
operator is designed to hold onto the buffer even after its completion so late subscribers can still receive the cached data. Only a newconnect()
clears this data which may or may not happen. SincerefCount
ensures that there won't be any latecommers to an already completed connection (because it also drops its current subscribers on termination or when reaching zero), thereplay()
can be reset to an empty state.Reported in #5172