diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index b1a2717d71..25b4bba3a7 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -559,7 +559,7 @@ export declare function reduce(accumulator: (acc: A | S, value: V, export declare function refCount(): MonoTypeOperatorFunction; -export declare function repeat(count?: number): MonoTypeOperatorFunction; +export declare function repeat(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction; export declare function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction; diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 2a63608cf6..e7606660b8 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -229,7 +229,7 @@ export declare function reduce(accumulator: (acc: A | S, value: V, export declare function refCount(): MonoTypeOperatorFunction; -export declare function repeat(count?: number): MonoTypeOperatorFunction; +export declare function repeat(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction; export declare function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction; diff --git a/spec/operators/repeat-spec.ts b/spec/operators/repeat-spec.ts index 67f86f8e74..21517857bc 100644 --- a/spec/operators/repeat-spec.ts +++ b/spec/operators/repeat-spec.ts @@ -1,115 +1,145 @@ +/** @prettier */ import { expect } from 'chai'; -import { cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { repeat, mergeMap, map, multicast, refCount, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { of, Subject, Observable } from 'rxjs'; - -declare const rxTestScheduler: TestScheduler; +import { of, Subject, Observable, timer } from 'rxjs'; +import { observableMatcher } from '../helpers/observableMatcher'; /** @test {repeat} */ describe('repeat operator', () => { + let rxTest: TestScheduler; + + beforeEach(() => { + rxTest = new TestScheduler(observableMatcher); + }); + it('should resubscribe count number of times', () => { - const e1 = cold('--a--b--| '); - const subs = ['^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '--a--b----a--b----a--b--|'; - - expectObservable(e1.pipe(repeat(3))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const subs = [ + ' ^-------! ', // + ' --------^-------! ', + ' ----------------^-------!', + ]; + const expected = '--a--b----a--b----a--b--|'; + + expectObservable(e1.pipe(repeat(3))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should resubscribe multiple times', () => { - const e1 = cold('--a--b--| '); - const subs = ['^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '--a--b----a--b----a--b----a--b--|'; - - expectObservable(e1.pipe(repeat(2), repeat(2))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const subs = [ + ' ^-------! ', + ' --------^-------! ', + ' ----------------^-------! ', + ' ------------------------^-------!', + ]; + const expected = '--a--b----a--b----a--b----a--b--|'; + + expectObservable(e1.pipe(repeat(2), repeat(2))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should complete without emit when count is zero', () => { - const e1 = cold('--a--b--|'); - const subs: string[] = []; - const expected = '|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('--a--b--|'); + const subs: string[] = []; + const expected = '|'; - expectObservable(e1.pipe(repeat(0))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(0))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should emit source once when count is one', () => { - const e1 = cold('--a--b--|'); - const subs = '^ !'; - const expected = '--a--b--|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--|'); + const subs = ' ^-------!'; + const expected = '--a--b--|'; - expectObservable(e1.pipe(repeat(1))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(1))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should repeat until gets unsubscribed', () => { - const e1 = cold('--a--b--| '); - const subs = ['^ ! ', - ' ^ !']; - const unsub = ' !'; - const expected = '--a--b----a--b-'; - - expectObservable(e1.pipe(repeat(10)), unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const subs = [ + ' ^-------! ', // + ' --------^------!', + ]; + const unsub = ' ---------------!'; + const expected = '--a--b----a--b-'; + + expectObservable(e1.pipe(repeat(10)), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should be able to repeat indefinitely until unsubscribed', () => { - const e1 = cold('--a--b--| '); - const subs = ['^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ !']; - const unsub = ' !'; - const expected = '--a--b----a--b----a--b----a--b----a--b----a--'; - - expectObservable(e1.pipe(repeat()), unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const subs = [ + ' ^-------! ', + ' --------^-------! ', + ' ----------------^-------! ', + ' ------------------------^-------! ', + ' --------------------------------^-------! ', + ' ----------------------------------------^---!', + ]; + const unsub = ' --------------------------------------------!'; + const expected = '--a--b----a--b----a--b----a--b----a--b----a--'; + + expectObservable(e1.pipe(repeat()), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should not break unsubscription chain when unsubscribed explicitly', () => { - const e1 = cold('--a--b--| '); - const subs = ['^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ ! ', - ' ^ !']; - const unsub = ' !'; - const expected = '--a--b----a--b----a--b----a--b----a--b----a--'; - - const result = e1.pipe( - mergeMap((x: string) => of(x)), - repeat(), - mergeMap((x: string) => of(x)) - ); - - expectObservable(result, unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const subs = [ + ' ^-------! ', + ' --------^-------! ', + ' ----------------^-------! ', + ' ------------------------^-------! ', + ' --------------------------------^-------! ', + ' ----------------------------------------^---!', + ]; + const unsub = ' --------------------------------------------!'; + const expected = '--a--b----a--b----a--b----a--b----a--b----a--'; + + const result = e1.pipe( + mergeMap((x: string) => of(x)), + repeat(), + mergeMap((x: string) => of(x)) + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should consider negative count as no repeat, and return EMPTY', () => { - const e1 = cold('--a--b--| '); - const unsub = ' !'; - const expected = '|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('--a--b--| '); + const expected = '|'; - expectObservable(e1.pipe(repeat(-1)), unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe([]); + expectObservable(e1.pipe(repeat(-1))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe([]); + }); }); it('should always teardown before starting the next cycle', async () => { const results: any[] = []; - const source = new Observable(subscriber => { + const source = new Observable((subscriber) => { Promise.resolve().then(() => { - subscriber.next(1) + subscriber.next(1); Promise.resolve().then(() => { subscriber.next(2); Promise.resolve().then(() => { @@ -119,171 +149,205 @@ describe('repeat operator', () => { }); return () => { results.push('teardown'); - } + }; }); - await source.pipe(repeat(3)).forEach(value => results.push(value)); - - expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown']) + await source.pipe(repeat(3)).forEach((value) => results.push(value)); + + expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown']); }); it('should always teardown before starting the next cycle, even when synchronous', () => { const results: any[] = []; - const source = new Observable(subscriber => { + const source = new Observable((subscriber) => { subscriber.next(1); subscriber.next(2); subscriber.complete(); return () => { results.push('teardown'); - } + }; }); const subscription = source.pipe(repeat(3)).subscribe({ - next: value => results.push(value), - complete: () => results.push('complete') + next: (value) => results.push(value), + complete: () => results.push('complete'), }); expect(subscription.closed).to.be.true; - expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown']) + expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown']); }); it('should not complete when source never completes', () => { - const e1 = cold('-'); - const e1subs = '^'; - const expected = '-'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; - expectObservable(e1.pipe(repeat(3))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectObservable(e1.pipe(repeat(3))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should not complete when source does not completes', () => { - const e1 = cold('-'); - const unsub = ' !'; - const subs = '^ !'; - const expected = '-'; - - expectObservable(e1.pipe(repeat(3)), unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('-'); + const unsub = '------------------------------!'; + const subs = ' ^-----------------------------!'; + const expected = '-'; + + expectObservable(e1.pipe(repeat(3)), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should complete immediately when source does not complete without emit but count is zero', () => { - const e1 = cold('-'); - const subs: string[] = []; - const expected = '|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('-'); + const subs: string[] = []; + const expected = '|'; - expectObservable(e1.pipe(repeat(0))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(0))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should complete immediately when source does not complete but count is zero', () => { - const e1 = cold('--a--b--'); - const subs: string[] = []; - const expected = '|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('--a--b--'); + const subs: string[] = []; + const expected = '|'; - expectObservable(e1.pipe(repeat(0))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(0))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should emit source once and does not complete when source emits but does not complete', () => { - const e1 = cold('--a--b--'); - const subs = ['^ ']; - const expected = '--a--b--'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--'); + const subs = [' ^-------']; + const expected = '--a--b--'; - expectObservable(e1.pipe(repeat(3))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(3))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should complete when source is empty', () => { - const e1 = cold('|'); - const e1subs = ['(^!)', '(^!)', '(^!)']; - const expected = '|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('|'); + const e1subs = ['(^!)', '(^!)', '(^!)']; + const expected = '|'; - expectObservable(e1.pipe(repeat(3))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectObservable(e1.pipe(repeat(3))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should complete when source does not emit', () => { - const e1 = cold('----| '); - const subs = ['^ ! ', - ' ^ ! ', - ' ^ !']; - const expected = '------------|'; - - expectObservable(e1.pipe(repeat(3))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('----| '); + const subs = [ + ' ^---! ', // + ' ----^---! ', + ' --------^---!', + ]; + const expected = '------------|'; + + expectObservable(e1.pipe(repeat(3))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should complete immediately when source does not emit but count is zero', () => { - const e1 = cold('----|'); - const subs: string[] = []; - const expected = '|'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('----|'); + const subs: string[] = []; + const expected = '|'; - expectObservable(e1.pipe(repeat(0))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(0))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should raise error when source raises error', () => { - const e1 = cold('--a--b--#'); - const subs = '^ !'; - const expected = '--a--b--#'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--#'); + const subs = ' ^-------!'; + const expected = '--a--b--#'; - expectObservable(e1.pipe(repeat(2))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(repeat(2))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); }); it('should raises error if source throws', () => { - const e1 = cold('#'); - const e1subs = '(^!)'; - const expected = '#'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; - expectObservable(e1.pipe(repeat(3))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectObservable(e1.pipe(repeat(3))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should raises error if source throws when repeating infinitely', () => { - const e1 = cold('#'); - const e1subs = '(^!)'; - const expected = '#'; + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; - expectObservable(e1.pipe(repeat())).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectObservable(e1.pipe(repeat())).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should raise error after first emit succeed', () => { - let repeated = false; - - const e1 = cold('--a--|').pipe(map((x: string) => { - if (repeated) { - throw 'error'; - } else { - repeated = true; - return x; - } - })); - const expected = '--a----#'; - - expectObservable(e1.pipe(repeat(2))).toBe(expected); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + let repeated = false; + + const e1 = cold('--a--|').pipe( + map((x: string) => { + if (repeated) { + throw 'error'; + } else { + repeated = true; + return x; + } + }) + ); + const expected = '--a----#'; + + expectObservable(e1.pipe(repeat(2))).toBe(expected); + }); }); it('should repeat a synchronous source (multicasted and refCounted) multiple times', (done) => { const expected = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]; - of(1, 2, 3).pipe( - multicast(() => new Subject()), - refCount(), - repeat(5) - ).subscribe( - (x: number) => { expect(x).to.equal(expected.shift()); }, + of(1, 2, 3) + .pipe( + multicast(() => new Subject()), + refCount(), + repeat(5) + ) + .subscribe( + (x: number) => { + expect(x).to.equal(expected.shift()); + }, (x) => { done(new Error('should not be called')); - }, () => { + }, + () => { expect(expected.length).to.equal(0); done(); - }); + } + ); }); it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; - const synchronousObservable = new Observable(subscriber => { + const synchronousObservable = new Observable((subscriber) => { // This will check to see if the subscriber was closed on each loop // when the unsubscribe hits (from the `take`), it should be closed for (let i = 0; !subscriber.closed && i < 10; i++) { @@ -292,11 +356,98 @@ describe('repeat operator', () => { } }); - synchronousObservable.pipe( - repeat(), - take(3), - ).subscribe(() => { /* noop */ }); + synchronousObservable.pipe(repeat(), take(3)).subscribe(() => { + /* noop */ + }); expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + it('should allow count configuration', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const subs = [ + ' ^-------! ', // + ' --------^-------! ', + ' ----------------^-------!', + ]; + const expected = '--a--b----a--b----a--b--|'; + + expectObservable(e1.pipe(repeat({ count: 3 }))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); + + it('should allow delay time configuration', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' --a--b--| '); + const delay = 3; // ---| ---| + const subs = [ + ' ^-------! ', // + ' -----------^-------! ', + ' ----------------------^-------!', + ]; + const expected = '--a--b-------a--b-------a--b--|'; + + expectObservable(e1.pipe(repeat({ count: 3, delay }))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); + + it('should allow delay function configuration', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const expectedCounts = [1, 2, 3]; + + const e1 = cold(' --a--b--| '); + const delay = 3; // ---| ---| + const subs = [ + ' ^-------! ', // + ' -----------^-------! ', + ' ----------------------^-------!', + ]; + const expected = '--a--b-------a--b-------a--b--|'; + + expectObservable( + e1.pipe( + repeat({ + count: 3, + delay: (count) => { + expect(count).to.equal(expectedCounts.shift()); + return timer(delay); + }, + }) + ) + ).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); + + it('should handle delay function throwing', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const expectedCounts = [1, 2, 3]; + + const e1 = cold(' --a--b--| '); + const delay = 3; // ---| ---| + const subs = [ + ' ^-------! ', // + ' -----------^-------! ', + ]; + const expected = '--a--b-------a--b--#'; + + expectObservable( + e1.pipe( + repeat({ + count: 3, + delay: (count) => { + if (count === 2) { + throw 'bad'; + } + return timer(delay); + }, + }) + ) + ).toBe(expected, undefined, 'bad'); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); }); diff --git a/src/internal/operators/repeat.ts b/src/internal/operators/repeat.ts index 09da1190e8..41edff34b8 100644 --- a/src/internal/operators/repeat.ts +++ b/src/internal/operators/repeat.ts @@ -1,26 +1,53 @@ import { Subscription } from '../Subscription'; import { EMPTY } from '../observable/empty'; import { operate } from '../util/lift'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { OperatorSubscriber } from './OperatorSubscriber'; +import { innerFrom } from '../observable/innerFrom'; +import { timer } from '../observable/timer'; + +export interface RepeatConfig { + /** + * The number of times to repeat the source. Defaults to `Infinity`. + */ + count?: number; + + /** + * If a `number`, will delay the repeat of the source by that number of milliseconds. + * If a function, it will provide the number of times the source has been subscribed to, + * and the return value should be a valid observable input that will notify when the source + * should be repeated. If the notifier observable is empty, the result will complete. + */ + delay?: number | ((count: number) => ObservableInput); +} /** - * Returns an Observable that will resubscribe to the source stream when the source stream completes, at most count times. + * Returns an Observable that will resubscribe to the source stream when the source stream completes. * * Repeats all values emitted on the source. It's like {@link retry}, but for non error cases. * * ![](repeat.png) * - * Similar to {@link retry}, this operator repeats the stream of items emitted by the source for non error cases. - * Repeat can be useful for creating observables that are meant to have some repeated pattern or rhythm. + * Repeat will output values from a source until the source completes, then it will resubscribe to the + * source a specified number of times, with a specified delay. Repeat can be particularly useful in + * combination with closing operators like {@link take}, {@link takeUntil}, {@link first}, or {@link takeWhile}, + * as it can be used to restart a source again from scratch. + * + * Repeat is very similar to {@link retry}, where {@link retry} will resubscribe to the source in the error case, but + * `repeat` will resubscribe if the source completes. + * + * Note that `repeat` will _not_ catch errors. Use {@link retry} for that. * - * Note: `repeat(0)` returns an empty observable and `repeat()` will repeat forever + * - `repeat(0)` returns an empty observable + * - `repeat()` will repeat forever + * - `repeat({ delay: 200 })` will repeat forever, with a delay of 200ms between repetitions. + * - `repeat({ count: 2, delay: 400 })` will repeat twice, with a delay of 400ms between repetitions. + * - `repeat({ delay: (count) => timer(count * 1000) })` will repeat forever, but will have a delay that grows by one second for each repetition. * * ## Example * Repeat a message stream * ```ts - * import { of } from 'rxjs'; - * import { repeat } from 'rxjs/operators'; + * import { of, repeat } from 'rxjs'; * * const source = of('Repeat message'); * const example = source.pipe(repeat(3)); @@ -50,29 +77,78 @@ import { OperatorSubscriber } from './OperatorSubscriber'; * // 2 * ``` * + * Defining two complex repeats with delays on the same source. + * Note that the second repeat cannot be called until the first + * repeat as exhausted it's count. + * + * ```ts + * import { defer, of, repeat } from 'rxjs'; + * + * const source = defer(() => { + * return of(`Hello, it is ${new Date()}`) + * }); + * + * source.pipe( + * // Repeat 3 times with a delay of 1 second between repetitions + * repeat({ + * count: 3, + * delay: 1000, + * }), + * + * // *Then* repeat forever, but with an exponential step-back + * // maxing out at 1 minute. + * repeat({ + * delay: (count) => timer(Math.min(60000, 2 ^ count * 1000)) + * }) + * ) + * ``` + * * @see {@link repeatWhen} * @see {@link retry} * - * @param {number} [count] The number of times the source Observable items are repeated, a count of 0 will yield + * @param count The number of times the source Observable items are repeated, a count of 0 will yield * an empty Observable. - * @return A function that returns an Observable that will resubscribe to the - * source stream when the source stream completes, at most `count` times. */ -export function repeat(count = Infinity): MonoTypeOperatorFunction { +export function repeat(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction { + let count = Infinity; + let delay: RepeatConfig['delay']; + + if (countOrConfig != null) { + if (typeof countOrConfig === 'object') { + ({ count = Infinity, delay } = countOrConfig); + } else { + count = countOrConfig; + } + } + return count <= 0 ? () => EMPTY : operate((source, subscriber) => { let soFar = 0; - let innerSub: Subscription | null; - const subscribeForRepeat = () => { + let sourceSub: Subscription | null; + + const resubscribe = () => { + sourceSub?.unsubscribe(); + sourceSub = null; + if (delay != null) { + const notifier = typeof delay === 'number' ? timer(delay) : innerFrom(delay(soFar)); + const notifierSubscriber = new OperatorSubscriber(subscriber, () => { + notifierSubscriber.unsubscribe(); + subscribeToSource(); + }); + notifier.subscribe(notifierSubscriber); + } else { + subscribeToSource(); + } + }; + + const subscribeToSource = () => { let syncUnsub = false; - innerSub = source.subscribe( + sourceSub = source.subscribe( new OperatorSubscriber(subscriber, undefined, () => { if (++soFar < count) { - if (innerSub) { - innerSub.unsubscribe(); - innerSub = null; - subscribeForRepeat(); + if (sourceSub) { + resubscribe(); } else { syncUnsub = true; } @@ -83,11 +159,10 @@ export function repeat(count = Infinity): MonoTypeOperatorFunction { ); if (syncUnsub) { - innerSub.unsubscribe(); - innerSub = null; - subscribeForRepeat(); + resubscribe(); } }; - subscribeForRepeat(); + + subscribeToSource(); }); }