Skip to content

Commit 6b7a534

Browse files
authoredDec 27, 2021
feat(repeat): now has configurable delay (#6640)
* feat(repeat): now has configurable delay Adds a feature to `repeat` to match the configurable API of `retry`. `repeat` can now be used in a similar manner to `repeatWhen`, only perhaps in a more developer-friendly way. - Also updates `repeat` tests to use run mode. * chore: remove unnecessary unsub in test
1 parent 5835116 commit 6b7a534

File tree

4 files changed

+421
-195
lines changed

4 files changed

+421
-195
lines changed
 

‎api_guard/dist/types/index.d.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ export declare function reduce<V, A, S = A>(accumulator: (acc: A | S, value: V,
559559

560560
export declare function refCount<T>(): MonoTypeOperatorFunction<T>;
561561

562-
export declare function repeat<T>(count?: number): MonoTypeOperatorFunction<T>;
562+
export declare function repeat<T>(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction<T>;
563563

564564
export declare function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T>;
565565

‎api_guard/dist/types/operators/index.d.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ export declare function reduce<V, A, S = A>(accumulator: (acc: A | S, value: V,
229229

230230
export declare function refCount<T>(): MonoTypeOperatorFunction<T>;
231231

232-
export declare function repeat<T>(count?: number): MonoTypeOperatorFunction<T>;
232+
export declare function repeat<T>(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction<T>;
233233

234234
export declare function repeatWhen<T>(notifier: (notifications: Observable<void>) => Observable<any>): MonoTypeOperatorFunction<T>;
235235

‎spec/operators/repeat-spec.ts

+322-171
Original file line numberDiff line numberDiff line change
@@ -1,115 +1,145 @@
1+
/** @prettier */
12
import { expect } from 'chai';
2-
import { cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { repeat, mergeMap, map, multicast, refCount, take } from 'rxjs/operators';
44
import { TestScheduler } from 'rxjs/testing';
5-
import { of, Subject, Observable } from 'rxjs';
6-
7-
declare const rxTestScheduler: TestScheduler;
5+
import { of, Subject, Observable, timer } from 'rxjs';
6+
import { observableMatcher } from '../helpers/observableMatcher';
87

98
/** @test {repeat} */
109
describe('repeat operator', () => {
10+
let rxTest: TestScheduler;
11+
12+
beforeEach(() => {
13+
rxTest = new TestScheduler(observableMatcher);
14+
});
15+
1116
it('should resubscribe count number of times', () => {
12-
const e1 = cold('--a--b--| ');
13-
const subs = ['^ ! ',
14-
' ^ ! ',
15-
' ^ !'];
16-
const expected = '--a--b----a--b----a--b--|';
17-
18-
expectObservable(e1.pipe(repeat(3))).toBe(expected);
19-
expectSubscriptions(e1.subscriptions).toBe(subs);
17+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
18+
const e1 = cold(' --a--b--| ');
19+
const subs = [
20+
' ^-------! ', //
21+
' --------^-------! ',
22+
' ----------------^-------!',
23+
];
24+
const expected = '--a--b----a--b----a--b--|';
25+
26+
expectObservable(e1.pipe(repeat(3))).toBe(expected);
27+
expectSubscriptions(e1.subscriptions).toBe(subs);
28+
});
2029
});
2130

2231
it('should resubscribe multiple times', () => {
23-
const e1 = cold('--a--b--| ');
24-
const subs = ['^ ! ',
25-
' ^ ! ',
26-
' ^ ! ',
27-
' ^ !'];
28-
const expected = '--a--b----a--b----a--b----a--b--|';
29-
30-
expectObservable(e1.pipe(repeat(2), repeat(2))).toBe(expected);
31-
expectSubscriptions(e1.subscriptions).toBe(subs);
32+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
33+
const e1 = cold(' --a--b--| ');
34+
const subs = [
35+
' ^-------! ',
36+
' --------^-------! ',
37+
' ----------------^-------! ',
38+
' ------------------------^-------!',
39+
];
40+
const expected = '--a--b----a--b----a--b----a--b--|';
41+
42+
expectObservable(e1.pipe(repeat(2), repeat(2))).toBe(expected);
43+
expectSubscriptions(e1.subscriptions).toBe(subs);
44+
});
3245
});
3346

3447
it('should complete without emit when count is zero', () => {
35-
const e1 = cold('--a--b--|');
36-
const subs: string[] = [];
37-
const expected = '|';
48+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
49+
const e1 = cold('--a--b--|');
50+
const subs: string[] = [];
51+
const expected = '|';
3852

39-
expectObservable(e1.pipe(repeat(0))).toBe(expected);
40-
expectSubscriptions(e1.subscriptions).toBe(subs);
53+
expectObservable(e1.pipe(repeat(0))).toBe(expected);
54+
expectSubscriptions(e1.subscriptions).toBe(subs);
55+
});
4156
});
4257

4358
it('should emit source once when count is one', () => {
44-
const e1 = cold('--a--b--|');
45-
const subs = '^ !';
46-
const expected = '--a--b--|';
59+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
60+
const e1 = cold(' --a--b--|');
61+
const subs = ' ^-------!';
62+
const expected = '--a--b--|';
4763

48-
expectObservable(e1.pipe(repeat(1))).toBe(expected);
49-
expectSubscriptions(e1.subscriptions).toBe(subs);
64+
expectObservable(e1.pipe(repeat(1))).toBe(expected);
65+
expectSubscriptions(e1.subscriptions).toBe(subs);
66+
});
5067
});
5168

5269
it('should repeat until gets unsubscribed', () => {
53-
const e1 = cold('--a--b--| ');
54-
const subs = ['^ ! ',
55-
' ^ !'];
56-
const unsub = ' !';
57-
const expected = '--a--b----a--b-';
58-
59-
expectObservable(e1.pipe(repeat(10)), unsub).toBe(expected);
60-
expectSubscriptions(e1.subscriptions).toBe(subs);
70+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
71+
const e1 = cold(' --a--b--| ');
72+
const subs = [
73+
' ^-------! ', //
74+
' --------^------!',
75+
];
76+
const unsub = ' ---------------!';
77+
const expected = '--a--b----a--b-';
78+
79+
expectObservable(e1.pipe(repeat(10)), unsub).toBe(expected);
80+
expectSubscriptions(e1.subscriptions).toBe(subs);
81+
});
6182
});
6283

6384
it('should be able to repeat indefinitely until unsubscribed', () => {
64-
const e1 = cold('--a--b--| ');
65-
const subs = ['^ ! ',
66-
' ^ ! ',
67-
' ^ ! ',
68-
' ^ ! ',
69-
' ^ ! ',
70-
' ^ !'];
71-
const unsub = ' !';
72-
const expected = '--a--b----a--b----a--b----a--b----a--b----a--';
73-
74-
expectObservable(e1.pipe(repeat()), unsub).toBe(expected);
75-
expectSubscriptions(e1.subscriptions).toBe(subs);
85+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
86+
const e1 = cold(' --a--b--| ');
87+
const subs = [
88+
' ^-------! ',
89+
' --------^-------! ',
90+
' ----------------^-------! ',
91+
' ------------------------^-------! ',
92+
' --------------------------------^-------! ',
93+
' ----------------------------------------^---!',
94+
];
95+
const unsub = ' --------------------------------------------!';
96+
const expected = '--a--b----a--b----a--b----a--b----a--b----a--';
97+
98+
expectObservable(e1.pipe(repeat()), unsub).toBe(expected);
99+
expectSubscriptions(e1.subscriptions).toBe(subs);
100+
});
76101
});
77102

78103
it('should not break unsubscription chain when unsubscribed explicitly', () => {
79-
const e1 = cold('--a--b--| ');
80-
const subs = ['^ ! ',
81-
' ^ ! ',
82-
' ^ ! ',
83-
' ^ ! ',
84-
' ^ ! ',
85-
' ^ !'];
86-
const unsub = ' !';
87-
const expected = '--a--b----a--b----a--b----a--b----a--b----a--';
88-
89-
const result = e1.pipe(
90-
mergeMap((x: string) => of(x)),
91-
repeat(),
92-
mergeMap((x: string) => of(x))
93-
);
94-
95-
expectObservable(result, unsub).toBe(expected);
96-
expectSubscriptions(e1.subscriptions).toBe(subs);
104+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
105+
const e1 = cold(' --a--b--| ');
106+
const subs = [
107+
' ^-------! ',
108+
' --------^-------! ',
109+
' ----------------^-------! ',
110+
' ------------------------^-------! ',
111+
' --------------------------------^-------! ',
112+
' ----------------------------------------^---!',
113+
];
114+
const unsub = ' --------------------------------------------!';
115+
const expected = '--a--b----a--b----a--b----a--b----a--b----a--';
116+
117+
const result = e1.pipe(
118+
mergeMap((x: string) => of(x)),
119+
repeat(),
120+
mergeMap((x: string) => of(x))
121+
);
122+
123+
expectObservable(result, unsub).toBe(expected);
124+
expectSubscriptions(e1.subscriptions).toBe(subs);
125+
});
97126
});
98127

99128
it('should consider negative count as no repeat, and return EMPTY', () => {
100-
const e1 = cold('--a--b--| ');
101-
const unsub = ' !';
102-
const expected = '|';
129+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
130+
const e1 = cold('--a--b--| ');
131+
const expected = '|';
103132

104-
expectObservable(e1.pipe(repeat(-1)), unsub).toBe(expected);
105-
expectSubscriptions(e1.subscriptions).toBe([]);
133+
expectObservable(e1.pipe(repeat(-1))).toBe(expected);
134+
expectSubscriptions(e1.subscriptions).toBe([]);
135+
});
106136
});
107137

108138
it('should always teardown before starting the next cycle', async () => {
109139
const results: any[] = [];
110-
const source = new Observable<number>(subscriber => {
140+
const source = new Observable<number>((subscriber) => {
111141
Promise.resolve().then(() => {
112-
subscriber.next(1)
142+
subscriber.next(1);
113143
Promise.resolve().then(() => {
114144
subscriber.next(2);
115145
Promise.resolve().then(() => {
@@ -119,171 +149,205 @@ describe('repeat operator', () => {
119149
});
120150
return () => {
121151
results.push('teardown');
122-
}
152+
};
123153
});
124154

125-
await source.pipe(repeat(3)).forEach(value => results.push(value));
126-
127-
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown'])
155+
await source.pipe(repeat(3)).forEach((value) => results.push(value));
156+
157+
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown']);
128158
});
129159

130160
it('should always teardown before starting the next cycle, even when synchronous', () => {
131161
const results: any[] = [];
132-
const source = new Observable<number>(subscriber => {
162+
const source = new Observable<number>((subscriber) => {
133163
subscriber.next(1);
134164
subscriber.next(2);
135165
subscriber.complete();
136166
return () => {
137167
results.push('teardown');
138-
}
168+
};
139169
});
140170
const subscription = source.pipe(repeat(3)).subscribe({
141-
next: value => results.push(value),
142-
complete: () => results.push('complete')
171+
next: (value) => results.push(value),
172+
complete: () => results.push('complete'),
143173
});
144174

145175
expect(subscription.closed).to.be.true;
146-
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown'])
176+
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'complete', 'teardown']);
147177
});
148178

149179
it('should not complete when source never completes', () => {
150-
const e1 = cold('-');
151-
const e1subs = '^';
152-
const expected = '-';
180+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
181+
const e1 = cold('-');
182+
const e1subs = '^';
183+
const expected = '-';
153184

154-
expectObservable(e1.pipe(repeat(3))).toBe(expected);
155-
expectSubscriptions(e1.subscriptions).toBe(e1subs);
185+
expectObservable(e1.pipe(repeat(3))).toBe(expected);
186+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
187+
});
156188
});
157189

158190
it('should not complete when source does not completes', () => {
159-
const e1 = cold('-');
160-
const unsub = ' !';
161-
const subs = '^ !';
162-
const expected = '-';
163-
164-
expectObservable(e1.pipe(repeat(3)), unsub).toBe(expected);
165-
expectSubscriptions(e1.subscriptions).toBe(subs);
191+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
192+
const e1 = cold('-');
193+
const unsub = '------------------------------!';
194+
const subs = ' ^-----------------------------!';
195+
const expected = '-';
196+
197+
expectObservable(e1.pipe(repeat(3)), unsub).toBe(expected);
198+
expectSubscriptions(e1.subscriptions).toBe(subs);
199+
});
166200
});
167201

168202
it('should complete immediately when source does not complete without emit but count is zero', () => {
169-
const e1 = cold('-');
170-
const subs: string[] = [];
171-
const expected = '|';
203+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
204+
const e1 = cold('-');
205+
const subs: string[] = [];
206+
const expected = '|';
172207

173-
expectObservable(e1.pipe(repeat(0))).toBe(expected);
174-
expectSubscriptions(e1.subscriptions).toBe(subs);
208+
expectObservable(e1.pipe(repeat(0))).toBe(expected);
209+
expectSubscriptions(e1.subscriptions).toBe(subs);
210+
});
175211
});
176212

177213
it('should complete immediately when source does not complete but count is zero', () => {
178-
const e1 = cold('--a--b--');
179-
const subs: string[] = [];
180-
const expected = '|';
214+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
215+
const e1 = cold('--a--b--');
216+
const subs: string[] = [];
217+
const expected = '|';
181218

182-
expectObservable(e1.pipe(repeat(0))).toBe(expected);
183-
expectSubscriptions(e1.subscriptions).toBe(subs);
219+
expectObservable(e1.pipe(repeat(0))).toBe(expected);
220+
expectSubscriptions(e1.subscriptions).toBe(subs);
221+
});
184222
});
185223

186224
it('should emit source once and does not complete when source emits but does not complete', () => {
187-
const e1 = cold('--a--b--');
188-
const subs = ['^ '];
189-
const expected = '--a--b--';
225+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
226+
const e1 = cold(' --a--b--');
227+
const subs = [' ^-------'];
228+
const expected = '--a--b--';
190229

191-
expectObservable(e1.pipe(repeat(3))).toBe(expected);
192-
expectSubscriptions(e1.subscriptions).toBe(subs);
230+
expectObservable(e1.pipe(repeat(3))).toBe(expected);
231+
expectSubscriptions(e1.subscriptions).toBe(subs);
232+
});
193233
});
194234

195235
it('should complete when source is empty', () => {
196-
const e1 = cold('|');
197-
const e1subs = ['(^!)', '(^!)', '(^!)'];
198-
const expected = '|';
236+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
237+
const e1 = cold('|');
238+
const e1subs = ['(^!)', '(^!)', '(^!)'];
239+
const expected = '|';
199240

200-
expectObservable(e1.pipe(repeat(3))).toBe(expected);
201-
expectSubscriptions(e1.subscriptions).toBe(e1subs);
241+
expectObservable(e1.pipe(repeat(3))).toBe(expected);
242+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
243+
});
202244
});
203245

204246
it('should complete when source does not emit', () => {
205-
const e1 = cold('----| ');
206-
const subs = ['^ ! ',
207-
' ^ ! ',
208-
' ^ !'];
209-
const expected = '------------|';
210-
211-
expectObservable(e1.pipe(repeat(3))).toBe(expected);
212-
expectSubscriptions(e1.subscriptions).toBe(subs);
247+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
248+
const e1 = cold('----| ');
249+
const subs = [
250+
' ^---! ', //
251+
' ----^---! ',
252+
' --------^---!',
253+
];
254+
const expected = '------------|';
255+
256+
expectObservable(e1.pipe(repeat(3))).toBe(expected);
257+
expectSubscriptions(e1.subscriptions).toBe(subs);
258+
});
213259
});
214260

215261
it('should complete immediately when source does not emit but count is zero', () => {
216-
const e1 = cold('----|');
217-
const subs: string[] = [];
218-
const expected = '|';
262+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
263+
const e1 = cold('----|');
264+
const subs: string[] = [];
265+
const expected = '|';
219266

220-
expectObservable(e1.pipe(repeat(0))).toBe(expected);
221-
expectSubscriptions(e1.subscriptions).toBe(subs);
267+
expectObservable(e1.pipe(repeat(0))).toBe(expected);
268+
expectSubscriptions(e1.subscriptions).toBe(subs);
269+
});
222270
});
223271

224272
it('should raise error when source raises error', () => {
225-
const e1 = cold('--a--b--#');
226-
const subs = '^ !';
227-
const expected = '--a--b--#';
273+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
274+
const e1 = cold(' --a--b--#');
275+
const subs = ' ^-------!';
276+
const expected = '--a--b--#';
228277

229-
expectObservable(e1.pipe(repeat(2))).toBe(expected);
230-
expectSubscriptions(e1.subscriptions).toBe(subs);
278+
expectObservable(e1.pipe(repeat(2))).toBe(expected);
279+
expectSubscriptions(e1.subscriptions).toBe(subs);
280+
});
231281
});
232282

233283
it('should raises error if source throws', () => {
234-
const e1 = cold('#');
235-
const e1subs = '(^!)';
236-
const expected = '#';
284+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
285+
const e1 = cold('#');
286+
const e1subs = '(^!)';
287+
const expected = '#';
237288

238-
expectObservable(e1.pipe(repeat(3))).toBe(expected);
239-
expectSubscriptions(e1.subscriptions).toBe(e1subs);
289+
expectObservable(e1.pipe(repeat(3))).toBe(expected);
290+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
291+
});
240292
});
241293

242294
it('should raises error if source throws when repeating infinitely', () => {
243-
const e1 = cold('#');
244-
const e1subs = '(^!)';
245-
const expected = '#';
295+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
296+
const e1 = cold('#');
297+
const e1subs = '(^!)';
298+
const expected = '#';
246299

247-
expectObservable(e1.pipe(repeat())).toBe(expected);
248-
expectSubscriptions(e1.subscriptions).toBe(e1subs);
300+
expectObservable(e1.pipe(repeat())).toBe(expected);
301+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
302+
});
249303
});
250304

251305
it('should raise error after first emit succeed', () => {
252-
let repeated = false;
253-
254-
const e1 = cold('--a--|').pipe(map((x: string) => {
255-
if (repeated) {
256-
throw 'error';
257-
} else {
258-
repeated = true;
259-
return x;
260-
}
261-
}));
262-
const expected = '--a----#';
263-
264-
expectObservable(e1.pipe(repeat(2))).toBe(expected);
306+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
307+
let repeated = false;
308+
309+
const e1 = cold('--a--|').pipe(
310+
map((x: string) => {
311+
if (repeated) {
312+
throw 'error';
313+
} else {
314+
repeated = true;
315+
return x;
316+
}
317+
})
318+
);
319+
const expected = '--a----#';
320+
321+
expectObservable(e1.pipe(repeat(2))).toBe(expected);
322+
});
265323
});
266324

267325
it('should repeat a synchronous source (multicasted and refCounted) multiple times', (done) => {
268326
const expected = [1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3];
269327

270-
of(1, 2, 3).pipe(
271-
multicast(() => new Subject<number>()),
272-
refCount(),
273-
repeat(5)
274-
).subscribe(
275-
(x: number) => { expect(x).to.equal(expected.shift()); },
328+
of(1, 2, 3)
329+
.pipe(
330+
multicast(() => new Subject<number>()),
331+
refCount(),
332+
repeat(5)
333+
)
334+
.subscribe(
335+
(x: number) => {
336+
expect(x).to.equal(expected.shift());
337+
},
276338
(x) => {
277339
done(new Error('should not be called'));
278-
}, () => {
340+
},
341+
() => {
279342
expect(expected.length).to.equal(0);
280343
done();
281-
});
344+
}
345+
);
282346
});
283347

284348
it('should stop listening to a synchronous observable when unsubscribed', () => {
285349
const sideEffects: number[] = [];
286-
const synchronousObservable = new Observable<number>(subscriber => {
350+
const synchronousObservable = new Observable<number>((subscriber) => {
287351
// This will check to see if the subscriber was closed on each loop
288352
// when the unsubscribe hits (from the `take`), it should be closed
289353
for (let i = 0; !subscriber.closed && i < 10; i++) {
@@ -292,11 +356,98 @@ describe('repeat operator', () => {
292356
}
293357
});
294358

295-
synchronousObservable.pipe(
296-
repeat(),
297-
take(3),
298-
).subscribe(() => { /* noop */ });
359+
synchronousObservable.pipe(repeat(), take(3)).subscribe(() => {
360+
/* noop */
361+
});
299362

300363
expect(sideEffects).to.deep.equal([0, 1, 2]);
301364
});
365+
366+
it('should allow count configuration', () => {
367+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
368+
const e1 = cold(' --a--b--| ');
369+
const subs = [
370+
' ^-------! ', //
371+
' --------^-------! ',
372+
' ----------------^-------!',
373+
];
374+
const expected = '--a--b----a--b----a--b--|';
375+
376+
expectObservable(e1.pipe(repeat({ count: 3 }))).toBe(expected);
377+
expectSubscriptions(e1.subscriptions).toBe(subs);
378+
});
379+
});
380+
381+
it('should allow delay time configuration', () => {
382+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
383+
const e1 = cold(' --a--b--| ');
384+
const delay = 3; // ---| ---|
385+
const subs = [
386+
' ^-------! ', //
387+
' -----------^-------! ',
388+
' ----------------------^-------!',
389+
];
390+
const expected = '--a--b-------a--b-------a--b--|';
391+
392+
expectObservable(e1.pipe(repeat({ count: 3, delay }))).toBe(expected);
393+
expectSubscriptions(e1.subscriptions).toBe(subs);
394+
});
395+
});
396+
397+
it('should allow delay function configuration', () => {
398+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
399+
const expectedCounts = [1, 2, 3];
400+
401+
const e1 = cold(' --a--b--| ');
402+
const delay = 3; // ---| ---|
403+
const subs = [
404+
' ^-------! ', //
405+
' -----------^-------! ',
406+
' ----------------------^-------!',
407+
];
408+
const expected = '--a--b-------a--b-------a--b--|';
409+
410+
expectObservable(
411+
e1.pipe(
412+
repeat({
413+
count: 3,
414+
delay: (count) => {
415+
expect(count).to.equal(expectedCounts.shift());
416+
return timer(delay);
417+
},
418+
})
419+
)
420+
).toBe(expected);
421+
expectSubscriptions(e1.subscriptions).toBe(subs);
422+
});
423+
});
424+
425+
it('should handle delay function throwing', () => {
426+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
427+
const expectedCounts = [1, 2, 3];
428+
429+
const e1 = cold(' --a--b--| ');
430+
const delay = 3; // ---| ---|
431+
const subs = [
432+
' ^-------! ', //
433+
' -----------^-------! ',
434+
];
435+
const expected = '--a--b-------a--b--#';
436+
437+
expectObservable(
438+
e1.pipe(
439+
repeat({
440+
count: 3,
441+
delay: (count) => {
442+
if (count === 2) {
443+
throw 'bad';
444+
}
445+
return timer(delay);
446+
},
447+
})
448+
)
449+
).toBe(expected, undefined, 'bad');
450+
expectSubscriptions(e1.subscriptions).toBe(subs);
451+
});
452+
});
302453
});

‎src/internal/operators/repeat.ts

+97-22
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,53 @@
11
import { Subscription } from '../Subscription';
22
import { EMPTY } from '../observable/empty';
33
import { operate } from '../util/lift';
4-
import { MonoTypeOperatorFunction } from '../types';
4+
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
55
import { OperatorSubscriber } from './OperatorSubscriber';
6+
import { innerFrom } from '../observable/innerFrom';
7+
import { timer } from '../observable/timer';
8+
9+
export interface RepeatConfig {
10+
/**
11+
* The number of times to repeat the source. Defaults to `Infinity`.
12+
*/
13+
count?: number;
14+
15+
/**
16+
* If a `number`, will delay the repeat of the source by that number of milliseconds.
17+
* If a function, it will provide the number of times the source has been subscribed to,
18+
* and the return value should be a valid observable input that will notify when the source
19+
* should be repeated. If the notifier observable is empty, the result will complete.
20+
*/
21+
delay?: number | ((count: number) => ObservableInput<any>);
22+
}
623

724
/**
8-
* Returns an Observable that will resubscribe to the source stream when the source stream completes, at most count times.
25+
* Returns an Observable that will resubscribe to the source stream when the source stream completes.
926
*
1027
* <span class="informal">Repeats all values emitted on the source. It's like {@link retry}, but for non error cases.</span>
1128
*
1229
* ![](repeat.png)
1330
*
14-
* Similar to {@link retry}, this operator repeats the stream of items emitted by the source for non error cases.
15-
* Repeat can be useful for creating observables that are meant to have some repeated pattern or rhythm.
31+
* Repeat will output values from a source until the source completes, then it will resubscribe to the
32+
* source a specified number of times, with a specified delay. Repeat can be particularly useful in
33+
* combination with closing operators like {@link take}, {@link takeUntil}, {@link first}, or {@link takeWhile},
34+
* as it can be used to restart a source again from scratch.
35+
*
36+
* Repeat is very similar to {@link retry}, where {@link retry} will resubscribe to the source in the error case, but
37+
* `repeat` will resubscribe if the source completes.
38+
*
39+
* Note that `repeat` will _not_ catch errors. Use {@link retry} for that.
1640
*
17-
* Note: `repeat(0)` returns an empty observable and `repeat()` will repeat forever
41+
* - `repeat(0)` returns an empty observable
42+
* - `repeat()` will repeat forever
43+
* - `repeat({ delay: 200 })` will repeat forever, with a delay of 200ms between repetitions.
44+
* - `repeat({ count: 2, delay: 400 })` will repeat twice, with a delay of 400ms between repetitions.
45+
* - `repeat({ delay: (count) => timer(count * 1000) })` will repeat forever, but will have a delay that grows by one second for each repetition.
1846
*
1947
* ## Example
2048
* Repeat a message stream
2149
* ```ts
22-
* import { of } from 'rxjs';
23-
* import { repeat } from 'rxjs/operators';
50+
* import { of, repeat } from 'rxjs';
2451
*
2552
* const source = of('Repeat message');
2653
* const example = source.pipe(repeat(3));
@@ -50,29 +77,78 @@ import { OperatorSubscriber } from './OperatorSubscriber';
5077
* // 2
5178
* ```
5279
*
80+
* Defining two complex repeats with delays on the same source.
81+
* Note that the second repeat cannot be called until the first
82+
* repeat as exhausted it's count.
83+
*
84+
* ```ts
85+
* import { defer, of, repeat } from 'rxjs';
86+
*
87+
* const source = defer(() => {
88+
* return of(`Hello, it is ${new Date()}`)
89+
* });
90+
*
91+
* source.pipe(
92+
* // Repeat 3 times with a delay of 1 second between repetitions
93+
* repeat({
94+
* count: 3,
95+
* delay: 1000,
96+
* }),
97+
*
98+
* // *Then* repeat forever, but with an exponential step-back
99+
* // maxing out at 1 minute.
100+
* repeat({
101+
* delay: (count) => timer(Math.min(60000, 2 ^ count * 1000))
102+
* })
103+
* )
104+
* ```
105+
*
53106
* @see {@link repeatWhen}
54107
* @see {@link retry}
55108
*
56-
* @param {number} [count] The number of times the source Observable items are repeated, a count of 0 will yield
109+
* @param count The number of times the source Observable items are repeated, a count of 0 will yield
57110
* an empty Observable.
58-
* @return A function that returns an Observable that will resubscribe to the
59-
* source stream when the source stream completes, at most `count` times.
60111
*/
61-
export function repeat<T>(count = Infinity): MonoTypeOperatorFunction<T> {
112+
export function repeat<T>(countOrConfig?: number | RepeatConfig): MonoTypeOperatorFunction<T> {
113+
let count = Infinity;
114+
let delay: RepeatConfig['delay'];
115+
116+
if (countOrConfig != null) {
117+
if (typeof countOrConfig === 'object') {
118+
({ count = Infinity, delay } = countOrConfig);
119+
} else {
120+
count = countOrConfig;
121+
}
122+
}
123+
62124
return count <= 0
63125
? () => EMPTY
64126
: operate((source, subscriber) => {
65127
let soFar = 0;
66-
let innerSub: Subscription | null;
67-
const subscribeForRepeat = () => {
128+
let sourceSub: Subscription | null;
129+
130+
const resubscribe = () => {
131+
sourceSub?.unsubscribe();
132+
sourceSub = null;
133+
if (delay != null) {
134+
const notifier = typeof delay === 'number' ? timer(delay) : innerFrom(delay(soFar));
135+
const notifierSubscriber = new OperatorSubscriber(subscriber, () => {
136+
notifierSubscriber.unsubscribe();
137+
subscribeToSource();
138+
});
139+
notifier.subscribe(notifierSubscriber);
140+
} else {
141+
subscribeToSource();
142+
}
143+
};
144+
145+
const subscribeToSource = () => {
68146
let syncUnsub = false;
69-
innerSub = source.subscribe(
147+
sourceSub = source.subscribe(
70148
new OperatorSubscriber(subscriber, undefined, () => {
71149
if (++soFar < count) {
72-
if (innerSub) {
73-
innerSub.unsubscribe();
74-
innerSub = null;
75-
subscribeForRepeat();
150+
if (sourceSub) {
151+
resubscribe();
76152
} else {
77153
syncUnsub = true;
78154
}
@@ -83,11 +159,10 @@ export function repeat<T>(count = Infinity): MonoTypeOperatorFunction<T> {
83159
);
84160

85161
if (syncUnsub) {
86-
innerSub.unsubscribe();
87-
innerSub = null;
88-
subscribeForRepeat();
162+
resubscribe();
89163
}
90164
};
91-
subscribeForRepeat();
165+
166+
subscribeToSource();
92167
});
93168
}

0 commit comments

Comments
 (0)
Please sign in to comment.