diff --git a/spec-dtslint/operators/repeatWhen-spec.ts b/spec-dtslint/operators/repeatWhen-spec.ts index 0290394ff7..6ff52a0a2e 100644 --- a/spec-dtslint/operators/repeatWhen-spec.ts +++ b/spec-dtslint/operators/repeatWhen-spec.ts @@ -1,22 +1,84 @@ import { of } from 'rxjs'; import { repeatWhen } from 'rxjs/operators'; +import { asInteropObservable } from '../../spec/helpers/interop-helper'; it('should infer correctly', () => { - const o = of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable + of(1, 2, 3).pipe(repeatWhen(errors => errors)); // $ExpectType Observable }); it('should infer correctly when the error observable has a different type', () => { - const o = of(1, 2, 3).pipe(repeatWhen(repeatWhen(errors => of('a', 'b', 'c')))); // $ExpectType Observable + of(1, 2, 3).pipe(repeatWhen(errors => asInteropObservable(of('a', 'b', 'c')))); // $ExpectType Observable }); it('should enforce types', () => { - const o = of(1, 2, 3).pipe(repeatWhen()); // $ExpectError + of(1, 2, 3).pipe(repeatWhen()); // $ExpectError +}); + +it('should accept interop observable notifier', () => { + of(1, 2, 3).pipe(repeatWhen(() => asInteropObservable(of(true)))); // $ExpectType Observable +}); + +it('should accept promise notifier', () => { + of(1, 2, 3).pipe(repeatWhen(() => Promise.resolve(true))); // $ExpectType Observable +}); + +it('should async iterable notifier', () => { + const asyncRange = { + from: 1, + to: 2, + [Symbol.asyncIterator]() { + return { + current: this.from, + last: this.to, + async next() { + await Promise.resolve(); + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(repeatWhen(() => asyncRange)); // $ExpectType Observable +}); + +it('should accept iterable notifier', () => { + const syncRange = { + from: 1, + to: 2, + [Symbol.iterator]() { + return { + current: this.from, + last: this.to, + next() { + const done = (this.current > this.last); + return { + done, + value: done ? this.current++ : undefined + }; + } + }; + } + }; + of(1, 2, 3).pipe(repeatWhen(() => syncRange)); // $ExpectType Observable +}); + +it('should accept readable stream notifier', () => { + const readableStream = new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + }); + of(1, 2, 3).pipe(repeatWhen(() => readableStream)); // $ExpectType Observable }); it('should enforce types of the notifier', () => { - const o = of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError + of(1, 2, 3).pipe(repeatWhen(() => 8)); // $ExpectError }); it('should be deprecated', () => { - const o = of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation + of(1, 2, 3).pipe(repeatWhen(() => of(true))); // $ExpectDeprecation }); \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index e267a62548..2a36982347 100644 --- a/src/index.ts +++ b/src/index.ts @@ -164,7 +164,7 @@ export { publishLast } from './internal/operators/publishLast'; export { publishReplay } from './internal/operators/publishReplay'; export { raceWith } from './internal/operators/raceWith'; export { reduce } from './internal/operators/reduce'; -export { repeat } from './internal/operators/repeat'; +export { repeat, RepeatConfig } from './internal/operators/repeat'; export { repeatWhen } from './internal/operators/repeatWhen'; export { retry, RetryConfig } from './internal/operators/retry'; export { retryWhen } from './internal/operators/retryWhen'; diff --git a/src/internal/operators/repeatWhen.ts b/src/internal/operators/repeatWhen.ts index 989e8b079c..5e55ca07f4 100644 --- a/src/internal/operators/repeatWhen.ts +++ b/src/internal/operators/repeatWhen.ts @@ -1,8 +1,9 @@ import { Observable } from '../Observable'; +import { innerFrom } from '../observable/innerFrom'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; @@ -33,13 +34,14 @@ import { createOperatorSubscriber } from './OperatorSubscriber'; * @see {@link retry} * @see {@link retryWhen} * - * @param {function(notifications: Observable): Observable} notifier - Receives an Observable of notifications with + * @param notifier Function that receives an Observable of notifications with * which a user can `complete` or `error`, aborting the repetition. - * @return A function that returns an Observable that mirrors the source + * @return A function that returns an `ObservableInput` that mirrors the source * Observable with the exception of a `complete`. - * @deprecated Will be removed in v9 or v10. Use {@link repeat}'s `delay` option instead. + * @deprecated Will be removed in v9 or v10. Use {@link repeat}'s {@link RepeatConfig#delay delay} option instead. + * Instead of `repeatWhen(() => notify$)`, use: `repeat({ delay: () => notify$ })`. */ -export function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction { +export function repeatWhen(notifier: (notifications: Observable) => ObservableInput): MonoTypeOperatorFunction { return operate((source, subscriber) => { let innerSub: Subscription | null; let syncResub = false; @@ -61,7 +63,7 @@ export function repeatWhen(notifier: (notifications: Observable) => Obs // If the call to `notifier` throws, it will be caught by the OperatorSubscriber // In the main subscription -- in `subscribeForRepeatWhen`. - notifier(completions$).subscribe( + innerFrom(notifier(completions$)).subscribe( createOperatorSubscriber( subscriber, () => { diff --git a/src/operators/index.ts b/src/operators/index.ts index ebd6d60254..c07e0559ac 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -68,7 +68,7 @@ export { publishReplay } from '../internal/operators/publishReplay'; export { race } from '../internal/operators/race'; export { raceWith } from '../internal/operators/raceWith'; export { reduce } from '../internal/operators/reduce'; -export { repeat } from '../internal/operators/repeat'; +export { repeat, RepeatConfig } from '../internal/operators/repeat'; export { repeatWhen } from '../internal/operators/repeatWhen'; export { retry, RetryConfig } from '../internal/operators/retry'; export { retryWhen } from '../internal/operators/retryWhen';