Skip to content

Commit

Permalink
feat(share): use another observable to control resets
Browse files Browse the repository at this point in the history
  • Loading branch information
backbone87 committed Mar 23, 2021
1 parent 3330ff3 commit 963ff5e
Showing 1 changed file with 48 additions and 20 deletions.
68 changes: 48 additions & 20 deletions src/internal/operators/share.ts
Expand Up @@ -2,7 +2,13 @@ import { Subject } from '../Subject';

import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { EMPTY } from '../observable/empty';
import { from } from '../observable/from';
import { of } from '../observable/of';
import { mapTo } from '../operators/mapTo';
import { switchAll } from '../operators/switchAll';
import { take } from '../operators/take';
import { operate } from '../util/lift';

export interface ShareConfig<T> {
Expand All @@ -19,15 +25,15 @@ export interface ShareConfig<T> {
* or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however
* {@link ReplaySubject} will also push its buffered values before pushing the error.
*/
resetOnError?: boolean;
resetOnError?: boolean | ((error: any) => Observable<any>);
/**
* If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This
* allows the resulting observable to be "repeated" after it is done.
* If false, when the source completes, it will push the completion through the connecting subject, and the subject
* will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats
* or resubscriptions will resubscribe to that same subject.
*/
resetOnComplete?: boolean;
resetOnComplete?: boolean | (() => Observable<any>);
/**
* If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the
* internal state will be reset and the resulting observable will return to a "cold" state. This means that the next
Expand All @@ -36,7 +42,7 @@ export interface ShareConfig<T> {
* If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
* will remain connected to the source, and new subscriptions to the result will be connected through that same subject.
*/
resetOnRefCountZero?: boolean;
resetOnRefCountZero?: boolean | (() => Observable<any>);
}

export function share<T>(): MonoTypeOperatorFunction<T>;
Expand Down Expand Up @@ -90,28 +96,45 @@ export function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;
*
* @return A function that returns an Observable that mirrors the source.
*/
export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
options = options || {};
const { connector = () => new Subject<T>(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options;
export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction<T> {
const { connector = () => new Subject<T>() } = options;

let connection: Subscription | null = null;
let subject: SubjectLike<T> | null = null;
let refCount = 0;
let hasCompleted = false;
let hasErrored = false;
const resetSubject = new Subject<Observable<() => void>>();
let resetConnection: Subscription | null = null;

// Used to reset the internal state to a "cold"
// state, as though it had never been subscribed to.
const reset = () => {
connection = subject = null;
resetConnection?.unsubscribe();
resetConnection = connection = subject = null;
hasCompleted = hasErrored = false;
};
const resetAndUnsubscribe = () => {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
};

const resetOnComplete = createResetNotifierFactory(reset, options.resetOnComplete);
const resetOnError = createResetNotifierFactory(reset, options.resetOnError);
const resetOnRefCountZero = createResetNotifierFactory(resetAndUnsubscribe, options.resetOnRefCountZero);

return operate((source, subscriber) => {
refCount++;
if (!hasErrored && !hasCompleted) {
resetSubject.next(EMPTY);
}

// Create the subject if we don't have one yet.
subject = subject ?? connector();
resetConnection = resetConnection ?? resetSubject.pipe(switchAll()).subscribe((fn) => void fn());

// The following line adds the subscription to the subscriber passed.
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
Expand All @@ -125,19 +148,15 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
// We need to capture the subject before
// we reset (if we need to reset).
const dest = subject!;
if (resetOnError) {
reset();
}
resetSubject.next(resetOnError(err));
dest.error(err);
},
complete: () => {
hasCompleted = true;
const dest = subject!;
// We need to capture the subject before
// we reset (if we need to reset).
if (resetOnComplete) {
reset();
}
const dest = subject!;
resetSubject.next(resetOnComplete());
dest.complete();
},
});
Expand All @@ -150,13 +169,22 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
if (refCount === 0 && !hasErrored && !hasCompleted) {
resetSubject.next(resetOnRefCountZero());
}
};
});
}

type ResetNotifierFactory<T extends unknown[] = [], R = any> = (...args: T) => Observable<R>;

function createResetNotifierFactory<T extends unknown[]>(
fn: () => void,
on: boolean | ResetNotifierFactory<T> = true
): ResetNotifierFactory<T, () => void> {
if (typeof on !== 'boolean') {
return (...args) => on(...args).pipe(take(1), mapTo(fn));
}

return on ? () => of(fn) : () => EMPTY;
}

0 comments on commit 963ff5e

Please sign in to comment.