diff --git a/spec-dtslint/operators/window-spec.ts b/spec-dtslint/operators/window-spec.ts index 683e6a85e6..1f65abeb1e 100644 --- a/spec-dtslint/operators/window-spec.ts +++ b/spec-dtslint/operators/window-spec.ts @@ -6,5 +6,10 @@ it('should infer correctly', () => { }); it('should enforce types', () => { - of(1).pipe(window('')); // $ExpectError + of(1).pipe(window()); // $ExpectError + of(1).pipe(window(6)); // $ExpectError +}); + +it('should support Promises', () => { + of(1, 2, 3).pipe(window(Promise.resolve('foo'))); // $ExpectType Observable> }); diff --git a/spec/operators/window-spec.ts b/spec/operators/window-spec.ts index 5eb2d9aff9..929977e04c 100644 --- a/spec/operators/window-spec.ts +++ b/spec/operators/window-spec.ts @@ -1,7 +1,8 @@ -import { window, mergeMap } from 'rxjs/operators'; +import { window, mergeMap, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { EMPTY, of, Observable } from 'rxjs'; +import { EMPTY, of, Observable, interval } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; +import { expect } from 'chai'; /** @test {window} */ describe('window', () => { @@ -280,4 +281,43 @@ describe('window', () => { expectSubscriptions(closings.subscriptions).toBe(closingSubs); }); }); + + it('should window when Promise resolves', (done) => { + const e1 = interval(3).pipe(take(5)); + let pos = 0; + const result: number[][] = [[], []]; + const expected = [ + [0, 1], + [2, 3, 4], + ]; + + e1.pipe(window(new Promise((resolve) => setTimeout(() => resolve(), 8)))).subscribe({ + next: (x) => { + x.subscribe({ + next: (v) => result[pos].push(v), + complete: () => pos++, + }); + }, + error: () => done(new Error('should not be called')), + complete: () => { + expect(result).to.deep.equal(expected); + done(); + }, + }); + }); + + it('should raise error when Promise rejects', (done) => { + const e1 = interval(1).pipe(take(5)); + const error = new Error('err'); + + e1.pipe(window(Promise.reject(error))).subscribe({ + error: (err) => { + expect(err).to.be.an('error'); + done(); + }, + complete: () => { + done(new Error('should not be called')); + }, + }); + }); }); diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index 31ef626f60..b8250cbc2e 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -1,9 +1,10 @@ import { Observable } from '../Observable'; -import { OperatorFunction } from '../types'; +import { OperatorFunction, ObservableInput } from '../types'; import { Subject } from '../Subject'; import { operate } from '../util/lift'; import { createOperatorSubscriber } from './OperatorSubscriber'; import { noop } from '../util/noop'; +import { innerFrom } from '../observable/innerFrom'; /** * Branch out the source Observable values as a nested Observable whenever @@ -17,8 +18,9 @@ import { noop } from '../util/noop'; * Returns an Observable that emits windows of items it collects from the source * Observable. The output Observable emits connected, non-overlapping * windows. It emits the current window and opens a new one whenever the - * Observable `windowBoundaries` emits an item. Because each window is an - * Observable, the output is a higher-order Observable. + * `windowBoundaries` emits an item. `windowBoundaries` can be any type that + * `ObservableInput` accepts. It internally gets converted to an Observable. + * Because each window is an Observable, the output is a higher-order Observable. * * ## Example * @@ -43,12 +45,12 @@ import { noop } from '../util/noop'; * @see {@link windowWhen} * @see {@link buffer} * - * @param {Observable} windowBoundaries An Observable that completes the + * @param windowBoundaries An `ObservableInput` that completes the * previous window and starts a new window. * @return A function that returns an Observable of windows, which are * Observables emitting values of the source Observable. */ -export function window(windowBoundaries: Observable): OperatorFunction> { +export function window(windowBoundaries: ObservableInput): OperatorFunction> { return operate((source, subscriber) => { let windowSubject: Subject = new Subject(); @@ -73,7 +75,7 @@ export function window(windowBoundaries: Observable): OperatorFunction {