From 963ff5e7685be5530bc30996bae5a0607642af5f Mon Sep 17 00:00:00 2001 From: Oliver Hoff Date: Tue, 23 Mar 2021 06:12:34 +0100 Subject: [PATCH] feat(share): use another observable to control resets --- src/internal/operators/share.ts | 68 +++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index e12fbc86927..420c7550b95 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -2,7 +2,13 @@ import { Subject } from '../Subject'; import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; import { Subscription } from '../Subscription'; +import { Observable } from '../Observable'; +import { EMPTY } from '../observable/empty'; import { from } from '../observable/from'; +import { of } from '../observable/of'; +import { mapTo } from '../operators/mapTo'; +import { switchAll } from '../operators/switchAll'; +import { take } from '../operators/take'; import { operate } from '../util/lift'; export interface ShareConfig { @@ -19,7 +25,7 @@ export interface ShareConfig { * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however * {@link ReplaySubject} will also push its buffered values before pushing the error. */ - resetOnError?: boolean; + resetOnError?: boolean | ((error: any) => Observable); /** * If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This * allows the resulting observable to be "repeated" after it is done. @@ -27,7 +33,7 @@ export interface ShareConfig { * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats * or resubscriptions will resubscribe to that same subject. */ - resetOnComplete?: boolean; + resetOnComplete?: boolean | (() => Observable); /** * If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next @@ -36,7 +42,7 @@ export interface ShareConfig { * If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject * will remain connected to the source, and new subscriptions to the result will be connected through that same subject. */ - resetOnRefCountZero?: boolean; + resetOnRefCountZero?: boolean | (() => Observable); } export function share(): MonoTypeOperatorFunction; @@ -90,28 +96,45 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * * @return A function that returns an Observable that mirrors the source. */ -export function share(options?: ShareConfig): OperatorFunction { - options = options || {}; - const { connector = () => new Subject(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; +export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { + const { connector = () => new Subject() } = options; let connection: Subscription | null = null; let subject: SubjectLike | null = null; let refCount = 0; let hasCompleted = false; let hasErrored = false; + const resetSubject = new Subject void>>(); + let resetConnection: Subscription | null = null; // Used to reset the internal state to a "cold" // state, as though it had never been subscribed to. const reset = () => { - connection = subject = null; + resetConnection?.unsubscribe(); + resetConnection = connection = subject = null; hasCompleted = hasErrored = false; }; + const resetAndUnsubscribe = () => { + // We need to capture the connection before + // we reset (if we need to reset). + const conn = connection; + reset(); + conn?.unsubscribe(); + }; + + const resetOnComplete = createResetNotifierFactory(reset, options.resetOnComplete); + const resetOnError = createResetNotifierFactory(reset, options.resetOnError); + const resetOnRefCountZero = createResetNotifierFactory(resetAndUnsubscribe, options.resetOnRefCountZero); return operate((source, subscriber) => { refCount++; + if (!hasErrored && !hasCompleted) { + resetSubject.next(EMPTY); + } // Create the subject if we don't have one yet. subject = subject ?? connector(); + resetConnection = resetConnection ?? resetSubject.pipe(switchAll()).subscribe((fn) => void fn()); // The following line adds the subscription to the subscriber passed. // Basically, `subscriber === subject.subscribe(subscriber)` is `true`. @@ -125,19 +148,15 @@ export function share(options?: ShareConfig): OperatorFunction { // We need to capture the subject before // we reset (if we need to reset). const dest = subject!; - if (resetOnError) { - reset(); - } + resetSubject.next(resetOnError(err)); dest.error(err); }, complete: () => { hasCompleted = true; - const dest = subject!; // We need to capture the subject before // we reset (if we need to reset). - if (resetOnComplete) { - reset(); - } + const dest = subject!; + resetSubject.next(resetOnComplete()); dest.complete(); }, }); @@ -150,13 +169,22 @@ export function share(options?: ShareConfig): OperatorFunction { // If we're resetting on refCount === 0, and it's 0, we only want to do // that on "unsubscribe", really. Resetting on error or completion is a different // configuration. - if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { - // We need to capture the connection before - // we reset (if we need to reset). - const conn = connection; - reset(); - conn?.unsubscribe(); + if (refCount === 0 && !hasErrored && !hasCompleted) { + resetSubject.next(resetOnRefCountZero()); } }; }); } + +type ResetNotifierFactory = (...args: T) => Observable; + +function createResetNotifierFactory( + fn: () => void, + on: boolean | ResetNotifierFactory = true +): ResetNotifierFactory void> { + if (typeof on !== 'boolean') { + return (...args) => on(...args).pipe(take(1), mapTo(fn)); + } + + return on ? () => of(fn) : () => EMPTY; +}