Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(shareLatest): properly closing sync observables #194

Merged
merged 1 commit into from May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 22 additions & 21 deletions packages/core/src/internal/share-latest.ts
@@ -1,4 +1,4 @@
import { Observable, Subscription, Subject, noop } from "rxjs"
import { Observable, Subscription, Subject, noop, Subscriber } from "rxjs"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { SUSPENSE } from "../SUSPENSE"
Expand All @@ -10,7 +10,7 @@ const shareLatest = <T>(
teardown = noop,
): BehaviorObservable<T> => {
let subject: Subject<T> | null
let subscription: Subscription | null
let subscription: Subscriber<T> | null
let refCount = 0
let currentValue: T = EMPTY_VALUE
let promise: Promise<T> | null
Expand All @@ -29,15 +29,31 @@ const shareLatest = <T>(

refCount++
let innerSub: Subscription

subscriber.add(() => {
refCount--
innerSub.unsubscribe()
if (refCount === 0) {
currentValue = EMPTY_VALUE
if (subscription) {
subscription.unsubscribe()
}
teardown()
subject = null
subscription = null
promise = null
}
})

if (!subject) {
subject = new Subject<T>()
innerSub = subject.subscribe(subscriber)
subscription = null
subscription = source$.subscribe(
(value) => {
subscription = new Subscriber<T>(
(value: T) => {
subject!.next((currentValue = value))
},
(err) => {
(err: any) => {
const _subject = subject
subscription = null
subject = null
Expand All @@ -49,29 +65,14 @@ const shareLatest = <T>(
subject!.complete()
},
)
if (subscription.closed) subscription = null
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed! 🎉

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good one

source$.subscribe(subscription)
emitIfEmpty()
} else {
innerSub = subject.subscribe(subscriber)
if (currentValue !== EMPTY_VALUE) {
subscriber.next(currentValue)
}
}

return () => {
refCount--
innerSub.unsubscribe()
if (refCount === 0) {
currentValue = EMPTY_VALUE
if (subscription) {
subscription.unsubscribe()
}
teardown()
subject = null
subscription = null
promise = null
}
}
}) as BehaviorObservable<T>

let error: any = EMPTY_VALUE
Expand Down
32 changes: 23 additions & 9 deletions packages/core/src/share-latest.test.ts
@@ -1,7 +1,7 @@
import { TestScheduler } from "rxjs/testing"
import { from, merge, defer } from "rxjs"
import { from, merge, defer, Observable, noop } from "rxjs"
import { shareLatest } from "./"
import { withLatestFrom, startWith, map } from "rxjs/operators"
import { withLatestFrom, startWith, map, take } from "rxjs/operators"

const scheduler = () =>
new TestScheduler((actual, expected) => {
Expand Down Expand Up @@ -75,15 +75,29 @@ describe("shareLatest", () => {

// prettier-ignore
it("should not skip values on a sync source", () => {
scheduler().run(({ expectObservable }) => {
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
const sub1 = '^';
const expected1 = " (abcd|)"
scheduler().run(({ expectObservable }) => {
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
const sub1 = '^';
const expected1 = " (abcd|)"

const shared = shareLatest()(source);
const shared = shareLatest()(source);

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub1).toBe(expected1);
})
})

it("should stop listening to a synchronous observable when unsubscribed", () => {
let sideEffects = 0
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects++
subscriber.next(i)
}
})
synchronousObservable.pipe(shareLatest(), take(3)).subscribe(noop)
expect(sideEffects).toBe(3)
})
})
})
})