diff --git a/src/internal/operators/OperatorSubscriber.ts b/src/internal/operators/OperatorSubscriber.ts index d664b2755b..2aac551f5d 100644 --- a/src/internal/operators/OperatorSubscriber.ts +++ b/src/internal/operators/OperatorSubscriber.ts @@ -38,13 +38,18 @@ export class OperatorSubscriber extends Subscriber { * this handler are sent to the `destination` error handler. * @param onFinalize Additional teardown logic here. This will only be called on teardown if the * subscriber itself is not already closed. This is called after all other teardown logic is executed. + * @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe. + * NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription + * to the resulting observable does not actually disconnect from the source if there are active subscriptions + * to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!) */ constructor( destination: Subscriber, onNext?: (value: T) => void, onComplete?: () => void, onError?: (err: any) => void, - private onFinalize?: () => void + private onFinalize?: () => void, + private shouldUnsubscribe?: () => boolean ) { // It's important - for performance reasons - that all of this class's // members are initialized and that they are always initialized in the same @@ -97,9 +102,11 @@ export class OperatorSubscriber extends Subscriber { } unsubscribe() { - const { closed } = this; - super.unsubscribe(); - // Execute additional teardown if we have any and we didn't already do so. - !closed && this.onFinalize?.(); + if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) { + const { closed } = this; + super.unsubscribe(); + // Execute additional teardown if we have any and we didn't already do so. + !closed && this.onFinalize?.(); + } } } diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index 1f85a7a111..87aba89dc1 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -165,6 +165,12 @@ export function groupBy( // next call from the source. const handleError = (err: any) => notify((consumer) => consumer.error(err)); + // The number of actively subscribed groups + let activeGroups = 0; + + // Whether or not teardown was attempted on this subscription. + let teardownAttempted = false; + // Capturing a reference to this, because we need a handle to it // in `createGroupedObservable` below. This is what we use to // subscribe to our source observable. This sometimes needs to be unsubscribed @@ -172,7 +178,7 @@ export function groupBy( // in cases where a user unsubscribes from the main resulting subscription, but // still has groups from this subscription subscribed and would expect values from it // Consider: `source.pipe(groupBy(fn), take(2))`. - const groupBySourceSubscriber = new GroupBySubscriber( + const groupBySourceSubscriber = new OperatorSubscriber( subscriber, (value: T) => { // Because we have to notify all groups of any errors that occur in here, @@ -234,7 +240,14 @@ export function groupBy( // When the source subscription is _finally_ torn down, release the subjects and keys // in our groups Map, they may be quite large and we don't want to keep them around if we // don't have to. - () => groups.clear() + () => groups.clear(), + () => { + teardownAttempted = true; + // We only kill our subscription to the source if we have + // no active groups. As stated above, consider this scenario: + // source$.pipe(groupBy(fn), take(2)). + return activeGroups === 0; + } ); // Subscribe to the source @@ -247,16 +260,14 @@ export function groupBy( */ function createGroupedObservable(key: K, groupSubject: SubjectLike) { const result: any = new Observable((groupSubscriber) => { - groupBySourceSubscriber.activeGroups++; + activeGroups++; const innerSub = groupSubject.subscribe(groupSubscriber); return () => { innerSub.unsubscribe(); // We can kill the subscription to our source if we now have no more // active groups subscribed, and a teardown was already attempted on // the source. - --groupBySourceSubscriber.activeGroups === 0 && - groupBySourceSubscriber.teardownAttempted && - groupBySourceSubscriber.unsubscribe(); + --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe(); }; }); result.key = key; @@ -265,30 +276,6 @@ export function groupBy( }); } -/** - * This was created because groupBy is a bit unique, in that emitted groups that have - * subscriptions have to keep the subscription to the source alive until they - * are torn down. - */ -class GroupBySubscriber extends OperatorSubscriber { - /** - * The number of actively subscribed groups - */ - activeGroups = 0; - /** - * Whether or not teardown was attempted on this subscription. - */ - teardownAttempted = false; - - unsubscribe() { - this.teardownAttempted = true; - // We only kill our subscription to the source if we have - // no active groups. As stated above, consider this scenario: - // source$.pipe(groupBy(fn), take(2)). - this.activeGroups === 0 && super.unsubscribe(); - } -} - /** * An observable of values that is the emitted by the result of a {@link groupBy} operator, * contains a `key` property for the grouping.