From 2271a9180131a0becdbf789c1429ef741ace4b2f Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 7 May 2021 08:22:56 +1000 Subject: [PATCH] fix(share): propagate closed to firehose sources (#6370) * test: enable firehose test for share * fix: add teardown directly to subscriber * chore: add comment * chore: remove firehose tests for deprecated ops * refactor: move it up even further --- spec/operators/multicast-spec.ts | 43 ------------------------------ spec/operators/refCount-spec.ts | 21 --------------- spec/operators/share-spec.ts | 3 +-- spec/operators/shareReplay-spec.ts | 19 ------------- src/internal/operators/share.ts | 36 ++++++++++++++----------- 5 files changed, 21 insertions(+), 101 deletions(-) diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index fd76a8bc94..f88213f1cf 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -821,47 +821,4 @@ describe('multicast', () => { ); }); }); - - // TODO: fix firehose unsubscription - // AFAICT, it's not possible for multicast observables to support ASAP - // unsubscription from synchronous firehose sources. The problem is that the - // chaining of the closed 'signal' is broken by the subject. For example, - // here: - // - // https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53 - // - // The subject is passed to subscribe. However, in the subscribe - // implementation a SafeSubscriber is created with the subject as the - // observer: - // - // https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210 - // - // That breaks the chaining of closed - i.e. even if the unsubscribe is - // called on the subject, closing it, the SafeSubscriber's closed property - // won't reflect that. - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { - const sideEffects: number[] = []; - const synchronousObservable = new Observable((subscriber) => { - // This will check to see if the subscriber was closed on each loop - // when the unsubscribe hits (from the `take`), it should be closed - for (let i = 0; !subscriber.closed && i < 10; i++) { - sideEffects.push(i); - subscriber.next(i); - } - }); - - synchronousObservable - .pipe( - multicast( - () => new Subject(), - (source) => source - ), - take(3) - ) - .subscribe(() => { - /* noop */ - }); - - expect(sideEffects).to.deep.equal([0, 1, 2]); - }); }); diff --git a/spec/operators/refCount-spec.ts b/spec/operators/refCount-spec.ts index 5c204087bd..170806803e 100644 --- a/spec/operators/refCount-spec.ts +++ b/spec/operators/refCount-spec.ts @@ -114,25 +114,4 @@ describe('refCount', () => { expect(arr[0]).to.equal('the number one'); expect(arr[1]).to.equal('the number two'); }); - - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { - const sideEffects: number[] = []; - const synchronousObservable = new Observable(subscriber => { - // This will check to see if the subscriber was closed on each loop - // when the unsubscribe hits (from the `take`), it should be closed - for (let i = 0; !subscriber.closed && i < 10; i++) { - sideEffects.push(i); - subscriber.next(i); - } - }); - - synchronousObservable.pipe( - multicast(() => new Subject()), - refCount(), - take(3), - ).subscribe(() => { /* noop */ }); - - expect(sideEffects).to.deep.equal([0, 1, 2]); - }); }); diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index cb3b5b3b1b..c13da7e3c3 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -427,8 +427,7 @@ describe('share', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable((subscriber) => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index d0b77dce15..31a107a826 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -347,25 +347,6 @@ describe('shareReplay', () => { }); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { - const sideEffects: number[] = []; - const synchronousObservable = new Observable((subscriber) => { - // This will check to see if the subscriber was closed on each loop - // when the unsubscribe hits (from the `take`), it should be closed - for (let i = 0; !subscriber.closed && i < 10; i++) { - sideEffects.push(i); - subscriber.next(i); - } - }); - - synchronousObservable.pipe(shareReplay(), take(3)).subscribe(() => { - /* noop */ - }); - - expect(sideEffects).to.deep.equal([0, 1, 2]); - }); - const FinalizationRegistry = (global as any).FinalizationRegistry; if (FinalizationRegistry) { it('should not leak the subscriber for sync sources', (done) => { diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index ea0c374786..335210fdc8 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -112,6 +112,26 @@ export function share(options?: ShareConfig): OperatorFunction { // Create the subject if we don't have one yet. subject = subject ?? connector(); + // Add the teardown directly to the subscriber - instead of returning it - + // so that the handling of the subscriber's unsubscription will be wired + // up _before_ the subscription to the source occurs. This is done so that + // the assignment to the source connection's `closed` property will be seen + // by synchronous firehose sources. + subscriber.add(() => { + refCount--; + + // If we're resetting on refCount === 0, and it's 0, we only want to do + // that on "unsubscribe", really. Resetting on error or completion is a different + // configuration. + if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { + // We need to capture the connection before + // we reset (if we need to reset). + const conn = connection; + reset(); + conn?.unsubscribe(); + } + }); + // The following line adds the subscription to the subscriber passed. // Basically, `subscriber === subject.subscribe(subscriber)` is `true`. subject.subscribe(subscriber); @@ -147,21 +167,5 @@ export function share(options?: ShareConfig): OperatorFunction { }); from(source).subscribe(connection); } - - // This is also added to `subscriber`, technically. - return () => { - refCount--; - - // If we're resetting on refCount === 0, and it's 0, we only want to do - // that on "unsubscribe", really. Resetting on error or completion is a different - // configuration. - if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { - // We need to capture the connection before - // we reset (if we need to reset). - const conn = connection; - reset(); - conn?.unsubscribe(); - } - }; }); }