Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(retry): Add configurable delay #6421

Merged
merged 3 commits into from Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
246 changes: 245 additions & 1 deletion spec/operators/retry-spec.ts
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { retry, map, take, mergeMap, concat, multicast, refCount } from 'rxjs/operators';
import { Observable, Observer, defer, range, of, throwError, Subject } from 'rxjs';
import { Observable, Observer, defer, range, of, throwError, Subject, timer, EMPTY } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -411,4 +411,248 @@ describe('retry', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

describe('with delay config', () => {
describe('of a number', () => {
it('should delay the retry by a specified amount of time', () => {
rxTest.run(({ cold, time, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const t = time(' ----|');
const subs = [
//
' ^----------!',
' ---------------^----------!',
' ------------------------------^----------!',
' ---------------------------------------------^----!',
];
const unsub = ' ^-------------------------------------------------!';
const expected = ' ---a---b----------a---b----------a---b----------a--';
const result = source.pipe(
retry({
delay: t,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should act like a normal retry if delay is set to 0', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' -----------^----------!',
' ----------------------^----------!',
' ---------------------------------^----!',
];
const unsub = ' ^-------------------------------------!';
const expected = ' ---a---b------a---b------a---b------a--';
const result = source.pipe(
retry({
delay: 0,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should act like a normal retry if delay is less than 0', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' -----------^----------!',
' ----------------------^----------!',
' ---------------------------------^----!',
];
const unsub = ' ^-------------------------------------!';
const expected = ' ---a---b------a---b------a---b------a--';
const result = source.pipe(
retry({
delay: -100,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should honor count as the max retries', () => {
rxTest.run(({ cold, time, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const t = time(' ----|');
const subs = [
//
' ^----------!',
' ---------------^----------!',
' ------------------------------^----------!',
];
const expected = ' ---a---b----------a---b----------a---b---#';
const result = source.pipe(
retry({
count: 2,
delay: t,
})
);
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});

describe('of a function', () => {
it('should delay the retry with a function that returns a notifier', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ---------------------------------------^----!',
];
const unsub = ' ^-------------------------------------------!';
const expected = ' ---a---b-------a---b--------a---b---------a--';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
// retryCount will be 1, 2, 3, etc.
return timer(retryCount);
},
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should delay the retry with a function that returns a hot observable', () => {
rxTest.run(({ cold, hot, expectSubscriptions, expectObservable }) => {
const source = cold(' ---a---b---#');
const notifier = hot('--------------x----------------x----------------x------');
const subs = [
//
' ^----------!',
' --------------^----------!',
' -------------------------------^----------!',
];
const notifierSubs = [
//
' -----------^--!',
' -------------------------^-----!',
' ------------------------------------------^-!',
];
const unsub = ' ^-------------------------------------------!';
const expected = ' ---a---b---------a---b------------a---b------';
const result = source.pipe(
retry({
delay: () => notifier,
})
);
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(notifier.subscriptions).toBe(notifierSubs);
});
});

it('should complete if the notifier completes', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ------------------------------------!',
];
const expected = ' ---a---b-------a---b--------a---b---|';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
return retryCount <= 2 ? timer(retryCount) : EMPTY;
},
})
);
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should error if the notifier errors', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ------------------------------------!',
];
const expected = ' ---a---b-------a---b--------a---b---#';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
return retryCount <= 2 ? timer(retryCount) : throwError(() => new Error('blah'));
},
})
);
expectObservable(result).toBe(expected, undefined, new Error('blah'));
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should error if the delay function throws', () => {
rxTest.run(({ cold, expectSubscriptions, expectObservable }) => {
const source = cold('---a---b---#');
const subs = [
//
' ^----------!',
' ------------^----------!',
' -------------------------^----------!',
' ------------------------------------!',
];
const expected = ' ---a---b-------a---b--------a---b---#';
const result = source.pipe(
retry({
delay: (_err, retryCount) => {
if (retryCount <= 2) {
return timer(retryCount);
} else {
throw new Error('blah');
}
},
})
);
expectObservable(result).toBe(expected, undefined, new Error('blah'));
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should be usable for exponential backoff', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('---a---#');
const subs = [
//
' ^------!',
' ---------^------!',
' --------------------^------!',
' -----------------------------------^------!',
];
const expected = ' ---a--------a----------a--------------a---#';
const result = source.pipe(
retry({
count: 3,
delay: (_err, retryCount) => timer(2 ** retryCount),
})
);
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
});
});
78 changes: 68 additions & 10 deletions src/internal/operators/retry.ts
@@ -1,11 +1,28 @@
import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { Subscription } from '../Subscription';
import { OperatorSubscriber } from './OperatorSubscriber';
import { identity } from '../util/identity';
import { timer } from '../observable/timer';
import { innerFrom } from '../observable/from';

export interface RetryConfig {
count: number;
/**
* The maximum number of times to retry.
*/
count?: number;
/**
* The number of milliseconds to delay before retrying, OR a function to
* return a notifier for delaying. If a function is returned, that function should
* return a notifier that, when it emits will retry the source. If the notifier
* completes _without_ emitting, the resulting observable will complete without error,
* if the notifier errors, the error will be pushed to the result.
*/
delay?: number | ((error: any, retryCount: number) => ObservableInput<any>);
/**
* Whether or not to reset the retry counter when the retried subscription
* emits its first value.
*/
resetOnSuccess?: boolean;
}

Expand Down Expand Up @@ -50,13 +67,22 @@ export interface RetryConfig {
* // "Error!: Retried 2 times then quit!"
* ```
*
* @param {number} count - Number of retry attempts before failing.
* @param {boolean} resetOnSuccess - When set to `true` every successful emission will reset the error count
* @param count - Number of retry attempts before failing.
* @param resetOnSuccess - When set to `true` every successful emission will reset the error count
* @return A function that returns an Observable that will resubscribe to the
* source stream when the source stream errors, at most `count` times.
*/
export function retry<T>(count?: number): MonoTypeOperatorFunction<T>;

/**
* Returns an observable that mirrors the source observable unless it errors. If it errors, the source observable
* will be resubscribed to (or "retried") based on the configuration passed here. See documentation
* for {@link RetryConfig} for more details.
*
* @param config - The retry configuration
*/
export function retry<T>(config: RetryConfig): MonoTypeOperatorFunction<T>;

export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTypeOperatorFunction<T> {
let config: RetryConfig;
if (configOrCount && typeof configOrCount === 'object') {
Expand All @@ -66,7 +92,7 @@ export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTy
count: configOrCount,
};
}
const { count, resetOnSuccess = false } = config;
const { count = Infinity, delay, resetOnSuccess: resetOnSuccess = false } = config;

return count <= 0
? identity
Expand All @@ -79,6 +105,7 @@ export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTy
new OperatorSubscriber(
subscriber,
(value) => {
// If we're resetting on success
if (resetOnSuccess) {
soFar = 0;
}
Expand All @@ -88,14 +115,45 @@ export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTy
undefined,
(err) => {
if (soFar++ < count) {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscribeForRetry();
// We are still under our retry count
const resub = () => {
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
subscribeForRetry();
} else {
syncUnsub = true;
}
};

if (delay != null) {
// The user specified a retry delay.
// They gave us a number, use a timer, otherwise, it's a function,
// and we're going to call it to get a notifier.
const notifier = typeof delay === 'number' ? timer(delay) : innerFrom(delay(err, soFar));
const notifierSubscriber = new OperatorSubscriber(
subscriber,
() => {
// After we get the first notification, we
// unsubscribe from the notifer, because we don't want anymore
// and we resubscribe to the source.
notifierSubscriber.unsubscribe();
resub();
},
() => {
// The notifier completed without emitting.
// The author is telling us they want to complete.
subscriber.complete();
}
);
notifier.subscribe(notifierSubscriber);
} else {
syncUnsub = true;
// There was no notifier given. Just resub immediately.
resub();
}
} else {
// We're past our maximum number of retries.
// Just send along the error.
subscriber.error(err);
}
}
Expand Down