Skip to content

Commit

Permalink
feat(delayWhen): delayWhen's delayDurationSelector should support…
Browse files Browse the repository at this point in the history
… `ObservableInput` (#7049)

* feat(delayWhen): `delayWhen`'s `delayDurationSelector` should support `ObservableInput`

* test(delayWhen): support Promises in delayWhen

* chore: speed-up the tests
  • Loading branch information
jakovljevic-mladen committed Dec 15, 2022
1 parent 61b877a commit dfd95db
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 19 deletions.
10 changes: 7 additions & 3 deletions spec-dtslint/operators/delayWhen-spec.ts
Expand Up @@ -10,20 +10,24 @@ it('should support an empty notifier', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => NEVER)); // $ExpectType Observable<number>
});

it('should support a subscriptiondelayWhen parameter', () => {
it('should support a subscriptionDelay parameter', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => of('a', 'b', 'c'), of(new Date()))); // $ExpectType Observable<number>
});

it('should enforce types', () => {
const o = of(1, 2, 3).pipe(delayWhen()); // $ExpectError
});

it('should enforce types of delayWhenDurationSelector', () => {
it('should enforce types of delayDurationSelector', () => {
const o = of(1, 2, 3).pipe(delayWhen(of('a', 'b', 'c'))); // $ExpectError
const p = of(1, 2, 3).pipe(delayWhen((value: string, index) => of('a', 'b', 'c'))); // $ExpectError
const q = of(1, 2, 3).pipe(delayWhen((value, index: string) => of('a', 'b', 'c'))); // $ExpectError
});

it('should enforce types of subscriptiondelayWhen', () => {
it('should enforce types of subscriptionDelay', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => of('a', 'b', 'c'), 'a')); // $ExpectError
});

it('should support Promises', () => {
const o = of(1, 2, 3).pipe(delayWhen(() => Promise.resolve('a'))); // $ExpectType Observable<number>
});
4 changes: 2 additions & 2 deletions spec/operators/audit-spec.ts
Expand Up @@ -395,7 +395,7 @@ describe('audit operator', () => {
});

it('should audit by promise resolves', (done) => {
const e1 = interval(10).pipe(take(5));
const e1 = interval(1).pipe(take(5));
const expected = [0, 1, 2, 3, 4];

e1.pipe(audit(() => Promise.resolve(42))).subscribe({
Expand All @@ -413,7 +413,7 @@ describe('audit operator', () => {
});

it('should raise error when promise rejects', (done) => {
const e1 = interval(10).pipe(take(10));
const e1 = interval(1).pipe(take(10));
const expected = [0, 1, 2];
const error = new Error('error');

Expand Down
40 changes: 39 additions & 1 deletion spec/operators/delayWhen-spec.ts
@@ -1,4 +1,4 @@
import { of, EMPTY } from 'rxjs';
import { of, EMPTY, interval, take } from 'rxjs';
import { delayWhen, tap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
Expand Down Expand Up @@ -338,4 +338,42 @@ describe('delayWhen', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should delayWhen Promise resolves', (done) => {
const e1 = interval(1).pipe(take(5));
const expected = [0, 1, 2, 3, 4];

e1.pipe(delayWhen(() => Promise.resolve(42))).subscribe({
next: (x: number) => {
expect(x).to.equal(expected.shift());
},
error: () => {
done(new Error('should not be called'));
},
complete: () => {
expect(expected.length).to.equal(0);
done();
},
});
});

it('should raise error when Promise rejects', (done) => {
const e1 = interval(1).pipe(take(10));
const expected = [0, 1, 2];
const error = new Error('err');

e1.pipe(delayWhen((x) => (x === 3 ? Promise.reject(error) : Promise.resolve(42)))).subscribe({
next: (x: number) => {
expect(x).to.equal(expected.shift());
},
error: (err: any) => {
expect(err).to.be.an('error');
expect(expected.length).to.equal(0);
done();
},
complete: () => {
done(new Error('should not be called'));
},
});
});
});
29 changes: 16 additions & 13 deletions src/internal/operators/delayWhen.ts
@@ -1,17 +1,18 @@
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { MonoTypeOperatorFunction, ObservableInput } from '../types';
import { concat } from '../observable/concat';
import { take } from './take';
import { ignoreElements } from './ignoreElements';
import { mapTo } from './mapTo';
import { mergeMap } from './mergeMap';
import { innerFrom } from '../observable/innerFrom';

/** @deprecated The `subscriptionDelay` parameter will be removed in v8. */
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>,
delayDurationSelector: (value: T, index: number) => ObservableInput<any>,
subscriptionDelay: Observable<any>
): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => ObservableInput<any>): MonoTypeOperatorFunction<T>;

/**
* Delays the emission of items from the source Observable by a given time span
Expand All @@ -26,8 +27,9 @@ export function delayWhen<T>(delayDurationSelector: (value: T, index: number) =>
* a time span determined by another Observable. When the source emits a value,
* the `delayDurationSelector` function is called with the value emitted from
* the source Observable as the first argument to the `delayDurationSelector`.
* The `delayDurationSelector` function should return an Observable, called
* the "duration" Observable.
* The `delayDurationSelector` function should return an {@link ObservableInput},
* that is internally converted to an Observable that is called the "duration"
* Observable.
*
* The source value is emitted on the output Observable only when the "duration"
* Observable emits ({@link guide/glossary-and-semantics#next next}s) any value.
Expand Down Expand Up @@ -76,18 +78,19 @@ export function delayWhen<T>(delayDurationSelector: (value: T, index: number) =>
* @see {@link audit}
* @see {@link auditTime}
*
* @param {function(value: T, index: number): Observable} delayDurationSelector A function that
* returns an Observable for each value emitted by the source Observable, which
* is then used to delay the emission of that item on the output Observable
* until the Observable returned from this function emits a value.
* @param {Observable} subscriptionDelay An Observable that triggers the
* subscription to the source Observable once it emits any value.
* @param delayDurationSelector A function that returns an `ObservableInput` for
* each `value` emitted by the source Observable, which is then used to delay the
* emission of that `value` on the output Observable until the `ObservableInput`
* returned from this function emits a next value. When called, beside `value`,
* this function receives a zero-based `index` of the emission order.
* @param subscriptionDelay An Observable that triggers the subscription to the
* source Observable once it emits any value.
* @return A function that returns an Observable that delays the emissions of
* the source Observable by an amount of time specified by the Observable
* returned by `delayDurationSelector`.
*/
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>,
delayDurationSelector: (value: T, index: number) => ObservableInput<any>,
subscriptionDelay?: Observable<any>
): MonoTypeOperatorFunction<T> {
if (subscriptionDelay) {
Expand All @@ -96,5 +99,5 @@ export function delayWhen<T>(
concat(subscriptionDelay.pipe(take(1), ignoreElements()), source.pipe(delayWhen(delayDurationSelector)));
}

return mergeMap((value, index) => delayDurationSelector(value, index).pipe(take(1), mapTo(value)));
return mergeMap((value, index) => innerFrom(delayDurationSelector(value, index)).pipe(take(1), mapTo(value)));
}

0 comments on commit dfd95db

Please sign in to comment.