Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix passing the same file or stream to both stdout and stderr #1004

Merged
merged 3 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/bash.md
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,32 @@ await $({stdout: {file: 'output.txt'}})`npm run build`;

[More info.](output.md#file-output)

### Piping interleaved stdout and stderr to a file

```sh
# Bash
npm run build &> output.txt
```

```js
// zx
import {createWriteStream} from 'node:fs';

const subprocess = $`npm run build`;
const fileStream = createWriteStream('output.txt');
subprocess.pipe(fileStream);
subprocess.stderr.pipe(fileStream);
await subprocess;
```

```js
// Execa
const output = {file: 'output.txt'};
await $({stdout: output, stderr: output})`npm run build`;
```

[More info.](output.md#file-output)

### Piping stdin from a file

```sh
Expand Down
4 changes: 4 additions & 0 deletions docs/output.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ console.log(stderr); // string with errors
await execa({stdout: {file: 'output.txt'}})`npm run build`;
// Or:
await execa({stdout: new URL('file:///path/to/output.txt')})`npm run build`;

// Redirect interleaved stdout and stderr to same file
const output = {file: 'output.txt'};
await execa({stdout: output, stderr: output})`npm run build`;
```

## Terminal output
Expand Down
20 changes: 10 additions & 10 deletions lib/io/output-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {pipeStreams} from './pipeline.js';
// Handle `input`, `inputFile`, `stdin`, `stdout` and `stderr` options, after spawning, in async mode
// When multiple input streams are used, we merge them to ensure the output stream ends only once each input stream has ended
export const pipeOutputAsync = (subprocess, fileDescriptors, controller) => {
const inputStreamsGroups = {};
const pipeGroups = new Map();

for (const [fdNumber, {stdioItems, direction}] of Object.entries(fileDescriptors)) {
for (const {stream} of stdioItems.filter(({type}) => TRANSFORM_TYPES.has(type))) {
Expand All @@ -20,15 +20,15 @@ export const pipeOutputAsync = (subprocess, fileDescriptors, controller) => {
stream,
direction,
fdNumber,
inputStreamsGroups,
pipeGroups,
controller,
});
}
}

for (const [fdNumber, inputStreams] of Object.entries(inputStreamsGroups)) {
for (const [outputStream, inputStreams] of pipeGroups.entries()) {
const inputStream = inputStreams.length === 1 ? inputStreams[0] : mergeStreams(inputStreams);
pipeStreams(inputStream, subprocess.stdio[fdNumber]);
pipeStreams(inputStream, outputStream);
}
};

Expand All @@ -52,18 +52,18 @@ const SUBPROCESS_STREAM_PROPERTIES = ['stdin', 'stdout', 'stderr'];

// Most `std*` option values involve piping `subprocess.std*` to a stream.
// The stream is either passed by the user or created internally.
const pipeStdioItem = ({subprocess, stream, direction, fdNumber, inputStreamsGroups, controller}) => {
const pipeStdioItem = ({subprocess, stream, direction, fdNumber, pipeGroups, controller}) => {
if (stream === undefined) {
return;
}

setStandardStreamMaxListeners(stream, controller);

if (direction === 'output') {
pipeStreams(subprocess.stdio[fdNumber], stream);
} else {
inputStreamsGroups[fdNumber] = [...(inputStreamsGroups[fdNumber] ?? []), stream];
}
const [inputStream, outputStream] = direction === 'output'
? [stream, subprocess.stdio[fdNumber]]
: [subprocess.stdio[fdNumber], stream];
const outputStreams = pipeGroups.get(inputStream) ?? [];
pipeGroups.set(inputStream, [...outputStreams, outputStream]);
};

// Multiple subprocesses might be piping from/to `process.std*` at the same time.
Expand Down
21 changes: 15 additions & 6 deletions lib/io/output-sync.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {writeFileSync} from 'node:fs';
import {writeFileSync, appendFileSync} from 'node:fs';
import {shouldLogOutput, logLinesSync} from '../verbose/output.js';
import {runGeneratorsSync} from '../transform/generator.js';
import {splitLinesSync} from '../transform/split.js';
Expand All @@ -13,19 +13,24 @@ export const transformOutputSync = ({fileDescriptors, syncResult: {output}, opti
}

const state = {};
const outputFiles = new Set([]);
const transformedOutput = output.map((result, fdNumber) =>
transformOutputResultSync({
result,
fileDescriptors,
fdNumber,
state,
outputFiles,
isMaxBuffer,
verboseInfo,
}, options));
return {output: transformedOutput, ...state};
};

const transformOutputResultSync = ({result, fileDescriptors, fdNumber, state, isMaxBuffer, verboseInfo}, {buffer, encoding, lines, stripFinalNewline, maxBuffer}) => {
const transformOutputResultSync = (
{result, fileDescriptors, fdNumber, state, outputFiles, isMaxBuffer, verboseInfo},
{buffer, encoding, lines, stripFinalNewline, maxBuffer},
) => {
if (result === null) {
return;
}
Expand Down Expand Up @@ -57,7 +62,7 @@ const transformOutputResultSync = ({result, fileDescriptors, fdNumber, state, is

try {
if (state.error === undefined) {
writeToFiles(serializedResult, stdioItems);
writeToFiles(serializedResult, stdioItems, outputFiles);
}

return returnedResult;
Expand Down Expand Up @@ -98,9 +103,13 @@ const serializeChunks = ({chunks, objectMode, encoding, lines, stripFinalNewline
};

// When the `std*` target is a file path/URL or a file descriptor
const writeToFiles = (serializedResult, stdioItems) => {
for (const {type, path} of stdioItems) {
if (FILE_TYPES.has(type)) {
const writeToFiles = (serializedResult, stdioItems, outputFiles) => {
for (const {path} of stdioItems.filter(({type}) => FILE_TYPES.has(type))) {
const pathString = typeof path === 'string' ? path : path.toString();
if (outputFiles.has(pathString)) {
appendFileSync(path, serializedResult);
} else {
outputFiles.add(pathString);
writeFileSync(path, serializedResult);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/return/early-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
Writable,
Duplex,
} from 'node:stream';
import {cleanupCustomStreams} from '../stdio/handle-async.js';
import {cleanupCustomStreams} from '../stdio/handle.js';
import {makeEarlyError} from './result.js';
import {handleResult} from './reject.js';

Expand Down
116 changes: 116 additions & 0 deletions lib/stdio/duplicate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import {
SPECIAL_DUPLICATE_TYPES_SYNC,
SPECIAL_DUPLICATE_TYPES,
FORBID_DUPLICATE_TYPES,
TYPE_TO_MESSAGE,
} from './type.js';

// Duplicates in the same file descriptor is most likely an error.
// However, this can be useful with generators.
export const filterDuplicates = stdioItems => stdioItems.filter((stdioItemOne, indexOne) =>
stdioItems.every((stdioItemTwo, indexTwo) => stdioItemOne.value !== stdioItemTwo.value
|| indexOne >= indexTwo
|| stdioItemOne.type === 'generator'
|| stdioItemOne.type === 'asyncGenerator'));

// Check if two file descriptors are sharing the same target.
// For example `{stdout: {file: './output.txt'}, stderr: {file: './output.txt'}}`.
export const getDuplicateStream = ({stdioItem: {type, value, optionName}, direction, fileDescriptors, isSync}) => {
const otherStdioItems = getOtherStdioItems(fileDescriptors, type);
if (otherStdioItems.length === 0) {
return;
}

if (isSync) {
validateDuplicateStreamSync({
otherStdioItems,
type,
value,
optionName,
direction,
});
return;
}

if (SPECIAL_DUPLICATE_TYPES.has(type)) {
return getDuplicateStreamInstance({
otherStdioItems,
type,
value,
optionName,
direction,
});
}

if (FORBID_DUPLICATE_TYPES.has(type)) {
validateDuplicateTransform({
otherStdioItems,
type,
value,
optionName,
});
}
};

// Values shared by multiple file descriptors
const getOtherStdioItems = (fileDescriptors, type) => fileDescriptors
.flatMap(({direction, stdioItems}) => stdioItems
.filter(stdioItem => stdioItem.type === type)
.map((stdioItem => ({...stdioItem, direction}))));

// With `execaSync()`, do not allow setting a file path both in input and output
const validateDuplicateStreamSync = ({otherStdioItems, type, value, optionName, direction}) => {
if (SPECIAL_DUPLICATE_TYPES_SYNC.has(type)) {
getDuplicateStreamInstance({
otherStdioItems,
type,
value,
optionName,
direction,
});
}
};

// When two file descriptors share the file or stream, we need to re-use the same underlying stream.
// Otherwise, the stream would be closed twice when piping ends.
// This is only an issue with output file descriptors.
// This is not a problem with generator functions since those create a new instance for each file descriptor.
// We also forbid input and output file descriptors sharing the same file or stream, since that does not make sense.
const getDuplicateStreamInstance = ({otherStdioItems, type, value, optionName, direction}) => {
const duplicateStdioItems = otherStdioItems.filter(stdioItem => hasSameValue(stdioItem, value));
if (duplicateStdioItems.length === 0) {
return;
}

const differentStdioItem = duplicateStdioItems.find(stdioItem => stdioItem.direction !== direction);
throwOnDuplicateStream(differentStdioItem, optionName, type);

return direction === 'output' ? duplicateStdioItems[0].stream : undefined;
};

const hasSameValue = ({type, value}, secondValue) => {
if (type === 'filePath') {
return value.path === secondValue.path;
}

if (type === 'fileUrl') {
return value.href === secondValue.href;
}

return value === secondValue;
};

// We do not allow two file descriptors to share the same Duplex or TransformStream.
// This is because those are set directly to `subprocess.std*`.
// For example, this could result in `subprocess.stdout` and `subprocess.stderr` being the same value.
// This means reading from either would get data from both stdout and stderr.
const validateDuplicateTransform = ({otherStdioItems, type, value, optionName}) => {
const duplicateStdioItem = otherStdioItems.find(({value: {transform}}) => transform === value.transform);
throwOnDuplicateStream(duplicateStdioItem, optionName, type);
};

const throwOnDuplicateStream = (stdioItem, optionName, type) => {
if (stdioItem !== undefined) {
throw new TypeError(`The \`${stdioItem.optionName}\` and \`${optionName}\` options must not target ${TYPE_TO_MESSAGE[type]} that is the same.`);
}
};
16 changes: 1 addition & 15 deletions lib/stdio/handle-async.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {createReadStream, createWriteStream} from 'node:fs';
import {Buffer} from 'node:buffer';
import {Readable, Writable, Duplex} from 'node:stream';
import {isStandardStream} from '../utils/standard-stream.js';
import {generatorToStream} from '../transform/generator.js';
import {handleStdio} from './handle.js';
import {TYPE_TO_MESSAGE} from './type.js';
Expand All @@ -16,6 +15,7 @@ const forbiddenIfAsync = ({type, optionName}) => {
// Create streams used internally for piping when using specific values for the `std*` options, in async mode.
// For example, `stdout: {file}` creates a file stream, which is piped from/to.
const addProperties = {
fileNumber: forbiddenIfAsync,
generator: generatorToStream,
asyncGenerator: generatorToStream,
nodeStream: ({value}) => ({stream: value}),
Expand Down Expand Up @@ -50,17 +50,3 @@ const addPropertiesAsync = {
uint8Array: forbiddenIfAsync,
},
};

// The stream error handling is performed by the piping logic above, which cannot be performed before subprocess spawning.
// If the subprocess spawning fails (e.g. due to an invalid command), the streams need to be manually destroyed.
// We need to create those streams before subprocess spawning, in case their creation fails, e.g. when passing an invalid generator as argument.
// Like this, an exception would be thrown, which would prevent spawning a subprocess.
export const cleanupCustomStreams = fileDescriptors => {
for (const {stdioItems} of fileDescriptors) {
for (const {stream} of stdioItems) {
if (stream !== undefined && !isStandardStream(stream)) {
stream.destroy();
}
}
}
};
1 change: 1 addition & 0 deletions lib/stdio/handle-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const addPropertiesSync = {
...addProperties,
fileUrl: ({value}) => ({contents: [bufferToUint8Array(readFileSync(value))]}),
filePath: ({value: {file}}) => ({contents: [bufferToUint8Array(readFileSync(file))]}),
fileNumber: forbiddenIfSync,
iterable: ({value}) => ({contents: [...value]}),
string: ({value}) => ({contents: [value]}),
uint8Array: ({value}) => ({contents: [value]}),
Expand Down