Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tap): Adds subscribe, unsubscribe, finalize handlers #6527

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Expand Up @@ -708,7 +708,7 @@ export declare function takeWhile<T, S extends T>(predicate: (value: T, index: n
export declare function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export declare function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

export declare function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction<T>;

Expand Down
2 changes: 1 addition & 1 deletion api_guard/dist/types/operators/index.d.ts
Expand Up @@ -283,7 +283,7 @@ export declare function takeWhile<T, S extends T>(predicate: (value: T, index: n
export declare function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export declare function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

export declare function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction<T>;

Expand Down
86 changes: 85 additions & 1 deletion spec/operators/tap-spec.ts
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { tap, mergeMap, take } from 'rxjs/operators';
import { Subject, of, throwError, Observer, EMPTY, Observable } from 'rxjs';
import { Subject, of, throwError, Observer, EMPTY, Observable, noop } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -310,4 +310,88 @@ describe('tap', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

describe('lifecycle handlers', () => {
it('should support an unsubscribe event that fires before finalize', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe();

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);

subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'unsubscribe', 'finalize']);
});

it('should not call unsubscribe if source completes', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe();

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);
subject.complete();
// should have no effect
subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'complete', 'finalize']);
});

it('should not call unsubscribe if source errors', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe({
error: noop,
});

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);
subject.error(new Error('bad'));
// should have no effect
subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'error: bad', 'finalize']);
});
});
});
25 changes: 22 additions & 3 deletions src/internal/operators/tap.ts
Expand Up @@ -4,7 +4,13 @@ import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { identity } from '../util/identity';

export function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export interface TapObserver<T> extends Observer<T> {
subscribe: () => void;
unsubscribe: () => void;
finalize: () => void;
}

export function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
/** @deprecated Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments */
export function tap<T>(
Expand Down Expand Up @@ -106,19 +112,24 @@ export function tap<T>(
* runs the specified Observer or callback(s) for each item.
*/
export function tap<T>(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
observerOrNext?: Partial<TapObserver<T>> | ((value: T) => void) | null,
error?: ((e: any) => void) | null,
complete?: (() => void) | null
): MonoTypeOperatorFunction<T> {
// We have to check to see not only if next is a function,
// but if error or complete were passed. This is because someone
// could technically call tap like `tap(null, fn)` or `tap(null, null, fn)`.
const tapObserver =
isFunction(observerOrNext) || error || complete ? { next: observerOrNext as (value: T) => void, error, complete } : observerOrNext;
isFunction(observerOrNext) || error || complete
? // tslint:disable-next-line: no-object-literal-type-assertion
({ next: observerOrNext as Exclude<typeof observerOrNext, Partial<TapObserver<T>>>, error, complete } as Partial<TapObserver<T>>)
: observerOrNext;

// TODO: Use `operate` function once this PR lands: https://github.com/ReactiveX/rxjs/pull/5742
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think this comment can be removed

return tapObserver
? operate((source, subscriber) => {
tapObserver.subscribe?.();
let isUnsub = true;
source.subscribe(
new OperatorSubscriber(
subscriber,
Expand All @@ -127,12 +138,20 @@ export function tap<T>(
subscriber.next(value);
},
() => {
isUnsub = false;
tapObserver.complete?.();
subscriber.complete();
},
(err) => {
isUnsub = false;
tapObserver.error?.(err);
subscriber.error(err);
},
() => {
if (isUnsub) {
tapObserver.unsubscribe?.();
}
tapObserver.finalize?.();
}
)
);
Expand Down