diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 70b6411b43..0eeb52becc 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -1,3 +1,6 @@ +/** @prettier */ +import { Subscriber } from '../Subscriber'; +import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; @@ -55,13 +58,38 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * notifications as the source Observable, but with provided scheduler. */ export function observeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { + function dispatch(sub: Subscriber, act: () => void) { + const context: { subscription: Subscription | null; sync: boolean } = { sync: false, subscription: null }; + context.subscription = scheduler.schedule( + (state) => { + act(); + const { subscription } = state; + state.subscription = null; + + if (subscription) { + subscription.unsubscribe(); + sub.remove(subscription); + } else { + state.sync = true; + } + }, + delay, + context + ); + + if (!context.sync) { + sub.add(context.subscription); + } else { + context.subscription.unsubscribe(); + } + } return operate((source, subscriber) => { source.subscribe( new OperatorSubscriber( subscriber, - (value) => subscriber.add(scheduler.schedule(() => subscriber.next(value), delay)), - () => subscriber.add(scheduler.schedule(() => subscriber.complete(), delay)), - (err) => subscriber.add(scheduler.schedule(() => subscriber.error(err), delay)) + (value) => dispatch(subscriber, () => subscriber.next(value)), + () => dispatch(subscriber, () => subscriber.complete()), + (err) => dispatch(subscriber, () => subscriber.error(err)) ) ); }); diff --git a/src/internal/types.ts b/src/internal/types.ts index 62516dd341..690422b134 100644 --- a/src/internal/types.ts +++ b/src/internal/types.ts @@ -180,6 +180,7 @@ export interface SubjectLike extends Observer, Subscribable {} /** SCHEDULER INTERFACES */ export interface SchedulerLike extends TimestampProvider { + schedule(work: (this: SchedulerAction, state: T) => void, delay: number, state: T): Subscription; schedule(work: (this: SchedulerAction, state?: T) => void, delay?: number, state?: T): Subscription; }