1
- import { Observable } from '../Observable' ;
2
- import { OperatorFunction } from '../types' ;
1
+ import { OperatorFunction , ObservableInput } from '../types' ;
3
2
import { operate } from '../util/lift' ;
4
3
import { noop } from '../util/noop' ;
5
4
import { createOperatorSubscriber } from './OperatorSubscriber' ;
5
+ import { innerFrom } from '../observable/innerFrom' ;
6
6
7
7
/**
8
8
* Buffers the source Observable values until `closingNotifier` emits.
@@ -13,7 +13,8 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
13
13
* 
14
14
*
15
15
* Buffers the incoming Observable values until the given `closingNotifier`
16
- * Observable emits a value, at which point it emits the buffer on the output
16
+ * `ObservableInput` (that internally gets converted to an Observable)
17
+ * emits a value, at which point it emits the buffer on the output
17
18
* Observable and starts a new buffer internally, awaiting the next time
18
19
* `closingNotifier` emits.
19
20
*
@@ -36,12 +37,12 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
36
37
* @see {@link bufferWhen }
37
38
* @see {@link window }
38
39
*
39
- * @param { Observable<any> } closingNotifier An Observable that signals the
40
+ * @param closingNotifier An `ObservableInput` that signals the
40
41
* buffer to be emitted on the output Observable.
41
42
* @return A function that returns an Observable of buffers, which are arrays
42
43
* of values.
43
44
*/
44
- export function buffer < T > ( closingNotifier : Observable < any > ) : OperatorFunction < T , T [ ] > {
45
+ export function buffer < T > ( closingNotifier : ObservableInput < any > ) : OperatorFunction < T , T [ ] > {
45
46
return operate ( ( source , subscriber ) => {
46
47
// The current buffered values.
47
48
let currentBuffer : T [ ] = [ ] ;
@@ -59,7 +60,7 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,
59
60
) ;
60
61
61
62
// Subscribe to the closing notifier.
62
- closingNotifier . subscribe (
63
+ innerFrom ( closingNotifier ) . subscribe (
63
64
createOperatorSubscriber (
64
65
subscriber ,
65
66
( ) => {
0 commit comments