Skip to content

Commit

Permalink
feat(retry): Now supports configurable delay as a named argument (#6421)
Browse files Browse the repository at this point in the history
* feat(retry): Add configurable delay

- Adds a `delay` configuration that allows the user to create simpler exponential backoff, simple retry delays, and/or other functionality.

* chore: Address comments

* chore: revert breaking change and use resetOnSuccess
  • Loading branch information
benlesh committed Jul 28, 2021
1 parent 69f5bfa commit 5f69795
Show file tree
Hide file tree
Showing 2 changed files with 313 additions and 11 deletions.
246 changes: 245 additions & 1 deletion spec/operators/retry-spec.ts
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { retry, map, take, mergeMap, concat, multicast, refCount } from 'rxjs/operators';
import { Observable, Observer, defer, range, of, throwError, Subject } from 'rxjs';
import { Observable, Observer, defer, range, of, throwError, Subject, timer, EMPTY } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -411,4 +411,248 @@ describe('retry', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

describe('with delay config', () => {
describe('of a number', () => {
it('should delay the retry by a specified amount of time', () => {
rxTest.run(({ cold, time, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const t = time(' ----|');
const subs = [
//
' ^----------!',
' ---------------^----------!',
' ------------------------------^----------!',
' ---------------------------------------------^----!',
];
const unsub = ' ^-------------------------------------------------!';
const expected = ' ---a---b----------a---b----------a---b----------a--';
const result = source.pipe(
retry({
delay: t,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should act like a normal retry if delay is set to 0', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' -----------^----------!',
' ----------------------^----------!',
' ---------------------------------^----!',
];
const unsub = ' ^-------------------------------------!';
const expected = ' ---a---b------a---b------a---b------a--';
const result = source.pipe(
retry({
delay: 0,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should act like a normal retry if delay is less than 0', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' -----------^----------!',
' ----------------------^----------!',
' ---------------------------------^----!',
];
const unsub = ' ^-------------------------------------!';
const expected = ' ---a---b------a---b------a---b------a--';
const result = source.pipe(
retry({
delay: -100,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should honor count as the max retries', () => {
rxTest.run(({ cold, time, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const t = time(' ----|');
const subs = [
//
' ^----------!',
' ---------------^----------!',
' ------------------------------^----------!',
];
const expected = ' ---a---b----------a---b----------a---b---#';
const result = source.pipe(
retry({
count: 2,
delay: t,
})
);
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});

describe('of a function', () => {
it('should delay the retry with a function that returns a notifier', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ---------------------------------------^----!',
];
const unsub = ' ^-------------------------------------------!';
const expected = ' ---a---b-------a---b--------a---b---------a--';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
// retryCount will be 1, 2, 3, etc.
return timer(retryCount);
},
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should delay the retry with a function that returns a hot observable', () => {
rxTest.run(({ cold, hot, expectSubscriptions, expectObservable }) => {
const source = cold(' ---a---b---#');
const notifier = hot('--------------x----------------x----------------x------');
const subs = [
//
' ^----------!',
' --------------^----------!',
' -------------------------------^----------!',
];
const notifierSubs = [
//
' -----------^--!',
' -------------------------^-----!',
' ------------------------------------------^-!',
];
const unsub = ' ^-------------------------------------------!';
const expected = ' ---a---b---------a---b------------a---b------';
const result = source.pipe(
retry({
delay: () => notifier,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(notifier.subscriptions).toBe(notifierSubs);
});
});

it('should complete if the notifier completes', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ------------------------------------!',
];
const expected = ' ---a---b-------a---b--------a---b---|';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
return retryCount <= 2 ? timer(retryCount) : EMPTY;
},
})
);
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should error if the notifier errors', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ------------------------------------!',
];
const expected = ' ---a---b-------a---b--------a---b---#';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
return retryCount <= 2 ? timer(retryCount) : throwError(() => new Error('blah'));
},
})
);
expectObservable(result).toBe(expected, undefined, new Error('blah'));
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should error if the delay function throws', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ------------------------------------!',
];
const expected = ' ---a---b-------a---b--------a---b---#';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
if (retryCount <= 2) {
return timer(retryCount);
} else {
throw new Error('blah');
}
},
})
);
expectObservable(result).toBe(expected, undefined, new Error('blah'));
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should be usable for exponential backoff', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('---a---#');
const subs = [
//
' ^------!',
' ---------^------!',
' --------------------^------!',
' -----------------------------------^------!',
];
const expected = ' ---a--------a----------a--------------a---#';
const result = source.pipe(
retry({
count: 3,
delay: (_err, retryCount) => timer(2 ** retryCount),
})
);
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
});
});
78 changes: 68 additions & 10 deletions src/internal/operators/retry.ts
@@ -1,11 +1,28 @@
import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { Subscription } from '../Subscription';
import { OperatorSubscriber } from './OperatorSubscriber';
import { identity } from '../util/identity';
import { timer } from '../observable/timer';
import { innerFrom } from '../observable/from';

export interface RetryConfig {
count: number;
/**
* The maximum number of times to retry.
*/
count?: number;
/**
* The number of milliseconds to delay before retrying, OR a function to
* return a notifier for delaying. If a function is returned, that function should
* return a notifier that, when it emits will retry the source. If the notifier
* completes _without_ emitting, the resulting observable will complete without error,
* if the notifier errors, the error will be pushed to the result.
*/
delay?: number | ((error: any, retryCount: number) => ObservableInput<any>);
/**
* Whether or not to reset the retry counter when the retried subscription
* emits its first value.
*/
resetOnSuccess?: boolean;
}

Expand Down Expand Up @@ -50,13 +67,22 @@ export interface RetryConfig {
* // "Error!: Retried 2 times then quit!"
* ```
*
* @param {number} count - Number of retry attempts before failing.
* @param {boolean} resetOnSuccess - When set to `true` every successful emission will reset the error count
* @param count - Number of retry attempts before failing.
* @param resetOnSuccess - When set to `true` every successful emission will reset the error count
* @return A function that returns an Observable that will resubscribe to the
* source stream when the source stream errors, at most `count` times.
*/
export function retry<T>(count?: number): MonoTypeOperatorFunction<T>;

/**
* Returns an observable that mirrors the source observable unless it errors. If it errors, the source observable
* will be resubscribed to (or "retried") based on the configuration passed here. See documentation
* for {@link RetryConfig} for more details.
*
* @param config - The retry configuration
*/
export function retry<T>(config: RetryConfig): MonoTypeOperatorFunction<T>;

export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTypeOperatorFunction<T> {
let config: RetryConfig;
if (configOrCount && typeof configOrCount === 'object') {
Expand All @@ -66,7 +92,7 @@ export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTy
count: configOrCount,
};
}
const { count, resetOnSuccess = false } = config;
const { count = Infinity, delay, resetOnSuccess: resetOnSuccess = false } = config;

return count <= 0
? identity
Expand All @@ -79,6 +105,7 @@ export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTy
new OperatorSubscriber(
subscriber,
(value) => {
// If we're resetting on success
if (resetOnSuccess) {
soFar = 0;
}
Expand All @@ -88,14 +115,45 @@ export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTy
undefined,
(err) => {
if (soFar++ < count) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscribeForRetry();
// We are still under our retry count
const resub = () => {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscribeForRetry();
} else {
syncUnsub = true;
}
};

if (delay != null) {
// The user specified a retry delay.
// They gave us a number, use a timer, otherwise, it's a function,
// and we're going to call it to get a notifier.
const notifier = typeof delay === 'number' ? timer(delay) : innerFrom(delay(err, soFar));
const notifierSubscriber = new OperatorSubscriber(
subscriber,
() => {
// After we get the first notification, we
// unsubscribe from the notifer, because we don't want anymore
// and we resubscribe to the source.
notifierSubscriber.unsubscribe();
resub();
},
() => {
// The notifier completed without emitting.
// The author is telling us they want to complete.
subscriber.complete();
}
);
notifier.subscribe(notifierSubscriber);
} else {
syncUnsub = true;
// There was no notifier given. Just resub immediately.
resub();
}
} else {
// We're past our maximum number of retries.
// Just send along the error.
subscriber.error(err);
}
}
Expand Down

0 comments on commit 5f69795

Please sign in to comment.