From eb26cbc4488c9953cdde565b598b1dbdeeeee9ea Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 28 Jul 2021 14:58:10 -0500 Subject: [PATCH] feat(tap): now supports subscribe, unsubscribe, and finalize handlers (#6527) * feat(tap): Adds subscribe, unsubscribe, finalize handlers This adds a common request/task for RxJS users, which are three new handlers: - `subscribe`: fires on subscription to the source - `unsubscribe`: fires when the subscription to the result is unsubscribed from, but NOT if the source completes or errors - `finalize`: always fires on finalization, (basically equivalent to `finalize`) * chore: update api_guardian files * chore: Remove old TODO comment --- api_guard/dist/types/index.d.ts | 2 +- api_guard/dist/types/operators/index.d.ts | 2 +- spec/operators/tap-spec.ts | 86 ++++++++++++++++++++++- src/internal/operators/tap.ts | 26 +++++-- 4 files changed, 109 insertions(+), 7 deletions(-) diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 41afe32eb5..e48de9a7bb 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -712,7 +712,7 @@ export declare function takeWhile(predicate: (value: T, index: n export declare function takeWhile(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction; export declare function takeWhile(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction; -export declare function tap(observer?: Partial>): MonoTypeOperatorFunction; +export declare function tap(observer?: Partial>): MonoTypeOperatorFunction; export declare function tap(next: (value: T) => void): MonoTypeOperatorFunction; export declare function tap(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction; diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index b08bc94cee..b21a03562f 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -283,7 +283,7 @@ export declare function takeWhile(predicate: (value: T, index: n export declare function takeWhile(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction; export declare function takeWhile(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction; -export declare function tap(observer?: Partial>): MonoTypeOperatorFunction; +export declare function tap(observer?: Partial>): MonoTypeOperatorFunction; export declare function tap(next: (value: T) => void): MonoTypeOperatorFunction; export declare function tap(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction; diff --git a/spec/operators/tap-spec.ts b/spec/operators/tap-spec.ts index 21aef6ba11..f84b199217 100644 --- a/spec/operators/tap-spec.ts +++ b/spec/operators/tap-spec.ts @@ -1,7 +1,7 @@ /** @prettier */ import { expect } from 'chai'; import { tap, mergeMap, take } from 'rxjs/operators'; -import { Subject, of, throwError, Observer, EMPTY, Observable } from 'rxjs'; +import { Subject, of, throwError, Observer, EMPTY, Observable, noop } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -310,4 +310,88 @@ describe('tap', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + describe('lifecycle handlers', () => { + it('should support an unsubscribe event that fires before finalize', () => { + const results: any[] = []; + const subject = new Subject(); + + const subscription = subject + .pipe( + tap({ + subscribe: () => results.push('subscribe'), + next: (value) => results.push(`next ${value}`), + error: (err) => results.push(`error: ${err.message}`), + complete: () => results.push('complete'), + unsubscribe: () => results.push('unsubscribe'), + finalize: () => results.push('finalize'), + }) + ) + .subscribe(); + + subject.next(1); + subject.next(2); + expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']); + + subscription.unsubscribe(); + + expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'unsubscribe', 'finalize']); + }); + + it('should not call unsubscribe if source completes', () => { + const results: any[] = []; + const subject = new Subject(); + + const subscription = subject + .pipe( + tap({ + subscribe: () => results.push('subscribe'), + next: (value) => results.push(`next ${value}`), + error: (err) => results.push(`error: ${err.message}`), + complete: () => results.push('complete'), + unsubscribe: () => results.push('unsubscribe'), + finalize: () => results.push('finalize'), + }) + ) + .subscribe(); + + subject.next(1); + subject.next(2); + expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']); + subject.complete(); + // should have no effect + subscription.unsubscribe(); + + expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'complete', 'finalize']); + }); + + it('should not call unsubscribe if source errors', () => { + const results: any[] = []; + const subject = new Subject(); + + const subscription = subject + .pipe( + tap({ + subscribe: () => results.push('subscribe'), + next: (value) => results.push(`next ${value}`), + error: (err) => results.push(`error: ${err.message}`), + complete: () => results.push('complete'), + unsubscribe: () => results.push('unsubscribe'), + finalize: () => results.push('finalize'), + }) + ) + .subscribe({ + error: noop, + }); + + subject.next(1); + subject.next(2); + expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']); + subject.error(new Error('bad')); + // should have no effect + subscription.unsubscribe(); + + expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'error: bad', 'finalize']); + }); + }); }); diff --git a/src/internal/operators/tap.ts b/src/internal/operators/tap.ts index dba1531279..364d4ca3eb 100644 --- a/src/internal/operators/tap.ts +++ b/src/internal/operators/tap.ts @@ -4,7 +4,13 @@ import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; import { identity } from '../util/identity'; -export function tap(observer?: Partial>): MonoTypeOperatorFunction; +export interface TapObserver extends Observer { + subscribe: () => void; + unsubscribe: () => void; + finalize: () => void; +} + +export function tap(observer?: Partial>): MonoTypeOperatorFunction; export function tap(next: (value: T) => void): MonoTypeOperatorFunction; /** @deprecated Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments */ export function tap( @@ -106,7 +112,7 @@ export function tap( * runs the specified Observer or callback(s) for each item. */ export function tap( - observerOrNext?: Partial> | ((value: T) => void) | null, + observerOrNext?: Partial> | ((value: T) => void) | null, error?: ((e: any) => void) | null, complete?: (() => void) | null ): MonoTypeOperatorFunction { @@ -114,11 +120,15 @@ export function tap( // but if error or complete were passed. This is because someone // could technically call tap like `tap(null, fn)` or `tap(null, null, fn)`. const tapObserver = - isFunction(observerOrNext) || error || complete ? { next: observerOrNext as (value: T) => void, error, complete } : observerOrNext; + isFunction(observerOrNext) || error || complete + ? // tslint:disable-next-line: no-object-literal-type-assertion + ({ next: observerOrNext as Exclude>>, error, complete } as Partial>) + : observerOrNext; - // TODO: Use `operate` function once this PR lands: https://github.com/ReactiveX/rxjs/pull/5742 return tapObserver ? operate((source, subscriber) => { + tapObserver.subscribe?.(); + let isUnsub = true; source.subscribe( new OperatorSubscriber( subscriber, @@ -127,12 +137,20 @@ export function tap( subscriber.next(value); }, () => { + isUnsub = false; tapObserver.complete?.(); subscriber.complete(); }, (err) => { + isUnsub = false; tapObserver.error?.(err); subscriber.error(err); + }, + () => { + if (isUnsub) { + tapObserver.unsubscribe?.(); + } + tapObserver.finalize?.(); } ) );