Skip to content

Commit

Permalink
refactor(SafeSubscriber): optimize perf for ordinary observers
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 8, 2022
1 parent 7214daa commit d017b3a
Showing 1 changed file with 40 additions and 41 deletions.
81 changes: 40 additions & 41 deletions src/internal/Subscriber.ts
Expand Up @@ -146,55 +146,54 @@ function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn {
return _bind.call(fn, thisArg);
}

class ConsumerObserver<T> implements Observer<T> {
constructor(private partialObserver: Partial<Observer<T>>) {}

next(value: T): void {
if (this.partialObserver.next) {
try {
this.partialObserver.next(value);
} catch (error) {
reportUnhandledError(error);
}
}
}

error(err: any): void {
if (this.partialObserver.error) {
try {
this.partialObserver.error(err);
} catch (error) {
reportUnhandledError(error);
}
} else {
reportUnhandledError(err);
}
}

complete(): void {
if (this.partialObserver.complete) {
try {
this.partialObserver.complete();
} catch (error) {
reportUnhandledError(error);
}
}
}
}

export class SafeSubscriber<T> extends Subscriber<T> {
constructor(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null) {
super();

let next: ((value: T) => void) | undefined;
let error: ((err: any) => void) | undefined;
let complete: (() => void) | undefined;
if (isFunction(observerOrNext)) {
// The first argument is a function, not an observer. The next
// two arguments *could* be observers, or they could be empty.
next = observerOrNext;
} else if (observerOrNext) {
// The first argument is an observer object, we have to pull the handlers
// off and capture the owner object as the context. That is because we're
// going to put them all in a new destination with ensured methods
// for `next`, `error`, and `complete`. That's part of what makes this
// the "Safe" Subscriber.
({ next, error, complete } = observerOrNext);
next = next && bind(next, observerOrNext);
error = error && bind(error, observerOrNext);
complete = complete && bind(complete, observerOrNext);
}
const partialObserver = !observerOrNext || isFunction(observerOrNext) ? { next: observerOrNext ?? undefined } : observerOrNext;

// Once we set the destination, the superclass `Subscriber` will
// do it's magic in the `_next`, `_error`, and `_complete` methods.
this.destination = {
next: next ? wrapForErrorHandling(next, this) : noop,
error: wrapForErrorHandling(error ?? defaultErrorHandler, this),
complete: complete ? wrapForErrorHandling(complete, this) : noop,
};
// Wrap the partial observer to ensure it's a full observer, and
// make sure proper error handling is accounted for.
this.destination = new ConsumerObserver(partialObserver);
}
}

/**
* Wraps a user-provided handler (or our {@link defaultErrorHandler} in one case) to
* ensure that any thrown errors are caught and handled appropriately.
*
* @param handler The handler to wrap
* @param instance The SafeSubscriber instance we're going to mark if there's an error.
*/
function wrapForErrorHandling(handler: (arg?: any) => void, instance: SafeSubscriber<any>) {
return (...args: any[]) => {
try {
handler(...args);
} catch (err) {
reportUnhandledError(err);
}
};
}
/**
* An error handler used when no error handler was supplied
* to the SafeSubscriber -- meaning no error handler was supplied
Expand Down

0 comments on commit d017b3a

Please sign in to comment.