From dfd95db952a6772d35d11bdd1974f2c4b4d68b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mladen=20Jakovljevi=C4=87?= Date: Fri, 16 Dec 2022 00:20:35 +0100 Subject: [PATCH] feat(delayWhen): `delayWhen`'s `delayDurationSelector` should support `ObservableInput` (#7049) * feat(delayWhen): `delayWhen`'s `delayDurationSelector` should support `ObservableInput` * test(delayWhen): support Promises in delayWhen * chore: speed-up the tests --- spec-dtslint/operators/delayWhen-spec.ts | 10 ++++-- spec/operators/audit-spec.ts | 4 +-- spec/operators/delayWhen-spec.ts | 40 +++++++++++++++++++++++- src/internal/operators/delayWhen.ts | 29 +++++++++-------- 4 files changed, 64 insertions(+), 19 deletions(-) diff --git a/spec-dtslint/operators/delayWhen-spec.ts b/spec-dtslint/operators/delayWhen-spec.ts index 2924fd3607..b03de4d103 100644 --- a/spec-dtslint/operators/delayWhen-spec.ts +++ b/spec-dtslint/operators/delayWhen-spec.ts @@ -10,7 +10,7 @@ it('should support an empty notifier', () => { const o = of(1, 2, 3).pipe(delayWhen(() => NEVER)); // $ExpectType Observable }); -it('should support a subscriptiondelayWhen parameter', () => { +it('should support a subscriptionDelay parameter', () => { const o = of(1, 2, 3).pipe(delayWhen(() => of('a', 'b', 'c'), of(new Date()))); // $ExpectType Observable }); @@ -18,12 +18,16 @@ it('should enforce types', () => { const o = of(1, 2, 3).pipe(delayWhen()); // $ExpectError }); -it('should enforce types of delayWhenDurationSelector', () => { +it('should enforce types of delayDurationSelector', () => { const o = of(1, 2, 3).pipe(delayWhen(of('a', 'b', 'c'))); // $ExpectError const p = of(1, 2, 3).pipe(delayWhen((value: string, index) => of('a', 'b', 'c'))); // $ExpectError const q = of(1, 2, 3).pipe(delayWhen((value, index: string) => of('a', 'b', 'c'))); // $ExpectError }); -it('should enforce types of subscriptiondelayWhen', () => { +it('should enforce types of subscriptionDelay', () => { const o = of(1, 2, 3).pipe(delayWhen(() => of('a', 'b', 'c'), 'a')); // $ExpectError }); + +it('should support Promises', () => { + const o = of(1, 2, 3).pipe(delayWhen(() => Promise.resolve('a'))); // $ExpectType Observable +}); diff --git a/spec/operators/audit-spec.ts b/spec/operators/audit-spec.ts index 10f8bf70ed..78de9b0f75 100644 --- a/spec/operators/audit-spec.ts +++ b/spec/operators/audit-spec.ts @@ -395,7 +395,7 @@ describe('audit operator', () => { }); it('should audit by promise resolves', (done) => { - const e1 = interval(10).pipe(take(5)); + const e1 = interval(1).pipe(take(5)); const expected = [0, 1, 2, 3, 4]; e1.pipe(audit(() => Promise.resolve(42))).subscribe({ @@ -413,7 +413,7 @@ describe('audit operator', () => { }); it('should raise error when promise rejects', (done) => { - const e1 = interval(10).pipe(take(10)); + const e1 = interval(1).pipe(take(10)); const expected = [0, 1, 2]; const error = new Error('error'); diff --git a/spec/operators/delayWhen-spec.ts b/spec/operators/delayWhen-spec.ts index 711e6b1e77..cbecde8294 100644 --- a/spec/operators/delayWhen-spec.ts +++ b/spec/operators/delayWhen-spec.ts @@ -1,4 +1,4 @@ -import { of, EMPTY } from 'rxjs'; +import { of, EMPTY, interval, take } from 'rxjs'; import { delayWhen, tap } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -338,4 +338,42 @@ describe('delayWhen', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should delayWhen Promise resolves', (done) => { + const e1 = interval(1).pipe(take(5)); + const expected = [0, 1, 2, 3, 4]; + + e1.pipe(delayWhen(() => Promise.resolve(42))).subscribe({ + next: (x: number) => { + expect(x).to.equal(expected.shift()); + }, + error: () => { + done(new Error('should not be called')); + }, + complete: () => { + expect(expected.length).to.equal(0); + done(); + }, + }); + }); + + it('should raise error when Promise rejects', (done) => { + const e1 = interval(1).pipe(take(10)); + const expected = [0, 1, 2]; + const error = new Error('err'); + + e1.pipe(delayWhen((x) => (x === 3 ? Promise.reject(error) : Promise.resolve(42)))).subscribe({ + next: (x: number) => { + expect(x).to.equal(expected.shift()); + }, + error: (err: any) => { + expect(err).to.be.an('error'); + expect(expected.length).to.equal(0); + done(); + }, + complete: () => { + done(new Error('should not be called')); + }, + }); + }); }); diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index bbd1019561..0755507bee 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -1,17 +1,18 @@ import { Observable } from '../Observable'; -import { MonoTypeOperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, ObservableInput } from '../types'; import { concat } from '../observable/concat'; import { take } from './take'; import { ignoreElements } from './ignoreElements'; import { mapTo } from './mapTo'; import { mergeMap } from './mergeMap'; +import { innerFrom } from '../observable/innerFrom'; /** @deprecated The `subscriptionDelay` parameter will be removed in v8. */ export function delayWhen( - delayDurationSelector: (value: T, index: number) => Observable, + delayDurationSelector: (value: T, index: number) => ObservableInput, subscriptionDelay: Observable ): MonoTypeOperatorFunction; -export function delayWhen(delayDurationSelector: (value: T, index: number) => Observable): MonoTypeOperatorFunction; +export function delayWhen(delayDurationSelector: (value: T, index: number) => ObservableInput): MonoTypeOperatorFunction; /** * Delays the emission of items from the source Observable by a given time span @@ -26,8 +27,9 @@ export function delayWhen(delayDurationSelector: (value: T, index: number) => * a time span determined by another Observable. When the source emits a value, * the `delayDurationSelector` function is called with the value emitted from * the source Observable as the first argument to the `delayDurationSelector`. - * The `delayDurationSelector` function should return an Observable, called - * the "duration" Observable. + * The `delayDurationSelector` function should return an {@link ObservableInput}, + * that is internally converted to an Observable that is called the "duration" + * Observable. * * The source value is emitted on the output Observable only when the "duration" * Observable emits ({@link guide/glossary-and-semantics#next next}s) any value. @@ -76,18 +78,19 @@ export function delayWhen(delayDurationSelector: (value: T, index: number) => * @see {@link audit} * @see {@link auditTime} * - * @param {function(value: T, index: number): Observable} delayDurationSelector A function that - * returns an Observable for each value emitted by the source Observable, which - * is then used to delay the emission of that item on the output Observable - * until the Observable returned from this function emits a value. - * @param {Observable} subscriptionDelay An Observable that triggers the - * subscription to the source Observable once it emits any value. + * @param delayDurationSelector A function that returns an `ObservableInput` for + * each `value` emitted by the source Observable, which is then used to delay the + * emission of that `value` on the output Observable until the `ObservableInput` + * returned from this function emits a next value. When called, beside `value`, + * this function receives a zero-based `index` of the emission order. + * @param subscriptionDelay An Observable that triggers the subscription to the + * source Observable once it emits any value. * @return A function that returns an Observable that delays the emissions of * the source Observable by an amount of time specified by the Observable * returned by `delayDurationSelector`. */ export function delayWhen( - delayDurationSelector: (value: T, index: number) => Observable, + delayDurationSelector: (value: T, index: number) => ObservableInput, subscriptionDelay?: Observable ): MonoTypeOperatorFunction { if (subscriptionDelay) { @@ -96,5 +99,5 @@ export function delayWhen( concat(subscriptionDelay.pipe(take(1), ignoreElements()), source.pipe(delayWhen(delayDurationSelector))); } - return mergeMap((value, index) => delayDurationSelector(value, index).pipe(take(1), mapTo(value))); + return mergeMap((value, index) => innerFrom(delayDurationSelector(value, index)).pipe(take(1), mapTo(value))); }