forked from ReactiveX/rxjs
-
Notifications
You must be signed in to change notification settings - Fork 4
/
mergeInternals.ts
108 lines (100 loc) · 4 KB
/
mergeInternals.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import type { Observable, Subscriber } from '@rxjs/observable';
import { from, operate } from '@rxjs/observable';
import type { ObservableInput } from '../types.js';
/**
* A process embodying the general "merge" strategy. This is used in
* `mergeMap` and `mergeScan` because the logic is otherwise nearly identical.
* @param source The original source observable
* @param destination The consumer subscriber
* @param project The projection function to get our inner sources
* @param concurrent The number of concurrent inner subscriptions
* @param onBeforeNext Additional logic to apply before nexting to our consumer
* @param expand If `true` this will perform an "expand" strategy, which differs only
* in that it recurses, and the inner subscription must be schedule-able.
* @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
* this is to support the expand strategy, mostly, and should be deprecated
*/
export function mergeInternals<T, R>(
source: Observable<T>,
destination: Subscriber<R>,
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
expand?: boolean
) {
// Buffered values, in the event of going over our concurrency limit
const buffer: T[] = [];
// The number of active inner subscriptions.
let active = 0;
// An index to pass to our accumulator function
let index = 0;
// Whether or not the outer source has completed.
let isComplete = false;
/**
* Checks to see if we can complete our result or not.
*/
const checkComplete = () => {
// If the outer has completed, and nothing is left in the buffer,
// and we don't have any active inner subscriptions, then we can
// Emit the state and complete.
if (isComplete && !buffer.length && !active) {
destination.complete();
}
};
// If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
const doInnerSub = (value: T) => {
// If we're expanding, we need to emit the outer values and the inner values
// as the inners will "become outers" in a way as they are recursively fed
// back to the projection mechanism.
expand && destination.next(value as any);
// Increment the number of active subscriptions so we can track it
// against our concurrency limit later.
active++;
// Start our inner subscription.
from(project(value, index++)).subscribe(
operate({
destination,
next: (innerValue) => {
// `mergeScan` has additional handling here. For example
// taking the inner value and updating state.
onBeforeNext?.(innerValue);
if (expand) {
// If we're expanding, then just recurse back to our outer
// handler. It will emit the value first thing.
outerNext(innerValue as any);
} else {
// Otherwise, emit the inner value.
destination.next(innerValue);
}
},
complete: () => {
// Decrement the active count to ensure that the next time
// we try to call `doInnerSub`, the number is accurate.
active--;
// If we have more values in the buffer, try to process those
// Note that this call will increment `active` ahead of the
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
},
})
);
};
// Subscribe to our source observable.
source.subscribe(
operate({
destination,
next: outerNext,
complete: () => {
// Outer completed, make a note of it, and check to see if we can complete everything.
isComplete = true;
checkComplete();
},
})
);
}