Skip to content

Commit

Permalink
feat(tap): now supports subscribe, unsubscribe, and finalize handlers (
Browse files Browse the repository at this point in the history
…#6527)

* feat(tap): Adds subscribe, unsubscribe, finalize handlers

This adds a common request/task for RxJS users, which are three new handlers:

- `subscribe`: fires on subscription to the source
- `unsubscribe`: fires when the subscription to the result is unsubscribed from, but NOT if the source completes or errors
- `finalize`: always fires on finalization, (basically equivalent to `finalize`)

* chore: update api_guardian files

* chore: Remove old TODO comment
  • Loading branch information
benlesh committed Jul 28, 2021
1 parent 5f69795 commit eb26cbc
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 7 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Expand Up @@ -712,7 +712,7 @@ export declare function takeWhile<T, S extends T>(predicate: (value: T, index: n
export declare function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export declare function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

export declare function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction<T>;

Expand Down
2 changes: 1 addition & 1 deletion api_guard/dist/types/operators/index.d.ts
Expand Up @@ -283,7 +283,7 @@ export declare function takeWhile<T, S extends T>(predicate: (value: T, index: n
export declare function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export declare function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

export declare function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction<T>;

Expand Down
86 changes: 85 additions & 1 deletion spec/operators/tap-spec.ts
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { tap, mergeMap, take } from 'rxjs/operators';
import { Subject, of, throwError, Observer, EMPTY, Observable } from 'rxjs';
import { Subject, of, throwError, Observer, EMPTY, Observable, noop } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -310,4 +310,88 @@ describe('tap', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

describe('lifecycle handlers', () => {
it('should support an unsubscribe event that fires before finalize', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe();

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);

subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'unsubscribe', 'finalize']);
});

it('should not call unsubscribe if source completes', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe();

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);
subject.complete();
// should have no effect
subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'complete', 'finalize']);
});

it('should not call unsubscribe if source errors', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe({
error: noop,
});

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);
subject.error(new Error('bad'));
// should have no effect
subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'error: bad', 'finalize']);
});
});
});
26 changes: 22 additions & 4 deletions src/internal/operators/tap.ts
Expand Up @@ -4,7 +4,13 @@ import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { identity } from '../util/identity';

export function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export interface TapObserver<T> extends Observer<T> {
subscribe: () => void;
unsubscribe: () => void;
finalize: () => void;
}

export function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
/** @deprecated Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments */
export function tap<T>(
Expand Down Expand Up @@ -106,19 +112,23 @@ export function tap<T>(
* runs the specified Observer or callback(s) for each item.
*/
export function tap<T>(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
observerOrNext?: Partial<TapObserver<T>> | ((value: T) => void) | null,
error?: ((e: any) => void) | null,
complete?: (() => void) | null
): MonoTypeOperatorFunction<T> {
// We have to check to see not only if next is a function,
// but if error or complete were passed. This is because someone
// could technically call tap like `tap(null, fn)` or `tap(null, null, fn)`.
const tapObserver =
isFunction(observerOrNext) || error || complete ? { next: observerOrNext as (value: T) => void, error, complete } : observerOrNext;
isFunction(observerOrNext) || error || complete
? // tslint:disable-next-line: no-object-literal-type-assertion
({ next: observerOrNext as Exclude<typeof observerOrNext, Partial<TapObserver<T>>>, error, complete } as Partial<TapObserver<T>>)
: observerOrNext;

// TODO: Use `operate` function once this PR lands: https://github.com/ReactiveX/rxjs/pull/5742
return tapObserver
? operate((source, subscriber) => {
tapObserver.subscribe?.();
let isUnsub = true;
source.subscribe(
new OperatorSubscriber(
subscriber,
Expand All @@ -127,12 +137,20 @@ export function tap<T>(
subscriber.next(value);
},
() => {
isUnsub = false;
tapObserver.complete?.();
subscriber.complete();
},
(err) => {
isUnsub = false;
tapObserver.error?.(err);
subscriber.error(err);
},
() => {
if (isUnsub) {
tapObserver.unsubscribe?.();
}
tapObserver.finalize?.();
}
)
);
Expand Down

0 comments on commit eb26cbc

Please sign in to comment.