From 14d80910112d620a2f12bd2140b42ce9db96a170 Mon Sep 17 00:00:00 2001 From: Jeremy Wells Date: Tue, 25 Oct 2022 16:19:42 -0400 Subject: [PATCH] feat(distinct): distinct's flush supports ObservableInput --- api_guard/dist/types/index.d.ts | 2 +- api_guard/dist/types/operators/index.d.ts | 2 +- spec-dtslint/operators/distinct-spec.ts | 73 ++++++++++++++++++++++- src/internal/operators/distinct.ts | 12 ++-- 4 files changed, 80 insertions(+), 9 deletions(-) diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 387e385a6bd..563c72c5de3 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -160,7 +160,7 @@ export declare function delayWhen(delayDurationSelector: (value: T, index: nu export declare function dematerialize>(): OperatorFunction>; -export declare function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction; +export declare function distinct(keySelector?: (value: T) => K, flushes?: ObservableInput): MonoTypeOperatorFunction; export declare function distinctUntilChanged(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction; export declare function distinctUntilChanged(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction; diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 4d673c5b08b..6f3200e9a31 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -72,7 +72,7 @@ export declare function delayWhen(delayDurationSelector: (value: T, index: nu export declare function dematerialize>(): OperatorFunction>; -export declare function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction; +export declare function distinct(keySelector?: (value: T) => K, flushes?: ObservableInput): MonoTypeOperatorFunction; export declare function distinctUntilChanged(comparator?: (previous: T, current: T) => boolean): MonoTypeOperatorFunction; export declare function distinctUntilChanged(comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K): MonoTypeOperatorFunction; diff --git a/spec-dtslint/operators/distinct-spec.ts b/spec-dtslint/operators/distinct-spec.ts index 3386ce2bd07..f6034aac17e 100644 --- a/spec-dtslint/operators/distinct-spec.ts +++ b/spec-dtslint/operators/distinct-spec.ts @@ -1,5 +1,7 @@ import { of } from 'rxjs'; +import { asInteropObservable } from '../../spec/helpers/interop-helper'; import { distinct } from 'rxjs/operators'; +import { ReadableStreamLike } from '../../src/internal/types'; it('should infer correctly', () => { const o = of(1, 2, 3).pipe(distinct()); // $ExpectType Observable @@ -10,10 +12,79 @@ it('should accept a keySelector', () => { const o = of({ name: 'Tim' } as Person).pipe(distinct(person => person.name)); // $ExpectType Observable }); -it('should accept flushes', () => { +it('should accept observable flush', () => { const o = of(1, 2, 3).pipe(distinct(n => n, of('t', 'i', 'm'))); // $ExpectType Observable }); +it('should accept interop observable flush', () => { + of(1, 2, 3).pipe(distinct(n => n, asInteropObservable(of('t', 'i', 'm')))); // $ExpectType Observable +}); + +it('should accept array-like flush', () => { + of(1, 2, 3).pipe(distinct(n => n, [1,2,3])); // $ExpectType Observable +}); + +it('should accept promise flush', () => { + of(1, 2, 3).pipe(distinct(n => n, Promise.resolve())); // $ExpectType Observable +}); + +it('should accept async iterable flush', () => { + const asyncRange = { + from: 1, + to: 2, + [Symbol.asyncIterator]() { + return { + current: this.from, + last: this.to, + async next() { + await Promise.resolve(); + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(distinct(n => n, asyncRange)); // $ExpectType Observable +}); + +it('should accept iterable flush', () => { + const syncRange = { + from: 1, + to: 2, + [Symbol.iterator]() { + return { + current: this.from, + last: this.to, + next() { + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(distinct(n => n, syncRange)); // $ExpectType Observable +}); + +it('should accept readable stream flush', () => { + const readable: ReadableStreamLike = new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + }); + of(1, 2, 3).pipe(distinct(n => n, readable)); // $ExpectType Observable +}); + +it('should error with unsupported flush', () => { + of(1, 2, 3).pipe(distinct(n => n, {})); // $ExpectError +}); + it('should enforce types', () => { const o = of(1, 2, 3).pipe(distinct('F00D')); // $ExpectError }); diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index 4b4f55b56a0..70ed2c235a5 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -1,8 +1,8 @@ -import { Observable } from '../Observable'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; +import { innerFrom } from '../observable/innerFrom'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -56,12 +56,12 @@ import { noop } from '../util/noop'; * @see {@link distinctUntilChanged} * @see {@link distinctUntilKeyChanged} * - * @param {function} [keySelector] Optional function to select which value you want to check as distinct. - * @param {Observable} [flushes] Optional Observable for flushing the internal HashSet of the operator. + * @param keySelector Optional `function` to select which value you want to check as distinct. + * @param flushes Optional `ObservableInput` for flushing the internal HashSet of the operator. * @return A function that returns an Observable that emits items from the * source Observable with distinct values. */ -export function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction { +export function distinct(keySelector?: (value: T) => K, flushes?: ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { const distinctKeys = new Set(); source.subscribe( @@ -74,6 +74,6 @@ export function distinct(keySelector?: (value: T) => K, flushes?: Observab }) ); - flushes?.subscribe(createOperatorSubscriber(subscriber, () => distinctKeys.clear(), noop)); + flushes && innerFrom(flushes).subscribe(createOperatorSubscriber(subscriber, () => distinctKeys.clear(), noop)); }); }