/
readable.js
113 lines (103 loc) · 3.82 KB
/
readable.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import {Readable} from 'node:stream';
import {callbackify} from 'node:util';
import {BINARY_ENCODINGS} from '../arguments/encoding-option.js';
import {getFromStream} from '../arguments/fd-options.js';
import {iterateOnSubprocessStream, DEFAULT_OBJECT_HIGH_WATER_MARK} from '../io/iterate.js';
import {addConcurrentStream, waitForConcurrentStreams} from './concurrent.js';
import {
createDeferred,
safeWaitForSubprocessStdin,
waitForSubprocessStdout,
waitForSubprocess,
destroyOtherStream,
} from './shared.js';
// Create a `Readable` stream that forwards from `stdout` and awaits the subprocess
export const createReadable = ({subprocess, concurrentStreams, encoding}, {from, binary: binaryOption = true, preserveNewlines = true} = {}) => {
const binary = binaryOption || BINARY_ENCODINGS.has(encoding);
const {subprocessStdout, waitReadableDestroy} = getSubprocessStdout(subprocess, from, concurrentStreams);
const {readableEncoding, readableObjectMode, readableHighWaterMark} = getReadableOptions(subprocessStdout, binary);
const {read, onStdoutDataDone} = getReadableMethods({
subprocessStdout,
subprocess,
binary,
encoding,
preserveNewlines,
});
const readable = new Readable({
read,
destroy: callbackify(onReadableDestroy.bind(undefined, {subprocessStdout, subprocess, waitReadableDestroy})),
highWaterMark: readableHighWaterMark,
objectMode: readableObjectMode,
encoding: readableEncoding,
});
onStdoutFinished({
subprocessStdout,
onStdoutDataDone,
readable,
subprocess,
});
return readable;
};
// Retrieve `stdout` (or other stream depending on `from`)
export const getSubprocessStdout = (subprocess, from, concurrentStreams) => {
const subprocessStdout = getFromStream(subprocess, from);
const waitReadableDestroy = addConcurrentStream(concurrentStreams, subprocessStdout, 'readableDestroy');
return {subprocessStdout, waitReadableDestroy};
};
export const getReadableOptions = ({readableEncoding, readableObjectMode, readableHighWaterMark}, binary) => binary
? {readableEncoding, readableObjectMode, readableHighWaterMark}
: {readableEncoding, readableObjectMode: true, readableHighWaterMark: DEFAULT_OBJECT_HIGH_WATER_MARK};
export const getReadableMethods = ({subprocessStdout, subprocess, binary, encoding, preserveNewlines}) => {
const onStdoutDataDone = createDeferred();
const onStdoutData = iterateOnSubprocessStream({
subprocessStdout,
subprocess,
binary,
shouldEncode: !binary,
encoding,
preserveNewlines,
});
return {
read() {
onRead(this, onStdoutData, onStdoutDataDone);
},
onStdoutDataDone,
};
};
// Forwards data from `stdout` to `readable`
const onRead = async (readable, onStdoutData, onStdoutDataDone) => {
try {
const {value, done} = await onStdoutData.next();
if (done) {
onStdoutDataDone.resolve();
} else {
readable.push(value);
}
} catch {}
};
// When `subprocess.stdout` ends/aborts/errors, do the same on `readable`.
// Await the subprocess, for the same reason as above.
export const onStdoutFinished = async ({subprocessStdout, onStdoutDataDone, readable, subprocess, subprocessStdin}) => {
try {
await waitForSubprocessStdout(subprocessStdout);
await subprocess;
await safeWaitForSubprocessStdin(subprocessStdin);
await onStdoutDataDone;
if (readable.readable) {
readable.push(null);
}
} catch (error) {
await safeWaitForSubprocessStdin(subprocessStdin);
destroyOtherReadable(readable, error);
}
};
// When `readable` aborts/errors, do the same on `subprocess.stdout`
export const onReadableDestroy = async ({subprocessStdout, subprocess, waitReadableDestroy}, error) => {
if (await waitForConcurrentStreams(waitReadableDestroy, subprocess)) {
destroyOtherReadable(subprocessStdout, error);
await waitForSubprocess(subprocess, error);
}
};
const destroyOtherReadable = (stream, error) => {
destroyOtherStream(stream, stream.readable, error);
};