-
Notifications
You must be signed in to change notification settings - Fork 3k
/
bufferWhen.ts
99 lines (93 loc) · 3.38 KB
/
bufferWhen.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import type { Subscriber } from '@rxjs/observable';
import { operate, Observable, from } from '@rxjs/observable';
import type { ObservableInput, OperatorFunction } from '../types.js';
import { noop } from '../util/noop.js';
/**
* Buffers the source Observable values, using a factory function of closing
* Observables to determine when to close, emit, and reset the buffer.
*
* <span class="informal">Collects values from the past as an array. When it
* starts collecting values, it calls a function that returns an Observable that
* tells when to close the buffer and restart collecting.</span>
*
* ![](bufferWhen.svg)
*
* Opens a buffer immediately, then closes the buffer when the observable
* returned by calling `closingSelector` function emits a value. When it closes
* the buffer, it immediately opens a new buffer and repeats the process.
*
* ## Example
*
* Emit an array of the last clicks every [1-5] random seconds
*
* ```ts
* import { fromEvent, bufferWhen, interval } from 'rxjs';
*
* const clicks = fromEvent(document, 'click');
* const buffered = clicks.pipe(
* bufferWhen(() => interval(1000 + Math.random() * 4000))
* );
* buffered.subscribe(x => console.log(x));
* ```
*
* @see {@link buffer}
* @see {@link bufferCount}
* @see {@link bufferTime}
* @see {@link bufferToggle}
* @see {@link windowWhen}
*
* @param closingSelector A function that takes no arguments and returns an
* Observable that signals buffer closure.
* @return A function that returns an Observable of arrays of buffered values.
*/
export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
return (source) =>
new Observable((destination) => {
// The buffer we keep and emit.
let buffer: T[] | null = null;
// A reference to the subscriber used to subscribe to
// the closing notifier. We need to hold this so we can
// end the subscription after the first notification.
let closingSubscriber: Subscriber<T> | null = null;
destination.add(() => {
buffer = closingSubscriber = null!;
});
// Ends the previous closing notifier subscription, so it
// terminates after the first emission, then emits
// the current buffer if there is one, starts a new buffer, and starts a
// new closing notifier.
const openBuffer = () => {
// Make sure to finalize the closing subscription, we only cared
// about one notification.
closingSubscriber?.unsubscribe();
// emit the buffer if we have one, and start a new buffer.
const b = buffer;
buffer = [];
b && destination.next(b);
// Get a new closing notifier and subscribe to it.
from(closingSelector()).subscribe(
(closingSubscriber = operate({
destination,
next: openBuffer,
complete: noop,
}))
);
};
// Start the first buffer.
openBuffer();
// Subscribe to our source.
source.subscribe(
operate({
destination,
// Add every new value to the current buffer.
next: (value) => buffer?.push(value),
// When we complete, emit the buffer if we have one,
// then complete the result.
complete: () => {
buffer && destination.next(buffer);
destination.complete();
},
})
);
});
}