Skip to content

Commit

Permalink
fix: Ensure teardown happens before notification of complete or error
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Feb 29, 2024
1 parent 400e502 commit d8c4e84
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 132 deletions.
30 changes: 6 additions & 24 deletions packages/observable/src/observable.ts
Expand Up @@ -220,12 +220,6 @@ export interface SubscriberOverrides<T> {
* will be handled and passed to the destination's `error` method.
*/
complete?: () => void;
/**
* If provided, this function will be called after all teardown has occurred
* for this {@link Subscriber}. This is generally used for cleanup purposes
* during operator development.
*/
finalize?: () => void;
}

/**
Expand All @@ -248,8 +242,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
protected readonly _errorOverride: ((err: any) => void) | null = null;
/** @internal */
protected readonly _completeOverride: (() => void) | null = null;
/** @internal */
protected readonly _onFinalize: (() => void) | null = null;

/**
* @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead.
Expand Down Expand Up @@ -283,7 +275,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this._nextOverride = overrides?.next ?? null;
this._errorOverride = overrides?.error ?? null;
this._completeOverride = overrides?.complete ?? null;
this._onFinalize = overrides?.finalize ?? null;

// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -355,7 +346,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.closed) {
this.isStopped = true;
super.unsubscribe();
this._onFinalize?.();
}
}

Expand All @@ -364,19 +354,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}

protected _error(err: any): void {
try {
this.destination.error(err);
} finally {
this.unsubscribe();
}
this.unsubscribe();
this.destination.error(err);
}

protected _complete(): void {
try {
this.destination.complete();
} finally {
this.unsubscribe();
}
this.unsubscribe();
this.destination.complete();
}
}

Expand Down Expand Up @@ -428,22 +412,20 @@ function overrideNext<T>(this: Subscriber<T>, value: T): void {
}

function overrideError(this: Subscriber<unknown>, err: any): void {
this.unsubscribe();
try {
this._errorOverride!(err);
} catch (error) {
this.destination.error(error);
} finally {
this.unsubscribe();
}
}

function overrideComplete(this: Subscriber<unknown>): void {
this.unsubscribe();
try {
this._completeOverride!();
} catch (error) {
this.destination.error(error);
} finally {
this.unsubscribe();
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/rxjs/spec/observables/dom/webSocket-spec.ts
Expand Up @@ -669,7 +669,7 @@ describe('webSocket', () => {
});
socket.triggerClose({ wasClean: true });

expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']);
expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B unsub', 'B complete']);
});

it('should not close the socket until all subscriptions complete', () => {
Expand Down
6 changes: 3 additions & 3 deletions packages/rxjs/src/internal/observable/forkJoin.ts
Expand Up @@ -165,9 +165,9 @@ export function forkJoin(...args: any[]): Observable<any> {
}
values[sourceIndex] = value;
},
complete: () => remainingCompletions--,
finalize: () => {
if (!remainingCompletions || !hasValue) {
complete: () => {
remainingCompletions--;
if (remainingCompletions <= 0 || !hasValue) {
if (remainingEmissions === 0) {
destination.next(keys ? createObject(keys, values) : values);
destination.complete();
Expand Down
9 changes: 5 additions & 4 deletions packages/rxjs/src/internal/operators/bufferCount.ts
Expand Up @@ -63,6 +63,11 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
let buffers: T[][] = [];
let count = 0;

destination.add(() => {
// Clean up our memory when we finalize
buffers = null!;
});

source.subscribe(
operate({
destination,
Expand Down Expand Up @@ -108,10 +113,6 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
}
destination.complete();
},
finalize: () => {
// Clean up our memory when we finalize
buffers = null!;
},
})
);
});
Expand Down
6 changes: 4 additions & 2 deletions packages/rxjs/src/internal/operators/bufferTime.ts
Expand Up @@ -83,6 +83,10 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
// this is only really used for when *just* the buffer time span is passed.
let restartOnEmit = false;

destination.add(() => {
bufferRecords = null;
});

/**
* Does the work of emitting the buffer from the record, ensuring that the
* record is removed before the emission so reentrant code (from some custom scheduling, perhaps)
Expand Down Expand Up @@ -153,8 +157,6 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
destination.complete();
destination.unsubscribe();
},
// Clean up
finalize: () => (bufferRecords = null),
});

source.subscribe(bufferTimeSubscriber);
Expand Down
20 changes: 11 additions & 9 deletions packages/rxjs/src/internal/operators/bufferWhen.ts
@@ -1,4 +1,4 @@
import type { Subscriber} from '@rxjs/observable';
import type { Subscriber } from '@rxjs/observable';
import { operate, Observable, from } from '@rxjs/observable';
import type { ObservableInput, OperatorFunction } from '../types.js';
import { noop } from '../util/noop.js';
Expand Down Expand Up @@ -43,14 +43,18 @@ import { noop } from '../util/noop.js';
*/
export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
return (source) =>
new Observable((subscriber) => {
new Observable((destination) => {
// The buffer we keep and emit.
let buffer: T[] | null = null;
// A reference to the subscriber used to subscribe to
// the closing notifier. We need to hold this so we can
// end the subscription after the first notification.
let closingSubscriber: Subscriber<T> | null = null;

destination.add(() => {
buffer = closingSubscriber = null!;
});

// Ends the previous closing notifier subscription, so it
// terminates after the first emission, then emits
// the current buffer if there is one, starts a new buffer, and starts a
Expand All @@ -62,12 +66,12 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
// emit the buffer if we have one, and start a new buffer.
const b = buffer;
buffer = [];
b && subscriber.next(b);
b && destination.next(b);

// Get a new closing notifier and subscribe to it.
from(closingSelector()).subscribe(
(closingSubscriber = operate({
destination: subscriber,
destination,
next: openBuffer,
complete: noop,
}))
Expand All @@ -80,17 +84,15 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
// Subscribe to our source.
source.subscribe(
operate({
destination: subscriber,
destination,
// Add every new value to the current buffer.
next: (value) => buffer?.push(value),
// When we complete, emit the buffer if we have one,
// then complete the result.
complete: () => {
buffer && subscriber.next(buffer);
subscriber.complete();
buffer && destination.next(buffer);
destination.complete();
},
// Release memory on finalization
finalize: () => (buffer = closingSubscriber = null!),
})
);
});
Expand Down
10 changes: 5 additions & 5 deletions packages/rxjs/src/internal/operators/debounce.ts
@@ -1,4 +1,4 @@
import type { Subscriber} from '@rxjs/observable';
import type { Subscriber } from '@rxjs/observable';
import { operate, Observable, from } from '@rxjs/observable';
import type { MonoTypeOperatorFunction, ObservableInput } from '../types.js';
import { noop } from '../util/noop.js';
Expand Down Expand Up @@ -69,6 +69,10 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
// The subscriber/subscription for the current debounce, if there is one.
let durationSubscriber: Subscriber<any> | null = null;

destination.add(() => {
lastValue = durationSubscriber = null;
});

const emit = () => {
// Unsubscribe any current debounce subscription we have,
// we only cared about the first notification from it, and we
Expand Down Expand Up @@ -106,10 +110,6 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
emit();
destination.complete();
},
finalize: () => {
// Finalization.
lastValue = durationSubscriber = null;
},
})
);
});
Expand Down
8 changes: 4 additions & 4 deletions packages/rxjs/src/internal/operators/debounceTime.ts
Expand Up @@ -66,6 +66,10 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
let lastValue: T;
let activeTask: Subscription | void;

destination.add(() => {
lastValue = activeTask = null!;
});

source.subscribe(
operate({
destination,
Expand Down Expand Up @@ -94,10 +98,6 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
}
destination.complete();
},
finalize: () => {
// Finalization.
lastValue = activeTask = null!;
},
})
);
});
Expand Down
21 changes: 15 additions & 6 deletions packages/rxjs/src/internal/operators/groupBy.ts
Expand Up @@ -153,6 +153,11 @@ export function groupBy<T, K, R>(
// A lookup for the groups that we have so far.
const groups = new Map<K, SubjectLike<any>>();

destination.add(() => {
// Free up memory.
groups.clear();
});

// Used for notifying all groups and the subscriber in the same way.
const notify = (cb: (group: Observer<any>) => void) => {
groups.forEach(cb);
Expand Down Expand Up @@ -202,9 +207,18 @@ export function groupBy<T, K, R>(
// Our duration notified! We can complete the group.
// The group will be removed from the map in the finalization phase.
group!.complete();
groups.delete(key);
durationSubscriber?.unsubscribe();
},
error: (err) => {
group!.error(err);
groups.delete(key);
durationSubscriber?.unsubscribe();
},
complete: () => {
groups.delete(key);
durationSubscriber?.unsubscribe();
},
finalize: () => groups.delete(key),
});

// Start our duration notifier.
Expand All @@ -222,11 +236,6 @@ export function groupBy<T, K, R>(
error: handleError,
// Source completes.
complete: () => notify((consumer) => consumer.complete()),
// Free up memory.
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
// don't have to.
finalize: () => groups.clear(),
});

// Subscribe to the source
Expand Down
46 changes: 11 additions & 35 deletions packages/rxjs/src/internal/operators/mergeInternals.ts
Expand Up @@ -57,11 +57,6 @@ export function mergeInternals<T, R>(
// against our concurrency limit later.
active++;

// A flag used to show that the inner observable completed.
// This is checked during finalization to see if we should
// move to the next item in the buffer, if there is on.
let innerComplete = false;

// Start our inner subscription.
from(project(value, index++)).subscribe(
operate({
Expand All @@ -81,37 +76,18 @@ export function mergeInternals<T, R>(
}
},
complete: () => {
// Flag that we have completed, so we know to check the buffer
// during finalization.
innerComplete = true;
},
finalize: () => {
// During finalization, if the inner completed (it wasn't errored or
// cancelled), then we want to try the next item in the buffer if
// there is one.
if (innerComplete) {
// We have to wrap this in a try/catch because it happens during
// finalization, possibly asynchronously, and we want to pass
// any errors that happen (like in a projection function) to
// the outer Subscriber.
try {
// INNER SOURCE COMPLETE
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
} catch (err) {
destination.error(err);
}
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
},
})
);
Expand Down
10 changes: 6 additions & 4 deletions packages/rxjs/src/internal/operators/takeLast.ts
Expand Up @@ -50,6 +50,12 @@ export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
let ring = new Array<T>(count);
// This counter is how we track where we are at in the ring buffer.
let counter = 0;

destination.add(() => {
// During finalization release the values in our buffer.
ring = null!;
});

source.subscribe(
operate({
destination,
Expand All @@ -73,10 +79,6 @@ export function takeLast<T>(count: number): MonoTypeOperatorFunction<T> {
// All done. This will also trigger clean up.
destination.complete();
},
finalize: () => {
// During finalization release the values in our buffer.
ring = null!;
},
})
);
});
Expand Down

0 comments on commit d8c4e84

Please sign in to comment.