diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 335210fdc81..57c9a101487 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -1,7 +1,10 @@ +import { Observable } from '../Observable'; +import { from } from '../observable/from'; +import { take } from '../operators/take'; import { Subject } from '../Subject'; -import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; import { SafeSubscriber } from '../Subscriber'; -import { from } from '../observable/from'; +import { Subscription } from '../Subscription'; +import { MonoTypeOperatorFunction, SubjectLike } from '../types'; import { operate } from '../util/lift'; export interface ShareConfig { @@ -17,16 +20,20 @@ export interface ShareConfig { * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries * or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however * {@link ReplaySubject} will also push its buffered values before pushing the error. + * It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained + * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. */ - resetOnError?: boolean; + resetOnError?: boolean | ((error: any) => Observable); /** * If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This * allows the resulting observable to be "repeated" after it is done. * If false, when the source completes, it will push the completion through the connecting subject, and the subject * will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats * or resubscriptions will resubscribe to that same subject. + * It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained + * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. */ - resetOnComplete?: boolean; + resetOnComplete?: boolean | (() => Observable); /** * If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the * internal state will be reset and the resulting observable will return to a "cold" state. This means that the next @@ -34,8 +41,10 @@ export interface ShareConfig { * again. * If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject * will remain connected to the source, and new subscriptions to the result will be connected through that same subject. + * It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained + * control over how and when the reset should happen. This allows behaviors like conditional or delayed resets. */ - resetOnRefCountZero?: boolean; + resetOnRefCountZero?: boolean | (() => Observable); } export function share(): MonoTypeOperatorFunction; @@ -48,6 +57,14 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream `hot`. * This is an alias for `multicast(() => new Subject()), refCount()`. * + * The subscription to the underlying source Observable can be reset (unsubscribe and resubscribe for new subscribers), + * if the subscriber count to the shared observable drops to 0, or if the source Observable errors or completes. It is + * possible to use notifier factories for the resets to allow for behaviors like conditional or delayed resets. Please + * note that resetting on error or complete of the source Observable does not behave like a transparent retry or restart + * of the source because the error or complete will be forwarded to all subscribers and their subscription will be + * closed. Only new subscribers after a reset on error or complete happened will cause a fresh subscription to the + * source. To achieve transparent retries or restarts pipe the source through appropriate operators before sharing. + * * ![](share.png) * * ## Example @@ -84,30 +101,72 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * // ... and so on * ``` * + * ## Example with notifier factory: Delayed reset + * ```ts + * import { interval } from 'rxjs'; + * import { share, take, timer } from 'rxjs/operators'; + * + * const source = interval(1000).pipe(take(3), share({ resetOnRefCountZero: () => timer(1000) })); + * + * const subscriptionOne = source.subscribe(x => console.log('subscription 1: ', x)); + * setTimeout(() => subscriptionOne.unsubscribe(), 1300); + * + * setTimeout(() => source.subscribe(x => console.log('subscription 2: ', x)), 1700); + * + * setTimeout(() => source.subscribe(x => console.log('subscription 3: ', x)), 5000); + * + * // Logs: + * // subscription 1: 0 + * // (subscription 1 unsubscribes here) + * // (subscription 2 subscribes here ~400ms later, source was not reset) + * // subscription 2: 1 + * // subscription 2: 2 + * // (subscription 2 unsubscribes here) + * // (subscription 3 subscribes here ~2000ms later, source did reset before) + * // subscription 3: 0 + * // subscription 3: 1 + * // subscription 3: 2 + * ``` + * * @see {@link api/index/function/interval} * @see {@link map} * * @return A function that returns an Observable that mirrors the source. */ -export function share(options?: ShareConfig): OperatorFunction { - options = options || {}; - const { connector = () => new Subject(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; +export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { + const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; let connection: SafeSubscriber | null = null; + let resetConnection: Subscription | null = null; let subject: SubjectLike | null = null; let refCount = 0; let hasCompleted = false; let hasErrored = false; + const cancelReset = () => { + resetConnection?.unsubscribe(); + resetConnection = null; + }; // Used to reset the internal state to a "cold" // state, as though it had never been subscribed to. const reset = () => { + cancelReset(); connection = subject = null; hasCompleted = hasErrored = false; }; + const resetAndUnsubscribe = () => { + // We need to capture the connection before + // we reset (if we need to reset). + const conn = connection; + reset(); + conn?.unsubscribe(); + }; return operate((source, subscriber) => { refCount++; + if (!hasErrored && !hasCompleted) { + cancelReset(); + } // Create the subject if we don't have one yet. subject = subject ?? connector(); @@ -123,12 +182,8 @@ export function share(options?: ShareConfig): OperatorFunction { // If we're resetting on refCount === 0, and it's 0, we only want to do // that on "unsubscribe", really. Resetting on error or completion is a different // configuration. - if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { - // We need to capture the connection before - // we reset (if we need to reset). - const conn = connection; - reset(); - conn?.unsubscribe(); + if (refCount === 0 && !hasErrored && !hasCompleted) { + resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); } }); @@ -149,19 +204,17 @@ export function share(options?: ShareConfig): OperatorFunction { // We need to capture the subject before // we reset (if we need to reset). const dest = subject!; - if (resetOnError) { - reset(); - } + cancelReset(); + resetConnection = handleReset(reset, resetOnError, err); dest.error(err); }, complete: () => { hasCompleted = true; - const dest = subject!; // We need to capture the subject before // we reset (if we need to reset). - if (resetOnComplete) { - reset(); - } + const dest = subject!; + cancelReset(); + resetConnection = handleReset(reset, resetOnComplete); dest.complete(); }, }); @@ -169,3 +222,23 @@ export function share(options?: ShareConfig): OperatorFunction { } }); } + +function handleReset( + fn: () => void, + on: boolean | ((...args: T) => Observable), + ...args: T +): Subscription | null { + if (on === true) { + fn(); + + return null; + } + + if (on === false) { + return null; + } + + return on(...args) + .pipe(take(1)) + .subscribe(() => fn()); +}