Skip to content

Commit

Permalink
Improve piping processes (#834)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Feb 21, 2024
1 parent 30a662d commit 1a47ea2
Show file tree
Hide file tree
Showing 23 changed files with 1,511 additions and 256 deletions.
10 changes: 6 additions & 4 deletions docs/scripts.md
Original file line number Diff line number Diff line change
Expand Up @@ -588,17 +588,19 @@ console.log('example');
```sh
# Bash
echo example | cat
echo npm run build | sort | head -n2
```
```js
// zx
await $`echo example | cat`;
await $`npm run build | sort | head -n2`;
```
```js
// Execa
await $`echo example`.pipe($({stdin: 'pipe'})`cat`);
await $`npm run build`
.pipe($({stdin: 'pipe'})`sort`)
.pipe($({stdin: 'pipe'})`head -n2`);
```
### Piping stdout and stderr to another command
Expand All @@ -619,7 +621,7 @@ await Promise.all([echo, cat]);
```js
// Execa
await $({all: true})`echo example`.pipe($({stdin: 'pipe'})`cat`, 'all');
await $({all: true})`echo example`.pipe($({from: 'all', stdin: 'pipe'})`cat`);
```
### Piping stdout to a file
Expand Down
55 changes: 39 additions & 16 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {type ChildProcess} from 'node:child_process';
import {type Readable, type Writable} from 'node:stream';

type IfAsync<IsSync extends boolean, AsyncValue> = IsSync extends true ? never : AsyncValue;
type IfAsync<IsSync extends boolean, AsyncValue, SyncValue = never> = IsSync extends true ? SyncValue : AsyncValue;

// When the `stdin`/`stdout`/`stderr`/`stdio` option is set to one of those values, no stream is created
type NoStreamStdioOption =
Expand Down Expand Up @@ -669,14 +669,14 @@ type ExecaCommonReturnValue<IsSync extends boolean = boolean, OptionsType extend
/**
The output of the process on `stdout`.
This is `undefined` if the `stdout` option is set to only [`'inherit'`, `'ignore'`, `Stream` or `integer`](https://nodejs.org/api/child_process.html#child_process_options_stdio). This is an array if the `lines` option is `true, or if the `stdout` option is a transform in object mode.
This is `undefined` if the `stdout` option is set to only [`'inherit'`, `'ignore'`, `Stream` or `integer`](https://nodejs.org/api/child_process.html#child_process_options_stdio). This is an array if the `lines` option is `true`, or if the `stdout` option is a transform in object mode.
*/
stdout: StdioOutput<'1', OptionsType>;

/**
The output of the process on `stderr`.
This is `undefined` if the `stderr` option is set to only [`'inherit'`, `'ignore'`, `Stream` or `integer`](https://nodejs.org/api/child_process.html#child_process_options_stdio). This is an array if the `lines` option is `true, or if the `stderr` option is a transform in object mode.
This is `undefined` if the `stderr` option is set to only [`'inherit'`, `'ignore'`, `Stream` or `integer`](https://nodejs.org/api/child_process.html#child_process_options_stdio). This is an array if the `lines` option is `true`, or if the `stderr` option is a transform in object mode.
*/
stderr: StdioOutput<'2', OptionsType>;

Expand Down Expand Up @@ -737,9 +737,16 @@ type ExecaCommonReturnValue<IsSync extends boolean = boolean, OptionsType extend
- the `all` option is `false` (default value)
- both `stdout` and `stderr` options are set to [`'inherit'`, `'ignore'`, `Stream` or `integer`](https://nodejs.org/api/child_process.html#child_process_options_stdio)
This is an array if the `lines` option is `true, or if either the `stdout` or `stderr` option is a transform in object mode.
This is an array if the `lines` option is `true`, or if either the `stdout` or `stderr` option is a transform in object mode.
*/
all: IfAsync<IsSync, AllOutput<OptionsType>>;

/**
Results of the other processes that were piped into this child process. This is useful to inspect a series of child processes piped with each other.
This array is initially empty and is populated each time the `.pipe()` method resolves.
*/
pipedFrom: IfAsync<IsSync, ExecaReturnValue[], []>;
// Workaround for a TypeScript bug: https://github.com/microsoft/TypeScript/issues/57062
} & {};

Expand Down Expand Up @@ -811,6 +818,33 @@ type AllIfStderr<StderrResultIgnored extends boolean> = StderrResultIgnored exte
? undefined
: Readable;

type PipeOptions = {
/**
Which stream to pipe. A file descriptor number can also be passed.
`"all"` pipes both `stdout` and `stderr`. This requires the `all` option to be `true`.
*/
readonly from?: 'stdout' | 'stderr' | 'all' | number;

/**
Unpipe the child process when the signal aborts.
The `.pipe()` method will be rejected with a cancellation error.
*/
readonly signal?: AbortSignal;
};

type PipableProcess = {
/**
[Pipe](https://nodejs.org/api/stream.html#readablepipedestination-options) the child process' `stdout` to a second Execa child process' `stdin`. This resolves with that second process' result. If either process is rejected, this is rejected with that process' error instead.
This can be called multiple times to chain a series of processes.
Multiple child processes can be piped to the same process. Conversely, the same child process can be piped to multiple other processes.
*/
pipe<Destination extends ExecaChildProcess>(destination: Destination, options?: PipeOptions): Promise<Awaited<Destination>> & PipableProcess;
};

export type ExecaChildPromise<OptionsType extends Options = Options> = {
stdin: StreamUnlessIgnored<'0', OptionsType>;

Expand All @@ -831,17 +865,6 @@ export type ExecaChildPromise<OptionsType extends Options = Options> = {
onRejected?: (reason: ExecaError<OptionsType>) => ResultType | PromiseLike<ResultType>
): Promise<ExecaReturnValue<OptionsType> | ResultType>;

/**
[Pipe](https://nodejs.org/api/stream.html#readablepipedestination-options) the child process' `stdout` to another Execa child process' `stdin`.
A `streamName` can be passed to pipe `"stderr"`, `"all"` (both `stdout` and `stderr`) or any another file descriptor instead of `stdout`.
`childProcess.stdout` (and/or `childProcess.stderr` depending on `streamName`) must not be `undefined`. When `streamName` is `"all"`, the `all` option must be set to `true`.
Returns `execaChildProcess`, which allows chaining `.pipe()` then `await`ing the final result.
*/
pipe<Target extends ExecaChildProcess>(target: Target, streamName?: 'stdout' | 'stderr' | 'all' | number): Target;

/**
Sends a [signal](https://nodejs.org/api/os.html#signal-constants) to the child process. The default signal is the `killSignal` option. `killSignal` defaults to `SIGTERM`, which terminates the child process.
Expand All @@ -853,7 +876,7 @@ export type ExecaChildPromise<OptionsType extends Options = Options> = {
*/
kill(signal: Parameters<ChildProcess['kill']>[0], error?: Error): ReturnType<ChildProcess['kill']>;
kill(error?: Error): ReturnType<ChildProcess['kill']>;
};
} & PipableProcess;

export type ExecaChildProcess<OptionsType extends Options = Options> = ChildProcess &
ExecaChildPromise<OptionsType> &
Expand Down
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {handleNodeOption} from './lib/node.js';
import {handleInputAsync, pipeOutputAsync, cleanupStdioStreams} from './lib/stdio/async.js';
import {handleInputSync, pipeOutputSync} from './lib/stdio/sync.js';
import {spawnedKill, validateTimeout, normalizeForceKillAfterDelay, cleanupOnExit, isFailedExit} from './lib/kill.js';
import {pipeToProcess} from './lib/pipe.js';
import {pipeToProcess} from './lib/pipe/setup.js';
import {getSpawnedResult, makeAllStream} from './lib/stream.js';
import {mergePromise} from './lib/promise.js';
import {joinCommand} from './lib/escape.js';
Expand Down
35 changes: 23 additions & 12 deletions index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import * as process from 'node:process';
import {Readable, Writable} from 'node:stream';
import {createWriteStream} from 'node:fs';
import {expectType, expectError, expectAssignable, expectNotAssignable} from 'tsd';
import {expectType, expectNotType, expectError, expectAssignable, expectNotAssignable} from 'tsd';
import {
$,
execa,
Expand Down Expand Up @@ -79,26 +79,34 @@ const asyncFinal = async function * () {

try {
const execaPromise = execa('unicorns', {all: true});
const unicornsResult = await execaPromise;

const execaBufferPromise = execa('unicorns', {encoding: 'buffer', all: true});
const writeStream = createWriteStream('output.txt');
const bufferResult = await execaBufferPromise;

expectType<typeof execaPromise>(execaBufferPromise.pipe(execaPromise));
expectError(execaBufferPromise.pipe(writeStream));
expectError(execaBufferPromise.pipe('output.txt'));
await execaBufferPromise.pipe(execaPromise, 'stdout');
await execaBufferPromise.pipe(execaPromise, 'stderr');
await execaBufferPromise.pipe(execaPromise, 'all');
await execaBufferPromise.pipe(execaPromise, 3);
expectError(execaBufferPromise.pipe(execaPromise, 'other'));
expectType<typeof bufferResult>(await execaPromise.pipe(execaBufferPromise));
expectNotType<typeof bufferResult>(await execaPromise.pipe(execaPromise));
expectType<typeof bufferResult>(await execaPromise.pipe(execaPromise).pipe(execaBufferPromise));
await execaPromise.pipe(execaPromise).pipe(execaBufferPromise, {from: 'stdout'});
expectError(execaPromise.pipe(execaBufferPromise).stdout);
expectError(execaPromise.pipe(createWriteStream('output.txt')));
expectError(execaPromise.pipe('output.txt'));
await execaPromise.pipe(execaBufferPromise, {});
expectError(execaPromise.pipe(execaBufferPromise, 'stdout'));
await execaPromise.pipe(execaBufferPromise, {from: 'stdout'});
await execaPromise.pipe(execaBufferPromise, {from: 'stderr'});
await execaPromise.pipe(execaBufferPromise, {from: 'all'});
await execaPromise.pipe(execaBufferPromise, {from: 3});
expectError(execaPromise.pipe(execaBufferPromise, {from: 'other'}));
await execaPromise.pipe(execaBufferPromise, {signal: new AbortController().signal});
expectError(await execaPromise.pipe(execaBufferPromise, {signal: true}));

expectType<Readable>(execaPromise.all);
const noAllPromise = execa('unicorns');
expectType<undefined>(noAllPromise.all);
const noAllResult = await noAllPromise;
expectType<undefined>(noAllResult.all);

const unicornsResult = await execaPromise;
expectType<string>(unicornsResult.command);
expectType<string>(unicornsResult.escapedCommand);
expectType<number | undefined>(unicornsResult.exitCode);
Expand All @@ -109,6 +117,7 @@ try {
expectType<string | undefined>(unicornsResult.signal);
expectType<string | undefined>(unicornsResult.signalDescription);
expectType<string>(unicornsResult.cwd);
expectType<ExecaReturnValue[]>(unicornsResult.pipedFrom);

expectType<undefined>(unicornsResult.stdio[0]);
expectType<string>(unicornsResult.stdout);
Expand All @@ -122,7 +131,6 @@ try {
expectType<Readable>(execaBufferPromise.stdout);
expectType<Readable>(execaBufferPromise.stderr);
expectType<Readable>(execaBufferPromise.all);
const bufferResult = await execaBufferPromise;
expectType<Uint8Array>(bufferResult.stdout);
expectType<Uint8Array>(bufferResult.stdio[1]);
expectType<Uint8Array>(bufferResult.stderr);
Expand Down Expand Up @@ -423,6 +431,7 @@ try {
expectType<string>(execaError.cwd);
expectType<string>(execaError.shortMessage);
expectType<string | undefined>(execaError.originalMessage);
expectType<ExecaReturnValue[]>(execaError.pipedFrom);

expectType<undefined>(execaError.stdio[0]);

Expand Down Expand Up @@ -568,6 +577,7 @@ try {
expectType<string | undefined>(unicornsResult.signal);
expectType<string | undefined>(unicornsResult.signalDescription);
expectType<string>(unicornsResult.cwd);
expectType<[]>(unicornsResult.pipedFrom);

expectType<undefined>(unicornsResult.stdio[0]);
expectType<string>(unicornsResult.stdout);
Expand Down Expand Up @@ -639,6 +649,7 @@ try {
expectType<string>(execaError.cwd);
expectType<string>(execaError.shortMessage);
expectType<string | undefined>(execaError.originalMessage);
expectType<[]>(execaError.pipedFrom);

expectType<undefined>(execaError.stdio[0]);

Expand Down
3 changes: 3 additions & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ export const makeError = ({
delete error.bufferedData;
}

error.pipedFrom = [];

return error;
};

Expand Down Expand Up @@ -185,4 +187,5 @@ export const makeSuccessResult = ({
stderr: stdio[2],
all,
stdio,
pipedFrom: [],
});
95 changes: 0 additions & 95 deletions lib/pipe.js

This file was deleted.

13 changes: 13 additions & 0 deletions lib/pipe/abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {aborted} from 'node:util';
import {createNonCommandError} from './validate.js';

export const unpipeOnAbort = (signal, ...args) => signal === undefined
? []
: [unpipeOnSignalAbort(signal, ...args)];

const unpipeOnSignalAbort = async (signal, sourceStream, mergedStream, {stdioStreamsGroups, options}) => {
await aborted(signal, sourceStream);
await mergedStream.remove(sourceStream);
const error = new Error('Pipe cancelled by `signal` option.');
throw createNonCommandError({error, stdioStreamsGroups, options});
};
24 changes: 24 additions & 0 deletions lib/pipe/sequence.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Like Bash, we await both processes. This is unlike some other shells which only await the destination process.
// Like Bash with the `pipefail` option, if either process fails, the whole pipe fails.
// Like Bash, if both processes fail, we return the failure of the destination.
// This ensures both processes' errors are present, using `error.pipedFrom`.
export const waitForBothProcesses = async (source, destination) => {
const [
{status: sourceStatus, reason: sourceReason, value: sourceResult = sourceReason},
{status: destinationStatus, reason: destinationReason, value: destinationResult = destinationReason},
] = await Promise.allSettled([source, destination]);

if (!destinationResult.pipedFrom.includes(sourceResult)) {
destinationResult.pipedFrom.push(sourceResult);
}

if (destinationStatus === 'rejected') {
throw destinationResult;
}

if (sourceStatus === 'rejected') {
throw sourceResult;
}

return destinationResult;
};

0 comments on commit 1a47ea2

Please sign in to comment.