Skip to content

Commit

Permalink
feat(share): use another observable to control resets
Browse files Browse the repository at this point in the history
  • Loading branch information
backbone87 committed Mar 24, 2021
1 parent a94b804 commit d682c57
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions src/internal/operators/share.ts
Expand Up @@ -7,7 +7,6 @@ import { EMPTY } from '../observable/empty';
import { from } from '../observable/from';
import { of } from '../observable/of';
import { mapTo } from '../operators/mapTo';
import { switchAll } from '../operators/switchAll';
import { take } from '../operators/take';
import { operate } from '../util/lift';

Expand Down Expand Up @@ -100,18 +99,21 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction
const { connector = () => new Subject<T>() } = options;

let connection: Subscription | null = null;
let resetConnection: Subscription | null = null;
let subject: SubjectLike<T> | null = null;
let refCount = 0;
let hasCompleted = false;
let hasErrored = false;
const resetSubject = new Subject<Observable<() => void>>();
let resetConnection: Subscription | null = null;

const cancelReset = () => {
resetConnection?.unsubscribe();
resetConnection = null;
};
// Used to reset the internal state to a "cold"
// state, as though it had never been subscribed to.
const reset = () => {
resetConnection?.unsubscribe();
resetConnection = connection = subject = null;
cancelReset();
connection = subject = null;
hasCompleted = hasErrored = false;
};
const resetAndUnsubscribe = () => {
Expand All @@ -129,12 +131,11 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction
return operate((source, subscriber) => {
refCount++;
if (!hasErrored && !hasCompleted) {
resetSubject.next(EMPTY);
cancelReset();
}

// Create the subject if we don't have one yet.
subject = subject ?? connector();
resetConnection = resetConnection ?? resetSubject.pipe(switchAll()).subscribe((fn) => void fn());

// The following line adds the subscription to the subscriber passed.
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
Expand All @@ -148,15 +149,17 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction
// We need to capture the subject before
// we reset (if we need to reset).
const dest = subject!;
resetSubject.next(resetOnError(err));
cancelReset();
resetConnection = resetOnError(err).subscribe(invoke);
dest.error(err);
},
complete: () => {
hasCompleted = true;
// We need to capture the subject before
// we reset (if we need to reset).
const dest = subject!;
resetSubject.next(resetOnComplete());
cancelReset();
resetConnection = resetOnComplete().subscribe(invoke);
dest.complete();
},
});
Expand All @@ -170,7 +173,8 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (refCount === 0 && !hasErrored && !hasCompleted) {
resetSubject.next(resetOnRefCountZero());
cancelReset(); // paranoia, there should never be a resetConnection, if we reached this point
resetConnection = resetOnRefCountZero().subscribe(invoke);
}
};
});
Expand All @@ -188,3 +192,7 @@ function createResetNotifierFactory<T extends unknown[]>(

return on ? () => of(fn) : () => EMPTY;
}

function invoke(fn: () => void): void {
fn();
}

0 comments on commit d682c57

Please sign in to comment.