From 35446758c2a1b31609abbf78cce5bb89f9d3c1a5 Mon Sep 17 00:00:00 2001 From: Josep M Sobrepere Date: Thu, 6 May 2021 12:47:24 +0200 Subject: [PATCH] fix(shareLatest): properly closing sync observables --- packages/core/src/internal/share-latest.ts | 43 +++++++++++----------- packages/core/src/share-latest.test.ts | 32 +++++++++++----- 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/packages/core/src/internal/share-latest.ts b/packages/core/src/internal/share-latest.ts index 532382e4..88385501 100644 --- a/packages/core/src/internal/share-latest.ts +++ b/packages/core/src/internal/share-latest.ts @@ -1,4 +1,4 @@ -import { Observable, Subscription, Subject, noop } from "rxjs" +import { Observable, Subscription, Subject, noop, Subscriber } from "rxjs" import { BehaviorObservable } from "./BehaviorObservable" import { EMPTY_VALUE } from "./empty-value" import { SUSPENSE } from "../SUSPENSE" @@ -10,7 +10,7 @@ const shareLatest = ( teardown = noop, ): BehaviorObservable => { let subject: Subject | null - let subscription: Subscription | null + let subscription: Subscriber | null let refCount = 0 let currentValue: T = EMPTY_VALUE let promise: Promise | null @@ -29,15 +29,31 @@ const shareLatest = ( refCount++ let innerSub: Subscription + + subscriber.add(() => { + refCount-- + innerSub.unsubscribe() + if (refCount === 0) { + currentValue = EMPTY_VALUE + if (subscription) { + subscription.unsubscribe() + } + teardown() + subject = null + subscription = null + promise = null + } + }) + if (!subject) { subject = new Subject() innerSub = subject.subscribe(subscriber) subscription = null - subscription = source$.subscribe( - (value) => { + subscription = new Subscriber( + (value: T) => { subject!.next((currentValue = value)) }, - (err) => { + (err: any) => { const _subject = subject subscription = null subject = null @@ -49,7 +65,7 @@ const shareLatest = ( subject!.complete() }, ) - if (subscription.closed) subscription = null + source$.subscribe(subscription) emitIfEmpty() } else { innerSub = subject.subscribe(subscriber) @@ -57,21 +73,6 @@ const shareLatest = ( subscriber.next(currentValue) } } - - return () => { - refCount-- - innerSub.unsubscribe() - if (refCount === 0) { - currentValue = EMPTY_VALUE - if (subscription) { - subscription.unsubscribe() - } - teardown() - subject = null - subscription = null - promise = null - } - } }) as BehaviorObservable let error: any = EMPTY_VALUE diff --git a/packages/core/src/share-latest.test.ts b/packages/core/src/share-latest.test.ts index 256e4540..36d2ea00 100644 --- a/packages/core/src/share-latest.test.ts +++ b/packages/core/src/share-latest.test.ts @@ -1,7 +1,7 @@ import { TestScheduler } from "rxjs/testing" -import { from, merge, defer } from "rxjs" +import { from, merge, defer, Observable, noop } from "rxjs" import { shareLatest } from "./" -import { withLatestFrom, startWith, map } from "rxjs/operators" +import { withLatestFrom, startWith, map, take } from "rxjs/operators" const scheduler = () => new TestScheduler((actual, expected) => { @@ -75,15 +75,29 @@ describe("shareLatest", () => { // prettier-ignore it("should not skip values on a sync source", () => { - scheduler().run(({ expectObservable }) => { - const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)") - const sub1 = '^'; - const expected1 = " (abcd|)" + scheduler().run(({ expectObservable }) => { + const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)") + const sub1 = '^'; + const expected1 = " (abcd|)" - const shared = shareLatest()(source); + const shared = shareLatest()(source); - expectObservable(shared, sub1).toBe(expected1); + expectObservable(shared, sub1).toBe(expected1); + }) + }) + + it("should stop listening to a synchronous observable when unsubscribed", () => { + let sideEffects = 0 + const synchronousObservable = new Observable((subscriber) => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects++ + subscriber.next(i) + } + }) + synchronousObservable.pipe(shareLatest(), take(3)).subscribe(noop) + expect(sideEffects).toBe(3) }) - }) }) })