/
duplex.js
69 lines (67 loc) · 2.21 KB
/
duplex.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import {Duplex} from 'node:stream';
import {callbackify} from 'node:util';
import {BINARY_ENCODINGS} from '../arguments/encoding-option.js';
import {
getSubprocessStdout,
getReadableOptions,
getReadableMethods,
onStdoutFinished,
onReadableDestroy,
} from './readable.js';
import {
getSubprocessStdin,
getWritableMethods,
onStdinFinished,
onWritableDestroy,
} from './writable.js';
// Create a `Duplex` stream combining both `subprocess.readable()` and `subprocess.writable()`
export const createDuplex = ({subprocess, concurrentStreams, encoding}, {from, to, binary: binaryOption = true, preserveNewlines = true} = {}) => {
const binary = binaryOption || BINARY_ENCODINGS.has(encoding);
const {subprocessStdout, waitReadableDestroy} = getSubprocessStdout(subprocess, from, concurrentStreams);
const {subprocessStdin, waitWritableFinal, waitWritableDestroy} = getSubprocessStdin(subprocess, to, concurrentStreams);
const {readableEncoding, readableObjectMode, readableHighWaterMark} = getReadableOptions(subprocessStdout, binary);
const {read, onStdoutDataDone} = getReadableMethods({
subprocessStdout,
subprocess,
binary,
encoding,
preserveNewlines,
});
const duplex = new Duplex({
read,
...getWritableMethods(subprocessStdin, subprocess, waitWritableFinal),
destroy: callbackify(onDuplexDestroy.bind(undefined, {
subprocessStdout,
subprocessStdin,
subprocess,
waitReadableDestroy,
waitWritableFinal,
waitWritableDestroy,
})),
readableHighWaterMark,
writableHighWaterMark: subprocessStdin.writableHighWaterMark,
readableObjectMode,
writableObjectMode: subprocessStdin.writableObjectMode,
encoding: readableEncoding,
});
onStdoutFinished({
subprocessStdout,
onStdoutDataDone,
readable: duplex,
subprocess,
subprocessStdin,
});
onStdinFinished(subprocessStdin, duplex, subprocessStdout);
return duplex;
};
const onDuplexDestroy = async ({subprocessStdout, subprocessStdin, subprocess, waitReadableDestroy, waitWritableFinal, waitWritableDestroy}, error) => {
await Promise.all([
onReadableDestroy({subprocessStdout, subprocess, waitReadableDestroy}, error),
onWritableDestroy({
subprocessStdin,
subprocess,
waitWritableFinal,
waitWritableDestroy,
}, error),
]);
};