Skip to content

Commit 8c4347c

Browse files
authoredDec 15, 2022
feat(window): windowBoundaries should support ObservableInput (#7088)
1 parent 030b682 commit 8c4347c

File tree

3 files changed

+56
-9
lines changed

3 files changed

+56
-9
lines changed
 

‎spec-dtslint/operators/window-spec.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,10 @@ it('should infer correctly', () => {
66
});
77

88
it('should enforce types', () => {
9-
of(1).pipe(window('')); // $ExpectError
9+
of(1).pipe(window()); // $ExpectError
10+
of(1).pipe(window(6)); // $ExpectError
11+
});
12+
13+
it('should support Promises', () => {
14+
of(1, 2, 3).pipe(window(Promise.resolve('foo'))); // $ExpectType Observable<Observable<number>>
1015
});

‎spec/operators/window-spec.ts

+42-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import { window, mergeMap } from 'rxjs/operators';
1+
import { window, mergeMap, take } from 'rxjs/operators';
22
import { TestScheduler } from 'rxjs/testing';
3-
import { EMPTY, of, Observable } from 'rxjs';
3+
import { EMPTY, of, Observable, interval } from 'rxjs';
44
import { observableMatcher } from '../helpers/observableMatcher';
5+
import { expect } from 'chai';
56

67
/** @test {window} */
78
describe('window', () => {
@@ -280,4 +281,43 @@ describe('window', () => {
280281
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
281282
});
282283
});
284+
285+
it('should window when Promise resolves', (done) => {
286+
const e1 = interval(3).pipe(take(5));
287+
let pos = 0;
288+
const result: number[][] = [[], []];
289+
const expected = [
290+
[0, 1],
291+
[2, 3, 4],
292+
];
293+
294+
e1.pipe(window(new Promise<void>((resolve) => setTimeout(() => resolve(), 8)))).subscribe({
295+
next: (x) => {
296+
x.subscribe({
297+
next: (v) => result[pos].push(v),
298+
complete: () => pos++,
299+
});
300+
},
301+
error: () => done(new Error('should not be called')),
302+
complete: () => {
303+
expect(result).to.deep.equal(expected);
304+
done();
305+
},
306+
});
307+
});
308+
309+
it('should raise error when Promise rejects', (done) => {
310+
const e1 = interval(1).pipe(take(5));
311+
const error = new Error('err');
312+
313+
e1.pipe(window(Promise.reject(error))).subscribe({
314+
error: (err) => {
315+
expect(err).to.be.an('error');
316+
done();
317+
},
318+
complete: () => {
319+
done(new Error('should not be called'));
320+
},
321+
});
322+
});
283323
});

‎src/internal/operators/window.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { Observable } from '../Observable';
2-
import { OperatorFunction } from '../types';
2+
import { OperatorFunction, ObservableInput } from '../types';
33
import { Subject } from '../Subject';
44
import { operate } from '../util/lift';
55
import { createOperatorSubscriber } from './OperatorSubscriber';
66
import { noop } from '../util/noop';
7+
import { innerFrom } from '../observable/innerFrom';
78

89
/**
910
* Branch out the source Observable values as a nested Observable whenever
@@ -17,8 +18,9 @@ import { noop } from '../util/noop';
1718
* Returns an Observable that emits windows of items it collects from the source
1819
* Observable. The output Observable emits connected, non-overlapping
1920
* windows. It emits the current window and opens a new one whenever the
20-
* Observable `windowBoundaries` emits an item. Because each window is an
21-
* Observable, the output is a higher-order Observable.
21+
* `windowBoundaries` emits an item. `windowBoundaries` can be any type that
22+
* `ObservableInput` accepts. It internally gets converted to an Observable.
23+
* Because each window is an Observable, the output is a higher-order Observable.
2224
*
2325
* ## Example
2426
*
@@ -43,12 +45,12 @@ import { noop } from '../util/noop';
4345
* @see {@link windowWhen}
4446
* @see {@link buffer}
4547
*
46-
* @param {Observable<any>} windowBoundaries An Observable that completes the
48+
* @param windowBoundaries An `ObservableInput` that completes the
4749
* previous window and starts a new window.
4850
* @return A function that returns an Observable of windows, which are
4951
* Observables emitting values of the source Observable.
5052
*/
51-
export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
53+
export function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>> {
5254
return operate((source, subscriber) => {
5355
let windowSubject: Subject<T> = new Subject<T>();
5456

@@ -73,7 +75,7 @@ export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T
7375
);
7476

7577
// Subscribe to the window boundaries.
76-
windowBoundaries.subscribe(
78+
innerFrom(windowBoundaries).subscribe(
7779
createOperatorSubscriber(
7880
subscriber,
7981
() => {

0 commit comments

Comments
 (0)
Please sign in to comment.