diff --git a/spec/operators/retry-spec.ts b/spec/operators/retry-spec.ts index 3d79fce1e7..f42a6a301a 100644 --- a/spec/operators/retry-spec.ts +++ b/spec/operators/retry-spec.ts @@ -1,7 +1,7 @@ /** @prettier */ import { expect } from 'chai'; import { retry, map, take, mergeMap, concat, multicast, refCount } from 'rxjs/operators'; -import { Observable, Observer, defer, range, of, throwError, Subject } from 'rxjs'; +import { Observable, Observer, defer, range, of, throwError, Subject, timer, EMPTY } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -411,4 +411,248 @@ describe('retry', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); }); + + describe('with delay config', () => { + describe('of a number', () => { + it('should delay the retry by a specified amount of time', () => { + rxTest.run(({ cold, time, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const t = time(' ----|'); + const subs = [ + // + ' ^----------!', + ' ---------------^----------!', + ' ------------------------------^----------!', + ' ---------------------------------------------^----!', + ]; + const unsub = ' ^-------------------------------------------------!'; + const expected = ' ---a---b----------a---b----------a---b----------a--'; + const result = source.pipe( + retry({ + delay: t, + }) + ); + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should act like a normal retry if delay is set to 0', () => { + rxTest.run(({ cold, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const subs = [ + // + ' ^----------!', + ' -----------^----------!', + ' ----------------------^----------!', + ' ---------------------------------^----!', + ]; + const unsub = ' ^-------------------------------------!'; + const expected = ' ---a---b------a---b------a---b------a--'; + const result = source.pipe( + retry({ + delay: 0, + }) + ); + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should act like a normal retry if delay is less than 0', () => { + rxTest.run(({ cold, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const subs = [ + // + ' ^----------!', + ' -----------^----------!', + ' ----------------------^----------!', + ' ---------------------------------^----!', + ]; + const unsub = ' ^-------------------------------------!'; + const expected = ' ---a---b------a---b------a---b------a--'; + const result = source.pipe( + retry({ + delay: -100, + }) + ); + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should honor count as the max retries', () => { + rxTest.run(({ cold, time, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const t = time(' ----|'); + const subs = [ + // + ' ^----------!', + ' ---------------^----------!', + ' ------------------------------^----------!', + ]; + const expected = ' ---a---b----------a---b----------a---b---#'; + const result = source.pipe( + retry({ + count: 2, + delay: t, + }) + ); + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + }); + + describe('of a function', () => { + it('should delay the retry with a function that returns a notifier', () => { + rxTest.run(({ cold, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const subs = [ + // + ' ^----------!', + ' ------------^----------!', + ' -------------------------^----------!', + ' ---------------------------------------^----!', + ]; + const unsub = ' ^-------------------------------------------!'; + const expected = ' ---a---b-------a---b--------a---b---------a--'; + const result = source.pipe( + retry({ + delay: (_err, retryCount) => { + // retryCount will be 1, 2, 3, etc. + return timer(retryCount); + }, + }) + ); + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should delay the retry with a function that returns a hot observable', () => { + rxTest.run(({ cold, hot, expectSubscriptions, expectObservable }) => { + const source = cold(' ---a---b---#'); + const notifier = hot('--------------x----------------x----------------x------'); + const subs = [ + // + ' ^----------!', + ' --------------^----------!', + ' -------------------------------^----------!', + ]; + const notifierSubs = [ + // + ' -----------^--!', + ' -------------------------^-----!', + ' ------------------------------------------^-!', + ]; + const unsub = ' ^-------------------------------------------!'; + const expected = ' ---a---b---------a---b------------a---b------'; + const result = source.pipe( + retry({ + delay: () => notifier, + }) + ); + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + expectSubscriptions(notifier.subscriptions).toBe(notifierSubs); + }); + }); + + it('should complete if the notifier completes', () => { + rxTest.run(({ cold, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const subs = [ + // + ' ^----------!', + ' ------------^----------!', + ' -------------------------^----------!', + ' ------------------------------------!', + ]; + const expected = ' ---a---b-------a---b--------a---b---|'; + const result = source.pipe( + retry({ + delay: (_err, retryCount) => { + return retryCount <= 2 ? timer(retryCount) : EMPTY; + }, + }) + ); + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should error if the notifier errors', () => { + rxTest.run(({ cold, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const subs = [ + // + ' ^----------!', + ' ------------^----------!', + ' -------------------------^----------!', + ' ------------------------------------!', + ]; + const expected = ' ---a---b-------a---b--------a---b---#'; + const result = source.pipe( + retry({ + delay: (_err, retryCount) => { + return retryCount <= 2 ? timer(retryCount) : throwError(() => new Error('blah')); + }, + }) + ); + expectObservable(result).toBe(expected, undefined, new Error('blah')); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should error if the delay function throws', () => { + rxTest.run(({ cold, expectSubscriptions, expectObservable }) => { + const source = cold('---a---b---#'); + const subs = [ + // + ' ^----------!', + ' ------------^----------!', + ' -------------------------^----------!', + ' ------------------------------------!', + ]; + const expected = ' ---a---b-------a---b--------a---b---#'; + const result = source.pipe( + retry({ + delay: (_err, retryCount) => { + if (retryCount <= 2) { + return timer(retryCount); + } else { + throw new Error('blah'); + } + }, + }) + ); + expectObservable(result).toBe(expected, undefined, new Error('blah')); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should be usable for exponential backoff', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source = cold('---a---#'); + const subs = [ + // + ' ^------!', + ' ---------^------!', + ' --------------------^------!', + ' -----------------------------------^------!', + ]; + const expected = ' ---a--------a----------a--------------a---#'; + const result = source.pipe( + retry({ + count: 3, + delay: (_err, retryCount) => timer(2 ** retryCount), + }) + ); + expectObservable(result).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + }); + }); }); diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index 9d41a4d210..cdbe08bbec 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -1,11 +1,28 @@ -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { operate } from '../util/lift'; import { Subscription } from '../Subscription'; import { OperatorSubscriber } from './OperatorSubscriber'; import { identity } from '../util/identity'; +import { timer } from '../observable/timer'; +import { innerFrom } from '../observable/from'; export interface RetryConfig { - count: number; + /** + * The maximum number of times to retry. + */ + count?: number; + /** + * The number of milliseconds to delay before retrying, OR a function to + * return a notifier for delaying. If a function is returned, that function should + * return a notifier that, when it emits will retry the source. If the notifier + * completes _without_ emitting, the resulting observable will complete without error, + * if the notifier errors, the error will be pushed to the result. + */ + delay?: number | ((error: any, retryCount: number) => ObservableInput); + /** + * Whether or not to reset the retry counter when the retried subscription + * emits its first value. + */ resetOnSuccess?: boolean; } @@ -50,13 +67,22 @@ export interface RetryConfig { * // "Error!: Retried 2 times then quit!" * ``` * - * @param {number} count - Number of retry attempts before failing. - * @param {boolean} resetOnSuccess - When set to `true` every successful emission will reset the error count + * @param count - Number of retry attempts before failing. + * @param resetOnSuccess - When set to `true` every successful emission will reset the error count * @return A function that returns an Observable that will resubscribe to the * source stream when the source stream errors, at most `count` times. */ export function retry(count?: number): MonoTypeOperatorFunction; + +/** + * Returns an observable that mirrors the source observable unless it errors. If it errors, the source observable + * will be resubscribed to (or "retried") based on the configuration passed here. See documentation + * for {@link RetryConfig} for more details. + * + * @param config - The retry configuration + */ export function retry(config: RetryConfig): MonoTypeOperatorFunction; + export function retry(configOrCount: number | RetryConfig = Infinity): MonoTypeOperatorFunction { let config: RetryConfig; if (configOrCount && typeof configOrCount === 'object') { @@ -66,7 +92,7 @@ export function retry(configOrCount: number | RetryConfig = Infinity): MonoTy count: configOrCount, }; } - const { count, resetOnSuccess = false } = config; + const { count = Infinity, delay, resetOnSuccess: resetOnSuccess = false } = config; return count <= 0 ? identity @@ -79,6 +105,7 @@ export function retry(configOrCount: number | RetryConfig = Infinity): MonoTy new OperatorSubscriber( subscriber, (value) => { + // If we're resetting on success if (resetOnSuccess) { soFar = 0; } @@ -88,14 +115,45 @@ export function retry(configOrCount: number | RetryConfig = Infinity): MonoTy undefined, (err) => { if (soFar++ < count) { - if (innerSub) { - innerSub.unsubscribe(); - innerSub = null; - subscribeForRetry(); + // We are still under our retry count + const resub = () => { + if (innerSub) { + innerSub.unsubscribe(); + innerSub = null; + subscribeForRetry(); + } else { + syncUnsub = true; + } + }; + + if (delay != null) { + // The user specified a retry delay. + // They gave us a number, use a timer, otherwise, it's a function, + // and we're going to call it to get a notifier. + const notifier = typeof delay === 'number' ? timer(delay) : innerFrom(delay(err, soFar)); + const notifierSubscriber = new OperatorSubscriber( + subscriber, + () => { + // After we get the first notification, we + // unsubscribe from the notifer, because we don't want anymore + // and we resubscribe to the source. + notifierSubscriber.unsubscribe(); + resub(); + }, + () => { + // The notifier completed without emitting. + // The author is telling us they want to complete. + subscriber.complete(); + } + ); + notifier.subscribe(notifierSubscriber); } else { - syncUnsub = true; + // There was no notifier given. Just resub immediately. + resub(); } } else { + // We're past our maximum number of retries. + // Just send along the error. subscriber.error(err); } }