From d8c4e84edb6ce6dfb3d1b31e919d5a31fb7518b3 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 29 Feb 2024 10:31:40 -0600 Subject: [PATCH 1/3] fix: Ensure teardown happens before notification of complete or error Resolves #7443 --- packages/observable/src/observable.ts | 30 +++--------- .../spec/observables/dom/webSocket-spec.ts | 2 +- .../rxjs/src/internal/observable/forkJoin.ts | 6 +-- .../src/internal/operators/bufferCount.ts | 9 ++-- .../rxjs/src/internal/operators/bufferTime.ts | 6 ++- .../rxjs/src/internal/operators/bufferWhen.ts | 20 ++++---- .../rxjs/src/internal/operators/debounce.ts | 10 ++-- .../src/internal/operators/debounceTime.ts | 8 ++-- .../rxjs/src/internal/operators/groupBy.ts | 21 ++++++--- .../src/internal/operators/mergeInternals.ts | 46 +++++-------------- .../rxjs/src/internal/operators/takeLast.ts | 10 ++-- packages/rxjs/src/internal/operators/tap.ts | 14 +++--- .../rxjs/src/internal/operators/timeout.ts | 16 ++++--- .../src/internal/operators/windowCount.ts | 9 ++-- .../src/internal/operators/windowToggle.ts | 21 +++++---- .../rxjs/src/internal/operators/windowWhen.ts | 15 +++--- 16 files changed, 111 insertions(+), 132 deletions(-) diff --git a/packages/observable/src/observable.ts b/packages/observable/src/observable.ts index 31f8403d15..d0548d0429 100644 --- a/packages/observable/src/observable.ts +++ b/packages/observable/src/observable.ts @@ -220,12 +220,6 @@ export interface SubscriberOverrides { * will be handled and passed to the destination's `error` method. */ complete?: () => void; - /** - * If provided, this function will be called after all teardown has occurred - * for this {@link Subscriber}. This is generally used for cleanup purposes - * during operator development. - */ - finalize?: () => void; } /** @@ -248,8 +242,6 @@ export class Subscriber extends Subscription implements Observer { protected readonly _errorOverride: ((err: any) => void) | null = null; /** @internal */ protected readonly _completeOverride: (() => void) | null = null; - /** @internal */ - protected readonly _onFinalize: (() => void) | null = null; /** * @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead. @@ -283,7 +275,6 @@ export class Subscriber extends Subscription implements Observer { this._nextOverride = overrides?.next ?? null; this._errorOverride = overrides?.error ?? null; this._completeOverride = overrides?.complete ?? null; - this._onFinalize = overrides?.finalize ?? null; // It's important - for performance reasons - that all of this class's // members are initialized and that they are always initialized in the same @@ -355,7 +346,6 @@ export class Subscriber extends Subscription implements Observer { if (!this.closed) { this.isStopped = true; super.unsubscribe(); - this._onFinalize?.(); } } @@ -364,19 +354,13 @@ export class Subscriber extends Subscription implements Observer { } protected _error(err: any): void { - try { - this.destination.error(err); - } finally { - this.unsubscribe(); - } + this.unsubscribe(); + this.destination.error(err); } protected _complete(): void { - try { - this.destination.complete(); - } finally { - this.unsubscribe(); - } + this.unsubscribe(); + this.destination.complete(); } } @@ -428,22 +412,20 @@ function overrideNext(this: Subscriber, value: T): void { } function overrideError(this: Subscriber, err: any): void { + this.unsubscribe(); try { this._errorOverride!(err); } catch (error) { this.destination.error(error); - } finally { - this.unsubscribe(); } } function overrideComplete(this: Subscriber): void { + this.unsubscribe(); try { this._completeOverride!(); } catch (error) { this.destination.error(error); - } finally { - this.unsubscribe(); } } diff --git a/packages/rxjs/spec/observables/dom/webSocket-spec.ts b/packages/rxjs/spec/observables/dom/webSocket-spec.ts index 2023b6df7d..5f6f0cfcdc 100644 --- a/packages/rxjs/spec/observables/dom/webSocket-spec.ts +++ b/packages/rxjs/spec/observables/dom/webSocket-spec.ts @@ -669,7 +669,7 @@ describe('webSocket', () => { }); socket.triggerClose({ wasClean: true }); - expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']); + expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B unsub', 'B complete']); }); it('should not close the socket until all subscriptions complete', () => { diff --git a/packages/rxjs/src/internal/observable/forkJoin.ts b/packages/rxjs/src/internal/observable/forkJoin.ts index adfab7bfe0..c3157335ac 100644 --- a/packages/rxjs/src/internal/observable/forkJoin.ts +++ b/packages/rxjs/src/internal/observable/forkJoin.ts @@ -165,9 +165,9 @@ export function forkJoin(...args: any[]): Observable { } values[sourceIndex] = value; }, - complete: () => remainingCompletions--, - finalize: () => { - if (!remainingCompletions || !hasValue) { + complete: () => { + remainingCompletions--; + if (remainingCompletions <= 0 || !hasValue) { if (remainingEmissions === 0) { destination.next(keys ? createObject(keys, values) : values); destination.complete(); diff --git a/packages/rxjs/src/internal/operators/bufferCount.ts b/packages/rxjs/src/internal/operators/bufferCount.ts index 53f5ba2c1e..315ec162c8 100644 --- a/packages/rxjs/src/internal/operators/bufferCount.ts +++ b/packages/rxjs/src/internal/operators/bufferCount.ts @@ -63,6 +63,11 @@ export function bufferCount(bufferSize: number, startBufferEvery: number | nu let buffers: T[][] = []; let count = 0; + destination.add(() => { + // Clean up our memory when we finalize + buffers = null!; + }); + source.subscribe( operate({ destination, @@ -108,10 +113,6 @@ export function bufferCount(bufferSize: number, startBufferEvery: number | nu } destination.complete(); }, - finalize: () => { - // Clean up our memory when we finalize - buffers = null!; - }, }) ); }); diff --git a/packages/rxjs/src/internal/operators/bufferTime.ts b/packages/rxjs/src/internal/operators/bufferTime.ts index 2476aff693..3b4c58dbde 100644 --- a/packages/rxjs/src/internal/operators/bufferTime.ts +++ b/packages/rxjs/src/internal/operators/bufferTime.ts @@ -83,6 +83,10 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper // this is only really used for when *just* the buffer time span is passed. let restartOnEmit = false; + destination.add(() => { + bufferRecords = null; + }); + /** * Does the work of emitting the buffer from the record, ensuring that the * record is removed before the emission so reentrant code (from some custom scheduling, perhaps) @@ -153,8 +157,6 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper destination.complete(); destination.unsubscribe(); }, - // Clean up - finalize: () => (bufferRecords = null), }); source.subscribe(bufferTimeSubscriber); diff --git a/packages/rxjs/src/internal/operators/bufferWhen.ts b/packages/rxjs/src/internal/operators/bufferWhen.ts index f2b53b7f9a..97927b7210 100644 --- a/packages/rxjs/src/internal/operators/bufferWhen.ts +++ b/packages/rxjs/src/internal/operators/bufferWhen.ts @@ -1,4 +1,4 @@ -import type { Subscriber} from '@rxjs/observable'; +import type { Subscriber } from '@rxjs/observable'; import { operate, Observable, from } from '@rxjs/observable'; import type { ObservableInput, OperatorFunction } from '../types.js'; import { noop } from '../util/noop.js'; @@ -43,7 +43,7 @@ import { noop } from '../util/noop.js'; */ export function bufferWhen(closingSelector: () => ObservableInput): OperatorFunction { return (source) => - new Observable((subscriber) => { + new Observable((destination) => { // The buffer we keep and emit. let buffer: T[] | null = null; // A reference to the subscriber used to subscribe to @@ -51,6 +51,10 @@ export function bufferWhen(closingSelector: () => ObservableInput): Oper // end the subscription after the first notification. let closingSubscriber: Subscriber | null = null; + destination.add(() => { + buffer = closingSubscriber = null!; + }); + // Ends the previous closing notifier subscription, so it // terminates after the first emission, then emits // the current buffer if there is one, starts a new buffer, and starts a @@ -62,12 +66,12 @@ export function bufferWhen(closingSelector: () => ObservableInput): Oper // emit the buffer if we have one, and start a new buffer. const b = buffer; buffer = []; - b && subscriber.next(b); + b && destination.next(b); // Get a new closing notifier and subscribe to it. from(closingSelector()).subscribe( (closingSubscriber = operate({ - destination: subscriber, + destination, next: openBuffer, complete: noop, })) @@ -80,17 +84,15 @@ export function bufferWhen(closingSelector: () => ObservableInput): Oper // Subscribe to our source. source.subscribe( operate({ - destination: subscriber, + destination, // Add every new value to the current buffer. next: (value) => buffer?.push(value), // When we complete, emit the buffer if we have one, // then complete the result. complete: () => { - buffer && subscriber.next(buffer); - subscriber.complete(); + buffer && destination.next(buffer); + destination.complete(); }, - // Release memory on finalization - finalize: () => (buffer = closingSubscriber = null!), }) ); }); diff --git a/packages/rxjs/src/internal/operators/debounce.ts b/packages/rxjs/src/internal/operators/debounce.ts index 7f5c4659a9..1a1d0e82a6 100644 --- a/packages/rxjs/src/internal/operators/debounce.ts +++ b/packages/rxjs/src/internal/operators/debounce.ts @@ -1,4 +1,4 @@ -import type { Subscriber} from '@rxjs/observable'; +import type { Subscriber } from '@rxjs/observable'; import { operate, Observable, from } from '@rxjs/observable'; import type { MonoTypeOperatorFunction, ObservableInput } from '../types.js'; import { noop } from '../util/noop.js'; @@ -69,6 +69,10 @@ export function debounce(durationSelector: (value: T) => ObservableInput // The subscriber/subscription for the current debounce, if there is one. let durationSubscriber: Subscriber | null = null; + destination.add(() => { + lastValue = durationSubscriber = null; + }); + const emit = () => { // Unsubscribe any current debounce subscription we have, // we only cared about the first notification from it, and we @@ -106,10 +110,6 @@ export function debounce(durationSelector: (value: T) => ObservableInput emit(); destination.complete(); }, - finalize: () => { - // Finalization. - lastValue = durationSubscriber = null; - }, }) ); }); diff --git a/packages/rxjs/src/internal/operators/debounceTime.ts b/packages/rxjs/src/internal/operators/debounceTime.ts index a4b0551459..5ee90592ba 100644 --- a/packages/rxjs/src/internal/operators/debounceTime.ts +++ b/packages/rxjs/src/internal/operators/debounceTime.ts @@ -66,6 +66,10 @@ export function debounceTime(dueTime: number, scheduler: SchedulerLike = asyn let lastValue: T; let activeTask: Subscription | void; + destination.add(() => { + lastValue = activeTask = null!; + }); + source.subscribe( operate({ destination, @@ -94,10 +98,6 @@ export function debounceTime(dueTime: number, scheduler: SchedulerLike = asyn } destination.complete(); }, - finalize: () => { - // Finalization. - lastValue = activeTask = null!; - }, }) ); }); diff --git a/packages/rxjs/src/internal/operators/groupBy.ts b/packages/rxjs/src/internal/operators/groupBy.ts index 725fa05aa0..49225ac8ff 100644 --- a/packages/rxjs/src/internal/operators/groupBy.ts +++ b/packages/rxjs/src/internal/operators/groupBy.ts @@ -153,6 +153,11 @@ export function groupBy( // A lookup for the groups that we have so far. const groups = new Map>(); + destination.add(() => { + // Free up memory. + groups.clear(); + }); + // Used for notifying all groups and the subscriber in the same way. const notify = (cb: (group: Observer) => void) => { groups.forEach(cb); @@ -202,9 +207,18 @@ export function groupBy( // Our duration notified! We can complete the group. // The group will be removed from the map in the finalization phase. group!.complete(); + groups.delete(key); + durationSubscriber?.unsubscribe(); + }, + error: (err) => { + group!.error(err); + groups.delete(key); + durationSubscriber?.unsubscribe(); + }, + complete: () => { + groups.delete(key); durationSubscriber?.unsubscribe(); }, - finalize: () => groups.delete(key), }); // Start our duration notifier. @@ -222,11 +236,6 @@ export function groupBy( error: handleError, // Source completes. complete: () => notify((consumer) => consumer.complete()), - // Free up memory. - // When the source subscription is _finally_ torn down, release the subjects and keys - // in our groups Map, they may be quite large and we don't want to keep them around if we - // don't have to. - finalize: () => groups.clear(), }); // Subscribe to the source diff --git a/packages/rxjs/src/internal/operators/mergeInternals.ts b/packages/rxjs/src/internal/operators/mergeInternals.ts index 336cfd5d24..2c74c064e9 100644 --- a/packages/rxjs/src/internal/operators/mergeInternals.ts +++ b/packages/rxjs/src/internal/operators/mergeInternals.ts @@ -57,11 +57,6 @@ export function mergeInternals( // against our concurrency limit later. active++; - // A flag used to show that the inner observable completed. - // This is checked during finalization to see if we should - // move to the next item in the buffer, if there is on. - let innerComplete = false; - // Start our inner subscription. from(project(value, index++)).subscribe( operate({ @@ -81,37 +76,18 @@ export function mergeInternals( } }, complete: () => { - // Flag that we have completed, so we know to check the buffer - // during finalization. - innerComplete = true; - }, - finalize: () => { - // During finalization, if the inner completed (it wasn't errored or - // cancelled), then we want to try the next item in the buffer if - // there is one. - if (innerComplete) { - // We have to wrap this in a try/catch because it happens during - // finalization, possibly asynchronously, and we want to pass - // any errors that happen (like in a projection function) to - // the outer Subscriber. - try { - // INNER SOURCE COMPLETE - // Decrement the active count to ensure that the next time - // we try to call `doInnerSub`, the number is accurate. - active--; - // If we have more values in the buffer, try to process those - // Note that this call will increment `active` ahead of the - // next conditional, if there were any more inner subscriptions - // to start. - while (buffer.length && active < concurrent) { - doInnerSub(buffer.shift()!); - } - // Check to see if we can complete, and complete if so. - checkComplete(); - } catch (err) { - destination.error(err); - } + // Decrement the active count to ensure that the next time + // we try to call `doInnerSub`, the number is accurate. + active--; + // If we have more values in the buffer, try to process those + // Note that this call will increment `active` ahead of the + // next conditional, if there were any more inner subscriptions + // to start. + while (buffer.length && active < concurrent) { + doInnerSub(buffer.shift()!); } + // Check to see if we can complete, and complete if so. + checkComplete(); }, }) ); diff --git a/packages/rxjs/src/internal/operators/takeLast.ts b/packages/rxjs/src/internal/operators/takeLast.ts index 75f7cf656e..7c3e396a7b 100644 --- a/packages/rxjs/src/internal/operators/takeLast.ts +++ b/packages/rxjs/src/internal/operators/takeLast.ts @@ -50,6 +50,12 @@ export function takeLast(count: number): MonoTypeOperatorFunction { let ring = new Array(count); // This counter is how we track where we are at in the ring buffer. let counter = 0; + + destination.add(() => { + // During finalization release the values in our buffer. + ring = null!; + }); + source.subscribe( operate({ destination, @@ -73,10 +79,6 @@ export function takeLast(count: number): MonoTypeOperatorFunction { // All done. This will also trigger clean up. destination.complete(); }, - finalize: () => { - // During finalization release the values in our buffer. - ring = null!; - }, }) ); }); diff --git a/packages/rxjs/src/internal/operators/tap.ts b/packages/rxjs/src/internal/operators/tap.ts index dfe43c5306..d0714dbf84 100644 --- a/packages/rxjs/src/internal/operators/tap.ts +++ b/packages/rxjs/src/internal/operators/tap.ts @@ -162,6 +162,14 @@ export function tap(observerOrNext?: Partial> | ((value: T) => new Observable((destination) => { tapObserver.subscribe?.(); let isUnsub = true; + + destination.add(() => { + if (isUnsub) { + tapObserver.unsubscribe?.(); + } + tapObserver.finalize?.(); + }); + source.subscribe( operate({ destination, @@ -179,12 +187,6 @@ export function tap(observerOrNext?: Partial> | ((value: T) => tapObserver.complete?.(); destination.complete(); }, - finalize: () => { - if (isUnsub) { - tapObserver.unsubscribe?.(); - } - tapObserver.finalize?.(); - }, }) ); }) diff --git a/packages/rxjs/src/internal/operators/timeout.ts b/packages/rxjs/src/internal/operators/timeout.ts index 85b6d10b0e..ea51d39525 100644 --- a/packages/rxjs/src/internal/operators/timeout.ts +++ b/packages/rxjs/src/internal/operators/timeout.ts @@ -1,7 +1,7 @@ import { asyncScheduler } from '../scheduler/async.js'; import type { MonoTypeOperatorFunction, SchedulerLike, OperatorFunction, ObservableInput, ObservedValueOf } from '../types.js'; import { isValidDate } from '../util/isDate.js'; -import type { Subscription} from '@rxjs/observable'; +import type { Subscription } from '@rxjs/observable'; import { Observable, from, operate } from '@rxjs/observable'; import { executeSchedule } from '../util/executeSchedule.js'; @@ -318,6 +318,14 @@ export function timeout, M>( // A bit of state we pass to the with and error factories to // tell how many values we have seen so far. let seen = 0; + + destination.add(() => { + timerSubscription?.unsubscribe(); + // Be sure not to hold the last value in memory after unsubscription + // it could be quite large. + lastValue = null; + }); + const startTimer = (delay: number) => { timerSubscription = executeSchedule( destination, @@ -352,12 +360,6 @@ export function timeout, M>( // null | undefined are both < 0. Thanks, JavaScript. each! > 0 && startTimer(each!); }, - finalize: () => { - timerSubscription?.unsubscribe(); - // Be sure not to hold the last value in memory after unsubscription - // it could be quite large. - lastValue = null; - }, }) ); diff --git a/packages/rxjs/src/internal/operators/windowCount.ts b/packages/rxjs/src/internal/operators/windowCount.ts index 497b78e007..4a94fe6de6 100644 --- a/packages/rxjs/src/internal/operators/windowCount.ts +++ b/packages/rxjs/src/internal/operators/windowCount.ts @@ -69,9 +69,12 @@ export function windowCount(windowSize: number, startWindowEvery: number = 0) return (source) => new Observable((destination) => { let windows = [new Subject()]; - let starts: number[] = []; let count = 0; + destination.add(() => { + windows = null!; + }); + // Open the first window. destination.next(windows[0].asObservable()); @@ -118,10 +121,6 @@ export function windowCount(windowSize: number, startWindowEvery: number = 0) } destination.complete(); }, - finalize: () => { - starts = null!; - windows = null!; - }, }) ); }); diff --git a/packages/rxjs/src/internal/operators/windowToggle.ts b/packages/rxjs/src/internal/operators/windowToggle.ts index a1ae20bc0f..3c87e16da9 100644 --- a/packages/rxjs/src/internal/operators/windowToggle.ts +++ b/packages/rxjs/src/internal/operators/windowToggle.ts @@ -57,6 +57,17 @@ export function windowToggle( new Observable((destination) => { const windows: Subject[] = []; + destination.add(() => { + // Add this finalization so that all window subjects are + // disposed of. This way, if a user tries to subscribe + // to a window *after* the outer subscription has been unsubscribed, + // they will get an error, instead of waiting forever to + // see if a value arrives. + while (0 < windows.length) { + windows.shift()!.unsubscribe(); + } + }); + const handleError = (err: any) => { while (0 < windows.length) { windows.shift()!.error(err); @@ -125,16 +136,6 @@ export function windowToggle( } destination.complete(); }, - finalize: () => { - // Add this finalization so that all window subjects are - // disposed of. This way, if a user tries to subscribe - // to a window *after* the outer subscription has been unsubscribed, - // they will get an error, instead of waiting forever to - // see if a value arrives. - while (0 < windows.length) { - windows.shift()!.unsubscribe(); - } - }, }) ); }); diff --git a/packages/rxjs/src/internal/operators/windowWhen.ts b/packages/rxjs/src/internal/operators/windowWhen.ts index 89d5ce1296..019b32d610 100644 --- a/packages/rxjs/src/internal/operators/windowWhen.ts +++ b/packages/rxjs/src/internal/operators/windowWhen.ts @@ -1,4 +1,4 @@ -import type { Subscriber} from '@rxjs/observable'; +import type { Subscriber } from '@rxjs/observable'; import { operate, Observable, from } from '@rxjs/observable'; import { Subject } from '../Subject.js'; import type { ObservableInput, OperatorFunction } from '../types.js'; @@ -57,6 +57,13 @@ export function windowWhen(closingSelector: () => ObservableInput): Oper let window: Subject | null; let closingSubscriber: Subscriber | undefined; + destination.add(() => { + // Be sure to clean up our closing subscription + // when this tears down. + closingSubscriber?.unsubscribe(); + window = null!; + }); + /** * When we get an error, we have to notify both the * destination subscriber and the window. @@ -121,12 +128,6 @@ export function windowWhen(closingSelector: () => ObservableInput): Oper window!.complete(); destination.complete(); }, - finalize: () => { - // Be sure to clean up our closing subscription - // when this tears down. - closingSubscriber?.unsubscribe(); - window = null!; - }, }) ); }); From 63fff0fc45cbd17ab10ee13a1d32de39ef3172f5 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 29 Feb 2024 10:34:21 -0600 Subject: [PATCH 2/3] refactor(rxjs): simplify bufferTime --- .../rxjs/src/internal/operators/bufferTime.ts | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/packages/rxjs/src/internal/operators/bufferTime.ts b/packages/rxjs/src/internal/operators/bufferTime.ts index 3b4c58dbde..6070cf8069 100644 --- a/packages/rxjs/src/internal/operators/bufferTime.ts +++ b/packages/rxjs/src/internal/operators/bufferTime.ts @@ -131,34 +131,33 @@ export function bufferTime(bufferTimeSpan: number, ...otherArgs: any[]): Oper startBuffer(); - const bufferTimeSubscriber = operate({ - destination, - next: (value: T) => { - // Copy the records, so if we need to remove one we - // don't mutate the array. It's hard, but not impossible to - // set up a buffer time that could mutate the array and - // cause issues here. - const recordsCopy = bufferRecords!.slice(); - for (const record of recordsCopy) { - // Loop over all buffers and - const { buffer } = record; - buffer.push(value); - // If the buffer is over the max size, we need to emit it. - maxBufferSize <= buffer.length && emit(record); - } - }, - complete: () => { - // The source completed, emit all of the active - // buffers we have before we complete. - while (bufferRecords?.length) { - destination.next(bufferRecords.shift()!.buffer); - } - bufferTimeSubscriber?.unsubscribe(); - destination.complete(); - destination.unsubscribe(); - }, - }); - - source.subscribe(bufferTimeSubscriber); + source.subscribe( + operate({ + destination, + next: (value: T) => { + // Copy the records, so if we need to remove one we + // don't mutate the array. It's hard, but not impossible to + // set up a buffer time that could mutate the array and + // cause issues here. + const recordsCopy = bufferRecords!.slice(); + for (const record of recordsCopy) { + // Loop over all buffers and + const { buffer } = record; + buffer.push(value); + // If the buffer is over the max size, we need to emit it. + maxBufferSize <= buffer.length && emit(record); + } + }, + complete: () => { + // The source completed, emit all of the active + // buffers we have before we complete. + while (bufferRecords?.length) { + destination.next(bufferRecords.shift()!.buffer); + } + destination.complete(); + destination.unsubscribe(); + }, + }) + ); }); } From 9a05f172ac14e1194fd46b87f6561e9adc6302ba Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 29 Feb 2024 10:36:12 -0600 Subject: [PATCH 3/3] fix(rxjs): WebSocketSubject multiplex no longer tries to send unsubscribe messages to closed sockets --- .../rxjs/spec/observables/dom/webSocket-spec.ts | 2 +- .../internal/observable/dom/WebSocketSubject.ts | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/packages/rxjs/spec/observables/dom/webSocket-spec.ts b/packages/rxjs/spec/observables/dom/webSocket-spec.ts index 5f6f0cfcdc..2c52134a47 100644 --- a/packages/rxjs/spec/observables/dom/webSocket-spec.ts +++ b/packages/rxjs/spec/observables/dom/webSocket-spec.ts @@ -669,7 +669,7 @@ describe('webSocket', () => { }); socket.triggerClose({ wasClean: true }); - expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B unsub', 'B complete']); + expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete']); }); it('should not close the socket until all subscriptions complete', () => { diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 943e22a51e..a3f93888ee 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,4 +1,4 @@ -import type { Subscriber, Subscription} from '@rxjs/observable'; +import type { Subscriber, Subscription } from '@rxjs/observable'; import { Observable, operate } from '@rxjs/observable'; import type { NextObserver } from '../../types.js'; @@ -219,8 +219,11 @@ export class WebSocketSubject extends Observable { multiplex(subMsg: () => In, unsubMsg: () => In, messageFilter: (value: Out) => boolean) { return new Observable((destination) => { this.next(subMsg()); + let isUnsub = true; destination.add(() => { - this.next(unsubMsg()); + if (isUnsub) { + this.next(unsubMsg()); + } }); this.subscribe( operate({ @@ -230,6 +233,14 @@ export class WebSocketSubject extends Observable { destination.next(x); } }, + error: (err) => { + isUnsub = false; + destination.error(err); + }, + complete: () => { + isUnsub = false; + destination.complete(); + }, }) ); });