From 12c3716cecbf01f353c980488bf18845177b37b6 Mon Sep 17 00:00:00 2001 From: Oliver Hoff Date: Fri, 21 May 2021 00:11:25 +0200 Subject: [PATCH] feat(share): use another observable to control resets (#6169) --- spec/operators/share-spec.ts | 1219 ++++++++++++++++++++----------- src/internal/operators/share.ts | 134 +++- 2 files changed, 877 insertions(+), 476 deletions(-) diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index c13da7e3c3..2ea5132a4b 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -1,23 +1,42 @@ /** @prettier */ import { expect } from 'chai'; +import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError } from 'rxjs'; import { - share, - retry, - mergeMapTo, + map, mergeMap, - tap, + mergeMapTo, + onErrorResumeNext, repeat, + retry, + share, + startWith, take, takeUntil, takeWhile, - map, - startWith, + tap, + toArray, withLatestFrom, } from 'rxjs/operators'; -import { Observable, EMPTY, NEVER, of, Subject, defer } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; -import sinon = require('sinon'); +import { SinonSpy, spy } from 'sinon'; + +const syncNotify = of(1); +const asapNotify = scheduled(syncNotify, asapScheduler); +const syncError = throwError(() => new Error()); + +function spyOnUnhandledError(fn: (spy: SinonSpy) => void): void { + const prevOnUnhandledError = config.onUnhandledError; + + try { + const onUnhandledError = spy(); + config.onUnhandledError = onUnhandledError; + + fn(onUnhandledError); + } finally { + config.onUnhandledError = prevOnUnhandledError; + } +} /** @test {share} */ describe('share', () => { @@ -27,564 +46,876 @@ describe('share', () => { rxTest = new TestScheduler(observableMatcher); }); - describe('share()', () => { - it('should mirror a simple source Observable', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const source = cold('--1-2---3-4--5-|'); - const sourceSubs = ' ^--------------!'; - const expected = ' --1-2---3-4--5-|'; + for (const { title, options } of [ + { title: 'share()', options: {} }, + { + title: 'share() using sync reset notifiers equivalent to default config', + options: { + resetOnError: () => syncNotify, + resetOnComplete: () => syncNotify, + resetOnRefCountZero: () => syncNotify, + }, + }, + { + title: 'share() using sync reset notifiers equivalent to default config and notifying again after reset is notified', + options: { + resetOnError: () => concat(syncNotify, syncNotify), + resetOnComplete: () => concat(syncNotify, syncNotify), + resetOnRefCountZero: () => concat(syncNotify, syncNotify), + }, + }, + { + title: 'share() using sync reset notifiers equivalent to default config and never completing after reset is notified', + options: { + resetOnError: () => concat(syncNotify, NEVER), + resetOnComplete: () => concat(syncNotify, NEVER), + resetOnRefCountZero: () => concat(syncNotify, NEVER), + }, + }, + { + title: 'share() using sync reset notifiers equivalent to default config and throwing an error after reset is notified', + options: { + resetOnError: () => concat(syncNotify, syncError), + resetOnComplete: () => concat(syncNotify, syncError), + resetOnRefCountZero: () => concat(syncNotify, syncError), + }, + }, + ]) { + describe(title, () => { + it('should mirror a simple source Observable', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('--1-2---3-4--5-|'); + const sourceSubs = ' ^--------------!'; + const expected = ' --1-2---3-4--5-|'; + + const shared = source.pipe(share(options)); + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - const shared = source.pipe(share()); + it('should share a single subscription', () => { + let subscriptionCount = 0; + const obs = new Observable(() => { + subscriptionCount++; + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + const source = obs.pipe(share(options)); + + expect(subscriptionCount).to.equal(0); + + source.subscribe(); + source.subscribe(); + + expect(subscriptionCount).to.equal(1); }); - }); - it('should share a single subscription', () => { - let subscriptionCount = 0; - const obs = new Observable(() => { - subscriptionCount++; + it('should not change the output of the observable when error', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot('---a--^--b--c--d--e--#'); + const e1subs = ' ^--------------!'; + const expected = ' ---b--c--d--e--#'; + + expectObservable(e1.pipe(share(options))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); - const source = obs.pipe(share()); + it('should not change the output of the observable when successful with cold observable', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' ---a--b--c--d--e--|'); + const e1subs = ' ^-----------------!'; + const expected = '---a--b--c--d--e--|'; - expect(subscriptionCount).to.equal(0); + expectObservable(e1.pipe(share(options))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); - source.subscribe(); - source.subscribe(); + it('should not change the output of the observable when error with cold observable', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' ---a--b--c--d--e--#'); + const e1subs = ' ^-----------------!'; + const expected = '---a--b--c--d--e--#'; - expect(subscriptionCount).to.equal(1); - }); + expectObservable(e1.pipe(share(options))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); - it('should not change the output of the observable when error', () => { - rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { - const e1 = hot('---a--^--b--c--d--e--#'); - const e1subs = ' ^--------------!'; - const expected = ' ---b--c--d--e--#'; + it('should retry just fine', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' ---a--b--c--d--e--# '); + // prettier-ignore + const e1subs = [ + ' ^-----------------! ', + ' ------------------^-----------------!' + ]; + const expected = '---a--b--c--d--e-----a--b--c--d--e--#'; + + expectObservable(e1.pipe(share(options), retry(1))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); - expectObservable(e1.pipe(share())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + it('should share the same values to multiple observers', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ^-----------!'; + const subscriber1 = hot('a| '); + const expected1 = ' -1-2-3----4-|'; + const subscriber2 = hot('----b| '); + const expected2 = ' -----3----4-|'; + const subscriber3 = hot('--------c| '); + const expected3 = ' ----------4-|'; + + const shared = source.pipe(share(options)); + + expectObservable(subscriber1.pipe(mergeMapTo(shared))).toBe(expected1); + expectObservable(subscriber2.pipe(mergeMapTo(shared))).toBe(expected2); + expectObservable(subscriber3.pipe(mergeMapTo(shared))).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should not change the output of the observable when successful with cold observable', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const e1 = cold(' ---a--b--c--d--e--|'); - const e1subs = ' ^-----------------!'; - const expected = '---a--b--c--d--e--|'; + it('should share an error from the source to multiple observers', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-#'); + const sourceSubs = ' ^-----------!'; + const subscriber1 = hot('a| '); + const expected1 = ' -1-2-3----4-#'; + const subscriber2 = hot('----b| '); + const expected2 = ' -----3----4-#'; + const subscriber3 = hot('--------c| '); + const expected3 = ' ----------4-#'; + + const shared = source.pipe(share(options)); + + expectObservable(subscriber1.pipe(mergeMapTo(shared))).toBe(expected1); + expectObservable(subscriber2.pipe(mergeMapTo(shared))).toBe(expected2); + expectObservable(subscriber3.pipe(mergeMapTo(shared))).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - expectObservable(e1.pipe(share())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + it('should share the same values to multiple observers, but is unsubscribed explicitly and early', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ^--------! '; + const unsub = ' ---------! '; + const subscriber1 = hot('a| '); + const expected1 = ' -1-2-3---- '; + const subscriber2 = hot('----b| '); + const expected2 = ' -----3---- '; + const subscriber3 = hot('--------c| '); + const expected3 = ' ---------- '; + + const shared = source.pipe(share(options)); + + expectObservable(subscriber1.pipe(mergeMapTo(shared)), unsub).toBe(expected1); + expectObservable(subscriber2.pipe(mergeMapTo(shared)), unsub).toBe(expected2); + expectObservable(subscriber3.pipe(mergeMapTo(shared)), unsub).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should not change the output of the observable when error with cold observable', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const e1 = cold(' ---a--b--c--d--e--#'); - const e1subs = ' ^-----------------!'; - const expected = '---a--b--c--d--e--#'; + it('should share an empty source', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('| '); + const sourceSubs = ' (^!)'; + const expected = ' | '; - expectObservable(e1.pipe(share())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + const shared = source.pipe(share(options)); + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should retry just fine', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const e1 = cold(' ---a--b--c--d--e--# '); - // prettier-ignore - const e1subs = [ - ' ^-----------------! ', - ' ------------------^-----------------!' - ]; - const expected = '---a--b--c--d--e-----a--b--c--d--e--#'; + it('should share a never source', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('-'); + const sourceSubs = ' ^'; + const expected = ' -'; - expectObservable(e1.pipe(share(), retry(1))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + const shared = source.pipe(share(options)); + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should share the same values to multiple observers', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold(' -1-2-3----4-|'); - const sourceSubs = ' ^-----------!'; - const subscriber1 = hot('a| '); - const expected1 = ' -1-2-3----4-|'; - const subscriber2 = hot('----b| '); - const expected2 = ' -----3----4-|'; - const subscriber3 = hot('--------c| '); - const expected3 = ' ----------4-|'; - - const shared = source.pipe(share()); - - expectObservable(subscriber1.pipe(mergeMapTo(shared))).toBe(expected1); - expectObservable(subscriber2.pipe(mergeMapTo(shared))).toBe(expected2); - expectObservable(subscriber3.pipe(mergeMapTo(shared))).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should share a throw source', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('# '); + const sourceSubs = ' (^!)'; + const expected = ' # '; + + const shared = source.pipe(share(options)); + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should share an error from the source to multiple observers', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold(' -1-2-3----4-#'); - const sourceSubs = ' ^-----------!'; - const subscriber1 = hot('a| '); - const expected1 = ' -1-2-3----4-#'; - const subscriber2 = hot('----b| '); - const expected2 = ' -----3----4-#'; - const subscriber3 = hot('--------c| '); - const expected3 = ' ----------4-#'; - - const shared = source.pipe(share()); - - expectObservable(subscriber1.pipe(mergeMapTo(shared))).toBe(expected1); - expectObservable(subscriber2.pipe(mergeMapTo(shared))).toBe(expected2); - expectObservable(subscriber3.pipe(mergeMapTo(shared))).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should connect when first subscriber subscribes', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ---^-----------!'; + const subscriber1 = hot('---a| '); + const expected1 = ' ----1-2-3----4-|'; + const subscriber2 = hot('-------b| '); + const expected2 = ' --------3----4-|'; + const subscriber3 = hot('-----------c| '); + const expected3 = ' -------------4-|'; + + const shared = source.pipe(share(options)); + + expectObservable(subscriber1.pipe(mergeMapTo(shared))).toBe(expected1); + expectObservable(subscriber2.pipe(mergeMapTo(shared))).toBe(expected2); + expectObservable(subscriber3.pipe(mergeMapTo(shared))).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should share the same values to multiple observers, but is unsubscribed explicitly and early', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold(' -1-2-3----4-|'); - const sourceSubs = ' ^--------! '; - const unsub = ' ---------! '; - const subscriber1 = hot('a| '); - const expected1 = ' -1-2-3---- '; - const subscriber2 = hot('----b| '); - const expected2 = ' -----3---- '; - const subscriber3 = hot('--------c| '); - const expected3 = ' ---------- '; - - const shared = source.pipe(share()); - - expectObservable(subscriber1.pipe(mergeMapTo(shared)), unsub).toBe(expected1); - expectObservable(subscriber2.pipe(mergeMapTo(shared)), unsub).toBe(expected2); - expectObservable(subscriber3.pipe(mergeMapTo(shared)), unsub).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should disconnect when last subscriber unsubscribes', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ---^--------! '; + const subscriber1 = hot('---a| '); + const unsub1 = ' ----------! '; + const expected1 = ' ----1-2-3-- '; + const subscriber2 = hot('-------b| '); + const unsub2 = ' ------------! '; + const expected2 = ' --------3---- '; + + const shared = source.pipe(share(options)); + + expectObservable(subscriber1.pipe(mergeMapTo(shared)), unsub1).toBe(expected1); + expectObservable(subscriber2.pipe(mergeMapTo(shared)), unsub2).toBe(expected2); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should share an empty source', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const source = cold('| '); - const sourceSubs = ' (^!)'; - const expected = ' | '; + it('should not break unsubscription chain when last subscriber unsubscribes', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold(' -1-2-3----4-|'); + const sourceSubs = ' ---^--------! '; + const subscriber1 = hot('---a| '); + const unsub1 = ' ----------! '; + const expected1 = ' ----1-2-3-- '; + const subscriber2 = hot('-------b| '); + const unsub2 = ' ------------! '; + const expected2 = ' --------3---- '; + + const shared = source.pipe( + mergeMap((x: string) => of(x)), + share(options), + mergeMap((x: string) => of(x)) + ); + + expectObservable(subscriber1.pipe(mergeMapTo(shared)), unsub1).toBe(expected1); + expectObservable(subscriber2.pipe(mergeMapTo(shared)), unsub2).toBe(expected2); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - const shared = source.pipe(share()); + it('should be retryable when cold source is synchronous', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold('(123#) '); + const subscribe1 = ' s '; + const expected1 = ' (123123#) '; + const subscribe2 = ' -s '; + const expected2 = ' -(123123#)'; + const sourceSubs = [ + ' (^!) ', + ' (^!) ', + ' -(^!) ', + ' -(^!) ', + ]; + + const shared = source.pipe(share(options)); + + expectObservable( + hot(subscribe1).pipe( + tap(() => { + expectObservable(shared.pipe(retry(1))).toBe(expected1); + }) + ) + ).toBe(subscribe1); + + expectObservable( + hot(subscribe2).pipe( + tap(() => { + expectObservable(shared.pipe(retry(1))).toBe(expected2); + }) + ) + ).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should be repeatable when cold source is synchronous', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold('(123|) '); + const subscribe1 = ' s '; + const expected1 = ' (123123|) '; + const subscribe2 = ' -s '; + const expected2 = ' -(123123|)'; + const sourceSubs = [ + ' (^!) ', + ' (^!) ', + ' -(^!) ', + ' -(^!) ', + ]; + + const shared = source.pipe(share(options)); + + expectObservable( + hot(subscribe1).pipe( + tap(() => { + expectObservable(shared.pipe(repeat(2))).toBe(expected1); + }) + ) + ).toBe(subscribe1); + + expectObservable( + hot(subscribe2).pipe( + tap(() => { + expectObservable(shared.pipe(repeat(2))).toBe(expected2); + }) + ) + ).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); - }); - it('should share a never source', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const source = cold('-'); - const sourceSubs = ' ^'; - const expected = ' -'; + it('should be retryable', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold('-1-2-3----4-# '); + const sourceSubs = [ + ' ^-----------! ', + ' ------------^-----------! ', + ' ------------------------^-----------!', + ]; + const subscribe1 = ' s------------------------------------'; + const expected1 = ' -1-2-3----4--1-2-3----4--1-2-3----4-#'; + const subscribe2 = ' ----s--------------------------------'; + const expected2 = ' -----3----4--1-2-3----4--1-2-3----4-#'; + + const shared = source.pipe(share(options)); + + expectObservable( + hot(subscribe1).pipe( + tap(() => { + expectObservable(shared.pipe(retry(2))).toBe(expected1); + }) + ) + ).toBe(subscribe1); + + expectObservable( + hot(subscribe2).pipe( + tap(() => { + expectObservable(shared.pipe(retry(2))).toBe(expected2); + }) + ) + ).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - const shared = source.pipe(share()); + it('should be repeatable', () => { + rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { + const source = cold('-1-2-3----4-| '); + const sourceSubs = [ + ' ^-----------! ', + ' ------------^-----------! ', + ' ------------------------^-----------!', + ]; + const subscribe1 = ' s------------------------------------'; + const expected1 = ' -1-2-3----4--1-2-3----4--1-2-3----4-|'; + const subscribe2 = ' ----s--------------------------------'; + const expected2 = ' -----3----4--1-2-3----4--1-2-3----4-|'; + + const shared = source.pipe(share(options)); + + expectObservable( + hot(subscribe1).pipe( + tap(() => { + expectObservable(shared.pipe(repeat(3))).toBe(expected1); + }) + ) + ).toBe(subscribe1); + + expectObservable( + hot(subscribe2).pipe( + tap(() => { + expectObservable(shared.pipe(repeat(3))).toBe(expected2); + }) + ) + ).toBe(subscribe2); + + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should not change the output of the observable when never', () => { + rxTest.run(({ expectObservable }) => { + const e1 = NEVER; + const expected = '-'; + + expectObservable(e1.pipe(share(options))).toBe(expected); + }); }); - }); - it('should share a throw source', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const source = cold('# '); - const sourceSubs = ' (^!)'; - const expected = ' # '; + it('should not change the output of the observable when empty', () => { + rxTest.run(({ expectObservable }) => { + const e1 = EMPTY; + const expected = '|'; - const shared = source.pipe(share()); + expectObservable(e1.pipe(share(options))).toBe(expected); + }); + }); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should stop listening to a synchronous observable when unsubscribed', () => { + const sideEffects: number[] = []; + const synchronousObservable = new Observable((subscriber) => { + // This will check to see if the subscriber was closed on each loop + // when the unsubscribe hits (from the `take`), it should be closed + for (let i = 0; !subscriber.closed && i < 10; i++) { + sideEffects.push(i); + subscriber.next(i); + } + }); + + synchronousObservable.pipe(share(options), take(3)).subscribe(() => { + /* noop */ + }); + + expect(sideEffects).to.deep.equal([0, 1, 2]); }); - }); - it('should connect when first subscriber subscribes', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold(' -1-2-3----4-|'); - const sourceSubs = ' ---^-----------!'; - const subscriber1 = hot('---a| '); - const expected1 = ' ----1-2-3----4-|'; - const subscriber2 = hot('-------b| '); - const expected2 = ' --------3----4-|'; - const subscriber3 = hot('-----------c| '); - const expected3 = ' -------------4-|'; - - const shared = source.pipe(share()); - - expectObservable(subscriber1.pipe(mergeMapTo(shared))).toBe(expected1); - expectObservable(subscriber2.pipe(mergeMapTo(shared))).toBe(expected2); - expectObservable(subscriber3.pipe(mergeMapTo(shared))).toBe(expected3); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should not fail on reentrant subscription', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + // https://github.com/ReactiveX/rxjs/issues/6144 + const source = cold('(123|)'); + const subs = ' (^!) '; + const expected = ' (136|)'; + + const deferred = defer(() => shared).pipe(startWith(0)); + const shared: Observable = source.pipe( + withLatestFrom(deferred), + map(([a, b]) => String(Number(a) + Number(b))), + share(options) + ); + + expectObservable(shared).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); }); }); + } + + for (const { title, resetOnError, resetOnComplete, resetOnRefCountZero } of [ + { title: 'share(config)', resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }, + { + title: 'share(config) using EMPTY as sync reset notifier equivalents', + resetOnError: () => EMPTY, + resetOnComplete: () => EMPTY, + resetOnRefCountZero: () => EMPTY, + }, + { + title: 'share(config) using NEVER as sync reset notifier equivalents', + resetOnError: () => NEVER, + resetOnComplete: () => NEVER, + resetOnRefCountZero: () => NEVER, + }, + ]) { + describe(title, () => { + it('should not reset on error if configured to do so', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot('---a---b---c---d---e---f----#'); + const expected = ' ---a---b---c---d---e---f----#'; + const sourceSubs = [ + ' ^----------! ', + ' -----------^-----------! ', + ' -----------------------^----!', + ]; + const result = source.pipe( + // takes a, b, c... then repeat causes it to take d, e, f + take(3), + share({ resetOnError }), + repeat() + ); + + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); - it('should disconnect when last subscriber unsubscribes', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold(' -1-2-3----4-|'); - const sourceSubs = ' ---^--------! '; - const subscriber1 = hot('---a| '); - const unsub1 = ' ----------! '; - const expected1 = ' ----1-2-3-- '; - const subscriber2 = hot('-------b| '); - const unsub2 = ' ------------! '; - const expected2 = ' --------3---- '; - - const shared = source.pipe(share()); - - expectObservable(subscriber1.pipe(mergeMapTo(shared)), unsub1).toBe(expected1); - expectObservable(subscriber2.pipe(mergeMapTo(shared)), unsub2).toBe(expected2); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + it('should not reset on complete if configured to do so', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('---a---b---c---# '); + const expected = ' ---a---b---c------a---b---c------a---b---|'; + const sourceSubs = [ + ' ^--------------! ', + ' ---------------^--------------! ', + ' ------------------------------^----------!', + ]; + + // Used to trigger the source to complete at a given moment. + const triggerComplete = new Subject(); + + // just used to count how many values have made it through the share. + let count = 0; + + const result = source.pipe( + takeUntil(triggerComplete), + share({ resetOnComplete }), + // Retry on any error. + retry(), + tap(() => { + if (++count === 9) { + // If we see the ninth value, complete the source this time. + triggerComplete.next(); + } + }) + ); + + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + }); + + it('should not reset on refCount 0 if configured to do so', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot(' ---v---v---v---E--v---v---v---C---v----v------v---'); + const expected = ' ---v---v---v------v---v---v-------v----v---- '; + const subscription = '^-------------------------------------------! '; + const sourceSubs = [ + ' ^--------------!', + ' ---------------^--------------!', + // Note this last subscription never ends, because refCount hitting zero isn't going to reset. + ' ------------------------------^-------------- ', + ]; + + const result = source.pipe( + tap((value) => { + if (value === 'E') { + throw new Error('E'); + } + }), + takeWhile((value) => value !== 'C'), + share({ resetOnRefCountZero }), + retry(), + repeat() + ); + + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); }); + } + + describe('share(config)', () => { + it('should use the connector function provided', () => { + const connector = spy(() => new Subject()); + + rxTest.run(({ hot, expectObservable }) => { + const source = hot(' ---v---v---v---E--v---v---v---C---v----v--------v----v---'); + const subs1 = ' ^-------------------------------------------! '; + const expResult1 = ' ---v---v---v------v---v---v-------v----v----- '; + const subs2 = ' ----------------------------------------------^---------!'; + const expResult2 = ' ------------------------------------------------v----v---'; - it('should not break unsubscription chain when last subscriber unsubscribes', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold(' -1-2-3----4-|'); - const sourceSubs = ' ---^--------! '; - const subscriber1 = hot('---a| '); - const unsub1 = ' ----------! '; - const expected1 = ' ----1-2-3-- '; - const subscriber2 = hot('-------b| '); - const unsub2 = ' ------------! '; - const expected2 = ' --------3---- '; - - const shared = source.pipe( - mergeMap((x: string) => of(x)), - share(), - mergeMap((x: string) => of(x)) + const result = source.pipe( + tap((value) => { + if (value === 'E') { + throw new Error('E'); + } + }), + takeWhile((value) => value !== 'C'), + share({ + connector, + }), + retry(), + repeat() ); - expectObservable(subscriber1.pipe(mergeMapTo(shared)), unsub1).toBe(expected1); - expectObservable(subscriber2.pipe(mergeMapTo(shared)), unsub2).toBe(expected2); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(result, subs1).toBe(expResult1); + expectObservable(result, subs2).toBe(expResult2); + }); + + expect(connector).to.have.callCount(4); + }); + }); + + describe('share(config) with async/deferred reset notifiers', () => { + it('should reset on refCount 0 when synchronously resubscribing to a firehose and using a sync reset notifier', () => { + let subscriptionCount = 0; + const source = new Observable((subscriber) => { + subscriptionCount++; + for (let i = 0; i < 3 && !subscriber.closed; i++) { + subscriber.next(i); + } + if (!subscriber.closed) { + subscriber.complete(); + } }); + + let result; + source + .pipe(share({ resetOnRefCountZero: () => syncNotify }), take(2), repeat(2), toArray()) + .subscribe((numbers) => void (result = numbers)); + + expect(subscriptionCount).to.equal(2); + expect(result).to.deep.equal([0, 1, 0, 1]); }); - it('should be retryable when cold source is synchronous', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold('(123#) '); - const subscribe1 = ' s '; - const expected1 = ' (123123#) '; - const subscribe2 = ' -s '; - const expected2 = ' -(123123#)'; + it('should reset on refCount 0 when synchronously resubscribing and using a sync reset notifier', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot(' ---1---2---3---(4 )---5---|'); const sourceSubs = [ - ' (^!) ', - ' (^!) ', - ' -(^!) ', - ' -(^!) ', + ' ^------! ', + // break the line, please + ' -------^-------(! ) ', ]; + const expected = ' ---1---2---3---(4|) '; + const subscription = '^--------------(- ) '; - const shared = source.pipe(share()); + const sharedSource = source.pipe(share({ resetOnRefCountZero: () => syncNotify }), take(2)); - expectObservable( - hot(subscribe1).pipe( - tap(() => { - expectObservable(shared.pipe(retry(1))).toBe(expected1); - }) - ) - ).toBe(subscribe1); - - expectObservable( - hot(subscribe2).pipe( - tap(() => { - expectObservable(shared.pipe(retry(1))).toBe(expected2); - }) - ) - ).toBe(subscribe2); + const result = concat(sharedSource, sharedSource); + expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); - it('should be repeatable when cold source is synchronous', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold('(123|) '); - const subscribe1 = ' s '; - const expected1 = ' (123123|) '; - const subscribe2 = ' -s '; - const expected2 = ' -(123123|)'; - const sourceSubs = [ - ' (^!) ', - ' (^!) ', - ' -(^!) ', - ' -(^!) ', - ]; + it('should not reset on refCount 0 when synchronously resubscribing and using a deferred reset notifier', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold(' ---1---2---3---4---5---|'); + const sourceSubs = ' ^----------------------!'; + const expected = ' ---1---2---3---4---5---|'; + const subscription = '^-----------------------'; - const shared = source.pipe(share()); + const sharedSource = source.pipe(share({ resetOnRefCountZero: () => asapNotify }), take(3)); - expectObservable( - hot(subscribe1).pipe( - tap(() => { - expectObservable(shared.pipe(repeat(2))).toBe(expected1); - }) - ) - ).toBe(subscribe1); - - expectObservable( - hot(subscribe2).pipe( - tap(() => { - expectObservable(shared.pipe(repeat(2))).toBe(expected2); - }) - ) - ).toBe(subscribe2); + const result = concat(sharedSource, sharedSource); + expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); - it('should be retryable', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold('-1-2-3----4-# '); + it('should reset on refCount 0 only after reset notifier emitted', () => { + rxTest.run(({ hot, cold, expectObservable, expectSubscriptions }) => { + const source = hot(' ---1---2---3---4---5---|'); const sourceSubs = [ - ' ^-----------! ', - ' ------------^-----------! ', - ' ------------------------^-----------!', + ' ^----------------! ', + // break the line, please + ' ------------------^----!', ]; - const subscribe1 = ' s------------------------------------'; - const expected1 = ' -1-2-3----4--1-2-3----4--1-2-3----4-#'; - const subscribe2 = ' ----s--------------------------------'; - const expected2 = ' -----3----4--1-2-3----4--1-2-3----4-#'; + const expected = ' ---1---2---3---4---5---|'; + const subscription = ' ^-----------------------'; + const firstPause = cold(' -| '); + const reset = cold(' --r '); + const secondPause = cold(' ---| '); + // reset: ' --r ' - const shared = source.pipe(share()); + const sharedSource = source.pipe(share({ resetOnRefCountZero: () => reset }), take(2)); - expectObservable( - hot(subscribe1).pipe( - tap(() => { - expectObservable(shared.pipe(retry(2))).toBe(expected1); - }) - ) - ).toBe(subscribe1); - - expectObservable( - hot(subscribe2).pipe( - tap(() => { - expectObservable(shared.pipe(retry(2))).toBe(expected2); - }) - ) - ).toBe(subscribe2); + const result = concat(sharedSource, firstPause, sharedSource, secondPause, sharedSource); + expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); - it('should be repeatable', () => { - rxTest.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const source = cold('-1-2-3----4-| '); + it('should reset on error only after reset notifier emitted', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold(' ---1---2---# '); + // source: ' ---1---2---# ' const sourceSubs = [ - ' ^-----------! ', - ' ------------^-----------! ', - ' ------------------------^-----------!', + ' ^----------! ', + // break the line, please + ' --------------^----------! ', ]; - const subscribe1 = ' s------------------------------------'; - const expected1 = ' -1-2-3----4--1-2-3----4--1-2-3----4-|'; - const subscribe2 = ' ----s--------------------------------'; - const expected2 = ' -----3----4--1-2-3----4--1-2-3----4-|'; - - const shared = source.pipe(share()); + const expected = ' ---1---2---------1---2----# '; + const subscription = ' ^-------------------------- '; + const firstPause = cold(' -------| '); + const reset = cold(' --r '); + const secondPause = cold(' -----| '); + // reset: ' --r' - expectObservable( - hot(subscribe1).pipe( - tap(() => { - expectObservable(shared.pipe(repeat(3))).toBe(expected1); - }) - ) - ).toBe(subscribe1); + const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero: false }), take(2)); - expectObservable( - hot(subscribe2).pipe( - tap(() => { - expectObservable(shared.pipe(repeat(3))).toBe(expected2); - }) - ) - ).toBe(subscribe2); + const result = concat(sharedSource, firstPause, sharedSource, secondPause, sharedSource); + expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); - it('should not change the output of the observable when never', () => { - rxTest.run(({ expectObservable }) => { - const e1 = NEVER; - const expected = '-'; + it('should reset on complete only after reset notifier emitted', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold(' ---1---2---| '); + // source: ' ---1---2---| ' + const sourceSubs = [ + ' ^----------! ', + // break the line, please + ' --------------^----------! ', + ]; + const expected = ' ---1---2---------1---2----| '; + const subscription = ' ^-------------------------- '; + const firstPause = cold(' -------| '); + const reset = cold(' --r '); + const secondPause = cold(' -----| '); + // reset: ' --r' - expectObservable(e1.pipe(share())).toBe(expected); - }); - }); + const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2)); - it('should not change the output of the observable when empty', () => { - rxTest.run(({ expectObservable }) => { - const e1 = EMPTY; - const expected = '|'; + const result = concat(sharedSource, firstPause, sharedSource, secondPause, sharedSource); - expectObservable(e1.pipe(share())).toBe(expected); + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); - it('should stop listening to a synchronous observable when unsubscribed', () => { - const sideEffects: number[] = []; - const synchronousObservable = new Observable((subscriber) => { - // This will check to see if the subscriber was closed on each loop - // when the unsubscribe hits (from the `take`), it should be closed - for (let i = 0; !subscriber.closed && i < 10; i++) { - sideEffects.push(i); - subscriber.next(i); - } - }); + it('should not reset on refCount 0 if reset notifier errors before emitting any value', () => { + spyOnUnhandledError((onUnhandledError) => { + const error = new Error(); - synchronousObservable.pipe(share(), take(3)).subscribe(() => { - /* noop */ - }); + rxTest.run(({ hot, cold, expectObservable, expectSubscriptions }) => { + const source = hot(' ---1---2---3---4---(5 )---|'); + const sourceSubs = ' ^------------------(- )---!'; + const expected = ' ---1---2-------4---(5|) '; + const subscription = ' ^------------------(- ) '; + const firstPause = cold(' ------| '); + const reset = cold(' --# ', undefined, error); + // reset: ' (- )-# ' - expect(sideEffects).to.deep.equal([0, 1, 2]); - }); + const sharedSource = source.pipe(share({ resetOnRefCountZero: () => reset }), take(2)); - it('should not fail on reentrant subscription', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - // https://github.com/ReactiveX/rxjs/issues/6144 - const source = cold('(123|)'); - const subs = ' (^!) '; - const expected = ' (136|)'; - - const deferred = defer(() => shared).pipe(startWith(0)); - const shared: Observable = source.pipe( - withLatestFrom(deferred), - map(([a, b]) => String(Number(a) + Number(b))), - share() - ); + const result = concat(sharedSource, firstPause, sharedSource); - expectObservable(shared).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(subs); + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + expect(onUnhandledError).to.have.been.calledTwice; + expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error); + expect(onUnhandledError.getCall(1)).to.have.been.calledWithExactly(error); }); }); - }); - describe('share(config)', () => { - it('should not reset on error if configured to do so', () => { - rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { - const source = hot('---a---b---c---d---e---f----#'); - const expected = ' ---a---b---c---d---e---f----#'; - const sourceSubs = [ - ' ^----------! ', - ' -----------^-----------! ', - ' -----------------------^----!', - ]; - const result = source.pipe( - // takes a, b, c... then repeat causes it to take d, e, f - take(3), - share({ - resetOnError: false, - }), - repeat() - ); + it('should not reset on error if reset notifier errors before emitting any value', () => { + spyOnUnhandledError((onUnhandledError) => { + const error = new Error(); - expectObservable(result).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold(' ---1---2---# '); + const sourceSubs = ' ^----------! '; + const expected = ' ---1---2------#'; + const subscription = ' ^--------------'; + const firstPause = cold(' -------|'); + const reset = cold(' --# ', undefined, error); + + const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero: false }), take(2)); + + const result = concat(sharedSource, firstPause, sharedSource); + + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + expect(onUnhandledError).to.have.been.calledOnce; + expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error); }); }); - it('should not reset on complete if configured to do so', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const source = cold('---a---b---c---# '); - const expected = ' ---a---b---c------a---b---c------a---b---|'; - const sourceSubs = [ - ' ^--------------! ', - ' ---------------^--------------! ', - ' ------------------------------^----------!', - ]; + it('should not reset on complete if reset notifier errors before emitting any value', () => { + spyOnUnhandledError((onUnhandledError) => { + const error = new Error(); - // Used to trigger the source to complete at a given moment. - const triggerComplete = new Subject(); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold(' ---1---2---| '); + const sourceSubs = ' ^----------! '; + const expected = ' ---1---2------|'; + const subscription = ' ^--------------'; + const firstPause = cold(' -------|'); + const reset = cold(' --# ', undefined, error); - // just used to count how many values have made it through the share. - let count = 0; + const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2)); - const result = source.pipe( - takeUntil(triggerComplete), - share({ - resetOnComplete: false, - }), - // Retry on any error. - retry(), - tap(() => { - if (++count === 9) { - // If we see the ninth value, complete the source this time. - triggerComplete.next(); - } - }) - ); + const result = concat(sharedSource, firstPause, sharedSource); - expectObservable(result).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + expect(onUnhandledError).to.have.been.calledOnce; + expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error); }); }); - it('should not reset on refCount 0 if configured to do so', () => { - rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { - const source = hot(' ---v---v---v---E--v---v---v---C---v----v------v---'); - const expected = ' ---v---v---v------v---v---v-------v----v---- '; - const subscription = '^-------------------------------------------! '; + it('should not call "resetOnRefCountZero" on error', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const resetOnRefCountZero = spy(() => EMPTY); + + const source = cold(' ---1---(2#) '); + // source: ' ---1---(2#) ' const sourceSubs = [ - ' ^--------------!', - ' ---------------^--------------!', - // Note this last subscription never ends, because refCount hitting zero isn't going to reset. - ' ------------------------------^-------------- ', + ' ^------(! ) ', + // break the line, please + ' -------(- )---^------(! ) ', ]; + const expected = ' ---1---(2 )------1---(2#) '; + const subscription = ' ^------(- )----------(- ) '; + const firstPause = cold(' (- )---| '); + const reset = cold(' (- )-r '); + // reset: ' (- )-r' - const result = source.pipe( - tap((value) => { - if (value === 'E') { - throw new Error('E'); - } - }), - takeWhile((value) => value !== 'C'), - share({ - resetOnRefCountZero: false, - }), - retry(), - repeat() - ); + const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero })); + + const result = concat(sharedSource.pipe(onErrorResumeNext(firstPause)), sharedSource); expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expect(resetOnRefCountZero).to.not.have.been.called; }); }); - it('should use the connector function provided', () => { - const connector = sinon.spy(() => new Subject()); + it('should not call "resetOnRefCountZero" on complete', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const resetOnRefCountZero = spy(() => EMPTY); - rxTest.run(({ hot, expectObservable }) => { - const source = hot(' ---v---v---v---E--v---v---v---C---v----v--------v----v---'); - const subs1 = ' ^-------------------------------------------! '; - const expResult1 = ' ---v---v---v------v---v---v-------v----v----- '; - const subs2 = ' ----------------------------------------------^---------!'; - const expResult2 = ' ------------------------------------------------v----v---'; + const source = cold(' ---1---(2|) '); + // source: ' ---1---(2|) ' + const sourceSubs = [ + ' ^------(! ) ', + // break the line, please + ' -------(- )---^------(! ) ', + ]; + const expected = ' ---1---(2 )------1---(2|) '; + const subscription = ' ^------(- )----------(- ) '; + const firstPause = cold(' (- )---| '); + const reset = cold(' (- )-r '); + // reset: ' (- )-r' - const result = source.pipe( - tap((value) => { - if (value === 'E') { - throw new Error('E'); - } - }), - takeWhile((value) => value !== 'C'), - share({ - connector, - }), - retry(), - repeat() - ); + const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero })); - expectObservable(result, subs1).toBe(expResult1); - expectObservable(result, subs2).toBe(expResult2); - }); + const result = concat(sharedSource, firstPause, sharedSource); - expect(connector).to.have.callCount(4); + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expect(resetOnRefCountZero).to.not.have.been.called; + }); }); }); }); diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 335210fdc8..90f929d9aa 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -1,7 +1,10 @@ +import { Observable } from '../Observable'; +import { from } from '../observable/from'; +import { take } from '../operators/take'; import { Subject } from '../Subject'; -import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; import { SafeSubscriber } from '../Subscriber'; -import { from } from '../observable/from'; +import { Subscription } from '../Subscription'; +import { MonoTypeOperatorFunction, SubjectLike } from '../types'; import { operate } from '../util/lift'; export interface ShareConfig { @@ -17,16 +20,20 @@ export interface ShareConfig { * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however * {@link ReplaySubject} will also push its buffered values before pushing the error. + * It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained + * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. */ - resetOnError?: boolean; + resetOnError?: boolean | ((error: any) => Observable); /** * If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This * allows the resulting observable to be "repeated" after it is done. * If false, when the source completes, it will push the completion through the connecting subject, and the subject * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats * or resubscriptions will resubscribe to that same subject. + * It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained + * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. */ - resetOnComplete?: boolean; + resetOnComplete?: boolean | (() => Observable); /** * If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next @@ -34,8 +41,10 @@ export interface ShareConfig { * again. * If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject * will remain connected to the source, and new subscriptions to the result will be connected through that same subject. + * It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained + * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. */ - resetOnRefCountZero?: boolean; + resetOnRefCountZero?: boolean | (() => Observable); } export function share(): MonoTypeOperatorFunction; @@ -48,6 +57,14 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream `hot`. * This is an alias for `multicast(() => new Subject()), refCount()`. * + * The subscription to the underlying source Observable can be reset (unsubscribe and resubscribe for new subscribers), + * if the subscriber count to the shared observable drops to 0, or if the source Observable errors or completes. It is + * possible to use notifier factories for the resets to allow for behaviors like conditional or delayed resets. Please + * note that resetting on error or complete of the source Observable does not behave like a transparent retry or restart + * of the source because the error or complete will be forwarded to all subscribers and their subscription will be + * closed. Only new subscribers after a reset on error or complete happened will cause a fresh subscription to the + * source. To achieve transparent retries or restarts pipe the source through appropriate operators before sharing. + * * ![](share.png) * * ## Example @@ -84,33 +101,78 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * // ... and so on * ``` * + * ## Example with notifier factory: Delayed reset + * ```ts + * import { interval } from 'rxjs'; + * import { share, take, timer } from 'rxjs/operators'; + * + * const source = interval(1000).pipe(take(3), share({ resetOnRefCountZero: () => timer(1000) })); + * + * const subscriptionOne = source.subscribe(x => console.log('subscription 1: ', x)); + * setTimeout(() => subscriptionOne.unsubscribe(), 1300); + * + * setTimeout(() => source.subscribe(x => console.log('subscription 2: ', x)), 1700); + * + * setTimeout(() => source.subscribe(x => console.log('subscription 3: ', x)), 5000); + * + * // Logs: + * // subscription 1: 0 + * // (subscription 1 unsubscribes here) + * // (subscription 2 subscribes here ~400ms later, source was not reset) + * // subscription 2: 1 + * // subscription 2: 2 + * // (subscription 2 unsubscribes here) + * // (subscription 3 subscribes here ~2000ms later, source did reset before) + * // subscription 3: 0 + * // subscription 3: 1 + * // subscription 3: 2 + * ``` + * * @see {@link api/index/function/interval} * @see {@link map} * * @return A function that returns an Observable that mirrors the source. */ -export function share(options?: ShareConfig): OperatorFunction { - options = options || {}; - const { connector = () => new Subject(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; +export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { + const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; let connection: SafeSubscriber | null = null; + let resetConnection: Subscription | null = null; let subject: SubjectLike | null = null; let refCount = 0; let hasCompleted = false; let hasErrored = false; + const cancelReset = () => { + resetConnection?.unsubscribe(); + resetConnection = null; + }; // Used to reset the internal state to a "cold" // state, as though it had never been subscribed to. const reset = () => { + cancelReset(); connection = subject = null; hasCompleted = hasErrored = false; }; + const resetAndUnsubscribe = () => { + // We need to capture the connection before + // we reset (if we need to reset). + const conn = connection; + reset(); + conn?.unsubscribe(); + }; return operate((source, subscriber) => { refCount++; + if (!hasErrored && !hasCompleted) { + cancelReset(); + } - // Create the subject if we don't have one yet. - subject = subject ?? connector(); + // Create the subject if we don't have one yet. Grab a local reference to + // it as well, which avoids non-null assertations when using it and, if we + // connect to it now, then error/complete need a reference after it was + // reset. + const dest = (subject = subject ?? connector()); // Add the teardown directly to the subscriber - instead of returning it - // so that the handling of the subscriber's unsubscription will be wired @@ -123,18 +185,14 @@ export function share(options?: ShareConfig): OperatorFunction { // If we're resetting on refCount === 0, and it's 0, we only want to do // that on "unsubscribe", really. Resetting on error or completion is a different // configuration. - if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { - // We need to capture the connection before - // we reset (if we need to reset). - const conn = connection; - reset(); - conn?.unsubscribe(); + if (refCount === 0 && !hasErrored && !hasCompleted) { + resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); } }); // The following line adds the subscription to the subscriber passed. - // Basically, `subscriber === subject.subscribe(subscriber)` is `true`. - subject.subscribe(subscriber); + // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. + dest.subscribe(subscriber); if (!connection) { // We need to create a subscriber here - rather than pass an observer and @@ -143,25 +201,17 @@ export function share(options?: ShareConfig): OperatorFunction { // those situations we want connection to be already-assigned so that we // don't create another connection to the source. connection = new SafeSubscriber({ - next: (value: T) => subject!.next(value), - error: (err: any) => { + next: (value) => dest.next(value), + error: (err) => { hasErrored = true; - // We need to capture the subject before - // we reset (if we need to reset). - const dest = subject!; - if (resetOnError) { - reset(); - } + cancelReset(); + resetConnection = handleReset(reset, resetOnError, err); dest.error(err); }, complete: () => { hasCompleted = true; - const dest = subject!; - // We need to capture the subject before - // we reset (if we need to reset). - if (resetOnComplete) { - reset(); - } + cancelReset(); + resetConnection = handleReset(reset, resetOnComplete); dest.complete(); }, }); @@ -169,3 +219,23 @@ export function share(options?: ShareConfig): OperatorFunction { } }); } + +function handleReset( + reset: () => void, + on: boolean | ((...args: T) => Observable), + ...args: T +): Subscription | null { + if (on === true) { + reset(); + + return null; + } + + if (on === false) { + return null; + } + + return on(...args) + .pipe(take(1)) + .subscribe(() => reset()); +}