/
shareReplay.ts
172 lines (169 loc) · 5.98 KB
/
shareReplay.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import { ReplaySubject } from '../ReplaySubject';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { share } from './share';
export interface ShareReplayConfig {
bufferSize?: number;
windowTime?: number;
refCount: boolean;
scheduler?: SchedulerLike;
}
export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
/**
* Share source and replay specified number of emissions on subscription.
*
* This operator is a specialization of `replay` that connects to a source observable
* and multicasts through a `ReplaySubject` constructed with the specified arguments.
* A successfully completed source will stay cached in the `shareReplayed observable` forever,
* but an errored source can be retried.
*
* ## Why use shareReplay?
* You generally want to use `shareReplay` when you have side-effects or taxing computations
* that you do not wish to be executed amongst multiple subscribers.
* It may also be valuable in situations where you know you will have late subscribers to
* a stream that need access to previously emitted values.
* This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`.
*
* ![](shareReplay.png)
*
* ## Reference counting
* By default `shareReplay` will use `refCount` of false, meaning that it will _not_ unsubscribe the
* source when the reference counter drops to zero, i.e. the inner `ReplaySubject` will _not_ be unsubscribed
* (and potentially run for ever).
* This is the default as it is expected that `shareReplay` is often used to keep around expensive to setup
* observables which we want to keep running instead of having to do the expensive setup again.
*
* As of RXJS version 6.4.0 a new overload signature was added to allow for manual control over what
* happens when the operators internal reference counter drops to zero.
* If `refCount` is true, the source will be unsubscribed from once the reference count drops to zero, i.e.
* the inner `ReplaySubject` will be unsubscribed. All new subscribers will receive value emissions from a
* new `ReplaySubject` which in turn will cause a new subscription to the source observable.
*
* ## Examples
*
* Example with a third subscriber coming late to the party
*
* ```ts
* import { interval, take, shareReplay } from 'rxjs';
*
* const shared$ = interval(2000).pipe(
* take(6),
* shareReplay(3)
* );
*
* shared$.subscribe(x => console.log('sub A: ', x));
* shared$.subscribe(y => console.log('sub B: ', y));
*
* setTimeout(() => {
* shared$.subscribe(y => console.log('sub C: ', y));
* }, 11000);
*
* // Logs:
* // (after ~2000 ms)
* // sub A: 0
* // sub B: 0
* // (after ~4000 ms)
* // sub A: 1
* // sub B: 1
* // (after ~6000 ms)
* // sub A: 2
* // sub B: 2
* // (after ~8000 ms)
* // sub A: 3
* // sub B: 3
* // (after ~10000 ms)
* // sub A: 4
* // sub B: 4
* // (after ~11000 ms, sub C gets the last 3 values)
* // sub C: 2
* // sub C: 3
* // sub C: 4
* // (after ~12000 ms)
* // sub A: 5
* // sub B: 5
* // sub C: 5
* ```
*
* Example for `refCount` usage
*
* ```ts
* import { Observable, tap, interval, shareReplay, take } from 'rxjs';
*
* const log = <T>(name: string, source: Observable<T>) => source.pipe(
* tap({
* subscribe: () => console.log(`${ name }: subscribed`),
* next: value => console.log(`${ name }: ${ value }`),
* complete: () => console.log(`${ name }: completed`),
* finalize: () => console.log(`${ name }: unsubscribed`)
* })
* );
*
* const obs$ = log('source', interval(1000));
*
* const shared$ = log('shared', obs$.pipe(
* shareReplay({ bufferSize: 1, refCount: true }),
* take(2)
* ));
*
* shared$.subscribe(x => console.log('sub A: ', x));
* shared$.subscribe(y => console.log('sub B: ', y));
*
* // PRINTS:
* // shared: subscribed <-- reference count = 1
* // source: subscribed
* // shared: subscribed <-- reference count = 2
* // source: 0
* // shared: 0
* // sub A: 0
* // shared: 0
* // sub B: 0
* // source: 1
* // shared: 1
* // sub A: 1
* // shared: completed <-- take(2) completes the subscription for sub A
* // shared: unsubscribed <-- reference count = 1
* // shared: 1
* // sub B: 1
* // shared: completed <-- take(2) completes the subscription for sub B
* // shared: unsubscribed <-- reference count = 0
* // source: unsubscribed <-- replaySubject unsubscribes from source observable because the reference count dropped to 0 and refCount is true
*
* // In case of refCount being false, the unsubscribe is never called on the source and the source would keep on emitting, even if no subscribers
* // are listening.
* // source: 2
* // source: 3
* // source: 4
* // ...
* ```
*
* @see {@link publish}
* @see {@link share}
* @see {@link publishReplay}
*
* @param {Number} [bufferSize=Infinity] Maximum element count of the replay buffer.
* @param {Number} [windowTime=Infinity] Maximum time length of the replay buffer in milliseconds.
* @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
* will be invoked on.
* @return A function that returns an Observable sequence that contains the
* elements of a sequence produced by multicasting the source sequence within a
* selector function.
*/
export function shareReplay<T>(
configOrBufferSize?: ShareReplayConfig | number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
let bufferSize: number;
let refCount = false;
if (configOrBufferSize && typeof configOrBufferSize === 'object') {
({ bufferSize = Infinity, windowTime = Infinity, refCount = false, scheduler } = configOrBufferSize);
} else {
bufferSize = (configOrBufferSize ?? Infinity) as number;
}
return share<T>({
connector: () => new ReplaySubject(bufferSize, windowTime, scheduler),
resetOnError: true,
resetOnComplete: false,
resetOnRefCountZero: refCount,
});
}